|
|
""" |
|
|
Audio-Video Synchronization Manager for EceMotion Pictures. |
|
|
Ensures frame-perfect alignment between generated audio and video content. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import tempfile |
|
|
import subprocess |
|
|
import numpy as np |
|
|
import logging |
|
|
from typing import Tuple, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
import shutil |
|
|
|
|
|
from config import SYNC_TOLERANCE_MS, FORCE_SYNC, AUDIO_SAMPLE_RATE |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class SyncManager: |
|
|
"""Manages audio-video synchronization with frame-perfect accuracy.""" |
|
|
|
|
|
def __init__(self, tolerance_ms: int = SYNC_TOLERANCE_MS): |
|
|
self.tolerance_ms = tolerance_ms |
|
|
self.tolerance_s = tolerance_ms / 1000.0 |
|
|
self.ffmpeg_available = self._check_ffmpeg() |
|
|
|
|
|
def _check_ffmpeg(self) -> bool: |
|
|
"""Check if ffmpeg is available.""" |
|
|
try: |
|
|
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) |
|
|
return True |
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
|
logger.warning("ffmpeg not found, using fallback methods") |
|
|
return False |
|
|
|
|
|
def calculate_video_duration(self, num_frames: int, fps: float) -> float: |
|
|
"""Calculate exact video duration from frame count and FPS.""" |
|
|
return num_frames / fps |
|
|
|
|
|
def measure_audio_duration(self, audio_path: str) -> float: |
|
|
"""Measure actual duration of audio file.""" |
|
|
if not os.path.exists(audio_path): |
|
|
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
|
|
|
|
if self.ffmpeg_available: |
|
|
return self._measure_with_ffmpeg(audio_path) |
|
|
else: |
|
|
return self._measure_with_soundfile(audio_path) |
|
|
|
|
|
def _measure_with_ffmpeg(self, audio_path: str) -> float: |
|
|
"""Measure duration using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', |
|
|
'-of', 'csv=p=0', audio_path |
|
|
] |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
|
duration = float(result.stdout.strip()) |
|
|
logger.info(f"Audio duration (ffmpeg): {duration:.3f}s") |
|
|
return duration |
|
|
except (subprocess.CalledProcessError, ValueError) as e: |
|
|
logger.error(f"Failed to measure audio duration with ffmpeg: {e}") |
|
|
return self._measure_with_soundfile(audio_path) |
|
|
|
|
|
def _measure_with_soundfile(self, audio_path: str) -> float: |
|
|
"""Measure duration using soundfile as fallback.""" |
|
|
try: |
|
|
import soundfile as sf |
|
|
info = sf.info(audio_path) |
|
|
duration = info.duration |
|
|
logger.info(f"Audio duration (soundfile): {duration:.3f}s") |
|
|
return duration |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to measure audio duration with soundfile: {e}") |
|
|
|
|
|
return self._estimate_duration_from_size(audio_path) |
|
|
|
|
|
def _estimate_duration_from_size(self, audio_path: str) -> float: |
|
|
"""Estimate duration from file size (very rough estimate).""" |
|
|
try: |
|
|
file_size = os.path.getsize(audio_path) |
|
|
|
|
|
estimated_duration = file_size / (1024 * 1024) |
|
|
logger.warning(f"Estimated audio duration from file size: {estimated_duration:.3f}s") |
|
|
return estimated_duration |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to estimate duration: {e}") |
|
|
return 10.0 |
|
|
|
|
|
def measure_video_duration(self, video_path: str) -> float: |
|
|
"""Measure actual duration of video file.""" |
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
|
|
|
|
if self.ffmpeg_available: |
|
|
return self._measure_video_with_ffmpeg(video_path) |
|
|
else: |
|
|
return self._estimate_video_duration(video_path) |
|
|
|
|
|
def _measure_video_with_ffmpeg(self, video_path: str) -> float: |
|
|
"""Measure video duration using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', |
|
|
'-of', 'csv=p=0', video_path |
|
|
] |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
|
duration = float(result.stdout.strip()) |
|
|
logger.info(f"Video duration (ffmpeg): {duration:.3f}s") |
|
|
return duration |
|
|
except (subprocess.CalledProcessError, ValueError) as e: |
|
|
logger.error(f"Failed to measure video duration with ffmpeg: {e}") |
|
|
return self._estimate_video_duration(video_path) |
|
|
|
|
|
def _estimate_video_duration(self, video_path: str) -> float: |
|
|
"""Estimate video duration (fallback method).""" |
|
|
try: |
|
|
|
|
|
filename = os.path.basename(video_path) |
|
|
if '_' in filename: |
|
|
|
|
|
parts = filename.split('_') |
|
|
for part in parts: |
|
|
if 's' in part: |
|
|
try: |
|
|
duration = float(part.replace('s', '')) |
|
|
logger.info(f"Estimated video duration from filename: {duration:.3f}s") |
|
|
return duration |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
|
|
|
logger.warning("Using default video duration estimate: 10.0s") |
|
|
return 10.0 |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to estimate video duration: {e}") |
|
|
return 10.0 |
|
|
|
|
|
def adjust_audio_to_video(self, audio_path: str, target_duration: float, |
|
|
output_path: str) -> str: |
|
|
"""Adjust audio duration to match video duration.""" |
|
|
if self.ffmpeg_available: |
|
|
return self._adjust_audio_with_ffmpeg(audio_path, target_duration, output_path) |
|
|
else: |
|
|
return self._adjust_audio_with_soundfile(audio_path, target_duration, output_path) |
|
|
|
|
|
def _adjust_audio_with_ffmpeg(self, audio_path: str, target_duration: float, |
|
|
output_path: str) -> str: |
|
|
"""Adjust audio using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', audio_path, '-t', str(target_duration), |
|
|
'-af', 'apad', '-c:a', 'pcm_s16le', '-y', output_path |
|
|
] |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
logger.info(f"Adjusted audio to {target_duration:.3f}s using ffmpeg") |
|
|
return output_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to adjust audio with ffmpeg: {e}") |
|
|
return self._adjust_audio_with_soundfile(audio_path, target_duration, output_path) |
|
|
|
|
|
def _adjust_audio_with_soundfile(self, audio_path: str, target_duration: float, |
|
|
output_path: str) -> str: |
|
|
"""Adjust audio using soundfile (fallback).""" |
|
|
try: |
|
|
import soundfile as sf |
|
|
|
|
|
|
|
|
audio_data, sample_rate = sf.read(audio_path) |
|
|
|
|
|
|
|
|
target_samples = int(target_duration * sample_rate) |
|
|
|
|
|
if len(audio_data) < target_samples: |
|
|
|
|
|
padding = np.zeros(target_samples - len(audio_data)) |
|
|
if len(audio_data.shape) > 1: |
|
|
padding = np.zeros((target_samples - len(audio_data), audio_data.shape[1])) |
|
|
adjusted_audio = np.concatenate([audio_data, padding]) |
|
|
else: |
|
|
|
|
|
adjusted_audio = audio_data[:target_samples] |
|
|
|
|
|
|
|
|
sf.write(output_path, adjusted_audio, sample_rate) |
|
|
logger.info(f"Adjusted audio to {target_duration:.3f}s using soundfile") |
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to adjust audio with soundfile: {e}") |
|
|
|
|
|
shutil.copy2(audio_path, output_path) |
|
|
return output_path |
|
|
|
|
|
def adjust_video_to_audio(self, video_path: str, target_duration: float, |
|
|
output_path: str) -> str: |
|
|
"""Adjust video duration to match audio duration.""" |
|
|
if self.ffmpeg_available: |
|
|
return self._adjust_video_with_ffmpeg(video_path, target_duration, output_path) |
|
|
else: |
|
|
|
|
|
shutil.copy2(video_path, output_path) |
|
|
return output_path |
|
|
|
|
|
def _adjust_video_with_ffmpeg(self, video_path: str, target_duration: float, |
|
|
output_path: str) -> str: |
|
|
"""Adjust video using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', video_path, '-t', str(target_duration), |
|
|
'-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-y', output_path |
|
|
] |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
logger.info(f"Adjusted video to {target_duration:.3f}s using ffmpeg") |
|
|
return output_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to adjust video with ffmpeg: {e}") |
|
|
|
|
|
shutil.copy2(video_path, output_path) |
|
|
return output_path |
|
|
|
|
|
def validate_sync(self, video_path: str, audio_path: str) -> Tuple[bool, float]: |
|
|
"""Validate that audio and video are properly synchronized.""" |
|
|
try: |
|
|
video_duration = self.measure_video_duration(video_path) |
|
|
audio_duration = self.measure_audio_duration(audio_path) |
|
|
|
|
|
duration_diff = abs(video_duration - audio_duration) |
|
|
is_synced = duration_diff <= self.tolerance_s |
|
|
|
|
|
logger.info(f"Sync validation: video={video_duration:.3f}s, " |
|
|
f"audio={audio_duration:.3f}s, diff={duration_diff:.3f}s, " |
|
|
f"synced={is_synced}") |
|
|
|
|
|
return is_synced, duration_diff |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Sync validation failed: {e}") |
|
|
return False, float('inf') |
|
|
|
|
|
def synchronize_media(self, video_path: str, audio_path: str, |
|
|
output_path: str, prefer_audio_duration: bool = True) -> str: |
|
|
""" |
|
|
Synchronize audio and video with frame-perfect accuracy. |
|
|
""" |
|
|
try: |
|
|
|
|
|
video_duration = self.measure_video_duration(video_path) |
|
|
audio_duration = self.measure_audio_duration(audio_path) |
|
|
|
|
|
duration_diff = abs(video_duration - audio_duration) |
|
|
|
|
|
|
|
|
if duration_diff <= self.tolerance_s: |
|
|
logger.info("Media already synchronized, copying to output") |
|
|
self._copy_media(video_path, audio_path, output_path) |
|
|
return output_path |
|
|
|
|
|
|
|
|
if prefer_audio_duration: |
|
|
target_duration = audio_duration |
|
|
logger.info(f"Adjusting video to match audio duration: {target_duration:.3f}s") |
|
|
else: |
|
|
target_duration = video_duration |
|
|
logger.info(f"Adjusting audio to match video duration: {target_duration:.3f}s") |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
temp_video = os.path.join(temp_dir, "temp_video.mp4") |
|
|
temp_audio = os.path.join(temp_dir, "temp_audio.wav") |
|
|
|
|
|
|
|
|
if prefer_audio_duration: |
|
|
self.adjust_video_to_audio(video_path, target_duration, temp_video) |
|
|
temp_audio = audio_path |
|
|
else: |
|
|
self.adjust_audio_to_video(audio_path, target_duration, temp_audio) |
|
|
temp_video = video_path |
|
|
|
|
|
|
|
|
self._mux_media(temp_video, temp_audio, output_path) |
|
|
|
|
|
|
|
|
is_synced, final_diff = self.validate_sync(output_path, output_path) |
|
|
if not is_synced and FORCE_SYNC: |
|
|
logger.warning(f"Final sync validation failed with diff {final_diff:.3f}s") |
|
|
else: |
|
|
logger.info("Media successfully synchronized") |
|
|
|
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Synchronization failed: {e}") |
|
|
|
|
|
shutil.copy2(video_path, output_path) |
|
|
return output_path |
|
|
|
|
|
def _copy_media(self, video_path: str, audio_path: str, output_path: str): |
|
|
"""Copy and mux media without duration adjustment.""" |
|
|
self._mux_media(video_path, audio_path, output_path) |
|
|
|
|
|
def _mux_media(self, video_path: str, audio_path: str, output_path: str): |
|
|
"""Mux video and audio with precise timing.""" |
|
|
if self.ffmpeg_available: |
|
|
self._mux_with_ffmpeg(video_path, audio_path, output_path) |
|
|
else: |
|
|
self._mux_with_moviepy(video_path, audio_path, output_path) |
|
|
|
|
|
def _mux_with_ffmpeg(self, video_path: str, audio_path: str, output_path: str): |
|
|
"""Mux using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
'ffmpeg', '-i', video_path, '-i', audio_path, |
|
|
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '128k', |
|
|
'-shortest', '-fflags', '+shortest', |
|
|
'-movflags', '+faststart', '-y', output_path |
|
|
] |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
logger.info("Media successfully muxed with ffmpeg") |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Media muxing with ffmpeg failed: {e}") |
|
|
self._mux_with_moviepy(video_path, audio_path, output_path) |
|
|
|
|
|
def _mux_with_moviepy(self, video_path: str, audio_path: str, output_path: str): |
|
|
"""Mux using moviepy (fallback).""" |
|
|
try: |
|
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
|
|
|
|
|
|
|
video = VideoFileClip(video_path) |
|
|
audio = AudioFileClip(audio_path) |
|
|
|
|
|
|
|
|
if audio.duration > video.duration: |
|
|
audio = audio.subclip(0, video.duration) |
|
|
elif audio.duration < video.duration: |
|
|
|
|
|
from moviepy.audio.AudioClip import AudioClip |
|
|
silence = AudioClip(lambda t: 0, duration=video.duration - audio.duration) |
|
|
audio = audio.concatenate_audioclips([audio, silence]) |
|
|
|
|
|
|
|
|
final_video = video.set_audio(audio) |
|
|
final_video.write_videofile( |
|
|
output_path, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
temp_audiofile='temp-audio.m4a', |
|
|
remove_temp=True, |
|
|
verbose=False, |
|
|
logger=None |
|
|
) |
|
|
|
|
|
|
|
|
video.close() |
|
|
audio.close() |
|
|
final_video.close() |
|
|
|
|
|
logger.info("Media successfully muxed with moviepy") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Media muxing with moviepy failed: {e}") |
|
|
|
|
|
shutil.copy2(video_path, output_path) |
|
|
|
|
|
def get_optimal_frame_count(self, target_duration: float, fps: float) -> int: |
|
|
"""Calculate optimal frame count for target duration.""" |
|
|
frame_count = int(target_duration * fps) |
|
|
|
|
|
frame_count = max(8, min(frame_count, 64)) |
|
|
return frame_count |
|
|
|
|
|
def estimate_audio_duration(self, text: str, words_per_minute: int = 150) -> float: |
|
|
"""Estimate audio duration from text length.""" |
|
|
word_count = len(text.split()) |
|
|
duration_minutes = word_count / words_per_minute |
|
|
return duration_minutes * 60.0 |
|
|
|
|
|
def create_sync_manager() -> SyncManager: |
|
|
"""Factory function to create a SyncManager instance.""" |
|
|
return SyncManager() |
|
|
|