# FILE: api/ltx_aduc_orchestrator.py # DESCRIPTION: The main workflow orchestrator for the ADUC-SDR LTX suite. # In this simplified architecture, it coordinates a single unified client (LtxAducPipeline) # to execute the complete video generation pipeline from prompt to MP4. import logging import time import yaml import os import sys from PIL import Image from typing import Optional, Dict, Union # O Orquestrador importa o CLIENTE UNIFICADO que ele vai coordenar. from api.ltx.ltx_aduc_pipeline import ltx_aduc_pipeline # O Orquestrador importa as FERRAMENTAS de que precisa para as tarefas finais. from tools.video_encode_tool import video_encode_tool_singleton # Importa o Path para carregar a configuração. from pathlib import Path LTX_VIDEO_REPO_DIR = Path("/data/LTX-Video") # ============================================================================== # --- A CLASSE ORQUESTRADORA (Cérebro do Workflow) --- # ============================================================================== class LtxAducOrchestrator: """ Orquestra o fluxo de trabalho completo de geração de vídeo, coordenando o cliente unificado LTX. É o ponto de entrada principal para a UI. """ def __init__(self): """ Inicializa o orquestrador, carregando a configuração base uma única vez. """ self.output_dir = "/app/output" self.base_config = self._load_base_config() logging.info("✅ LTX ADUC Orchestrator initialized and ready.") def _load_base_config(self) -> Dict: """Carrega a configuração base do arquivo YAML, que contém os parâmetros padrão.""" try: config_path = LTX_VIDEO_REPO_DIR / "configs" / "ltxv-13b-0.9.8-distilled-fp8.yaml" with open(config_path, "r") as file: return yaml.safe_load(file) except Exception as e: logging.error(f"Failed to load base config file. Orchestrator may not function correctly. Error: {e}") return {} def __call__( self, prompt: str, initial_image: Optional[Image.Image] = None, height: int = 432, width: int = 768, duration_in_seconds: float = 4.0, ltx_configs: Optional[Dict] = None, output_filename_base: str = "ltx_aduc_video" ) -> Optional[str]: """ Ponto de entrada principal do Orquestrador. Executa o pipeline completo de geração de vídeo. Args: prompt (str): O prompt de texto completo, onde cada nova linha é uma cena. initial_image (Optional[Image.Image]): Imagem PIL para condicionar a primeira cena. height (int): Altura do vídeo final. width (int): Largura do vídeo final. duration_in_seconds (float): Duração total desejada do vídeo. ltx_configs (Optional[Dict]): Configurações avançadas da UI para sobrescrever os padrões. output_filename_base (str): Nome base para o arquivo de vídeo de saída. Returns: Optional[str]: O caminho para o arquivo .mp4 gerado, ou None em caso de falha. """ t0 = time.time() logging.info(f"Orchestrator starting new job for prompt: '{prompt.splitlines()[0]}...'") try: # ================================================================= # --- ETAPA 1: PREPARAÇÃO DOS INPUTS E CONFIGURAÇÕES --- # ================================================================= prompt_list = [line.strip() for line in prompt.splitlines() if line.strip()] if not prompt_list: raise ValueError("O prompt está vazio ou não contém linhas válidas.") initial_conditioning_items = [] if initial_image: logging.info("Orchestrator delegating: create conditioning item.") conditioning_params = [(0, 1.0)] # (frame_number, strength) initial_conditioning_items = ltx_aduc_pipeline.encode_to_conditioning_items( media_list=[initial_image], params=conditioning_params, resolution=(height, width) ) common_ltx_args = self.base_config.get("first_pass", {}).copy() common_ltx_args.update({ 'negative_prompt': "blurry, low quality, bad anatomy, deformed", 'height': height, 'width': width }) if ltx_configs: common_ltx_args.update(ltx_configs) # ================================================================= # --- ETAPA 2: DELEGAR GERAÇÃO DO VÍDEO LATENTE --- # ================================================================= logging.info("Orchestrator delegating: generate latent video.") final_latents, used_seed = ltx_aduc_pipeline.generate_latents( prompt_list=prompt_list, duration_in_seconds=duration_in_seconds, common_ltx_args=common_ltx_args, initial_conditioning_items=initial_conditioning_items ) if final_latents is None: raise RuntimeError("LTX client failed to generate a latent tensor.") logging.info(f"Orchestrator received latent tensor with shape: {final_latents.shape}") # ================================================================= # --- ETAPA 3: DELEGAR DECODIFICAÇÃO PARA PIXELS --- # ================================================================= logging.info("Orchestrator delegating: decode latent to pixels.") pixel_tensor = ltx_aduc_pipeline.decode_to_pixels(final_latents) if pixel_tensor is None: raise RuntimeError("LTX client failed to decode the latent tensor.") logging.info(f"Orchestrator received pixel tensor with shape: {pixel_tensor.shape}") # ================================================================= # --- ETAPA 4: TAREFA FINAL - CODIFICAR PARA MP4 --- # ================================================================= video_filename = f"{output_filename_base}_{int(time.time())}_{used_seed}.mp4" output_path = f"{self.output_dir}/{video_filename}" logging.info(f"Orchestrator executing final task: saving tensor to MP4 at {output_path}") video_encode_tool_singleton.save_video_from_tensor( pixel_5d=pixel_tensor, path=output_path, fps=24 ) total_time = time.time() - t0 logging.info(f"🚀🚀🚀 Orchestrator job complete! Video saved to {output_path}. Total time: {total_time:.2f}s") return output_path except Exception: logging.error("ORCHESTRATOR FAILED! A critical error occurred during the workflow.", exc_info=True) return None # ============================================================================== # --- INSTÂNCIA SINGLETON DO ORQUESTRADOR --- # ============================================================================== ltx_aduc_orchestrator = LtxAducOrchestrator()