EceMotion_Pictures / utils_video.py
drvsbrkcn's picture
Upload 3 files
404465b verified
"""
Video processing utilities for EceMotion Pictures.
Enhanced text-to-video generation with robust error handling and fallbacks.
"""
import numpy as np
import logging
import os
import shutil
from typing import Optional, Tuple, List
from pathlib import Path
from config import (
MODEL_VIDEO, MODEL_CONFIGS, get_device, VHS_INTENSITY, SCANLINE_OPACITY,
CHROMATIC_ABERRATION, FILM_GRAIN, get_safe_model_name
)
logger = logging.getLogger(__name__)
# Global model cache
t2v_pipe = None
current_model = None
def get_t2v_pipe(device: str, model_name: str = MODEL_VIDEO):
"""Get or create T2V pipeline with lazy loading and model switching."""
global t2v_pipe, current_model
# Use safe model name
safe_model_name = get_safe_model_name(model_name, "video")
if t2v_pipe is None or current_model != safe_model_name:
logger.info(f"Loading T2V model: {safe_model_name}")
try:
if "cogvideox" in safe_model_name.lower():
# Try CogVideoX first
t2v_pipe = _load_cogvideox(safe_model_name, device)
else:
# Use standard diffusers pipeline
t2v_pipe = _load_standard_t2v(safe_model_name, device)
if t2v_pipe is not None:
current_model = safe_model_name
logger.info(f"T2V model {safe_model_name} loaded successfully")
else:
raise RuntimeError("Failed to load any T2V model")
except Exception as e:
logger.error(f"Failed to load {safe_model_name}: {e}")
# Fallback to original model
t2v_pipe = _load_standard_t2v("damo-vilab/text-to-video-ms-1.7b", device)
current_model = "damo-vilab/text-to-video-ms-1.7b"
return t2v_pipe
def _load_cogvideox(model_name: str, device: str):
"""Load CogVideoX model."""
try:
from diffusers import CogVideoXPipeline
pipe = CogVideoXPipeline.from_pretrained(
model_name,
torch_dtype="auto",
trust_remote_code=True
)
if device == "cuda":
pipe = pipe.to(device)
return pipe
except Exception as e:
logger.error(f"Failed to load CogVideoX: {e}")
return None
def _load_standard_t2v(model_name: str, device: str):
"""Load standard T2V model."""
try:
from diffusers import TextToVideoSDPipeline
import torch
# Fix device string - convert "auto" to proper device
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = TextToVideoSDPipeline.from_pretrained(
model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
)
if device == "cuda":
pipe = pipe.to(device)
return pipe
except Exception as e:
logger.error(f"Failed to load standard T2V: {e}")
return None
def synth_t2v(prompt: str, seed: int, num_frames: int = 16, fps: int = 8,
device: str = None, model_name: str = MODEL_VIDEO):
"""
Generate text-to-video with enhanced model support and frame control.
"""
if device is None:
device = get_device()
pipe = get_t2v_pipe(device, model_name)
model_config = MODEL_CONFIGS.get(current_model, {})
# Validate frame count against model limits (reduced for ZeroGPU)
max_frames = model_config.get("max_frames", 16) # Reduced from 32
min_frames = model_config.get("min_frames", 8)
num_frames = max(min_frames, min(num_frames, max_frames))
# Force lower frame count for ZeroGPU timeout limits
if num_frames > 16:
num_frames = 16
logger.info(f"Reduced frame count to {num_frames} for ZeroGPU compatibility")
logger.info(f"Generating {num_frames} frames at {fps}fps with {current_model}")
try:
# Set up generator
import torch
generator = torch.Generator(device=device).manual_seed(seed)
# Generate frames based on model type
if "cogvideox" in current_model.lower():
# CogVideoX specific generation (optimized for ZeroGPU)
result = pipe(
prompt=prompt,
num_frames=num_frames,
generator=generator,
guidance_scale=5.0, # Reduced for speed
num_inference_steps=10 # Reduced for speed
)
frames = result.frames
else:
# Standard pipeline
result = pipe(
prompt=prompt,
num_frames=num_frames,
generator=generator
)
frames = result.frames
# Convert to numpy arrays and create clip
frame_arrays = [np.array(frame) for frame in frames]
# Create clip using moviepy
from moviepy.editor import ImageSequenceClip
clip = ImageSequenceClip(frame_arrays, fps=fps)
logger.info(f"Generated video clip: {clip.duration:.2f}s, {len(frame_arrays)} frames")
return clip
except Exception as e:
logger.error(f"Video generation failed: {e}")
# Return a simple fallback clip
return _create_fallback_clip(prompt, num_frames, fps)
def _create_fallback_clip(prompt: str, num_frames: int, fps: int):
"""Create a simple fallback clip when video generation fails."""
try:
from moviepy.editor import ColorClip
# Create a simple colored background without text (avoids ImageMagick issues)
duration = num_frames / fps
clip = ColorClip(size=(640, 480), color=(100, 50, 200), duration=duration)
logger.info(f"Created fallback clip: {clip.duration:.2f}s")
return clip
except Exception as e:
logger.error(f"Failed to create fallback clip: {e}")
# Last resort: create a simple color clip
from moviepy.editor import ColorClip
return ColorClip(size=(640, 480), color=(100, 50, 200), duration=5.0)
def apply_retro_filters(input_path: str, output_path: str, intensity: float = VHS_INTENSITY):
"""
Apply authentic VHS/CRT effects with enhanced visual artifacts.
"""
logger.info(f"Applying retro filters with intensity {intensity}")
# Check if ffmpeg is available
if not _check_ffmpeg():
logger.warning("ffmpeg not available, using simple filter")
_apply_simple_retro_filters(input_path, output_path)
return
try:
# Build filter chain for authentic VHS look
filters = []
# 1. Format conversion
filters.append('format=yuv420p')
# 2. Basic color grading for retro look
filters.append(f'hue=s={0.8 + 0.2 * intensity}')
filters.append(f'eq=brightness={0.02 * intensity}:contrast={1.0 + 0.1 * intensity}:saturation={1.0 + 0.2 * intensity}:gamma={1.0 - 0.05 * intensity}')
# 3. VHS tracking lines and noise
if intensity > 0.3:
filters.append(f'tblend=all_mode=difference:all_opacity={0.05 * intensity}')
filters.append(f'noise=alls={int(20 * intensity)}:allf=t')
# 4. Film grain
if FILM_GRAIN > 0:
grain = FILM_GRAIN * intensity
filters.append(f'noise=alls={int(15 * grain)}:allf=u')
# 5. Vignetting
filters.append(f'vignette=PI/4:{0.3 * intensity}')
# Apply filters using ffmpeg
import ffmpeg
stream = ffmpeg.input(input_path)
# Apply filter chain - use simple filters to avoid filter_complex issues
if len(filters) > 1:
# Apply filters one by one to avoid filter_complex issues
for filter_str in filters:
if filter_str == 'format=yuv420p':
stream = stream.filter('format', 'yuv420p')
elif 'hue=' in filter_str:
s_val = filter_str.split('s=')[1]
stream = stream.filter('hue', s=float(s_val))
elif 'eq=' in filter_str:
# Extract eq parameters
eq_params = filter_str.split('eq=')[1]
parts = eq_params.split(':')
brightness = float(parts[0].split('=')[1]) if 'brightness=' in parts[0] else 0
contrast = float(parts[1].split('=')[1]) if 'contrast=' in parts[1] else 1
saturation = float(parts[2].split('=')[1]) if 'saturation=' in parts[2] else 1
stream = stream.filter('eq', brightness=brightness, contrast=contrast, saturation=saturation)
elif 'noise=' in filter_str:
alls_val = int(filter_str.split('alls=')[1].split(':')[0])
stream = stream.filter('noise', alls=alls_val)
elif 'vignette=' in filter_str:
angle = float(filter_str.split('vignette=')[1].split(':')[0])
strength = float(filter_str.split(':')[1])
stream = stream.filter('vignette', angle=angle, strength=strength)
else:
stream = stream.filter('format', 'yuv420p')
# Output with high quality settings
stream = stream.output(
output_path,
vcodec='libx264',
pix_fmt='yuv420p',
crf=20, # Good quality
preset='medium',
movflags='+faststart'
)
stream.overwrite_output().run(quiet=True)
logger.info("Retro filters applied successfully")
except Exception as e:
logger.error(f"Failed to apply retro filters: {e}")
# Fallback to simple filter
_apply_simple_retro_filters(input_path, output_path)
def _check_ffmpeg() -> bool:
"""Check if ffmpeg is available."""
try:
import subprocess
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def _apply_simple_retro_filters(input_path: str, output_path: str):
"""Fallback simple retro filter application."""
try:
import ffmpeg
(
ffmpeg
.input(input_path)
.filter('format', 'yuv420p')
.filter('tblend', all_mode='difference', all_opacity=0.05)
.filter('hue', s=0.9)
.filter('eq', brightness=0.02, contrast=1.05, saturation=1.1, gamma=0.98)
.filter('noise', alls=10)
.output(output_path, vcodec='libx264', pix_fmt='yuv420p', crf=20, movflags='+faststart')
.overwrite_output()
.run(quiet=True)
)
logger.info("Simple retro filters applied as fallback")
except Exception as e:
logger.error(f"Even simple retro filters failed: {e}")
# Just copy the file
shutil.copy2(input_path, output_path)
def mux_audio(video_in: str, audio_in: str, out_path: str):
"""Mux video and audio with error handling."""
try:
if _check_ffmpeg():
_mux_with_ffmpeg(video_in, audio_in, out_path)
else:
_mux_with_moviepy(video_in, audio_in, out_path)
except Exception as e:
logger.error(f"Audio muxing failed: {e}")
# Fallback: just copy video
shutil.copy2(video_in, out_path)
def _mux_with_ffmpeg(video_in: str, audio_in: str, out_path: str):
"""Mux using ffmpeg."""
import ffmpeg
(
ffmpeg
.input(video_in)
.input(audio_in)
.output(out_path, vcodec='copy', acodec='aac', audio_bitrate='128k', movflags='+faststart')
.overwrite_output()
.run(quiet=True)
)
def _mux_with_moviepy(video_in: str, audio_in: str, out_path: str):
"""Mux using moviepy (fallback)."""
from moviepy.editor import VideoFileClip, AudioFileClip
# Load video and audio
video = VideoFileClip(video_in)
audio = AudioFileClip(audio_in)
# Set audio duration to match video
if audio.duration > video.duration:
audio = audio.subclip(0, video.duration)
elif audio.duration < video.duration:
# Pad audio with silence
from moviepy.audio.AudioClip import AudioClip
silence = AudioClip(lambda t: 0, duration=video.duration - audio.duration)
audio = audio.concatenate_audioclips([audio, silence])
# Combine and write
final_video = video.set_audio(audio)
final_video.write_videofile(
out_path,
codec='libx264',
audio_codec='aac',
temp_audiofile='temp-audio.m4a',
remove_temp=True,
verbose=False,
logger=None
)
# Clean up
video.close()
audio.close()
final_video.close()