|
|
|
|
|
""" |
|
|
Audio Processing Module |
|
|
Handles audio extraction, processing, and integration with FFmpeg operations. |
|
|
|
|
|
Upgrades: |
|
|
- Prefer lossless audio stream-copy for muxing (no generational loss). |
|
|
- Safe fallback to AAC re-encode when needed. |
|
|
- Optional EBU R128 loudness normalization (two-pass loudnorm). |
|
|
- Optional audio/video offset with sample-accurate filters. |
|
|
- Robust ffprobe-based audio detection and metadata. |
|
|
- MoviePy fallback when ffmpeg is unavailable. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
import re |
|
|
import json |
|
|
import time |
|
|
import math |
|
|
import shutil |
|
|
import logging |
|
|
import tempfile |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import Optional, Dict, Any, List |
|
|
|
|
|
from core.exceptions import AudioProcessingError |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AudioProcessor: |
|
|
""" |
|
|
Comprehensive audio processing for video background replacement. |
|
|
""" |
|
|
|
|
|
def __init__(self, temp_dir: Optional[str] = None): |
|
|
self.temp_dir = temp_dir or tempfile.gettempdir() |
|
|
self.ffmpeg_path = shutil.which("ffmpeg") |
|
|
self.ffprobe_path = shutil.which("ffprobe") |
|
|
self.ffmpeg_available = self.ffmpeg_path is not None |
|
|
self.ffprobe_available = self.ffprobe_path is not None |
|
|
|
|
|
self.stats = { |
|
|
"audio_extractions": 0, |
|
|
"audio_merges": 0, |
|
|
"total_processing_time": 0.0, |
|
|
"failed_operations": 0, |
|
|
} |
|
|
|
|
|
if not self.ffmpeg_available: |
|
|
logger.warning("FFmpeg not available - audio processing will be limited") |
|
|
logger.info( |
|
|
"AudioProcessor initialized (FFmpeg: %s, FFprobe: %s)", |
|
|
self.ffmpeg_available, |
|
|
self.ffprobe_available, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run(self, cmd: List[str], tag: str = "") -> subprocess.CompletedProcess: |
|
|
logger.info("ffmpeg%s: %s", f"[{tag}]" if tag else "", " ".join(cmd)) |
|
|
return subprocess.run(cmd, text=True, capture_output=True) |
|
|
|
|
|
def _has_audio(self, path: str) -> bool: |
|
|
if not os.path.isfile(path): |
|
|
return False |
|
|
if self.ffprobe_available: |
|
|
try: |
|
|
proc = subprocess.run( |
|
|
[ |
|
|
self.ffprobe_path, "-v", "error", |
|
|
"-select_streams", "a:0", |
|
|
"-show_entries", "stream=index", |
|
|
"-of", "csv=p=0", |
|
|
path, |
|
|
], |
|
|
text=True, capture_output=True, check=False, |
|
|
) |
|
|
return bool(proc.stdout.strip()) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if self.ffmpeg_available: |
|
|
try: |
|
|
proc = subprocess.run( |
|
|
[self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", path, "-f", "null", "-"], |
|
|
text=True, capture_output=True, |
|
|
) |
|
|
return "Audio:" in (proc.stderr or "") |
|
|
except Exception: |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_audio_info(self, video_path: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive audio information from a media file. |
|
|
""" |
|
|
if not self.ffprobe_available: |
|
|
return {"has_audio": False, "error": "FFprobe not available"} |
|
|
|
|
|
try: |
|
|
proc = subprocess.run( |
|
|
[ |
|
|
self.ffprobe_path, "-v", "error", |
|
|
"-select_streams", "a:0", |
|
|
"-show_entries", "stream=codec_name,sample_rate,channels,bit_rate,duration", |
|
|
"-of", "json", |
|
|
video_path, |
|
|
], |
|
|
text=True, capture_output=True, check=False, |
|
|
) |
|
|
if proc.returncode != 0: |
|
|
return {"has_audio": False, "error": proc.stderr.strip()} |
|
|
|
|
|
data = json.loads(proc.stdout or "{}") |
|
|
streams = data.get("streams", []) |
|
|
if not streams: |
|
|
return {"has_audio": False, "error": "No audio stream found"} |
|
|
|
|
|
s = streams[0] |
|
|
info = { |
|
|
"has_audio": True, |
|
|
"codec": s.get("codec_name", "unknown"), |
|
|
"sample_rate": int(s["sample_rate"]) if s.get("sample_rate") else "unknown", |
|
|
"channels": int(s["channels"]) if s.get("channels") else "unknown", |
|
|
"duration": float(s["duration"]) if s.get("duration") else "unknown", |
|
|
"bit_rate": int(s["bit_rate"]) if s.get("bit_rate") else "unknown", |
|
|
} |
|
|
return info |
|
|
except Exception as e: |
|
|
logger.error("Error getting audio info: %s", e) |
|
|
return {"has_audio": False, "error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_audio( |
|
|
self, |
|
|
video_path: str, |
|
|
output_path: Optional[str] = None, |
|
|
audio_format: str = "aac", |
|
|
quality: str = "high", |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Extract audio from a media file to a separate file. |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("extract", "FFmpeg not available", video_path) |
|
|
|
|
|
start = time.time() |
|
|
info = self.get_audio_info(video_path) |
|
|
if not info.get("has_audio", False): |
|
|
logger.info("No audio found in %s", video_path) |
|
|
return None |
|
|
|
|
|
if output_path is None: |
|
|
output_path = os.path.join(self.temp_dir, f"extracted_audio_{int(time.time())}.{audio_format}") |
|
|
|
|
|
quality_map = { |
|
|
"low": {"aac": ["-b:a", "96k"], "mp3": ["-b:a", "128k"], "wav": []}, |
|
|
"medium": {"aac": ["-b:a", "192k"], "mp3": ["-b:a", "192k"], "wav": []}, |
|
|
"high": {"aac": ["-b:a", "320k"], "mp3": ["-b:a", "320k"], "wav": []}, |
|
|
} |
|
|
codec_map = {"aac": ["-c:a", "aac"], "mp3": ["-c:a", "libmp3lame"], "wav": ["-c:a", "pcm_s16le"]} |
|
|
|
|
|
cmd = [self.ffmpeg_path, "-y", "-i", video_path] |
|
|
cmd += codec_map.get(audio_format, ["-c:a", "aac"]) |
|
|
cmd += quality_map.get(quality, {}).get(audio_format, []) |
|
|
cmd += ["-vn", output_path] |
|
|
|
|
|
proc = self._run(cmd, "extract") |
|
|
if proc.returncode != 0: |
|
|
self.stats["failed_operations"] += 1 |
|
|
raise AudioProcessingError("extract", f"FFmpeg failed: {proc.stderr}", video_path, output_path) |
|
|
|
|
|
if not os.path.exists(output_path): |
|
|
self.stats["failed_operations"] += 1 |
|
|
raise AudioProcessingError("extract", "Output audio file was not created", video_path, output_path) |
|
|
|
|
|
self.stats["audio_extractions"] += 1 |
|
|
self.stats["total_processing_time"] += (time.time() - start) |
|
|
logger.info("Audio extracted: %s", output_path) |
|
|
return output_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _measure_loudness(self, src_with_audio: str, stream_selector: str = "1:a:0") -> Optional[Dict[str, float]]: |
|
|
""" |
|
|
First pass loudnorm to measure levels. Returns dict with input_i, input_tp, input_lra, input_thresh, target_offset. |
|
|
We run ffmpeg with -filter_complex on the selected audio input and parse the printed JSON (stderr). |
|
|
""" |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-nostats", "-loglevel", "warning", |
|
|
"-i", src_with_audio, |
|
|
"-vn", |
|
|
"-af", "loudnorm=I=-16:TP=-1.5:LRA=11:print_format=json", |
|
|
"-f", "null", "-" |
|
|
] |
|
|
proc = self._run(cmd, "loudnorm-pass1") |
|
|
txt = (proc.stderr or "") + (proc.stdout or "") |
|
|
|
|
|
m = re.search(r"\{\s*\"input_i\"[^\}]+\}", txt, re.MULTILINE | re.DOTALL) |
|
|
if not m: |
|
|
logger.warning("Could not parse loudnorm analysis output.") |
|
|
return None |
|
|
try: |
|
|
data = json.loads(m.group(0)) |
|
|
|
|
|
return { |
|
|
"input_i": float(data.get("input_i")), |
|
|
"input_tp": float(data.get("input_tp")), |
|
|
"input_lra": float(data.get("input_lra")), |
|
|
"input_thresh": float(data.get("input_thresh")), |
|
|
"target_offset": float(data.get("target_offset")), |
|
|
} |
|
|
except Exception as e: |
|
|
logger.warning("Loudnorm analysis JSON parse error: %s", e) |
|
|
return None |
|
|
|
|
|
def _build_loudnorm_filter(self, measured: Dict[str, float], target_I=-16.0, target_TP=-1.5, target_LRA=11.0) -> str: |
|
|
""" |
|
|
Build the second-pass loudnorm filter string using measured values. |
|
|
""" |
|
|
|
|
|
return ( |
|
|
"loudnorm=" |
|
|
f"I={target_I}:TP={target_TP}:LRA={target_LRA}:" |
|
|
f"measured_I={measured['input_i']}:" |
|
|
f"measured_TP={measured['input_tp']}:" |
|
|
f"measured_LRA={measured['input_lra']}:" |
|
|
f"measured_thresh={measured['input_thresh']}:" |
|
|
f"offset={measured['target_offset']}:" |
|
|
"linear=true:print_format=summary" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_audio_to_video( |
|
|
self, |
|
|
original_video: str, |
|
|
processed_video: str, |
|
|
output_path: Optional[str] = None, |
|
|
audio_quality: str = "high", |
|
|
normalize: bool = False, |
|
|
normalize_I: float = -16.0, |
|
|
normalize_TP: float = -1.5, |
|
|
normalize_LRA: float = 11.0, |
|
|
offset_ms: float = 0.0, |
|
|
) -> str: |
|
|
""" |
|
|
Add/mux the audio from original_video into processed_video. |
|
|
|
|
|
Strategy: |
|
|
1) If no audio in original → return processed (or copy to desired name). |
|
|
2) If ffmpeg present: |
|
|
a) If normalize/offset requested → re-encode AAC with filters (two-pass loudnorm). |
|
|
b) Else try stream-copy (lossless): -c:a copy. If that fails, AAC re-encode. |
|
|
3) If ffmpeg missing → fallback to MoviePy (re-encode). |
|
|
|
|
|
Returns path to the muxed video (MP4). |
|
|
""" |
|
|
if not os.path.isfile(processed_video): |
|
|
raise FileNotFoundError(f"Processed video not found: {processed_video}") |
|
|
|
|
|
if output_path is None: |
|
|
base = os.path.splitext(os.path.basename(processed_video))[0] |
|
|
output_path = os.path.join(os.path.dirname(processed_video), f"{base}_with_audio.mp4") |
|
|
|
|
|
|
|
|
if not self._has_audio(original_video): |
|
|
logger.info("Original has no audio; returning processed video unchanged.") |
|
|
if processed_video != output_path: |
|
|
shutil.copy2(processed_video, output_path) |
|
|
return output_path |
|
|
|
|
|
if not self.ffmpeg_available: |
|
|
logger.warning("FFmpeg not available – using MoviePy fallback.") |
|
|
return self._moviepy_mux(original_video, processed_video, output_path) |
|
|
|
|
|
start = time.time() |
|
|
|
|
|
|
|
|
if normalize or abs(offset_ms) > 1e-3: |
|
|
|
|
|
filter_chain = [] |
|
|
if abs(offset_ms) > 1e-3: |
|
|
if offset_ms > 0: |
|
|
|
|
|
ms = int(round(offset_ms)) |
|
|
filter_chain.append(f"adelay={ms}|{ms}") |
|
|
else: |
|
|
|
|
|
secs = abs(offset_ms) / 1000.0 |
|
|
filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
|
|
|
|
|
if normalize: |
|
|
measured = self._measure_loudness(original_video) |
|
|
if measured: |
|
|
filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
|
|
else: |
|
|
|
|
|
filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
|
|
|
afilter = ",".join(filter_chain) if filter_chain else None |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", processed_video, |
|
|
"-i", original_video, |
|
|
"-map", "0:v:0", "-map", "1:a:0", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
if afilter: |
|
|
|
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", processed_video, |
|
|
"-i", original_video, |
|
|
"-map", "0:v:0", |
|
|
"-filter_complex", f"[1:a]{afilter}[aout]", |
|
|
"-map", "[aout]", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
|
|
|
proc = self._run(cmd, "mux-reencode-filters") |
|
|
if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
|
self.stats["audio_merges"] += 1 |
|
|
self.stats["total_processing_time"] += (time.time() - start) |
|
|
logger.info("Audio merged with filters (normalize=%s, offset_ms=%.2f): %s", normalize, offset_ms, output_path) |
|
|
return output_path |
|
|
|
|
|
logger.warning("Filtered mux failed; stderr: %s", proc.stderr) |
|
|
|
|
|
|
|
|
cmd_copy = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", processed_video, |
|
|
"-i", original_video, |
|
|
"-map", "0:v:0", "-map", "1:a:0", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "copy", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
proc = self._run(cmd_copy, "mux-copy") |
|
|
if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
|
self.stats["audio_merges"] += 1 |
|
|
self.stats["total_processing_time"] += (time.time() - start) |
|
|
logger.info("Audio merged (stream-copy): %s", output_path) |
|
|
return output_path |
|
|
|
|
|
|
|
|
quality_map = {"low": ["-b:a", "96k"], "medium": ["-b:a", "192k"], "high": ["-b:a", "320k"]} |
|
|
cmd_aac = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", processed_video, |
|
|
"-i", original_video, |
|
|
"-map", "0:v:0", "-map", "1:a:0", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "aac", |
|
|
*quality_map.get(audio_quality, quality_map["high"]), |
|
|
"-ac", "2", "-ar", "48000", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
proc = self._run(cmd_aac, "mux-aac") |
|
|
if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
|
self.stats["audio_merges"] += 1 |
|
|
self.stats["total_processing_time"] += (time.time() - start) |
|
|
logger.info("Audio merged (AAC re-encode): %s", output_path) |
|
|
return output_path |
|
|
|
|
|
|
|
|
logger.warning("FFmpeg mux failed; using MoviePy fallback.") |
|
|
return self._moviepy_mux(original_video, processed_video, output_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _moviepy_mux(self, original_video: str, processed_video: str, output_path: str) -> str: |
|
|
try: |
|
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
|
except Exception as e: |
|
|
self.stats["failed_operations"] += 1 |
|
|
raise AudioProcessingError("mux", f"MoviePy unavailable and ffmpeg failed: {e}", processed_video) |
|
|
|
|
|
with VideoFileClip(processed_video) as v_clip: |
|
|
try: |
|
|
a_clip = AudioFileClip(original_video) |
|
|
except Exception as e: |
|
|
logger.warning("MoviePy could not load audio from %s (%s). Returning processed video.", original_video, e) |
|
|
if processed_video != output_path: |
|
|
shutil.copy2(processed_video, output_path) |
|
|
return output_path |
|
|
|
|
|
v_clip = v_clip.set_audio(a_clip) |
|
|
v_clip.write_videofile( |
|
|
output_path, |
|
|
codec="libx264", |
|
|
audio_codec="aac", |
|
|
audio_bitrate="192k", |
|
|
temp_audiofile=os.path.join(self.temp_dir, "temp-audio.m4a"), |
|
|
remove_temp=True, |
|
|
threads=2, |
|
|
preset="medium", |
|
|
) |
|
|
return output_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sync_audio_video( |
|
|
self, |
|
|
video_path: str, |
|
|
audio_path: str, |
|
|
output_path: str, |
|
|
offset_ms: float = 0.0, |
|
|
normalize: bool = False, |
|
|
normalize_I: float = -16.0, |
|
|
normalize_TP: float = -1.5, |
|
|
normalize_LRA: float = 11.0, |
|
|
) -> bool: |
|
|
""" |
|
|
Synchronize a separate audio file with a video (copy video, re-encode audio AAC). |
|
|
Positive offset_ms delays audio; negative trims audio start. |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("sync", "FFmpeg not available") |
|
|
|
|
|
filter_chain = [] |
|
|
if abs(offset_ms) > 1e-3: |
|
|
if offset_ms > 0: |
|
|
ms = int(round(offset_ms)) |
|
|
filter_chain.append(f"adelay={ms}|{ms}") |
|
|
else: |
|
|
secs = abs(offset_ms) / 1000.0 |
|
|
filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
|
|
|
|
|
if normalize: |
|
|
measured = self._measure_loudness(audio_path) |
|
|
if measured: |
|
|
filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
|
|
else: |
|
|
filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
|
|
|
afilter = ",".join(filter_chain) if filter_chain else None |
|
|
|
|
|
if afilter: |
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", video_path, |
|
|
"-i", audio_path, |
|
|
"-map", "0:v:0", |
|
|
"-filter_complex", f"[1:a]{afilter}[aout]", |
|
|
"-map", "[aout]", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
else: |
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", video_path, |
|
|
"-i", audio_path, |
|
|
"-map", "0:v:0", "-map", "1:a:0", |
|
|
"-c:v", "copy", |
|
|
"-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
|
|
"-shortest", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
|
|
|
proc = self._run(cmd, "sync") |
|
|
return proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def adjust_audio_levels( |
|
|
self, |
|
|
input_path: str, |
|
|
output_path: str, |
|
|
volume_factor: float = 1.0, |
|
|
normalize: bool = False, |
|
|
normalize_I: float = -16.0, |
|
|
normalize_TP: float = -1.5, |
|
|
normalize_LRA: float = 11.0, |
|
|
) -> bool: |
|
|
""" |
|
|
Adjust levels on a single-file video (copy video, re-encode audio AAC). |
|
|
""" |
|
|
if not self.ffmpeg_available: |
|
|
raise AudioProcessingError("adjust_levels", "FFmpeg not available") |
|
|
|
|
|
filters = [] |
|
|
if volume_factor != 1.0: |
|
|
filters.append(f"volume={volume_factor}") |
|
|
if normalize: |
|
|
measured = self._measure_loudness(input_path) |
|
|
if measured: |
|
|
filters.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
|
|
else: |
|
|
filters.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
|
|
|
|
|
if filters: |
|
|
cmd = [ |
|
|
self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
|
|
"-i", input_path, |
|
|
"-c:v", "copy", |
|
|
"-af", ",".join(filters), |
|
|
"-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
|
|
"-movflags", "+faststart", |
|
|
"-y", output_path, |
|
|
] |
|
|
else: |
|
|
|
|
|
shutil.copy2(input_path, output_path) |
|
|
return True |
|
|
|
|
|
proc = self._run(cmd, "adjust-levels") |
|
|
if proc.returncode != 0: |
|
|
raise AudioProcessingError("adjust_levels", proc.stderr, input_path) |
|
|
return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
tot_ops = self.stats["audio_extractions"] + self.stats["audio_merges"] + self.stats["failed_operations"] |
|
|
successes = self.stats["audio_extractions"] + self.stats["audio_merges"] |
|
|
success_rate = (successes / max(1, tot_ops)) * 100.0 |
|
|
return { |
|
|
"ffmpeg_available": self.ffmpeg_available, |
|
|
"ffprobe_available": self.ffprobe_available, |
|
|
"audio_extractions": self.stats["audio_extractions"], |
|
|
"audio_merges": self.stats["audio_merges"], |
|
|
"total_processing_time": self.stats["total_processing_time"], |
|
|
"failed_operations": self.stats["failed_operations"], |
|
|
"success_rate": success_rate, |
|
|
} |
|
|
|
|
|
def cleanup_temp_files(self, max_age_hours: int = 24): |
|
|
""" |
|
|
Clean up temporary audio/video files older than specified age in temp_dir. |
|
|
""" |
|
|
try: |
|
|
temp_path = Path(self.temp_dir) |
|
|
cutoff = time.time() - (max_age_hours * 3600) |
|
|
cleaned = 0 |
|
|
|
|
|
for ext in (".aac", ".mp3", ".wav", ".mp4", ".m4a"): |
|
|
for p in temp_path.glob(f"*audio*{ext}"): |
|
|
try: |
|
|
if p.stat().st_mtime < cutoff: |
|
|
p.unlink() |
|
|
cleaned += 1 |
|
|
except Exception as e: |
|
|
logger.warning("Could not delete temp file %s: %s", p, e) |
|
|
if cleaned: |
|
|
logger.info("Cleaned up %d temporary audio files", cleaned) |
|
|
except Exception as e: |
|
|
logger.warning("Temp file cleanup error: %s", e) |
|
|
|