""" Audio Processing Module Handles audio extraction, processing, and integration with FFmpeg operations """ import os import subprocess import tempfile import logging import time from pathlib import Path from typing import Optional, Dict, Any, List, Tuple # Import from core from core.exceptions import AudioProcessingError logger = logging.getLogger(__name__) class AudioProcessor: """ Comprehensive audio processing for video background replacement """ def __init__(self, temp_dir: Optional[str] = None): self.temp_dir = temp_dir or tempfile.gettempdir() self.ffmpeg_available = self._check_ffmpeg_availability() self.ffprobe_available = self._check_ffprobe_availability() # Audio processing statistics self.stats = { 'audio_extractions': 0, 'audio_merges': 0, 'total_processing_time': 0.0, 'failed_operations': 0 } if not self.ffmpeg_available: logger.warning("FFmpeg not available - audio processing will be limited") logger.info(f"AudioProcessor initialized (FFmpeg: {self.ffmpeg_available}, FFprobe: {self.ffprobe_available})") def _check_ffmpeg_availability(self) -> bool: """Check if FFmpeg is available on the system""" try: result = subprocess.run( ['ffmpeg', '-version'], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): return False def _check_ffprobe_availability(self) -> bool: """Check if FFprobe is available on the system""" try: result = subprocess.run( ['ffprobe', '-version'], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): return False def get_audio_info(self, video_path: str) -> Dict[str, Any]: """ Get comprehensive audio information from video file Args: video_path: Path to the video file Returns: Dictionary containing audio information """ if not self.ffprobe_available: return {'has_audio': False, 'error': 'FFprobe not available'} try: # Get audio stream information result = subprocess.run([ 'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name,sample_rate,channels,duration,bit_rate', '-of', 'csv=p=0', video_path ], capture_output=True, text=True, timeout=30) if result.returncode != 0: return { 'has_audio': False, 'error': 'No audio stream found', 'ffprobe_error': result.stderr } # Parse audio information audio_data = result.stdout.strip().split(',') if len(audio_data) >= 1 and audio_data[0]: info = { 'has_audio': True, 'codec': audio_data[0] if len(audio_data) > 0 else 'unknown', 'sample_rate': audio_data[1] if len(audio_data) > 1 else 'unknown', 'channels': audio_data[2] if len(audio_data) > 2 else 'unknown', 'duration': audio_data[3] if len(audio_data) > 3 else 'unknown', 'bit_rate': audio_data[4] if len(audio_data) > 4 else 'unknown' } # Convert string values to appropriate types try: if info['sample_rate'] != 'unknown': info['sample_rate'] = int(info['sample_rate']) if info['channels'] != 'unknown': info['channels'] = int(info['channels']) if info['duration'] != 'unknown': info['duration'] = float(info['duration']) if info['bit_rate'] != 'unknown': info['bit_rate'] = int(info['bit_rate']) except ValueError: pass # Keep as string if conversion fails return info else: return {'has_audio': False, 'error': 'Audio stream data empty'} except subprocess.TimeoutExpired: return {'has_audio': False, 'error': 'FFprobe timeout'} except Exception as e: logger.error(f"Error getting audio info: {e}") return {'has_audio': False, 'error': str(e)} def extract_audio(self, video_path: str, output_path: Optional[str] = None, audio_format: str = 'aac', quality: str = 'high') -> Optional[str]: """ Extract audio from video file Args: video_path: Path to input video output_path: Output path for audio (auto-generated if None) audio_format: Output audio format (aac, mp3, wav) quality: Audio quality (low, medium, high) Returns: Path to extracted audio file or None if failed """ if not self.ffmpeg_available: raise AudioProcessingError("extract", "FFmpeg not available", video_path) start_time = time.time() try: # Check if input has audio audio_info = self.get_audio_info(video_path) if not audio_info.get('has_audio', False): logger.info(f"No audio found in {video_path}") return None # Generate output path if not provided if output_path is None: timestamp = int(time.time()) output_path = os.path.join( self.temp_dir, f"extracted_audio_{timestamp}.{audio_format}" ) # Quality settings quality_settings = { 'low': {'aac': ['-b:a', '96k'], 'mp3': ['-b:a', '128k'], 'wav': []}, 'medium': {'aac': ['-b:a', '192k'], 'mp3': ['-b:a', '192k'], 'wav': []}, 'high': {'aac': ['-b:a', '320k'], 'mp3': ['-b:a', '320k'], 'wav': []} } codec_settings = { 'aac': ['-c:a', 'aac'], 'mp3': ['-c:a', 'libmp3lame'], 'wav': ['-c:a', 'pcm_s16le'] } # Build FFmpeg command cmd = ['ffmpeg', '-y', '-i', video_path] cmd.extend(codec_settings.get(audio_format, ['-c:a', 'aac'])) cmd.extend(quality_settings.get(quality, {}).get(audio_format, [])) cmd.extend(['-vn', output_path]) # -vn excludes video # Execute command result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 # 5 minute timeout ) if result.returncode != 0: raise AudioProcessingError( "extract", f"FFmpeg failed: {result.stderr}", video_path, output_path ) if not os.path.exists(output_path): raise AudioProcessingError( "extract", "Output audio file was not created", video_path, output_path ) # Update statistics processing_time = time.time() - start_time self.stats['audio_extractions'] += 1 self.stats['total_processing_time'] += processing_time logger.info(f"Audio extracted successfully in {processing_time:.1f}s: {output_path}") return output_path except subprocess.TimeoutExpired: self.stats['failed_operations'] += 1 raise AudioProcessingError("extract", "FFmpeg timeout during extraction", video_path) except Exception as e: self.stats['failed_operations'] += 1 if isinstance(e, AudioProcessingError): raise else: raise AudioProcessingError("extract", f"Unexpected error: {str(e)}", video_path) def add_audio_to_video(self, original_video: str, processed_video: str, output_path: Optional[str] = None, audio_quality: str = 'high') -> str: """ Add audio from original video to processed video Args: original_video: Path to original video with audio processed_video: Path to processed video without audio output_path: Output path (auto-generated if None) audio_quality: Audio quality setting Returns: Path to final video with audio """ if not self.ffmpeg_available: logger.warning("FFmpeg not available - returning processed video without audio") return processed_video start_time = time.time() try: # Check if original video has audio audio_info = self.get_audio_info(original_video) if not audio_info.get('has_audio', False): logger.info("Original video has no audio - returning processed video") return processed_video # Generate output path if not provided if output_path is None: timestamp = int(time.time()) output_path = os.path.join( self.temp_dir, f"final_with_audio_{timestamp}.mp4" ) # Quality settings for audio encoding quality_settings = { 'low': ['-b:a', '96k'], 'medium': ['-b:a', '192k'], 'high': ['-b:a', '320k'] } # Build FFmpeg command to combine video and audio cmd = [ 'ffmpeg', '-y', '-i', processed_video, # Video input '-i', original_video, # Audio source '-c:v', 'copy', # Copy video stream as-is '-c:a', 'aac', # Encode audio as AAC ] # Add quality settings cmd.extend(quality_settings.get(audio_quality, quality_settings['high'])) # Map streams and set duration cmd.extend([ '-map', '0:v:0', # Video from first input '-map', '1:a:0', # Audio from second input '-shortest', # Match shortest stream duration output_path ]) # Execute command result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 # 10 minute timeout ) if result.returncode != 0: logger.warning(f"Audio merge failed: {result.stderr}") logger.warning("Returning processed video without audio") return processed_video if not os.path.exists(output_path): logger.warning("Output video with audio was not created") return processed_video # Verify the output file if os.path.getsize(output_path) == 0: logger.warning("Output video file is empty") try: os.remove(output_path) except: pass return processed_video # Clean up original processed video if successful try: if output_path != processed_video: os.remove(processed_video) logger.debug("Cleaned up intermediate processed video") except Exception as e: logger.warning(f"Could not clean up intermediate file: {e}") # Update statistics processing_time = time.time() - start_time self.stats['audio_merges'] += 1 self.stats['total_processing_time'] += processing_time logger.info(f"Audio merged successfully in {processing_time:.1f}s: {output_path}") return output_path except subprocess.TimeoutExpired: self.stats['failed_operations'] += 1 logger.warning("Audio merge timeout - returning processed video without audio") return processed_video except Exception as e: self.stats['failed_operations'] += 1 logger.warning(f"Audio merge error: {e} - returning processed video without audio") return processed_video def sync_audio_video(self, video_path: str, audio_path: str, output_path: str, offset_ms: float = 0.0) -> bool: """ Synchronize separate audio and video files Args: video_path: Path to video file audio_path: Path to audio file output_path: Output path for synchronized file offset_ms: Audio offset in milliseconds (positive = delay audio) Returns: True if successful, False otherwise """ if not self.ffmpeg_available: raise AudioProcessingError("sync", "FFmpeg not available") try: cmd = ['ffmpeg', '-y', '-i', video_path, '-i', audio_path] # Add audio offset if specified if offset_ms != 0.0: offset_seconds = offset_ms / 1000.0 cmd.extend(['-itsoffset', str(offset_seconds)]) cmd.extend([ '-c:v', 'copy', # Copy video as-is '-c:a', 'aac', # Encode audio as AAC '-b:a', '192k', # Audio bitrate '-shortest', # Match shortest stream output_path ]) result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) if result.returncode != 0: raise AudioProcessingError( "sync", f"Synchronization failed: {result.stderr}", video_path ) return os.path.exists(output_path) and os.path.getsize(output_path) > 0 except subprocess.TimeoutExpired: raise AudioProcessingError("sync", "Synchronization timeout", video_path) except Exception as e: if isinstance(e, AudioProcessingError): raise else: raise AudioProcessingError("sync", f"Unexpected error: {str(e)}", video_path) def adjust_audio_levels(self, input_path: str, output_path: str, volume_factor: float = 1.0, normalize: bool = False) -> bool: """ Adjust audio levels in a video file Args: input_path: Input video path output_path: Output video path volume_factor: Volume multiplication factor (1.0 = no change) normalize: Whether to normalize audio levels Returns: True if successful, False otherwise """ if not self.ffmpeg_available: raise AudioProcessingError("adjust_levels", "FFmpeg not available") try: cmd = ['ffmpeg', '-y', '-i', input_path, '-c:v', 'copy'] # Build audio filter audio_filters = [] if volume_factor != 1.0: audio_filters.append(f"volume={volume_factor}") if normalize: audio_filters.append("loudnorm") if audio_filters: cmd.extend(['-af', ','.join(audio_filters)]) cmd.extend(['-c:a', 'aac', '-b:a', '192k', output_path]) result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) if result.returncode != 0: raise AudioProcessingError( "adjust_levels", f"Level adjustment failed: {result.stderr}", input_path ) return os.path.exists(output_path) and os.path.getsize(output_path) > 0 except Exception as e: if isinstance(e, AudioProcessingError): raise else: raise AudioProcessingError("adjust_levels", f"Unexpected error: {str(e)}", input_path) def get_supported_formats(self) -> Dict[str, List[str]]: """Get supported audio and video formats""" if not self.ffmpeg_available: return {'audio': [], 'video': []} try: # Get supported formats from FFmpeg result = subprocess.run( ['ffmpeg', '-formats'], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} # Parse output (simplified - could be more comprehensive) lines = result.stdout.split('\n') audio_formats = [] video_formats = [] for line in lines: if 'aac' in line.lower(): audio_formats.append('aac') elif 'mp3' in line.lower(): audio_formats.append('mp3') elif 'wav' in line.lower(): audio_formats.append('wav') elif 'mp4' in line.lower(): video_formats.append('mp4') elif 'avi' in line.lower(): video_formats.append('avi') elif 'mov' in line.lower(): video_formats.append('mov') return { 'audio': list(set(audio_formats)) or ['aac', 'mp3', 'wav'], 'video': list(set(video_formats)) or ['mp4', 'avi', 'mov'] } except Exception as e: logger.warning(f"Could not get supported formats: {e}") return {'audio': ['aac', 'mp3', 'wav'], 'video': ['mp4', 'avi', 'mov']} def validate_audio_video_compatibility(self, video_path: str, audio_path: str) -> Dict[str, Any]: """ Validate compatibility between video and audio files Returns: Dictionary with compatibility information """ if not self.ffprobe_available: return {'compatible': False, 'error': 'FFprobe not available'} try: # Get video info video_result = subprocess.run([ 'ffprobe', '-v', 'quiet', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', video_path ], capture_output=True, text=True, timeout=30) # Get audio info audio_result = subprocess.run([ 'ffprobe', '-v', 'quiet', '-select_streams', 'a:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', audio_path ], capture_output=True, text=True, timeout=30) if video_result.returncode != 0 or audio_result.returncode != 0: return {'compatible': False, 'error': 'Could not read file information'} try: video_duration = float(video_result.stdout.strip()) audio_duration = float(audio_result.stdout.strip()) duration_diff = abs(video_duration - audio_duration) duration_diff_percent = (duration_diff / max(video_duration, audio_duration)) * 100 return { 'compatible': duration_diff_percent < 5.0, # 5% tolerance 'video_duration': video_duration, 'audio_duration': audio_duration, 'duration_difference': duration_diff, 'duration_difference_percent': duration_diff_percent, 'recommendation': ( 'Compatible' if duration_diff_percent < 5.0 else 'Duration mismatch - consider trimming/extending' ) } except ValueError: return {'compatible': False, 'error': 'Invalid duration values'} except Exception as e: return {'compatible': False, 'error': str(e)} def get_stats(self) -> Dict[str, Any]: """Get audio processing statistics""" return { 'ffmpeg_available': self.ffmpeg_available, 'ffprobe_available': self.ffprobe_available, 'audio_extractions': self.stats['audio_extractions'], 'audio_merges': self.stats['audio_merges'], 'total_processing_time': self.stats['total_processing_time'], 'failed_operations': self.stats['failed_operations'], 'success_rate': ( (self.stats['audio_extractions'] + self.stats['audio_merges']) / max(1, self.stats['audio_extractions'] + self.stats['audio_merges'] + self.stats['failed_operations']) ) * 100 } def cleanup_temp_files(self, max_age_hours: int = 24): """Clean up temporary audio files older than specified age""" try: temp_path = Path(self.temp_dir) current_time = time.time() cutoff_time = current_time - (max_age_hours * 3600) cleaned_files = 0 for file_path in temp_path.glob("*audio*.{aac,mp3,wav,mp4}"): if file_path.stat().st_mtime < cutoff_time: try: file_path.unlink() cleaned_files += 1 except Exception as e: logger.warning(f"Could not delete temp file {file_path}: {e}") if cleaned_files > 0: logger.info(f"Cleaned up {cleaned_files} temporary audio files") except Exception as e: logger.warning(f"Error during temp file cleanup: {e}")