# aduc_framework/engineers/deformes4D.py # # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos # # Versão 3.1.1 (Com correção de limpeza de arquivos) # # Este engenheiro implementa a Câmera (Ψ) e o Destilador (Δ) da arquitetura # ADUC-SDR. Sua única responsabilidade é a geração sequencial de fragmentos de # vídeo com base em um conjunto de keyframes pré-definido. import os import time import imageio import numpy as np import torch import logging from PIL import Image, ImageOps import gc import shutil from pathlib import Path from typing import List, Tuple, Dict, Any, Callable, Optional # --- Imports Relativos Corrigidos --- from ..types import LatentConditioningItem from ..managers.ltx_manager import ltx_manager_singleton from ..managers.vae_manager import vae_manager_singleton from .deformes2D_thinker import deformes2d_thinker_singleton from ..tools.video_encode_tool import video_encode_tool_singleton logger = logging.getLogger(__name__) ProgressCallback = Optional[Callable[[float, str], None]] class Deformes4DEngine: """ Orquestra a geração e concatenação de fragmentos de vídeo. """ def __init__(self): """O construtor é leve e não recebe argumentos.""" self.workspace_dir: Optional[str] = None self.device = 'cuda' if torch.cuda.is_available() else 'cpu' logger.info("Deformes4DEngine instanciado (não inicializado).") def initialize(self, workspace_dir: str): """Inicializa o engenheiro com as configurações necessárias.""" if self.workspace_dir is not None: return # Evita reinicialização self.workspace_dir = workspace_dir os.makedirs(self.workspace_dir, exist_ok=True) logger.info(f"Deformes4D Specialist (Executor) inicializado com workspace: {self.workspace_dir}.") def generate_original_movie( self, full_generation_state: Dict[str, Any], progress_callback: ProgressCallback = None ) -> Dict[str, Any]: """ Gera o filme principal lendo todos os parâmetros do estado de geração. """ if not self.workspace_dir: raise RuntimeError("Deformes4DEngine não foi inicializado. Chame o método initialize() antes de usar.") # 1. Extrai todos os parâmetros do estado de geração pre_prod_params = full_generation_state.get("parametros_geracao", {}).get("pre_producao", {}) prod_params = full_generation_state.get("parametros_geracao", {}).get("producao", {}) keyframes_data = full_generation_state.get("Keyframe_atos", []) global_prompt = full_generation_state.get("Promt_geral", "") storyboard = [ato["resumo_ato"] for ato in full_generation_state.get("Atos", [])] keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_data] seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0) video_resolution = pre_prod_params.get('resolution', 480) trim_percent = prod_params.get('trim_percent', 50) handler_strength = prod_params.get('handler_strength', 0.5) destination_convergence_strength = prod_params.get('destination_convergence_strength', 0.75) guidance_scale = prod_params.get('guidance_scale', 2.0) stg_scale = prod_params.get('stg_scale', 0.025) num_inference_steps = prod_params.get('inference_steps', 20) # 2. Inicia o processo de geração FPS = 24 FRAMES_PER_LATENT_CHUNK = 8 LATENT_PROCESSING_CHUNK_SIZE = 4 run_timestamp = int(time.time()) temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}") temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}") os.makedirs(temp_latent_dir, exist_ok=True) os.makedirs(temp_video_clips_dir, exist_ok=True) total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK) frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK) latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0 DESTINATION_FRAME_TARGET = total_frames_brutos - 1 base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps} story_history = "" target_resolution_tuple = (video_resolution, video_resolution) eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None latent_fragment_paths = [] video_fragments_data = [] if len(keyframe_paths) < 2: raise ValueError(f"A geração requer pelo menos 2 keyframes. Fornecidos: {len(keyframe_paths)}.") num_transitions_to_generate = len(keyframe_paths) - 1 logger.info("--- ESTÁGIO 1: Geração de Fragmentos Latentes ---") for i in range(num_transitions_to_generate): fragment_index = i + 1 if progress_callback: progress_fraction = (i / num_transitions_to_generate) * 0.7 progress_callback(progress_fraction, f"Gerando Latente {fragment_index}/{num_transitions_to_generate}") past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i] start_keyframe_path = keyframe_paths[i] destination_keyframe_path = keyframe_paths[i + 1] future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "A cena final." decision = deformes2d_thinker_singleton.get_cinematic_decision( global_prompt, story_history, past_keyframe_path, start_keyframe_path, destination_keyframe_path, storyboard[i - 1] if i > 0 else "O início.", storyboard[i], future_story_prompt ) motion_prompt = decision["motion_prompt"] story_history += f"\n- Ato {fragment_index}: {motion_prompt}" conditioning_items = [] if eco_latent_for_next_loop is None: img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple) conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_start), 0, 1.0)) else: conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0)) conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength)) img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple) conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength)) latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment( height=video_resolution, width=video_resolution, conditioning_items_data=conditioning_items, motion_prompt=motion_prompt, video_total_frames=total_frames_brutos, video_fps=FPS, **base_ltx_params ) last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone() eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone() dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone() latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone() del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache() cpu_latent = latents_video.cpu() latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt") torch.save(cpu_latent, latent_path) latent_fragment_paths.append(latent_path) video_fragments_data.append({"id": i, "prompt_video": motion_prompt}) del latents_video, cpu_latent; gc.collect() del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache() logger.info(f"--- ESTÁGIO 2: Processando {len(latent_fragment_paths)} latentes ---") final_video_clip_paths = [] num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE) if LATENT_PROCESSING_CHUNK_SIZE > 0 else 0 for i in range(num_chunks): chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index] if progress_callback: progress_fraction = 0.7 + (i / num_chunks * 0.28) progress_callback(progress_fraction, f"Processando & Decodificando Lote {i+1}/{num_chunks}") tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths] sub_group_latent = torch.cat(tensors_in_chunk, dim=2) del tensors_in_chunk; gc.collect(); torch.cuda.empty_cache() pixel_tensor = vae_manager_singleton.decode(sub_group_latent) del sub_group_latent; gc.collect(); torch.cuda.empty_cache() base_name = f"clip_{i:04d}_{run_timestamp}" current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4") self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS) final_video_clip_paths.append(current_clip_path) del pixel_tensor; gc.collect(); torch.cuda.empty_cache() if progress_callback: progress_callback(0.98, "Montando o filme final...") final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4") video_encode_tool_singleton.concatenate_videos(final_video_clip_paths, final_video_path, self.workspace_dir) try: shutil.rmtree(temp_video_clips_dir) # A linha que apagava 'temp_latent_dir' foi removida para persistir os latentes. except OSError as e: logger.warning(f"Não foi possível remover o diretório de clipes temporários: {e}") logger.info(f"Processo completo! Vídeo original salvo em: {final_video_path}") final_video_data_for_state = { "id": 0, "caminho_pixel": final_video_path, "caminhos_latentes_fragmentos": latent_fragment_paths, "fragmentos_componentes": video_fragments_data } return { "final_path": final_video_path, "latent_paths": latent_fragment_paths, "video_data": final_video_data_for_state } # --- FUNÇÕES HELPER --- def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24): if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0) video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0 video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8) with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer: for frame in video_np: writer.append_data(frame) def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image: if image.size != target_resolution: return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS) return image def _pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor: image_np = np.array(pil_image).astype(np.float32) / 255.0 tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2) tensor = (tensor * 2.0) - 1.0 return vae_manager_singleton.encode(tensor) def _quantize_to_multiple(self, n: int, m: int) -> int: if m == 0: return n quantized = int(round(n / m) * m) return m if n > 0 and quantized == 0 else quantized