Spaces:
Sleeping
Sleeping
| import os | |
| os.environ['LIBROSA_CACHE_DIR'] = '/tmp' | |
| os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp' | |
| os.environ['NUMBA_CACHE_DIR'] = '/tmp' | |
| import cv2 | |
| import librosa | |
| import subprocess | |
| import numpy as np | |
| from typing import List, Dict, Tuple | |
| from extraction.timeline_generator import TimelineGenerator | |
| class MediaExtractor: | |
| def __init__(self, frames_per_interval: int = 5): | |
| self.frames_per_interval = frames_per_interval | |
| def get_video_info(self, video_path: str) -> Dict: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| cap.release() | |
| return { | |
| 'fps': fps, | |
| 'total_frames': total_frames, | |
| 'duration': duration | |
| } | |
| def extract_frames(self, video_path: str, timeline: List[Dict]) -> List[Dict]: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {video_path}") | |
| for interval in timeline: | |
| sample_times = np.linspace( | |
| interval['start'], | |
| interval['end'], | |
| self.frames_per_interval, | |
| endpoint=False | |
| ) | |
| for sample_time in sample_times: | |
| cap.set(cv2.CAP_PROP_POS_MSEC, sample_time * 1000) | |
| ret, frame = cap.read() | |
| if ret: | |
| interval['video_data'].append({ | |
| 'frame': frame, | |
| 'timestamp': round(sample_time, 2) | |
| }) | |
| cap.release() | |
| return timeline | |
| def extract_audio(self, video_path: str, timeline: List[Dict]) -> List[Dict]: | |
| temp_audio = "/tmp/temp_audio.wav" | |
| command = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-vn', '-acodec', 'pcm_s16le', | |
| '-ar', '16000', '-ac', '1', | |
| '-y', temp_audio | |
| ] | |
| try: | |
| subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True) | |
| has_audio = os.path.exists(temp_audio) and os.path.getsize(temp_audio) > 0 | |
| except subprocess.CalledProcessError: | |
| print("No audio detected in video") | |
| has_audio = False | |
| if not has_audio: | |
| print("Warning: No audio track detected in video") | |
| for interval in timeline: | |
| interval['audio_data'] = { | |
| 'audio': np.zeros(16000 * 2), | |
| 'sample_rate': 16000, | |
| 'has_audio': False | |
| } | |
| return timeline | |
| audio, sr = librosa.load(temp_audio, sr=16000, mono=True) | |
| for interval in timeline: | |
| start_sample = int(interval['start'] * sr) | |
| end_sample = int(interval['end'] * sr) | |
| end_sample = min(end_sample, len(audio)) | |
| audio_chunk = audio[start_sample:end_sample] | |
| if len(audio_chunk) < sr * 0.5: | |
| audio_chunk = np.pad(audio_chunk, (0, int(sr * 0.5) - len(audio_chunk))) | |
| interval['audio_data'] = { | |
| 'audio': audio_chunk, | |
| 'sample_rate': sr, | |
| 'has_audio': True | |
| } | |
| if os.path.exists(temp_audio): | |
| os.remove(temp_audio) | |
| return timeline | |
| def extract_all(self, video_path: str, interval_duration: float = 2.0) -> Tuple[List[Dict], Dict]: | |
| video_info = self.get_video_info(video_path) | |
| timeline_gen = TimelineGenerator(interval_duration) | |
| timeline = timeline_gen.create_timeline(video_info['duration']) | |
| timeline = self.extract_frames(video_path, timeline) | |
| timeline = self.extract_audio(video_path, timeline) | |
| return timeline, video_info |