Spaces:
Running
Running
File size: 4,193 Bytes
60efa5a 5b7ca6d 93401eb 60efa5a 091d569 60efa5a 091d569 60efa5a 091d569 60efa5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
import os
os.environ['LIBROSA_CACHE_DIR'] = '/tmp'
os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'
os.environ['NUMBA_CACHE_DIR'] = '/tmp'
import cv2
import librosa
import subprocess
import numpy as np
from typing import List, Dict, Tuple
from extraction.timeline_generator import TimelineGenerator
class MediaExtractor:
def __init__(self, frames_per_interval: int = 5):
self.frames_per_interval = frames_per_interval
def get_video_info(self, video_path: str) -> Dict:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
cap.release()
return {
'fps': fps,
'total_frames': total_frames,
'duration': duration
}
def extract_frames(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
for interval in timeline:
sample_times = np.linspace(
interval['start'],
interval['end'],
self.frames_per_interval,
endpoint=False
)
for sample_time in sample_times:
cap.set(cv2.CAP_PROP_POS_MSEC, sample_time * 1000)
ret, frame = cap.read()
if ret:
interval['video_data'].append({
'frame': frame,
'timestamp': round(sample_time, 2)
})
cap.release()
return timeline
def extract_audio(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
temp_audio = "/tmp/temp_audio.wav"
command = [
'ffmpeg', '-i', video_path,
'-vn', '-acodec', 'pcm_s16le',
'-ar', '16000', '-ac', '1',
'-y', temp_audio
]
try:
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True)
has_audio = os.path.exists(temp_audio) and os.path.getsize(temp_audio) > 0
except subprocess.CalledProcessError:
print("No audio detected in video")
has_audio = False
if not has_audio:
print("Warning: No audio track detected in video")
for interval in timeline:
interval['audio_data'] = {
'audio': np.zeros(16000 * 2),
'sample_rate': 16000,
'has_audio': False
}
return timeline
audio, sr = librosa.load(temp_audio, sr=16000, mono=True)
for interval in timeline:
start_sample = int(interval['start'] * sr)
end_sample = int(interval['end'] * sr)
end_sample = min(end_sample, len(audio))
audio_chunk = audio[start_sample:end_sample]
if len(audio_chunk) < sr * 0.5:
audio_chunk = np.pad(audio_chunk, (0, int(sr * 0.5) - len(audio_chunk)))
interval['audio_data'] = {
'audio': audio_chunk,
'sample_rate': sr,
'has_audio': True
}
if os.path.exists(temp_audio):
os.remove(temp_audio)
return timeline
def extract_all(self, video_path: str, interval_duration: float = 2.0) -> Tuple[List[Dict], Dict]:
video_info = self.get_video_info(video_path)
timeline_gen = TimelineGenerator(interval_duration)
timeline = timeline_gen.create_timeline(video_info['duration'])
timeline = self.extract_frames(video_path, timeline)
timeline = self.extract_audio(video_path, timeline)
return timeline, video_info |