aducsdr's picture
Update aduc_framework/engineers/deformes4D.py
41d9d43 verified
# aduc_framework/engineers/deformes4D.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 3.1.1 (Com correção de limpeza de arquivos)
#
# Este engenheiro implementa a Câmera (Ψ) e o Destilador (Δ) da arquitetura
# ADUC-SDR. Sua única responsabilidade é a geração sequencial de fragmentos de
# vídeo com base em um conjunto de keyframes pré-definido.
import os
import time
import imageio
import numpy as np
import torch
import logging
from PIL import Image, ImageOps
import gc
import shutil
from pathlib import Path
from typing import List, Tuple, Dict, Any, Callable, Optional
# --- Imports Relativos Corrigidos ---
from ..types import LatentConditioningItem
from ..managers.ltx_manager import ltx_manager_singleton
from ..managers.vae_manager import vae_manager_singleton
from .deformes2D_thinker import deformes2d_thinker_singleton
from ..tools.video_encode_tool import video_encode_tool_singleton
logger = logging.getLogger(__name__)
ProgressCallback = Optional[Callable[[float, str], None]]
class Deformes4DEngine:
"""
Orquestra a geração e concatenação de fragmentos de vídeo.
"""
def __init__(self):
"""O construtor é leve e não recebe argumentos."""
self.workspace_dir: Optional[str] = None
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info("Deformes4DEngine instanciado (não inicializado).")
def initialize(self, workspace_dir: str):
"""Inicializa o engenheiro com as configurações necessárias."""
if self.workspace_dir is not None:
return # Evita reinicialização
self.workspace_dir = workspace_dir
os.makedirs(self.workspace_dir, exist_ok=True)
logger.info(f"Deformes4D Specialist (Executor) inicializado com workspace: {self.workspace_dir}.")
def generate_original_movie(
self,
full_generation_state: Dict[str, Any],
progress_callback: ProgressCallback = None
) -> Dict[str, Any]:
"""
Gera o filme principal lendo todos os parâmetros do estado de geração.
"""
if not self.workspace_dir:
raise RuntimeError("Deformes4DEngine não foi inicializado. Chame o método initialize() antes de usar.")
# 1. Extrai todos os parâmetros do estado de geração
pre_prod_params = full_generation_state.get("parametros_geracao", {}).get("pre_producao", {})
prod_params = full_generation_state.get("parametros_geracao", {}).get("producao", {})
keyframes_data = full_generation_state.get("Keyframe_atos", [])
global_prompt = full_generation_state.get("Promt_geral", "")
storyboard = [ato["resumo_ato"] for ato in full_generation_state.get("Atos", [])]
keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_data]
seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0)
video_resolution = pre_prod_params.get('resolution', 480)
trim_percent = prod_params.get('trim_percent', 50)
handler_strength = prod_params.get('handler_strength', 0.5)
destination_convergence_strength = prod_params.get('destination_convergence_strength', 0.75)
guidance_scale = prod_params.get('guidance_scale', 2.0)
stg_scale = prod_params.get('stg_scale', 0.025)
num_inference_steps = prod_params.get('inference_steps', 20)
# 2. Inicia o processo de geração
FPS = 24
FRAMES_PER_LATENT_CHUNK = 8
LATENT_PROCESSING_CHUNK_SIZE = 4
run_timestamp = int(time.time())
temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}")
temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}")
os.makedirs(temp_latent_dir, exist_ok=True)
os.makedirs(temp_video_clips_dir, exist_ok=True)
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
DESTINATION_FRAME_TARGET = total_frames_brutos - 1
base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps}
story_history = ""
target_resolution_tuple = (video_resolution, video_resolution)
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
latent_fragment_paths = []
video_fragments_data = []
if len(keyframe_paths) < 2:
raise ValueError(f"A geração requer pelo menos 2 keyframes. Fornecidos: {len(keyframe_paths)}.")
num_transitions_to_generate = len(keyframe_paths) - 1
logger.info("--- ESTÁGIO 1: Geração de Fragmentos Latentes ---")
for i in range(num_transitions_to_generate):
fragment_index = i + 1
if progress_callback:
progress_fraction = (i / num_transitions_to_generate) * 0.7
progress_callback(progress_fraction, f"Gerando Latente {fragment_index}/{num_transitions_to_generate}")
past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i]
start_keyframe_path = keyframe_paths[i]
destination_keyframe_path = keyframe_paths[i + 1]
future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "A cena final."
decision = deformes2d_thinker_singleton.get_cinematic_decision(
global_prompt, story_history, past_keyframe_path, start_keyframe_path,
destination_keyframe_path, storyboard[i - 1] if i > 0 else "O início.",
storyboard[i], future_story_prompt
)
motion_prompt = decision["motion_prompt"]
story_history += f"\n- Ato {fragment_index}: {motion_prompt}"
conditioning_items = []
if eco_latent_for_next_loop is None:
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_start), 0, 1.0))
else:
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength))
latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment(
height=video_resolution, width=video_resolution,
conditioning_items_data=conditioning_items, motion_prompt=motion_prompt,
video_total_frames=total_frames_brutos, video_fps=FPS,
**base_ltx_params
)
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
cpu_latent = latents_video.cpu()
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
torch.save(cpu_latent, latent_path)
latent_fragment_paths.append(latent_path)
video_fragments_data.append({"id": i, "prompt_video": motion_prompt})
del latents_video, cpu_latent; gc.collect()
del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache()
logger.info(f"--- ESTÁGIO 2: Processando {len(latent_fragment_paths)} latentes ---")
final_video_clip_paths = []
num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE) if LATENT_PROCESSING_CHUNK_SIZE > 0 else 0
for i in range(num_chunks):
chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE
chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE
chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index]
if progress_callback:
progress_fraction = 0.7 + (i / num_chunks * 0.28)
progress_callback(progress_fraction, f"Processando & Decodificando Lote {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
sub_group_latent = torch.cat(tensors_in_chunk, dim=2)
del tensors_in_chunk; gc.collect(); torch.cuda.empty_cache()
pixel_tensor = vae_manager_singleton.decode(sub_group_latent)
del sub_group_latent; gc.collect(); torch.cuda.empty_cache()
base_name = f"clip_{i:04d}_{run_timestamp}"
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
final_video_clip_paths.append(current_clip_path)
del pixel_tensor; gc.collect(); torch.cuda.empty_cache()
if progress_callback: progress_callback(0.98, "Montando o filme final...")
final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4")
video_encode_tool_singleton.concatenate_videos(final_video_clip_paths, final_video_path, self.workspace_dir)
try:
shutil.rmtree(temp_video_clips_dir)
# A linha que apagava 'temp_latent_dir' foi removida para persistir os latentes.
except OSError as e:
logger.warning(f"Não foi possível remover o diretório de clipes temporários: {e}")
logger.info(f"Processo completo! Vídeo original salvo em: {final_video_path}")
final_video_data_for_state = {
"id": 0, "caminho_pixel": final_video_path,
"caminhos_latentes_fragmentos": latent_fragment_paths,
"fragmentos_componentes": video_fragments_data
}
return {
"final_path": final_video_path,
"latent_paths": latent_fragment_paths,
"video_data": final_video_data_for_state
}
# --- FUNÇÕES HELPER ---
def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24):
if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return
video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0)
video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0
video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
for frame in video_np: writer.append_data(frame)
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
if image.size != target_resolution:
return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS)
return image
def _pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor:
image_np = np.array(pil_image).astype(np.float32) / 255.0
tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2)
tensor = (tensor * 2.0) - 1.0
return vae_manager_singleton.encode(tensor)
def _quantize_to_multiple(self, n: int, m: int) -> int:
if m == 0: return n
quantized = int(round(n / m) * m)
return m if n > 0 and quantized == 0 else quantized