# ltx_server_clean_refactor.py — VideoService (Modular Version with Simple Overlap Chunking) # ============================================================================== # 0. CONFIGURAÇÃO DE AMBIENTE E IMPORTAÇÕES # ============================================================================== import os import sys import gc import yaml import time import json import random import shutil import warnings import tempfile import traceback import subprocess from pathlib import Path from typing import List, Dict, Optional, Tuple, Union import cv2 # --- Configurações de Logging e Avisos --- warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) from huggingface_hub import logging as hf_logging hf_logging.set_verbosity_error() # --- Importações de Bibliotecas de ML/Processamento --- import torch import torch.nn.functional as F import numpy as np from PIL import Image from einops import rearrange from huggingface_hub import hf_hub_download from safetensors import safe_open from managers.vae_manager import vae_manager_singleton from tools.video_encode_tool import video_encode_tool_singleton from api.aduc_ltx_latent_patch import LTXLatentConditioningPatch, PatchedConditioningItem # --- Constantes Globais --- LTXV_DEBUG = True # Mude para False para desativar logs de debug LTXV_FRAME_LOG_EVERY = 8 DEPS_DIR = Path("/data") LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video" RESULTS_DIR = Path("/app/output") DEFAULT_FPS = 24.0 # ============================================================================== # 1. SETUP E FUNÇÕES AUXILIARES DE AMBIENTE # ============================================================================== def _run_setup_script(): """Executa o script setup.py se o repositório LTX-Video não existir.""" setup_script_path = "setup.py" if not os.path.exists(setup_script_path): print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.") return print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...") try: subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True) print("[DEBUG] Script 'setup.py' concluído com sucesso.") except subprocess.CalledProcessError as e: print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}") sys.exit(1) def add_deps_to_path(repo_path: Path): """Adiciona o diretório do repositório ao sys.path para importações locais.""" resolved_path = str(repo_path.resolve()) if resolved_path not in sys.path: sys.path.insert(0, resolved_path) if LTXV_DEBUG: print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}") # --- Execução da configuração inicial --- if not LTX_VIDEO_REPO_DIR.exists(): _run_setup_script() add_deps_to_path(LTX_VIDEO_REPO_DIR) # --- Importações Dependentes do Path Adicionado --- from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier from ltx_video.models.transformers.transformer3d import Transformer3DModel from ltx_video.schedulers.rf import RectifiedFlowScheduler from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy import ltx_video.pipelines.crf_compressor as crf_compressor def create_latent_upsampler(latent_upsampler_model_path: str, device: str): latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path) latent_upsampler.to(device) latent_upsampler.eval() return latent_upsampler def create_ltx_video_pipeline( ckpt_path: str, precision: str, text_encoder_model_name_or_path: str, sampler: Optional[str] = None, device: Optional[str] = None, enhance_prompt: bool = False, prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None, prompt_enhancer_llm_model_name_or_path: Optional[str] = None, ) -> LTXVideoPipeline: ckpt_path = Path(ckpt_path) assert os.path.exists( ckpt_path ), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist" with safe_open(ckpt_path, framework="pt") as f: metadata = f.metadata() config_str = metadata.get("config") configs = json.loads(config_str) allowed_inference_steps = configs.get("allowed_inference_steps", None) vae = CausalVideoAutoencoder.from_pretrained(ckpt_path) transformer = Transformer3DModel.from_pretrained(ckpt_path) # Use constructor if sampler is specified, otherwise use from_pretrained if sampler == "from_checkpoint" or not sampler: scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path) else: scheduler = RectifiedFlowScheduler( sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic") ) text_encoder = T5EncoderModel.from_pretrained( text_encoder_model_name_or_path, subfolder="text_encoder" ) patchifier = SymmetricPatchifier(patch_size=1) tokenizer = T5Tokenizer.from_pretrained( text_encoder_model_name_or_path, subfolder="tokenizer" ) transformer = transformer.to(device) vae = vae.to(device) text_encoder = text_encoder.to(device) if enhance_prompt: prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained( prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True ) prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained( prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True ) prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained( prompt_enhancer_llm_model_name_or_path, torch_dtype="bfloat16", ) prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained( prompt_enhancer_llm_model_name_or_path, ) else: prompt_enhancer_image_caption_model = None prompt_enhancer_image_caption_processor = None prompt_enhancer_llm_model = None prompt_enhancer_llm_tokenizer = None vae = vae.to(torch.bfloat16) if precision == "bfloat16" and transformer.dtype != torch.bfloat16: transformer = transformer.to(torch.bfloat16) text_encoder = text_encoder.to(torch.bfloat16) # Use submodels for the pipeline submodel_dict = { "transformer": transformer, "patchifier": patchifier, "text_encoder": text_encoder, "tokenizer": tokenizer, "scheduler": scheduler, "vae": vae, "prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model, "prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor, "prompt_enhancer_llm_model": prompt_enhancer_llm_model, "prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer, "allowed_inference_steps": allowed_inference_steps, } pipeline = LTXVideoPipeline(**submodel_dict) LTXLatentConditioningPatch.apply() pipeline = pipeline.to(device) return pipeline # ============================================================================== # 2. FUNÇÕES AUXILIARES DE PROCESSAMENTO # ============================================================================== def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]: """Calcula o preenchimento para centralizar uma imagem em uma nova dimensão.""" pad_h = target_h - orig_h pad_w = target_w - orig_w pad_top = pad_h // 2 pad_bottom = pad_h - pad_top pad_left = pad_w // 2 pad_right = pad_w - pad_left return (pad_left, pad_right, pad_top, pad_bottom) def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"): """Exibe informações detalhadas sobre um tensor para depuração.""" if not isinstance(tensor, torch.Tensor): print(f"\n[INFO] '{name}' não é um tensor.") return print(f"\n--- Tensor Info: {name} ---") print(f" - Shape: {tuple(tensor.shape)}") print(f" - Dtype: {tensor.dtype}") print(f" - Device: {tensor.device}") if tensor.numel() > 0: try: print(f" - Stats: Min={tensor.min().item():.4f}, Max={tensor.max().item():.4f}, Mean={tensor.mean().item():.4f}") except RuntimeError: print(" - Stats: Não foi possível calcular (ex: tensores bool).") print("-" * 30) # ============================================================================== # 3. CLASSE PRINCIPAL DO SERVIÇO DE VÍDEO # ============================================================================== class VideoService: """ Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video. Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos passos (baixa resolução, upscale com denoise) e pós-processamento. """ def __init__(self): """Inicializa o serviço, carregando configurações e modelos.""" t0 = time.perf_counter() print("[INFO] Inicializando VideoService...") self.device = "cuda" if torch.cuda.is_available() else "cpu" self.config = self._load_config("ltxv-13b-0.9.8-distilled-fp8.yaml") self.pipeline, self.latent_upsampler = self._load_models_from_hub() self._move_models_to_device() self.runtime_autocast_dtype = self._get_precision_dtype() vae_manager_singleton.attach_pipeline( self.pipeline, device=self.device, autocast_dtype=self.runtime_autocast_dtype ) self._tmp_dirs = set() RESULTS_DIR.mkdir(exist_ok=True) print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s") # -------------------------------------------------------------------------- # --- Métodos Públicos (API do Serviço) --- # -------------------------------------------------------------------------- def _load_image_to_tensor_with_resize_and_crop( self, image_input: Union[str, Image.Image], target_height: int = 512, target_width: int = 768, just_crop: bool = False, ) -> torch.Tensor: """Load and process an image into a tensor. Args: image_input: Either a file path (str) or a PIL Image object target_height: Desired height of output tensor target_width: Desired width of output tensor just_crop: If True, only crop the image to the target size without resizing """ if isinstance(image_input, str): image = Image.open(image_input).convert("RGB") elif isinstance(image_input, Image.Image): image = image_input else: raise ValueError("image_input must be either a file path or a PIL Image object") input_width, input_height = image.size aspect_ratio_target = target_width / target_height aspect_ratio_frame = input_width / input_height if aspect_ratio_frame > aspect_ratio_target: new_width = int(input_height * aspect_ratio_target) new_height = input_height x_start = (input_width - new_width) // 2 y_start = 0 else: new_width = input_width new_height = int(input_width / aspect_ratio_target) x_start = 0 y_start = (input_height - new_height) // 2 image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height)) if not just_crop: image = image.resize((target_width, target_height)) image = np.array(image) image = cv2.GaussianBlur(image, (3, 3), 0) frame_tensor = torch.from_numpy(image).float() frame_tensor = crf_compressor.compress(frame_tensor / 255.0) * 255.0 frame_tensor = frame_tensor.permute(2, 0, 1) frame_tensor = (frame_tensor / 127.5) - 1.0 # Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width) return frame_tensor.unsqueeze(0).unsqueeze(2) # ADICIONE A FUNÇÃO ABAIXO @torch.no_grad() def _image_to_latents(self, image_input: Union[str, Image.Image], height: int, width: int) -> torch.Tensor: """ Converte uma imagem (caminho ou PIL) em um tensor de latentes 5D. Retorna: Tensor na forma [1, C_lat, 1, H_lat, W_lat] """ print(f"[DEBUG] Codificando imagem para latente ({height}x{width})...") # 1. Carrega a imagem e a transforma em um tensor de pixel 5D pixel_tensor = self._load_image_to_tensor_with_resize_and_crop( image_input, target_height=height, target_width=width ) pixel_tensor_gpu = pixel_tensor.to(self.device, dtype=self.pipeline.vae.dtype) # 2. Usa a VAE para codificar o tensor de pixel em um tensor de latentes with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): # O vae_encode da pipeline já lida com tensores 5D latents = self.pipeline.vae.encode(pixel_tensor_gpu).latent_dist.sample() # 3. Aplica o fator de escala (importante para consistência) if hasattr(self.pipeline.vae.config, "scaling_factor"): latents = latents * self.pipeline.vae.config.scaling_factor print(f"[DEBUG] Imagem codificada para latente com shape: {latents.shape}") return latents def _prepare_condition_items(self, items_list: List[Tuple], height: int, width: int) -> List[PatchedConditioningItem]: """ Prepara os itens de condicionamento. Recebe uma lista [Imagem, frame, peso], converte a Imagem para LATENTE e cria uma lista de PatchedConditioningItem com o tensor em `latents`. """ if not items_list: return [] conditioning_items = [] for media_input, frame_idx, weight in items_list: # 1. USA A NOVA FUNÇÃO PARA OBTER O TENSOR DE LATENTES DIRETAMENTE latent_tensor = self._image_to_latents(media_input, height, width) safe_frame_idx = int(frame_idx) # 2. CRIA O PatchedConditioningItem COM O CAMPO `latents` PREENCHIDO item = PatchedConditioningItem( media_frame_number=safe_frame_idx, conditioning_strength=float(weight), media_item=None, # Importante: media_item é None latents=latent_tensor # O latente pré-calculado vai aqui! ) conditioning_items.append(item) print(f"[INFO] Preparados {len(conditioning_items)} itens de condicionamento com latentes pré-codificados.") return conditioning_items def generate_low_resolution( self, prompt: str, negative_prompt: str, height: int, width: int, duration_secs: float, guidance_scale: float, seed: Optional[int] = None, conditioning_items: Optional[List[PatchedConditioningItem]] = None ) -> Tuple[str, str, int]: """ ETAPA 1: Gera um vídeo e latentes em resolução base a partir de um prompt e condicionamentos opcionais. """ print("[INFO] Iniciando ETAPA 1: Geração de Baixa Resolução...") # --- Configuração de Seed e Diretórios --- used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) #seed_everything(used_seed) print(f" - Usando Seed: {used_seed}") temp_dir = tempfile.mkdtemp(prefix="ltxv_low_") self._register_tmp_dir(temp_dir) results_dir = "/app/output" os.makedirs(results_dir, exist_ok=True) # --- Cálculo de Dimensões e Frames --- actual_num_frames = int(round(duration_secs * DEFAULT_FPS)) downscaled_height = height downscaled_width = width #self._calculate_downscaled_dims(height, width) print(f" - Frames: {actual_num_frames}, Duração: {duration_secs}s") print(f" - Dimensões de Saída: {downscaled_height}x{downscaled_width}") # --- Execução da Pipeline --- with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): first_pass_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": downscaled_height, "width": downscaled_width, "num_frames": (actual_num_frames//8)+1, "frame_rate": int(DEFAULT_FPS), "generator": torch.Generator(device=self.device).manual_seed(used_seed), "output_type": "latent", "vae_per_channel_normalize": True, "is_video": True, "conditioning_items": conditioning_items, "guidance_scale": float(guidance_scale), **(self.config.get("first_pass", {})) } print(" - Enviando para a pipeline LTX...") latents = self.pipeline(**first_pass_kwargs).images print(f" - Latentes gerados com shape: {latents.shape}") # Decodifica os latentes para pixels para criar o vídeo de preview pixel_tensor = vae_manager_singleton.decode(latents, decode_timestep=float(self.config.get("decode_timestep", 0.05))) tensor_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed) final_video_path = self._save_video_from_tensor(pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=DEFAULT_FPS) return final_video_path # --- Limpeza --- self._finalize() print("[SUCCESS] ETAPA 1 Concluída.") return final_video_path, tensor_path, used_seed # -------------------------------------------------------------------------- # --- Métodos Internos e Auxiliares --- # -------------------------------------------------------------------------- def _finalize(self): """Limpa a memória da GPU e os diretórios temporários.""" if LTXV_DEBUG: print("[DEBUG] Finalize: iniciando limpeza...") gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() # Limpa todos os diretórios temporários registrados for d in list(self._tmp_dirs): shutil.rmtree(d, ignore_errors=True) self._tmp_dirs.remove(d) if LTXV_DEBUG: print(f"[DEBUG] Diretório temporário removido: {d}") def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str: """Salva um tensor de latentes em um arquivo .pt.""" latents_cpu = latents_tensor.detach().to("cpu") tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt" torch.save(latents_cpu, tensor_path) if LTXV_DEBUG: print(f"[DEBUG] Latentes salvos em: {tensor_path}") return str(tensor_path) def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str: """Salva um tensor de pixels como um arquivo de vídeo MP4.""" temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4") video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=DEFAULT_FPS) final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4" shutil.move(temp_path, final_path) print(f"[INFO] Vídeo final salvo em: {final_path}") return str(final_path) def _load_config(self, config_filename: str) -> Dict: """Carrega o arquivo de configuração YAML.""" config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename print(f"[INFO] Carregando configuração de: {config_path}") with open(config_path, "r") as file: return yaml.safe_load(file) def _load_models_from_hub(self): """Baixa e cria as instâncias da pipeline e do upsampler.""" t0 = time.perf_counter() LTX_REPO = "Lightricks/LTX-Video" print("[INFO] Baixando checkpoint principal...") self.config["checkpoint_path"] = hf_hub_download( repo_id=LTX_REPO, filename=self.config["checkpoint_path"], token=os.getenv("HF_TOKEN") ) print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}") print("[INFO] Construindo pipeline...") pipeline = create_ltx_video_pipeline( ckpt_path=self.config["checkpoint_path"], precision=self.config["precision"], text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"], sampler=self.config["sampler"], device="cpu", # Carrega em CPU primeiro enhance_prompt=False ) print("[INFO] Pipeline construída.") latent_upsampler = None if self.config.get("spatial_upscaler_model_path"): print("[INFO] Baixando upscaler espacial...") self.config["spatial_upscaler_model_path"] = hf_hub_download( repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"], token=os.getenv("HF_TOKEN") ) print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}") print("[INFO] Construindo latent_upsampler...") latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu") print("[INFO] Latent upsampler construído.") print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s") return pipeline, latent_upsampler def _move_models_to_device(self): """Move os modelos carregados para o dispositivo de computação (GPU/CPU).""" print(f"[INFO] Movendo modelos para o dispositivo: {self.device}") self.pipeline.to(self.device) if self.latent_upsampler: self.latent_upsampler.to(self.device) def _get_precision_dtype(self) -> torch.dtype: """Determina o dtype para autocast com base na configuração de precisão.""" prec = str(self.config.get("precision", "")).lower() if prec in ["float8_e4m3fn", "bfloat16"]: return torch.bfloat16 elif prec == "mixed_precision": return torch.float16 return torch.float32 @torch.no_grad() def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor: """Aplica o upsample espacial e o filtro AdaIN aos latentes.""" if not self.latent_upsampler: raise ValueError("Latent Upsampler não está carregado para a operação de upscale.") latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True) upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized) upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True) # Filtro AdaIN para manter consistência de cor/estilo com o vídeo de baixa resolução return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents) def _prepare_conditioning_tensor_from_path(self, filepath: str, height: int, width: int, padding: Tuple) -> torch.Tensor: """Carrega uma imagem, redimensiona, aplica padding e move para o dispositivo.""" tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width) tensor = F.pad(tensor, padding) return tensor.to(self.device, dtype=self.runtime_autocast_dtype) def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]: """Calcula as dimensões para o primeiro passo (baixa resolução).""" height_padded = ((height - 1) // 8 + 1) * 8 width_padded = ((width - 1) // 8 + 1) * 8 downscale_factor = self.config.get("downscale_factor", 0.6666666) vae_scale_factor = self.pipeline.vae_scale_factor target_w = int(width_padded * downscale_factor) downscaled_width = target_w - (target_w % vae_scale_factor) target_h = int(height_padded * downscale_factor) downscaled_height = target_h - (target_h % vae_scale_factor) return downscaled_height, downscaled_width def _seed_everething(self, seed: int): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) if torch.backends.mps.is_available(): torch.mps.manual_seed(seed) def _register_tmp_dir(self, dir_path: str): """Registra um diretório temporário para limpeza posterior.""" if dir_path and os.path.isdir(dir_path): self._tmp_dirs.add(dir_path) if LTXV_DEBUG: print(f"[DEBUG] Diretório temporário registrado: {dir_path}") # ============================================================================== # 4. INSTANCIAÇÃO E PONTO DE ENTRADA (Exemplo) # ============================================================================== print("Criando instância do VideoService. O carregamento do modelo começará agora...") video_generation_service = VideoService() print("Instância do VideoService pronta para uso.")