|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import time |
|
|
import imageio |
|
|
import numpy as np |
|
|
import torch |
|
|
import logging |
|
|
from PIL import Image, ImageOps |
|
|
import gc |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Dict, Any, Callable, Optional |
|
|
|
|
|
|
|
|
from ..types import LatentConditioningItem |
|
|
from ..managers.ltx_manager import ltx_manager_singleton |
|
|
from ..managers.vae_manager import vae_manager_singleton |
|
|
from .deformes2D_thinker import deformes2d_thinker_singleton |
|
|
from ..tools.video_encode_tool import video_encode_tool_singleton |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
ProgressCallback = Optional[Callable[[float, str], None]] |
|
|
|
|
|
class Deformes4DEngine: |
|
|
""" |
|
|
Orquestra a geração e concatenação de fragmentos de vídeo. |
|
|
""" |
|
|
def __init__(self): |
|
|
"""O construtor é leve e não recebe argumentos.""" |
|
|
self.workspace_dir: Optional[str] = None |
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
logger.info("Deformes4DEngine instanciado (não inicializado).") |
|
|
|
|
|
def initialize(self, workspace_dir: str): |
|
|
"""Inicializa o engenheiro com as configurações necessárias.""" |
|
|
if self.workspace_dir is not None: |
|
|
return |
|
|
self.workspace_dir = workspace_dir |
|
|
os.makedirs(self.workspace_dir, exist_ok=True) |
|
|
logger.info(f"Deformes4D Specialist (Executor) inicializado com workspace: {self.workspace_dir}.") |
|
|
|
|
|
def generate_original_movie( |
|
|
self, |
|
|
full_generation_state: Dict[str, Any], |
|
|
progress_callback: ProgressCallback = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Gera o filme principal lendo todos os parâmetros do estado de geração. |
|
|
""" |
|
|
if not self.workspace_dir: |
|
|
raise RuntimeError("Deformes4DEngine não foi inicializado. Chame o método initialize() antes de usar.") |
|
|
|
|
|
|
|
|
pre_prod_params = full_generation_state.get("parametros_geracao", {}).get("pre_producao", {}) |
|
|
prod_params = full_generation_state.get("parametros_geracao", {}).get("producao", {}) |
|
|
|
|
|
keyframes_data = full_generation_state.get("Keyframe_atos", []) |
|
|
global_prompt = full_generation_state.get("Promt_geral", "") |
|
|
storyboard = [ato["resumo_ato"] for ato in full_generation_state.get("Atos", [])] |
|
|
keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_data] |
|
|
|
|
|
seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0) |
|
|
video_resolution = pre_prod_params.get('resolution', 480) |
|
|
|
|
|
trim_percent = prod_params.get('trim_percent', 50) |
|
|
handler_strength = prod_params.get('handler_strength', 0.5) |
|
|
destination_convergence_strength = prod_params.get('destination_convergence_strength', 0.75) |
|
|
guidance_scale = prod_params.get('guidance_scale', 2.0) |
|
|
stg_scale = prod_params.get('stg_scale', 0.025) |
|
|
num_inference_steps = prod_params.get('inference_steps', 20) |
|
|
|
|
|
|
|
|
FPS = 24 |
|
|
FRAMES_PER_LATENT_CHUNK = 8 |
|
|
LATENT_PROCESSING_CHUNK_SIZE = 4 |
|
|
|
|
|
run_timestamp = int(time.time()) |
|
|
temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}") |
|
|
temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}") |
|
|
os.makedirs(temp_latent_dir, exist_ok=True) |
|
|
os.makedirs(temp_video_clips_dir, exist_ok=True) |
|
|
|
|
|
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK) |
|
|
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK) |
|
|
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK |
|
|
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0 |
|
|
DESTINATION_FRAME_TARGET = total_frames_brutos - 1 |
|
|
|
|
|
base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps} |
|
|
story_history = "" |
|
|
target_resolution_tuple = (video_resolution, video_resolution) |
|
|
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None |
|
|
latent_fragment_paths = [] |
|
|
video_fragments_data = [] |
|
|
|
|
|
if len(keyframe_paths) < 2: |
|
|
raise ValueError(f"A geração requer pelo menos 2 keyframes. Fornecidos: {len(keyframe_paths)}.") |
|
|
num_transitions_to_generate = len(keyframe_paths) - 1 |
|
|
|
|
|
logger.info("--- ESTÁGIO 1: Geração de Fragmentos Latentes ---") |
|
|
for i in range(num_transitions_to_generate): |
|
|
fragment_index = i + 1 |
|
|
if progress_callback: |
|
|
progress_fraction = (i / num_transitions_to_generate) * 0.7 |
|
|
progress_callback(progress_fraction, f"Gerando Latente {fragment_index}/{num_transitions_to_generate}") |
|
|
|
|
|
past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i] |
|
|
start_keyframe_path = keyframe_paths[i] |
|
|
destination_keyframe_path = keyframe_paths[i + 1] |
|
|
future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "A cena final." |
|
|
decision = deformes2d_thinker_singleton.get_cinematic_decision( |
|
|
global_prompt, story_history, past_keyframe_path, start_keyframe_path, |
|
|
destination_keyframe_path, storyboard[i - 1] if i > 0 else "O início.", |
|
|
storyboard[i], future_story_prompt |
|
|
) |
|
|
motion_prompt = decision["motion_prompt"] |
|
|
story_history += f"\n- Ato {fragment_index}: {motion_prompt}" |
|
|
|
|
|
conditioning_items = [] |
|
|
if eco_latent_for_next_loop is None: |
|
|
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple) |
|
|
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_start), 0, 1.0)) |
|
|
else: |
|
|
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0)) |
|
|
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength)) |
|
|
|
|
|
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple) |
|
|
conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength)) |
|
|
|
|
|
latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment( |
|
|
height=video_resolution, width=video_resolution, |
|
|
conditioning_items_data=conditioning_items, motion_prompt=motion_prompt, |
|
|
video_total_frames=total_frames_brutos, video_fps=FPS, |
|
|
**base_ltx_params |
|
|
) |
|
|
|
|
|
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone() |
|
|
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone() |
|
|
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone() |
|
|
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone() |
|
|
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache() |
|
|
|
|
|
cpu_latent = latents_video.cpu() |
|
|
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt") |
|
|
torch.save(cpu_latent, latent_path) |
|
|
latent_fragment_paths.append(latent_path) |
|
|
|
|
|
video_fragments_data.append({"id": i, "prompt_video": motion_prompt}) |
|
|
del latents_video, cpu_latent; gc.collect() |
|
|
|
|
|
del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache() |
|
|
|
|
|
logger.info(f"--- ESTÁGIO 2: Processando {len(latent_fragment_paths)} latentes ---") |
|
|
final_video_clip_paths = [] |
|
|
num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE) if LATENT_PROCESSING_CHUNK_SIZE > 0 else 0 |
|
|
for i in range(num_chunks): |
|
|
chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE |
|
|
chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE |
|
|
chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index] |
|
|
|
|
|
if progress_callback: |
|
|
progress_fraction = 0.7 + (i / num_chunks * 0.28) |
|
|
progress_callback(progress_fraction, f"Processando & Decodificando Lote {i+1}/{num_chunks}") |
|
|
|
|
|
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths] |
|
|
sub_group_latent = torch.cat(tensors_in_chunk, dim=2) |
|
|
del tensors_in_chunk; gc.collect(); torch.cuda.empty_cache() |
|
|
|
|
|
pixel_tensor = vae_manager_singleton.decode(sub_group_latent) |
|
|
del sub_group_latent; gc.collect(); torch.cuda.empty_cache() |
|
|
|
|
|
base_name = f"clip_{i:04d}_{run_timestamp}" |
|
|
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4") |
|
|
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS) |
|
|
final_video_clip_paths.append(current_clip_path) |
|
|
del pixel_tensor; gc.collect(); torch.cuda.empty_cache() |
|
|
|
|
|
if progress_callback: progress_callback(0.98, "Montando o filme final...") |
|
|
final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4") |
|
|
video_encode_tool_singleton.concatenate_videos(final_video_clip_paths, final_video_path, self.workspace_dir) |
|
|
|
|
|
try: |
|
|
shutil.rmtree(temp_video_clips_dir) |
|
|
|
|
|
except OSError as e: |
|
|
logger.warning(f"Não foi possível remover o diretório de clipes temporários: {e}") |
|
|
|
|
|
logger.info(f"Processo completo! Vídeo original salvo em: {final_video_path}") |
|
|
|
|
|
final_video_data_for_state = { |
|
|
"id": 0, "caminho_pixel": final_video_path, |
|
|
"caminhos_latentes_fragmentos": latent_fragment_paths, |
|
|
"fragmentos_componentes": video_fragments_data |
|
|
} |
|
|
|
|
|
return { |
|
|
"final_path": final_video_path, |
|
|
"latent_paths": latent_fragment_paths, |
|
|
"video_data": final_video_data_for_state |
|
|
} |
|
|
|
|
|
|
|
|
def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24): |
|
|
if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return |
|
|
video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0) |
|
|
video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0 |
|
|
video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8) |
|
|
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer: |
|
|
for frame in video_np: writer.append_data(frame) |
|
|
|
|
|
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image: |
|
|
if image.size != target_resolution: |
|
|
return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS) |
|
|
return image |
|
|
|
|
|
def _pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor: |
|
|
image_np = np.array(pil_image).astype(np.float32) / 255.0 |
|
|
tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2) |
|
|
tensor = (tensor * 2.0) - 1.0 |
|
|
return vae_manager_singleton.encode(tensor) |
|
|
|
|
|
def _quantize_to_multiple(self, n: int, m: int) -> int: |
|
|
if m == 0: return n |
|
|
quantized = int(round(n / m) * m) |
|
|
return m if n > 0 and quantized == 0 else quantized |