aducsdr's picture
Update aduc_framework/engineers/deformes4D.py
2f4cdcb verified
raw
history blame
12.8 kB
# aduc_framework/engineers/deformes4D.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 3.0.2 (Framework-Compliant com Lógica de Geração Restaurada)
#
# Este engenheiro implementa a Câmera (Ψ) e o Destilador (Δ) da arquitetura
# ADUC-SDR. Ele orquestra a geração sequencial de fragmentos de vídeo com base
# em um conjunto de keyframes pré-definido.
import os
import time
import imageio
import numpy as np
import torch
import logging
from PIL import Image, ImageOps
import subprocess
import gc
import shutil
from pathlib import Path
from typing import List, Tuple, Dict, Any, Callable, Optional
# --- Imports Relativos Corrigidos ---
from ..types import LatentConditioningItem
from ..managers.ltx_manager import ltx_manager_singleton
from ..managers.latent_enhancer_manager import latent_enhancer_specialist_singleton
from ..managers.vae_manager import vae_manager_singleton
from .deformes2D_thinker import deformes2d_thinker_singleton
from ..managers.seedvr_manager import seedvr_manager_singleton
from ..managers.mmaudio_manager import mmaudio_manager_singleton
from ..tools.video_encode_tool import video_encode_tool_singleton
logger = logging.getLogger(__name__)
ProgressCallback = Optional[Callable[[float, str], None]]
class Deformes4DEngine:
"""
Orquestra a geração, pós-produção latente e renderização final de fragmentos de vídeo.
"""
def __init__(self):
"""O construtor é leve e não recebe argumentos."""
self.workspace_dir: Optional[str] = None
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info("Deformes4DEngine instanciado (não inicializado).")
def initialize(self, workspace_dir: str):
"""Inicializa o engenheiro com as configurações necessárias."""
if self.workspace_dir is not None:
return # Evita reinicialização
self.workspace_dir = workspace_dir
os.makedirs(self.workspace_dir, exist_ok=True)
logger.info(f"Deformes4D Specialist (ADUC-SDR Executor) inicializado com workspace: {self.workspace_dir}.")
def generate_original_movie(
self,
full_generation_state: Dict[str, Any],
progress_callback: ProgressCallback = None
) -> Dict[str, Any]:
"""
Gera o filme principal lendo todos os parâmetros do estado de geração.
"""
if not self.workspace_dir:
raise RuntimeError("Deformes4DEngine não foi inicializado. Chame o método initialize() antes de usar.")
# 1. Extrai todos os parâmetros do estado de geração
pre_prod_params = full_generation_state.get("parametros_geracao", {}).get("pre_producao", {})
prod_params = full_generation_state.get("parametros_geracao", {}).get("producao", {})
keyframes_data = full_generation_state.get("Keyframe_atos", [])
global_prompt = full_generation_state.get("Promt_geral", "")
storyboard = [ato["resumo_ato"] for ato in full_generation_state.get("Atos", [])]
keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_data]
seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0)
video_resolution = pre_prod_params.get('resolution', 480)
trim_percent = prod_params.get('trim_percent', 50)
handler_strength = prod_params.get('handler_strength', 0.5)
destination_convergence_strength = prod_params.get('destination_convergence_strength', 0.75)
guidance_scale = prod_params.get('guidance_scale', 2.0)
stg_scale = prod_params.get('stg_scale', 0.025)
num_inference_steps = prod_params.get('inference_steps', 20)
# 2. Inicia o processo de geração
FPS = 24
FRAMES_PER_LATENT_CHUNK = 8
LATENT_PROCESSING_CHUNK_SIZE = 4
run_timestamp = int(time.time())
temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}")
temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}")
os.makedirs(temp_latent_dir, exist_ok=True)
os.makedirs(temp_video_clips_dir, exist_ok=True)
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
DESTINATION_FRAME_TARGET = total_frames_brutos - 1
base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps}
story_history = ""
target_resolution_tuple = (video_resolution, video_resolution)
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
latent_fragment_paths = []
video_fragments_data = []
if len(keyframe_paths) < 2:
raise ValueError(f"A geração requer pelo menos 2 keyframes. Fornecidos: {len(keyframe_paths)}.")
num_transitions_to_generate = len(keyframe_paths) - 1
logger.info("--- INICIANDO ESTÁGIO 1: Geração de Fragmentos Latentes ---")
for i in range(num_transitions_to_generate):
fragment_index = i + 1
if progress_callback:
progress_fraction = (i / num_transitions_to_generate) * 0.7 # Geração de latentes usa 70% do tempo
progress_callback(progress_fraction, f"Gerando Latente {fragment_index}/{num_transitions_to_generate}")
past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i]
start_keyframe_path = keyframe_paths[i]
destination_keyframe_path = keyframe_paths[i + 1]
future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "A cena final."
decision = deformes2d_thinker_singleton.get_cinematic_decision(
global_prompt, story_history, past_keyframe_path, start_keyframe_path,
destination_keyframe_path, storyboard[i - 1] if i > 0 else "O início.",
storyboard[i], future_story_prompt
)
motion_prompt = decision["motion_prompt"]
story_history += f"\n- Ato {fragment_index}: {motion_prompt}"
conditioning_items = []
if eco_latent_for_next_loop is None:
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_start), 0, 1.0))
else:
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength))
latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment(
height=video_resolution, width=video_resolution,
conditioning_items_data=conditioning_items, motion_prompt=motion_prompt,
video_total_frames=total_frames_brutos, video_fps=FPS,
**base_ltx_params
)
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
cpu_latent = latents_video.cpu()
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
torch.save(cpu_latent, latent_path)
latent_fragment_paths.append(latent_path)
video_fragments_data.append({"id": i, "prompt_video": motion_prompt})
del latents_video, cpu_latent; gc.collect()
del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache()
logger.info(f"--- INICIANDO ESTÁGIO 2: Processando {len(latent_fragment_paths)} latentes ---")
final_video_clip_paths = []
num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE) if LATENT_PROCESSING_CHUNK_SIZE > 0 else 0
for i in range(num_chunks):
chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE
chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE
chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index]
if progress_callback:
progress_fraction = 0.7 + (i / num_chunks * 0.28) # Decodificação usa 28% do tempo
progress_callback(progress_fraction, f"Processando & Decodificando Lote {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
tensors_para_concatenar = [frag[:, :, :-1, :, :] if j < len(tensors_in_chunk) - 1 else frag for j, frag in enumerate(tensors_in_chunk)]
sub_group_latent = torch.cat(tensors_para_concatenar, dim=2)
del tensors_in_chunk, tensors_para_concatenar; gc.collect(); torch.cuda.empty_cache()
pixel_tensor = vae_manager_singleton.decode(sub_group_latent)
del sub_group_latent; gc.collect(); torch.cuda.empty_cache()
base_name = f"clip_{i:04d}_{run_timestamp}"
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
final_video_clip_paths.append(current_clip_path)
del pixel_tensor; gc.collect(); torch.cuda.empty_cache()
if progress_callback: progress_callback(0.98, "Montando o filme final...")
final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4")
video_encode_tool_singleton.concatenate_videos(final_video_clip_paths, final_video_path, self.workspace_dir)
try:
shutil.rmtree(temp_video_clips_dir)
shutil.rmtree(temp_latent_dir)
except OSError as e:
logger.warning(f"Não foi possível remover diretórios temporários: {e}")
logger.info(f"Processo completo! Vídeo original salvo em: {final_video_path}")
# 3. Empacota os resultados para o Orchestrator
final_video_data_for_state = {
"id": 0,
"caminho_pixel": final_video_path,
"caminhos_latentes_fragmentos": latent_fragment_paths,
"fragmentos_componentes": video_fragments_data
}
return {
"final_path": final_video_path,
"latent_paths": latent_fragment_paths,
"video_data": final_video_data_for_state
}
def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24):
if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return
video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0)
video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0
video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
for frame in video_np: writer.append_data(frame)
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
if image.size != target_resolution:
return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS)
return image
def _pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor:
image_np = np.array(pil_image).astype(np.float32) / 255.0
tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2)
tensor = (tensor * 2.0) - 1.0
return vae_manager_singleton.encode(tensor)
def _quantize_to_multiple(self, n: int, m: int) -> int:
if m == 0: return n
quantized = int(round(n / m) * m)
return m if n > 0 and quantized == 0 else quantized