File size: 4,193 Bytes
60efa5a
5b7ca6d
 
 
93401eb
60efa5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
091d569
60efa5a
 
 
 
 
 
 
 
091d569
60efa5a
 
091d569
60efa5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import os
os.environ['LIBROSA_CACHE_DIR'] = '/tmp'
os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'
os.environ['NUMBA_CACHE_DIR'] = '/tmp'

import cv2
import librosa
import subprocess
import numpy as np
from typing import List, Dict, Tuple
from extraction.timeline_generator import TimelineGenerator

class MediaExtractor:
    
    def __init__(self, frames_per_interval: int = 5):
        self.frames_per_interval = frames_per_interval
    
    def get_video_info(self, video_path: str) -> Dict:
        
        cap = cv2.VideoCapture(video_path)
        
        if not cap.isOpened():
            raise ValueError(f"Cannot open video: {video_path}")
        
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total_frames / fps if fps > 0 else 0
        
        cap.release()
        
        return {
            'fps': fps,
            'total_frames': total_frames,
            'duration': duration
        }
    
    def extract_frames(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
        
        cap = cv2.VideoCapture(video_path)
        
        if not cap.isOpened():
            raise ValueError(f"Cannot open video: {video_path}")
        
        for interval in timeline:
            
            sample_times = np.linspace(
                interval['start'], 
                interval['end'], 
                self.frames_per_interval,
                endpoint=False
            )
            
            for sample_time in sample_times:
                cap.set(cv2.CAP_PROP_POS_MSEC, sample_time * 1000)
                ret, frame = cap.read()
                
                if ret:
                    interval['video_data'].append({
                    'frame': frame,
                    'timestamp': round(sample_time, 2)
                })
        
        cap.release()
        
        return timeline
    
    def extract_audio(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
        
        temp_audio = "/tmp/temp_audio.wav"
        command = [
            'ffmpeg', '-i', video_path,
            '-vn', '-acodec', 'pcm_s16le',
            '-ar', '16000', '-ac', '1',
            '-y', temp_audio
        ]
        
        try:
            subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True)
            has_audio = os.path.exists(temp_audio) and os.path.getsize(temp_audio) > 0
        except subprocess.CalledProcessError:
            print("No audio detected in video")
            has_audio = False
        
        if not has_audio:
            print("Warning: No audio track detected in video")
            for interval in timeline:
                interval['audio_data'] = {
                    'audio': np.zeros(16000 * 2),  
                    'sample_rate': 16000,
                    'has_audio': False
                }
            return timeline
        
        audio, sr = librosa.load(temp_audio, sr=16000, mono=True)
        
        for interval in timeline:
            start_sample = int(interval['start'] * sr)
            end_sample = int(interval['end'] * sr)
            end_sample = min(end_sample, len(audio))
            audio_chunk = audio[start_sample:end_sample]
            
            if len(audio_chunk) < sr * 0.5:
                audio_chunk = np.pad(audio_chunk, (0, int(sr * 0.5) - len(audio_chunk)))
            
            interval['audio_data'] = {
                'audio': audio_chunk,
                'sample_rate': sr,
                'has_audio': True
            }
        
        if os.path.exists(temp_audio):
            os.remove(temp_audio)
        
        return timeline
        
    
    def extract_all(self, video_path: str, interval_duration: float = 2.0) -> Tuple[List[Dict], Dict]:
        
        video_info = self.get_video_info(video_path)
        
        timeline_gen = TimelineGenerator(interval_duration)
        timeline = timeline_gen.create_timeline(video_info['duration'])
        
        timeline = self.extract_frames(video_path, timeline)  
        timeline = self.extract_audio(video_path, timeline)
        
        return timeline, video_info