deepdefend-api / extraction /media_extractor.py
nishchandel's picture
fixed cache error
5b7ca6d
import os
os.environ['LIBROSA_CACHE_DIR'] = '/tmp'
os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'
os.environ['NUMBA_CACHE_DIR'] = '/tmp'
import cv2
import librosa
import subprocess
import numpy as np
from typing import List, Dict, Tuple
from extraction.timeline_generator import TimelineGenerator
class MediaExtractor:
def __init__(self, frames_per_interval: int = 5):
self.frames_per_interval = frames_per_interval
def get_video_info(self, video_path: str) -> Dict:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
cap.release()
return {
'fps': fps,
'total_frames': total_frames,
'duration': duration
}
def extract_frames(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
for interval in timeline:
sample_times = np.linspace(
interval['start'],
interval['end'],
self.frames_per_interval,
endpoint=False
)
for sample_time in sample_times:
cap.set(cv2.CAP_PROP_POS_MSEC, sample_time * 1000)
ret, frame = cap.read()
if ret:
interval['video_data'].append({
'frame': frame,
'timestamp': round(sample_time, 2)
})
cap.release()
return timeline
def extract_audio(self, video_path: str, timeline: List[Dict]) -> List[Dict]:
temp_audio = "/tmp/temp_audio.wav"
command = [
'ffmpeg', '-i', video_path,
'-vn', '-acodec', 'pcm_s16le',
'-ar', '16000', '-ac', '1',
'-y', temp_audio
]
try:
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True)
has_audio = os.path.exists(temp_audio) and os.path.getsize(temp_audio) > 0
except subprocess.CalledProcessError:
print("No audio detected in video")
has_audio = False
if not has_audio:
print("Warning: No audio track detected in video")
for interval in timeline:
interval['audio_data'] = {
'audio': np.zeros(16000 * 2),
'sample_rate': 16000,
'has_audio': False
}
return timeline
audio, sr = librosa.load(temp_audio, sr=16000, mono=True)
for interval in timeline:
start_sample = int(interval['start'] * sr)
end_sample = int(interval['end'] * sr)
end_sample = min(end_sample, len(audio))
audio_chunk = audio[start_sample:end_sample]
if len(audio_chunk) < sr * 0.5:
audio_chunk = np.pad(audio_chunk, (0, int(sr * 0.5) - len(audio_chunk)))
interval['audio_data'] = {
'audio': audio_chunk,
'sample_rate': sr,
'has_audio': True
}
if os.path.exists(temp_audio):
os.remove(temp_audio)
return timeline
def extract_all(self, video_path: str, interval_duration: float = 2.0) -> Tuple[List[Dict], Dict]:
video_info = self.get_video_info(video_path)
timeline_gen = TimelineGenerator(interval_duration)
timeline = timeline_gen.create_timeline(video_info['duration'])
timeline = self.extract_frames(video_path, timeline)
timeline = self.extract_audio(video_path, timeline)
return timeline, video_info