Aduc_sdr / deformes4D_engine.py
euiia's picture
Update deformes4D_engine.py
377170b verified
raw
history blame
20.4 kB
# deformes4D_engine.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Version: 2.0.1
#
# This file contains the Deformes4D Engine, which acts as the primary "Editor" or
# "Film Crew" specialist within the ADUC-SDR architecture. It implements the Camera (Ψ)
# and Distiller (Δ) concepts. Its core responsibilities include the low-level orchestration
# of video fragment generation (calling the LTX specialist), latent manipulation (calling
# the enhancer specialist), and final rendering/post-production tasks like HD mastering
# and audio generation. It executes the specific commands delegated by the AducOrchestrator.
import os
import time
import imageio
import numpy as np
import torch
import logging
from PIL import Image, ImageOps
from dataclasses import dataclass
import gradio as gr
import subprocess
import gc
import shutil
from pathlib import Path
from ltx_manager_helpers import ltx_manager_singleton
from gemini_helpers import gemini_singleton
from latent_enhancer_specialist import latent_enhancer_specialist_singleton
from hd_specialist import hd_specialist_singleton
from ltx_video.models.autoencoders.vae_encode import vae_encode, vae_decode
from audio_specialist import audio_specialist_singleton
logger = logging.getLogger(__name__)
@dataclass
class LatentConditioningItem:
"""Represents a conditioning anchor in the latent space for the Camera (Ψ)."""
latent_tensor: torch.Tensor
media_frame_number: int
conditioning_strength: float
class Deformes4DEngine:
"""
Implements the Camera (Ψ) and Distiller (Δ) of the ADUC-SDR architecture.
Orchestrates the generation, latent post-production, and final rendering of video fragments.
"""
def __init__(self, ltx_manager, workspace_dir="deformes_workspace"):
self.ltx_manager = ltx_manager
self.workspace_dir = workspace_dir
self._vae = None
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info("Deformes4D Specialist (ADUC-SDR Executor) initialized.")
os.makedirs(self.workspace_dir, exist_ok=True)
@property
def vae(self):
if self._vae is None:
# Assumes the VAE from the first LTX worker is representative
self._vae = self.ltx_manager.workers[0].pipeline.vae
self._vae.to(self.device); self._vae.eval()
return self._vae
# --- HELPER METHODS ---
@torch.no_grad()
def pixels_to_latents(self, tensor: torch.Tensor) -> torch.Tensor:
"""Encodes a pixel-space tensor to the latent space using the VAE."""
tensor = tensor.to(self.device, dtype=self.vae.dtype)
return vae_encode(tensor, self.vae, vae_per_channel_normalize=True)
@torch.no_grad()
def latents_to_pixels(self, latent_tensor: torch.Tensor, decode_timestep: float = 0.05) -> torch.Tensor:
"""Decodes a latent-space tensor to pixels using the VAE."""
latent_tensor = latent_tensor.to(self.device, dtype=self.vae.dtype)
timestep_tensor = torch.tensor([decode_timestep] * latent_tensor.shape[0], device=self.device, dtype=latent_tensor.dtype)
return vae_decode(latent_tensor, self.vae, is_video=True, timestep=timestep_tensor, vae_per_channel_normalize=True)
def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24):
"""Saves a pixel-space tensor as an MP4 video file."""
if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return
video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0)
video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0
video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
for frame in video_np: writer.append_data(frame)
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
"""Resizes and fits an image to the target resolution for VAE encoding."""
if image.size != target_resolution:
return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS)
return image
def pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor:
"""Converts a PIL Image to a latent tensor."""
image_np = np.array(pil_image).astype(np.float32) / 255.0
tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2)
tensor = (tensor * 2.0) - 1.0
return self.pixels_to_latents(tensor)
def concatenate_videos_ffmpeg(self, video_paths: list[str], output_path: str):
"""Concatenates multiple video clips into a single file using FFmpeg."""
if not video_paths: raise gr.Error("No video fragments to assemble.")
list_file_path = os.path.join(self.workspace_dir, "concat_list.txt")
with open(list_file_path, 'w', encoding='utf-8') as f:
for path in video_paths: f.write(f"file '{os.path.abspath(path)}'\n")
cmd_list = ['ffmpeg', '-y', '-hwaccel', 'auto', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path]
logger.info(f"Concatenating {len(video_paths)} video clips into {output_path}...")
try:
subprocess.run(cmd_list, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg error: {e.stderr}")
logger.info("Attempting concatenation again without hardware acceleration...")
cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path]
try:
subprocess.run(cmd_list, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e_fallback:
logger.error(f"FFmpeg error (fallback): {e_fallback.stderr}")
raise gr.Error(f"Failed to assemble the final video. Details: {e_fallback.stderr}")
# --- CORE ADUC-SDR LOGIC ---
def generate_original_movie(self, keyframes: list, global_prompt: str, storyboard: list,
seconds_per_fragment: float, trim_percent: int,
handler_strength: float, destination_convergence_strength: float,
video_resolution: int, use_continuity_director: bool,
guidance_scale: float, stg_scale: float, num_inference_steps: int,
progress: gr.Progress = gr.Progress()):
"""
Step 3: Production. Generates the original master video from keyframes.
"""
FPS = 24
FRAMES_PER_LATENT_CHUNK = 8
LATENT_PROCESSING_CHUNK_SIZE = 4
run_timestamp = int(time.time())
temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}")
temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}")
os.makedirs(temp_latent_dir, exist_ok=True)
os.makedirs(temp_video_clips_dir, exist_ok=True)
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
DESTINATION_FRAME_TARGET = total_frames_brutos - 1
base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps, "rescaling_scale": 0.15, "image_cond_noise_scale": 0.00}
keyframe_paths = [item[0] if isinstance(item, tuple) else item for item in keyframes]
story_history = ""
target_resolution_tuple = (video_resolution, video_resolution)
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
latent_fragment_paths = []
if len(keyframe_paths) < 2: raise gr.Error(f"Generation requires at least 2 keyframes. You provided {len(keyframe_paths)}.")
num_transitions_to_generate = len(keyframe_paths) - 1
logger.info("--- STARTING STAGE 1: Latent Fragment Generation ---")
for i in range(num_transitions_to_generate):
fragment_index = i + 1
progress(i / num_transitions_to_generate, desc=f"Generating Latent {fragment_index}/{num_transitions_to_generate}")
past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i]
start_keyframe_path = keyframe_paths[i]
destination_keyframe_path = keyframe_paths[i + 1]
future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "The final scene."
logger.info(f"Calling Gemini to generate cinematic decision for fragment {fragment_index}...")
decision = gemini_singleton.get_cinematic_decision(global_prompt, story_history, past_keyframe_path, start_keyframe_path, destination_keyframe_path, storyboard[i - 1] if i > 0 else "The beginning.", storyboard[i], future_story_prompt)
transition_type, motion_prompt = decision["transition_type"], decision["motion_prompt"]
story_history += f"\n- Act {fragment_index}: {motion_prompt}"
conditioning_items = []
if eco_latent_for_next_loop is None:
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self.pil_to_latent(img_start), 0, 1.0))
else:
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self.pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength))
current_ltx_params = {**base_ltx_params, "motion_prompt": motion_prompt}
logger.info(f"Calling LTX to generate video latents for fragment {fragment_index} ({total_frames_brutos} frames)...")
latents_brutos, _ = self._generate_latent_tensor_internal(conditioning_items, current_ltx_params, target_resolution_tuple, total_frames_brutos)
num_latent_frames = latents_brutos.shape[2]
logger.info(f"LTX responded with a latent tensor of shape {latents_brutos.shape}, representing ~{num_latent_frames * 8 + 1} video frames at {FPS} FPS.")
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
latents_video = latents_video[:, :, 1:, :, :]
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
if transition_type == "cut":
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
cpu_latent = latents_video.cpu()
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
torch.save(cpu_latent, latent_path)
latent_fragment_paths.append(latent_path)
del latents_video, cpu_latent; gc.collect()
del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache()
logger.info(f"--- STARTING STAGE 2: Processing {len(latent_fragment_paths)} latents in chunks of {LATENT_PROCESSING_CHUNK_SIZE} ---")
final_video_clip_paths = []
num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE)
for i in range(num_chunks):
chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE
chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE
chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index]
progress(i / num_chunks, desc=f"Processing & Decoding Batch {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
tensors_para_concatenar = [frag[:, :, :-1, :, :] if j < len(tensors_in_chunk) - 1 else frag for j, frag in enumerate(tensors_in_chunk)]
sub_group_latent = torch.cat(tensors_para_concatenar, dim=2)
del tensors_in_chunk, tensors_para_concatenar; gc.collect(); torch.cuda.empty_cache()
logger.info(f"Batch {i+1} concatenated. Latent shape: {sub_group_latent.shape}")
base_name = f"clip_{i:04d}_{run_timestamp}"
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
pixel_tensor = self.latents_to_pixels(sub_group_latent)
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
del pixel_tensor, sub_group_latent; gc.collect(); torch.cuda.empty_cache()
final_video_clip_paths.append(current_clip_path)
progress(0.98, desc="Final assembly of clips...")
final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4")
self.concatenate_videos_ffmpeg(final_video_clip_paths, final_video_path)
logger.info("Cleaning up temporary clip files...")
try:
shutil.rmtree(temp_video_clips_dir)
except OSError as e:
logger.warning(f"Could not remove temporary clip directory: {e}")
logger.info(f"Process complete! Original video saved to: {final_video_path}")
return {"final_path": final_video_path, "latent_paths": latent_fragment_paths}
def upscale_latents_and_create_video(self, latent_paths: list, chunk_size: int, progress: gr.Progress):
if not latent_paths:
raise gr.Error("Cannot perform upscaling: no latent paths were provided.")
logger.info("--- STARTING POST-PRODUCTION: Latent Upscaling ---")
run_timestamp = int(time.time())
temp_upscaled_clips_dir = os.path.join(self.workspace_dir, f"temp_upscaled_clips_{run_timestamp}")
os.makedirs(temp_upscaled_clips_dir, exist_ok=True)
final_upscaled_clip_paths = []
num_chunks = -(-len(latent_paths) // chunk_size)
for i in range(num_chunks):
chunk_start_index = i * chunk_size
chunk_end_index = chunk_start_index + chunk_size
chunk_paths = latent_paths[chunk_start_index:chunk_end_index]
progress(i / num_chunks, desc=f"Upscaling & Decoding Batch {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
tensors_para_concatenar = [frag[:, :, :-1, :, :] if j < len(tensors_in_chunk) - 1 else frag for j, frag in enumerate(tensors_in_chunk)]
sub_group_latent = torch.cat(tensors_para_concatenar, dim=2)
del tensors_in_chunk, tensors_para_concatenar; gc.collect(); torch.cuda.empty_cache()
logger.info(f"Batch {i+1} loaded. Original latent shape: {sub_group_latent.shape}")
upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent)
del sub_group_latent; gc.collect(); torch.cuda.empty_cache()
logger.info(f"Batch {i+1} upscaled. New latent shape: {upscaled_latent_chunk.shape}")
pixel_tensor = self.latents_to_pixels(upscaled_latent_chunk)
del upscaled_latent_chunk; gc.collect(); torch.cuda.empty_cache()
base_name = f"upscaled_clip_{i:04d}_{run_timestamp}"
current_clip_path = os.path.join(temp_upscaled_clips_dir, f"{base_name}.mp4")
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=24)
final_upscaled_clip_paths.append(current_clip_path)
del pixel_tensor; gc.collect(); torch.cuda.empty_cache()
logger.info(f"Saved upscaled clip: {Path(current_clip_path).name}")
progress(0.98, desc="Assembling upscaled clips...")
final_video_path = os.path.join(self.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4")
self.concatenate_videos_ffmpeg(final_upscaled_clip_paths, final_video_path)
logger.info("Cleaning up temporary upscaled clip files...")
try:
shutil.rmtree(temp_upscaled_clips_dir)
concat_list_path = os.path.join(self.workspace_dir, "concat_list.txt")
if os.path.exists(concat_list_path): os.remove(concat_list_path)
except OSError as e:
logger.warning(f"Could not remove temporary upscaled clip directory: {e}")
logger.info(f"Latent upscaling complete! Final video at: {final_video_path}")
yield {"final_path": final_video_path}
def master_video_hd(self, source_video_path: str, model_version: str, steps: int, prompt: str, progress: gr.Progress):
logger.info(f"--- STARTING POST-PRODUCTION: HD Mastering with SeedVR {model_version} ---")
progress(0.1, desc=f"Preparing for HD Mastering with SeedVR {model_version}...")
run_timestamp = int(time.time())
output_path = os.path.join(self.workspace_dir, f"hd_mastered_movie_{model_version}_{run_timestamp}.mp4")
try:
final_path = hd_specialist_singleton.process_video(
input_video_path=source_video_path,
output_video_path=output_path,
prompt=prompt,
model_version=model_version,
steps=steps,
progress=progress
)
logger.info(f"HD Mastering complete! Final video at: {final_path}")
yield {"final_path": final_path}
except Exception as e:
logger.error(f"HD Mastering failed: {e}", exc_info=True)
raise gr.Error(f"HD Mastering failed. Details: {e}")
def generate_audio_for_final_video(self, source_video_path: str, audio_prompt: str, progress: gr.Progress):
logger.info(f"--- STARTING POST-PRODUCTION: Audio Generation ---")
progress(0.1, desc="Preparing for audio generation...")
run_timestamp = int(time.time())
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path],
capture_output=True, text=True, check=True)
duration = float(result.stdout.strip())
logger.info(f"Source video duration: {duration:.2f} seconds.")
progress(0.5, desc="Generating audio track...")
output_path = os.path.join(self.workspace_dir, f"final_movie_with_audio_{run_timestamp}.mp4")
final_path = audio_specialist_singleton.generate_audio_for_video(
video_path=source_video_path,
prompt=audio_prompt,
duration_seconds=duration,
output_path_override=output_path
)
logger.info(f"Audio generation complete! Final video with audio at: {final_path}")
progress(1.0, desc="Audio generation complete!")
yield {"final_path": final_path}
except Exception as e:
logger.error(f"Audio generation failed: {e}", exc_info=True)
raise gr.Error(f"Audio generation failed. Details: {e}")
def _generate_latent_tensor_internal(self, conditioning_items, ltx_params, target_resolution, total_frames_to_generate):
final_ltx_params = {**ltx_params, 'width': target_resolution[0], 'height': target_resolution[1], 'video_total_frames': total_frames_to_generate, 'video_fps': 24, 'current_fragment_index': int(time.time()), 'conditioning_items_data': conditioning_items}
return self.ltx_manager.generate_latent_fragment(**final_ltx_params)
def _quantize_to_multiple(self, n, m):
if m == 0: return n
quantized = int(round(n / m) * m)
return m if n > 0 and quantized == 0 else quantized