Test / api /ltx /ltx_aduc_orchestrator.py
eeuuia's picture
Upload 5 files
9a6b3d7 verified
raw
history blame
7.3 kB
# FILE: api/ltx_aduc_orchestrator.py
# DESCRIPTION: The main workflow orchestrator for the ADUC-SDR LTX suite.
# In this simplified architecture, it coordinates a single unified client (LtxAducPipeline)
# to execute the complete video generation pipeline from prompt to MP4.
import logging
import time
import yaml
import os
import sys
from PIL import Image
from typing import Optional, Dict, Union
# O Orquestrador importa o CLIENTE UNIFICADO que ele vai coordenar.
from api.ltx.ltx_aduc_pipeline import ltx_aduc_pipeline
# O Orquestrador importa as FERRAMENTAS de que precisa para as tarefas finais.
from tools.video_encode_tool import video_encode_tool_singleton
# Importa o Path para carregar a configuração.
from pathlib import Path
LTX_VIDEO_REPO_DIR = Path("/data/LTX-Video")
# ==============================================================================
# --- A CLASSE ORQUESTRADORA (Cérebro do Workflow) ---
# ==============================================================================
class LtxAducOrchestrator:
"""
Orquestra o fluxo de trabalho completo de geração de vídeo,
coordenando o cliente unificado LTX. É o ponto de entrada principal para a UI.
"""
def __init__(self):
"""
Inicializa o orquestrador, carregando a configuração base uma única vez.
"""
self.output_dir = "/app/output"
self.base_config = self._load_base_config()
logging.info("✅ LTX ADUC Orchestrator initialized and ready.")
def _load_base_config(self) -> Dict:
"""Carrega a configuração base do arquivo YAML, que contém os parâmetros padrão."""
try:
config_path = LTX_VIDEO_REPO_DIR / "configs" / "ltxv-13b-0.9.8-distilled-fp8.yaml"
with open(config_path, "r") as file:
return yaml.safe_load(file)
except Exception as e:
logging.error(f"Failed to load base config file. Orchestrator may not function correctly. Error: {e}")
return {}
def __call__(
self,
prompt: str,
initial_image: Optional[Image.Image] = None,
height: int = 432,
width: int = 768,
duration_in_seconds: float = 4.0,
ltx_configs: Optional[Dict] = None,
output_filename_base: str = "ltx_aduc_video"
) -> Optional[str]:
"""
Ponto de entrada principal do Orquestrador. Executa o pipeline completo de geração de vídeo.
Args:
prompt (str): O prompt de texto completo, onde cada nova linha é uma cena.
initial_image (Optional[Image.Image]): Imagem PIL para condicionar a primeira cena.
height (int): Altura do vídeo final.
width (int): Largura do vídeo final.
duration_in_seconds (float): Duração total desejada do vídeo.
ltx_configs (Optional[Dict]): Configurações avançadas da UI para sobrescrever os padrões.
output_filename_base (str): Nome base para o arquivo de vídeo de saída.
Returns:
Optional[str]: O caminho para o arquivo .mp4 gerado, ou None em caso de falha.
"""
t0 = time.time()
logging.info(f"Orchestrator starting new job for prompt: '{prompt.splitlines()[0]}...'")
try:
# =================================================================
# --- ETAPA 1: PREPARAÇÃO DOS INPUTS E CONFIGURAÇÕES ---
# =================================================================
prompt_list = [line.strip() for line in prompt.splitlines() if line.strip()]
if not prompt_list:
raise ValueError("O prompt está vazio ou não contém linhas válidas.")
initial_conditioning_items = []
if initial_image:
logging.info("Orchestrator delegating: create conditioning item.")
conditioning_params = [(0, 1.0)] # (frame_number, strength)
initial_conditioning_items = ltx_aduc_pipeline.encode_to_conditioning_items(
media_list=[initial_image],
params=conditioning_params,
resolution=(height, width)
)
common_ltx_args = self.base_config.get("first_pass", {}).copy()
common_ltx_args.update({
'negative_prompt': "blurry, low quality, bad anatomy, deformed",
'height': height,
'width': width
})
if ltx_configs:
common_ltx_args.update(ltx_configs)
# =================================================================
# --- ETAPA 2: DELEGAR GERAÇÃO DO VÍDEO LATENTE ---
# =================================================================
logging.info("Orchestrator delegating: generate latent video.")
final_latents, used_seed = ltx_aduc_pipeline.generate_latents(
prompt_list=prompt_list,
duration_in_seconds=duration_in_seconds,
common_ltx_args=common_ltx_args,
initial_conditioning_items=initial_conditioning_items
)
if final_latents is None:
raise RuntimeError("LTX client failed to generate a latent tensor.")
logging.info(f"Orchestrator received latent tensor with shape: {final_latents.shape}")
# =================================================================
# --- ETAPA 3: DELEGAR DECODIFICAÇÃO PARA PIXELS ---
# =================================================================
logging.info("Orchestrator delegating: decode latent to pixels.")
pixel_tensor = ltx_aduc_pipeline.decode_to_pixels(final_latents)
if pixel_tensor is None:
raise RuntimeError("LTX client failed to decode the latent tensor.")
logging.info(f"Orchestrator received pixel tensor with shape: {pixel_tensor.shape}")
# =================================================================
# --- ETAPA 4: TAREFA FINAL - CODIFICAR PARA MP4 ---
# =================================================================
video_filename = f"{output_filename_base}_{int(time.time())}_{used_seed}.mp4"
output_path = f"{self.output_dir}/{video_filename}"
logging.info(f"Orchestrator executing final task: saving tensor to MP4 at {output_path}")
video_encode_tool_singleton.save_video_from_tensor(
pixel_5d=pixel_tensor,
path=output_path,
fps=24
)
total_time = time.time() - t0
logging.info(f"🚀🚀🚀 Orchestrator job complete! Video saved to {output_path}. Total time: {total_time:.2f}s")
return output_path
except Exception:
logging.error("ORCHESTRATOR FAILED! A critical error occurred during the workflow.", exc_info=True)
return None
# ==============================================================================
# --- INSTÂNCIA SINGLETON DO ORQUESTRADOR ---
# ==============================================================================
ltx_aduc_orchestrator = LtxAducOrchestrator()