|
|
""" |
|
|
Video processing utilities for EceMotion Pictures. |
|
|
Enhanced text-to-video generation with robust error handling and fallbacks. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import logging |
|
|
import os |
|
|
import shutil |
|
|
from typing import Optional, Tuple, List |
|
|
from pathlib import Path |
|
|
|
|
|
from config import ( |
|
|
MODEL_VIDEO, MODEL_CONFIGS, get_device, VHS_INTENSITY, SCANLINE_OPACITY, |
|
|
CHROMATIC_ABERRATION, FILM_GRAIN, get_safe_model_name |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
t2v_pipe = None |
|
|
current_model = None |
|
|
|
|
|
def get_t2v_pipe(device: str, model_name: str = MODEL_VIDEO): |
|
|
"""Get or create T2V pipeline with lazy loading and model switching.""" |
|
|
global t2v_pipe, current_model |
|
|
|
|
|
|
|
|
safe_model_name = get_safe_model_name(model_name, "video") |
|
|
|
|
|
if t2v_pipe is None or current_model != safe_model_name: |
|
|
logger.info(f"Loading T2V model: {safe_model_name}") |
|
|
|
|
|
try: |
|
|
if "cogvideox" in safe_model_name.lower(): |
|
|
|
|
|
t2v_pipe = _load_cogvideox(safe_model_name, device) |
|
|
else: |
|
|
|
|
|
t2v_pipe = _load_standard_t2v(safe_model_name, device) |
|
|
|
|
|
if t2v_pipe is not None: |
|
|
current_model = safe_model_name |
|
|
logger.info(f"T2V model {safe_model_name} loaded successfully") |
|
|
else: |
|
|
raise RuntimeError("Failed to load any T2V model") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load {safe_model_name}: {e}") |
|
|
|
|
|
t2v_pipe = _load_standard_t2v("damo-vilab/text-to-video-ms-1.7b", device) |
|
|
current_model = "damo-vilab/text-to-video-ms-1.7b" |
|
|
|
|
|
return t2v_pipe |
|
|
|
|
|
def _load_cogvideox(model_name: str, device: str): |
|
|
"""Load CogVideoX model.""" |
|
|
try: |
|
|
from diffusers import CogVideoXPipeline |
|
|
|
|
|
pipe = CogVideoXPipeline.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype="auto", |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
if device == "cuda": |
|
|
pipe = pipe.to(device) |
|
|
|
|
|
return pipe |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load CogVideoX: {e}") |
|
|
return None |
|
|
|
|
|
def _load_standard_t2v(model_name: str, device: str): |
|
|
"""Load standard T2V model.""" |
|
|
try: |
|
|
from diffusers import TextToVideoSDPipeline |
|
|
import torch |
|
|
|
|
|
|
|
|
if device == "auto": |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
pipe = TextToVideoSDPipeline.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.float16 if device == "cuda" else torch.float32 |
|
|
) |
|
|
|
|
|
if device == "cuda": |
|
|
pipe = pipe.to(device) |
|
|
|
|
|
return pipe |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load standard T2V: {e}") |
|
|
return None |
|
|
|
|
|
def synth_t2v(prompt: str, seed: int, num_frames: int = 16, fps: int = 8, |
|
|
device: str = None, model_name: str = MODEL_VIDEO): |
|
|
""" |
|
|
Generate text-to-video with enhanced model support and frame control. |
|
|
""" |
|
|
if device is None: |
|
|
device = get_device() |
|
|
|
|
|
pipe = get_t2v_pipe(device, model_name) |
|
|
model_config = MODEL_CONFIGS.get(current_model, {}) |
|
|
|
|
|
|
|
|
max_frames = model_config.get("max_frames", 16) |
|
|
min_frames = model_config.get("min_frames", 8) |
|
|
num_frames = max(min_frames, min(num_frames, max_frames)) |
|
|
|
|
|
|
|
|
if num_frames > 16: |
|
|
num_frames = 16 |
|
|
logger.info(f"Reduced frame count to {num_frames} for ZeroGPU compatibility") |
|
|
|
|
|
logger.info(f"Generating {num_frames} frames at {fps}fps with {current_model}") |
|
|
|
|
|
try: |
|
|
|
|
|
import torch |
|
|
generator = torch.Generator(device=device).manual_seed(seed) |
|
|
|
|
|
|
|
|
if "cogvideox" in current_model.lower(): |
|
|
|
|
|
result = pipe( |
|
|
prompt=prompt, |
|
|
num_frames=num_frames, |
|
|
generator=generator, |
|
|
guidance_scale=5.0, |
|
|
num_inference_steps=10 |
|
|
) |
|
|
frames = result.frames |
|
|
else: |
|
|
|
|
|
result = pipe( |
|
|
prompt=prompt, |
|
|
num_frames=num_frames, |
|
|
generator=generator |
|
|
) |
|
|
frames = result.frames |
|
|
|
|
|
|
|
|
frame_arrays = [np.array(frame) for frame in frames] |
|
|
|
|
|
|
|
|
from moviepy.editor import ImageSequenceClip |
|
|
clip = ImageSequenceClip(frame_arrays, fps=fps) |
|
|
|
|
|
logger.info(f"Generated video clip: {clip.duration:.2f}s, {len(frame_arrays)} frames") |
|
|
return clip |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video generation failed: {e}") |
|
|
|
|
|
return _create_fallback_clip(prompt, num_frames, fps) |
|
|
|
|
|
def _create_fallback_clip(prompt: str, num_frames: int, fps: int): |
|
|
"""Create a simple fallback clip when video generation fails.""" |
|
|
try: |
|
|
from moviepy.editor import ColorClip |
|
|
|
|
|
|
|
|
duration = num_frames / fps |
|
|
clip = ColorClip(size=(640, 480), color=(100, 50, 200), duration=duration) |
|
|
|
|
|
logger.info(f"Created fallback clip: {clip.duration:.2f}s") |
|
|
return clip |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create fallback clip: {e}") |
|
|
|
|
|
from moviepy.editor import ColorClip |
|
|
return ColorClip(size=(640, 480), color=(100, 50, 200), duration=5.0) |
|
|
|
|
|
def apply_retro_filters(input_path: str, output_path: str, intensity: float = VHS_INTENSITY): |
|
|
""" |
|
|
Apply authentic VHS/CRT effects with enhanced visual artifacts. |
|
|
""" |
|
|
logger.info(f"Applying retro filters with intensity {intensity}") |
|
|
|
|
|
|
|
|
if not _check_ffmpeg(): |
|
|
logger.warning("ffmpeg not available, using simple filter") |
|
|
_apply_simple_retro_filters(input_path, output_path) |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
filters = [] |
|
|
|
|
|
|
|
|
filters.append('format=yuv420p') |
|
|
|
|
|
|
|
|
filters.append(f'hue=s={0.8 + 0.2 * intensity}') |
|
|
filters.append(f'eq=brightness={0.02 * intensity}:contrast={1.0 + 0.1 * intensity}:saturation={1.0 + 0.2 * intensity}:gamma={1.0 - 0.05 * intensity}') |
|
|
|
|
|
|
|
|
if intensity > 0.3: |
|
|
filters.append(f'tblend=all_mode=difference:all_opacity={0.05 * intensity}') |
|
|
filters.append(f'noise=alls={int(20 * intensity)}:allf=t') |
|
|
|
|
|
|
|
|
if FILM_GRAIN > 0: |
|
|
grain = FILM_GRAIN * intensity |
|
|
filters.append(f'noise=alls={int(15 * grain)}:allf=u') |
|
|
|
|
|
|
|
|
filters.append(f'vignette=PI/4:{0.3 * intensity}') |
|
|
|
|
|
|
|
|
import ffmpeg |
|
|
|
|
|
stream = ffmpeg.input(input_path) |
|
|
|
|
|
|
|
|
if len(filters) > 1: |
|
|
|
|
|
for filter_str in filters: |
|
|
if filter_str == 'format=yuv420p': |
|
|
stream = stream.filter('format', 'yuv420p') |
|
|
elif 'hue=' in filter_str: |
|
|
s_val = filter_str.split('s=')[1] |
|
|
stream = stream.filter('hue', s=float(s_val)) |
|
|
elif 'eq=' in filter_str: |
|
|
|
|
|
eq_params = filter_str.split('eq=')[1] |
|
|
parts = eq_params.split(':') |
|
|
brightness = float(parts[0].split('=')[1]) if 'brightness=' in parts[0] else 0 |
|
|
contrast = float(parts[1].split('=')[1]) if 'contrast=' in parts[1] else 1 |
|
|
saturation = float(parts[2].split('=')[1]) if 'saturation=' in parts[2] else 1 |
|
|
stream = stream.filter('eq', brightness=brightness, contrast=contrast, saturation=saturation) |
|
|
elif 'noise=' in filter_str: |
|
|
alls_val = int(filter_str.split('alls=')[1].split(':')[0]) |
|
|
stream = stream.filter('noise', alls=alls_val) |
|
|
elif 'vignette=' in filter_str: |
|
|
angle = float(filter_str.split('vignette=')[1].split(':')[0]) |
|
|
strength = float(filter_str.split(':')[1]) |
|
|
stream = stream.filter('vignette', angle=angle, strength=strength) |
|
|
else: |
|
|
stream = stream.filter('format', 'yuv420p') |
|
|
|
|
|
|
|
|
stream = stream.output( |
|
|
output_path, |
|
|
vcodec='libx264', |
|
|
pix_fmt='yuv420p', |
|
|
crf=20, |
|
|
preset='medium', |
|
|
movflags='+faststart' |
|
|
) |
|
|
|
|
|
stream.overwrite_output().run(quiet=True) |
|
|
logger.info("Retro filters applied successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to apply retro filters: {e}") |
|
|
|
|
|
_apply_simple_retro_filters(input_path, output_path) |
|
|
|
|
|
def _check_ffmpeg() -> bool: |
|
|
"""Check if ffmpeg is available.""" |
|
|
try: |
|
|
import subprocess |
|
|
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) |
|
|
return True |
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
|
return False |
|
|
|
|
|
def _apply_simple_retro_filters(input_path: str, output_path: str): |
|
|
"""Fallback simple retro filter application.""" |
|
|
try: |
|
|
import ffmpeg |
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(input_path) |
|
|
.filter('format', 'yuv420p') |
|
|
.filter('tblend', all_mode='difference', all_opacity=0.05) |
|
|
.filter('hue', s=0.9) |
|
|
.filter('eq', brightness=0.02, contrast=1.05, saturation=1.1, gamma=0.98) |
|
|
.filter('noise', alls=10) |
|
|
.output(output_path, vcodec='libx264', pix_fmt='yuv420p', crf=20, movflags='+faststart') |
|
|
.overwrite_output() |
|
|
.run(quiet=True) |
|
|
) |
|
|
logger.info("Simple retro filters applied as fallback") |
|
|
except Exception as e: |
|
|
logger.error(f"Even simple retro filters failed: {e}") |
|
|
|
|
|
shutil.copy2(input_path, output_path) |
|
|
|
|
|
def mux_audio(video_in: str, audio_in: str, out_path: str): |
|
|
"""Mux video and audio with error handling.""" |
|
|
try: |
|
|
if _check_ffmpeg(): |
|
|
_mux_with_ffmpeg(video_in, audio_in, out_path) |
|
|
else: |
|
|
_mux_with_moviepy(video_in, audio_in, out_path) |
|
|
except Exception as e: |
|
|
logger.error(f"Audio muxing failed: {e}") |
|
|
|
|
|
shutil.copy2(video_in, out_path) |
|
|
|
|
|
def _mux_with_ffmpeg(video_in: str, audio_in: str, out_path: str): |
|
|
"""Mux using ffmpeg.""" |
|
|
import ffmpeg |
|
|
|
|
|
( |
|
|
ffmpeg |
|
|
.input(video_in) |
|
|
.input(audio_in) |
|
|
.output(out_path, vcodec='copy', acodec='aac', audio_bitrate='128k', movflags='+faststart') |
|
|
.overwrite_output() |
|
|
.run(quiet=True) |
|
|
) |
|
|
|
|
|
def _mux_with_moviepy(video_in: str, audio_in: str, out_path: str): |
|
|
"""Mux using moviepy (fallback).""" |
|
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
|
|
|
|
|
|
|
video = VideoFileClip(video_in) |
|
|
audio = AudioFileClip(audio_in) |
|
|
|
|
|
|
|
|
if audio.duration > video.duration: |
|
|
audio = audio.subclip(0, video.duration) |
|
|
elif audio.duration < video.duration: |
|
|
|
|
|
from moviepy.audio.AudioClip import AudioClip |
|
|
silence = AudioClip(lambda t: 0, duration=video.duration - audio.duration) |
|
|
audio = audio.concatenate_audioclips([audio, silence]) |
|
|
|
|
|
|
|
|
final_video = video.set_audio(audio) |
|
|
final_video.write_videofile( |
|
|
out_path, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
temp_audiofile='temp-audio.m4a', |
|
|
remove_temp=True, |
|
|
verbose=False, |
|
|
logger=None |
|
|
) |
|
|
|
|
|
|
|
|
video.close() |
|
|
audio.close() |
|
|
final_video.close() |
|
|
|
|
|
|