import os os.environ['LIBROSA_CACHE_DIR'] = '/tmp' os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp' os.environ['NUMBA_CACHE_DIR'] = '/tmp' import torch import librosa import numpy as np from typing import Dict, List from models.load_models import model_loader class AudioAnalyzer: """Analyzes audio chunks for deepfake detection""" def __init__(self): self.model, self.processor = model_loader.load_audio_model() self.device = model_loader.get_device() def predict_deepfake(self, audio: np.ndarray, sample_rate: int) -> Dict: """Predict if audio chunk is deepfake""" min_length = sample_rate * 1 if len(audio) < min_length: audio = np.pad(audio, (0, min_length - len(audio))) inputs = self.processor( audio, sampling_rate=sample_rate, return_tensors="pt", padding=True ) if self.device == "cuda": inputs = {k: v.cuda() for k, v in inputs.items()} with torch.no_grad(): outputs = self.model(**inputs) logits = outputs.logits probs = torch.nn.functional.softmax(logits, dim=-1) fake_prob = probs[0][1].item() if probs.shape[1] > 1 else probs[0][0].item() confidence = max(probs[0]).item() return { 'fake_score': round(fake_prob, 3), 'confidence': round(confidence, 3), 'label': 'fake' if fake_prob > 0.5 else 'real' } def analyze_spectrogram(self, audio: np.ndarray, sample_rate: int, fake_score: float) -> Dict: """Analyze audio with adaptive thresholds based on fake_score""" try: spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)[0] spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate)[0] zero_crossing_rate = librosa.feature.zero_crossing_rate(audio)[0] mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13) suspicious_regions = self._identify_audio_anomalies( spectral_centroid, spectral_rolloff, zero_crossing_rate, mfcc, fake_score ) return { 'regions': suspicious_regions, 'spectral_features': { 'avg_spectral_centroid': round(float(np.mean(spectral_centroid)), 2), 'avg_spectral_rolloff': round(float(np.mean(spectral_rolloff)), 2), 'avg_zero_crossing_rate': round(float(np.mean(zero_crossing_rate)), 3), 'mfcc_variance': round(float(np.var(mfcc)), 3) } } except Exception as e: if fake_score > 0.6: return { 'regions': ['voice_synthesis_detected', 'audio_artifacts'], 'spectral_features': {} } else: return { 'regions': ['no_suspicious_patterns'], 'spectral_features': {} } def _identify_audio_anomalies(self, spectral_centroid: np.ndarray, spectral_rolloff: np.ndarray, zero_crossing: np.ndarray, mfcc: np.ndarray, fake_score: float) -> List[str]: suspicious_regions = [] if fake_score > 0.7: pitch_low, pitch_high = 200, 6000 mfcc_threshold = 25 zcr_low, zcr_high = 0.02, 0.25 rolloff_threshold = 3000 centroid_jump = 800 elif fake_score > 0.5: pitch_low, pitch_high = 250, 5500 mfcc_threshold = 28 zcr_low, zcr_high = 0.025, 0.22 rolloff_threshold = 2700 centroid_jump = 900 else: pitch_low, pitch_high = 300, 5000 mfcc_threshold = 30 zcr_low, zcr_high = 0.03, 0.20 rolloff_threshold = 2500 centroid_jump = 1000 pitch_variance = np.var(spectral_centroid) if pitch_variance < pitch_low: suspicious_regions.append('monotone_voice') elif pitch_variance > pitch_high: suspicious_regions.append('erratic_pitch') mfcc_var = np.var(mfcc) if mfcc_var < mfcc_threshold: suspicious_regions.append('voice_synthesis_artifacts') zcr_mean = np.mean(zero_crossing) if zcr_mean > zcr_high: suspicious_regions.append('high_frequency_noise') elif zcr_mean < zcr_low: suspicious_regions.append('overly_smooth_audio') rolloff_std = np.std(spectral_rolloff) if rolloff_std > rolloff_threshold: suspicious_regions.append('spectral_artifacts') centroid_diff = np.diff(spectral_centroid) if len(centroid_diff) > 0 and np.max(np.abs(centroid_diff)) > centroid_jump: suspicious_regions.append('audio_splicing') if np.std(spectral_centroid) < 50: suspicious_regions.append('unnatural_consistency') if fake_score > 0.6 and len(suspicious_regions) == 0: suspicious_regions.append('general_audio_manipulation') return suspicious_regions if suspicious_regions else ['no_suspicious_patterns'] def analyze_interval(self, interval_data: Dict) -> Dict: """Analyze audio for a single interval""" audio_data = interval_data['audio_data'] if not audio_data or not audio_data.get('has_audio', False): return { 'interval_id': interval_data['interval_id'], 'interval': interval_data['interval'], 'fake_score': 0.0, 'confidence': 0.0, 'suspicious_regions': ['no_audio'], 'has_audio': False, 'spectral_features': {} } audio = audio_data['audio'] sample_rate = audio_data['sample_rate'] prediction = self.predict_deepfake(audio, sample_rate) spectrogram_analysis = self.analyze_spectrogram( audio, sample_rate, prediction['fake_score'] ) return { 'interval_id': interval_data['interval_id'], 'interval': interval_data['interval'], 'fake_score': prediction['fake_score'], 'confidence': prediction['confidence'], 'suspicious_regions': spectrogram_analysis['regions'], 'has_audio': True, 'spectral_features': spectrogram_analysis['spectral_features'] }