deepdefend-api / analysis /audio_analyser.py
nishchandel's picture
fixed cache error
5b7ca6d
import os
os.environ['LIBROSA_CACHE_DIR'] = '/tmp'
os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'
os.environ['NUMBA_CACHE_DIR'] = '/tmp'
import torch
import librosa
import numpy as np
from typing import Dict, List
from models.load_models import model_loader
class AudioAnalyzer:
"""Analyzes audio chunks for deepfake detection"""
def __init__(self):
self.model, self.processor = model_loader.load_audio_model()
self.device = model_loader.get_device()
def predict_deepfake(self, audio: np.ndarray, sample_rate: int) -> Dict:
"""Predict if audio chunk is deepfake"""
min_length = sample_rate * 1
if len(audio) < min_length:
audio = np.pad(audio, (0, min_length - len(audio)))
inputs = self.processor(
audio,
sampling_rate=sample_rate,
return_tensors="pt",
padding=True
)
if self.device == "cuda":
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits
probs = torch.nn.functional.softmax(logits, dim=-1)
fake_prob = probs[0][1].item() if probs.shape[1] > 1 else probs[0][0].item()
confidence = max(probs[0]).item()
return {
'fake_score': round(fake_prob, 3),
'confidence': round(confidence, 3),
'label': 'fake' if fake_prob > 0.5 else 'real'
}
def analyze_spectrogram(self, audio: np.ndarray, sample_rate: int, fake_score: float) -> Dict:
"""Analyze audio with adaptive thresholds based on fake_score"""
try:
spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(audio)[0]
mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13)
suspicious_regions = self._identify_audio_anomalies(
spectral_centroid, spectral_rolloff, zero_crossing_rate, mfcc, fake_score
)
return {
'regions': suspicious_regions,
'spectral_features': {
'avg_spectral_centroid': round(float(np.mean(spectral_centroid)), 2),
'avg_spectral_rolloff': round(float(np.mean(spectral_rolloff)), 2),
'avg_zero_crossing_rate': round(float(np.mean(zero_crossing_rate)), 3),
'mfcc_variance': round(float(np.var(mfcc)), 3)
}
}
except Exception as e:
if fake_score > 0.6:
return {
'regions': ['voice_synthesis_detected', 'audio_artifacts'],
'spectral_features': {}
}
else:
return {
'regions': ['no_suspicious_patterns'],
'spectral_features': {}
}
def _identify_audio_anomalies(self, spectral_centroid: np.ndarray, spectral_rolloff: np.ndarray, zero_crossing: np.ndarray, mfcc: np.ndarray, fake_score: float) -> List[str]:
suspicious_regions = []
if fake_score > 0.7:
pitch_low, pitch_high = 200, 6000
mfcc_threshold = 25
zcr_low, zcr_high = 0.02, 0.25
rolloff_threshold = 3000
centroid_jump = 800
elif fake_score > 0.5:
pitch_low, pitch_high = 250, 5500
mfcc_threshold = 28
zcr_low, zcr_high = 0.025, 0.22
rolloff_threshold = 2700
centroid_jump = 900
else:
pitch_low, pitch_high = 300, 5000
mfcc_threshold = 30
zcr_low, zcr_high = 0.03, 0.20
rolloff_threshold = 2500
centroid_jump = 1000
pitch_variance = np.var(spectral_centroid)
if pitch_variance < pitch_low:
suspicious_regions.append('monotone_voice')
elif pitch_variance > pitch_high:
suspicious_regions.append('erratic_pitch')
mfcc_var = np.var(mfcc)
if mfcc_var < mfcc_threshold:
suspicious_regions.append('voice_synthesis_artifacts')
zcr_mean = np.mean(zero_crossing)
if zcr_mean > zcr_high:
suspicious_regions.append('high_frequency_noise')
elif zcr_mean < zcr_low:
suspicious_regions.append('overly_smooth_audio')
rolloff_std = np.std(spectral_rolloff)
if rolloff_std > rolloff_threshold:
suspicious_regions.append('spectral_artifacts')
centroid_diff = np.diff(spectral_centroid)
if len(centroid_diff) > 0 and np.max(np.abs(centroid_diff)) > centroid_jump:
suspicious_regions.append('audio_splicing')
if np.std(spectral_centroid) < 50:
suspicious_regions.append('unnatural_consistency')
if fake_score > 0.6 and len(suspicious_regions) == 0:
suspicious_regions.append('general_audio_manipulation')
return suspicious_regions if suspicious_regions else ['no_suspicious_patterns']
def analyze_interval(self, interval_data: Dict) -> Dict:
"""Analyze audio for a single interval"""
audio_data = interval_data['audio_data']
if not audio_data or not audio_data.get('has_audio', False):
return {
'interval_id': interval_data['interval_id'],
'interval': interval_data['interval'],
'fake_score': 0.0,
'confidence': 0.0,
'suspicious_regions': ['no_audio'],
'has_audio': False,
'spectral_features': {}
}
audio = audio_data['audio']
sample_rate = audio_data['sample_rate']
prediction = self.predict_deepfake(audio, sample_rate)
spectrogram_analysis = self.analyze_spectrogram(
audio, sample_rate, prediction['fake_score']
)
return {
'interval_id': interval_data['interval_id'],
'interval': interval_data['interval'],
'fake_score': prediction['fake_score'],
'confidence': prediction['confidence'],
'suspicious_regions': spectrogram_analysis['regions'],
'has_audio': True,
'spectral_features': spectrogram_analysis['spectral_features']
}