# FILE: api/ltx_server_refactored_complete.py # DESCRIPTION: Final backend service for LTX-Video generation. # Features dedicated VAE device logic, robust initialization, and narrative chunking. import gc import io import json import logging import os import random import shutil import subprocess import sys import tempfile import time import traceback import warnings from pathlib import Path from typing import Dict, List, Optional, Tuple, Union import torch import yaml import numpy as np from einops import rearrange from huggingface_hub import hf_hub_download from transformers import T5EncoderModel, T5Tokenizer from safetensors import safe_open from managers.gpu_manager import gpu_manager from managers.vae_manager import vae_manager_singleton from transformers import T5EncoderModel, T5Tokenizer from safetensors import safe_open from managers.gpu_manager import gpu_manager from PIL import Image # ============================================================================== # --- INITIAL SETUP & CONFIGURATION --- # ============================================================================== from utils.debug_utils import log_function_io warnings.filterwarnings("ignore") logging.getLogger("huggingface_hub").setLevel(logging.ERROR) logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') # --- CONSTANTS --- CACHE_DIR = os.environ.get("HF_HOME") DEPS_DIR = Path("/data") LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video" BASE_CONFIG_PATH = LTX_VIDEO_REPO_DIR / "configs" DEFAULT_CONFIG_FILE = BASE_CONFIG_PATH / "ltxv-13b-0.9.8-dev-fp8.yaml" LTX_REPO_ID = "Lightricks/LTX-Video" RESULTS_DIR = Path("/app/output") DEFAULT_FPS = 24.0 FRAMES_ALIGNMENT = 8 # --- CRITICAL: DEPENDENCY PATH INJECTION --- repo_path = str(LTX_VIDEO_REPO_DIR.resolve()) if repo_path not in sys.path: sys.path.insert(0, repo_path) logging.info(f"LTX-Video repository added to sys.path: {repo_path}") from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline # E outros... from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from ltx_video.models.transformers.transformer3d import Transformer3DModel from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier from ltx_video.schedulers.rf import RectifiedFlowScheduler from ltx_video.models.autoencoders.vae_encode import (normalize_latents, un_normalize_latents) from ltx_video.pipelines.pipeline_ltx_video import (ConditioningItem, LTXMultiScalePipeline, adain_filter_latent) from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy # ============================================================================== # --- UTILITY & HELPER FUNCTIONS --- # ============================================================================== def seed_everything(seed: int): """Sets the seed for reproducibility.""" random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]: """Calculates symmetric padding values.""" pad_h = target_h - orig_h pad_w = target_w - orig_w pad_top = pad_h // 2 pad_bottom = pad_h - pad_top pad_left = pad_w // 2 pad_right = pad_w - pad_left return (pad_left, pad_right, pad_top, pad_bottom) def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"): """Logs detailed debug information about a PyTorch tensor.""" if not isinstance(tensor, torch.Tensor): logging.debug(f"'{name}' is not a tensor.") return info_str = ( f"--- Tensor: {name} ---\n" f" - Shape: {tuple(tensor.shape)}\n" f" - Dtype: {tensor.dtype}\n" f" - Device: {tensor.device}\n" ) if tensor.numel() > 0: try: info_str += ( f" - Min: {tensor.min().item():.4f} | " f"Max: {tensor.max().item():.4f} | " f"Mean: {tensor.mean().item():.4f}\n" ) except Exception: pass # Fails on some dtypes logging.debug(info_str + "----------------------") # (O resto das importações e configurações iniciais permanecem as mesmas) import logging warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", message=".*") from huggingface_hub import logging as ll ll.set_verbosity_error() ll.set_verbosity_warning() ll.set_verbosity_info() ll.set_verbosity_debug() logger = logging.getLogger("AducDebug") logging.basicConfig(level=logging.DEBUG) logger.setLevel(logging.DEBUG) class LtxAducPipeline: def __init__(self): """Initializes the service with dedicated GPU logic for main pipeline and VAE.""" t0 = time.perf_counter() logging.info("Initializing VideoService...") RESULTS_DIR.mkdir(parents=True, exist_ok=True) target_main_device_str = str(gpu_manager.get_ltx_device()) target_vae_device_str = str(gpu_manager.get_ltx_vae_device()) logging.info(f"LTX allocated to devices: Main='{target_main_device_str}', VAE='{target_vae_device_str}'") self.config = self._load_config() self.pipeline, self.latent_upsampler = self._load_models() self.main_device = torch.device("cpu") self.vae_device = torch.device("cpu") self.move_to_device(main_device_str=target_main_device_str, vae_device_str=target_vae_device_str) self._temp_files = [] self._apply_precision_policy() vae_manager_singleton.attach_pipeline( self.pipeline, device=self.vae_device, autocast_dtype=self.runtime_autocast_dtype ) self._tmp_dirs = set() logging.info(f"VideoService ready. Startup time: {time.perf_counter()-t0:.2f}s") # ========================================================================== # --- LIFECYCLE & MODEL MANAGEMENT --- # ========================================================================== def _create_latent_upsampler(self, latent_upsampler_model_path: str, device: str) -> LatentUpsampler: """Loads the Latent Upsampler model from a checkpoint path.""" logging.info(f"Loading Latent Upsampler from: {latent_upsampler_model_path} to device: {device}") latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path) latent_upsampler.to(device) latent_upsampler.eval() return latent_upsampler def _load_config(self) -> Dict: """Loads the YAML configuration file.""" config_path = DEFAULT_CONFIG_FILE logging.info(f"Loading config from: {config_path}") with open(config_path, "r") as file: return yaml.safe_load(file) def _load_models(self) -> Tuple[LTXVideoPipeline, Optional[torch.nn.Module]]: """ Carrega todos os sub-modelos do pipeline na CPU. Esta função substitui a necessidade de chamar a `create_ltx_video_pipeline` externa, dando-nos controle total sobre o processo. """ t0 = time.perf_counter() logging.info("Carregando sub-modelos do LTX para a CPU...") ckpt_path_str = hf_hub_download(repo_id=LTX_REPO_ID, filename=self.config["checkpoint_path"], cache_dir=CACHE_DIR) ckpt_path = Path(ckpt_path_str) if not ckpt_path.is_file(): raise FileNotFoundError(f"Main checkpoint file not found: {ckpt_path}") # 1. Carrega Metadados do Checkpoint with safe_open(ckpt_path, framework="pt") as f: metadata = f.metadata() or {} config_str = metadata.get("config", "{}") configs = json.loads(config_str) allowed_inference_steps = configs.get("allowed_inference_steps") # 2. Carrega os Componentes Individuais (todos na CPU) # O `.from_pretrained(ckpt_path)` é inteligente e carrega os pesos corretos do arquivo .safetensors. logging.info("Carregando VAE...") vae = CausalVideoAutoencoder.from_pretrained(ckpt_path).to("cpu") logging.info("Carregando Transformer...") transformer = Transformer3DModel.from_pretrained(ckpt_path).to("cpu") logging.info("Carregando Scheduler...") scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path) logging.info("Carregando Text Encoder e Tokenizer...") text_encoder_path = self.config["text_encoder_model_name_or_path"] text_encoder = T5EncoderModel.from_pretrained(text_encoder_path, subfolder="text_encoder").to("cpu") tokenizer = T5Tokenizer.from_pretrained(text_encoder_path, subfolder="tokenizer") patchifier = SymmetricPatchifier(patch_size=1) # 3. Define a precisão dos modelos (ainda na CPU, será aplicado na GPU depois) precision = self.config.get("precision", "bfloat16") if precision == "bfloat16": vae.to(torch.bfloat16) transformer.to(torch.bfloat16) text_encoder.to(torch.bfloat16) # 4. Monta o objeto do Pipeline com os componentes carregados logging.info("Montando o objeto LTXVideoPipeline...") submodel_dict = { "transformer": transformer, "patchifier": patchifier, "text_encoder": text_encoder, "tokenizer": tokenizer, "scheduler": scheduler, "vae": vae, "allowed_inference_steps": allowed_inference_steps, # Os prompt enhancers são opcionais e não são carregados por padrão para economizar memória "prompt_enhancer_image_caption_model": None, "prompt_enhancer_image_caption_processor": None, "prompt_enhancer_llm_model": None, "prompt_enhancer_llm_tokenizer": None, } pipeline = LTXVideoPipeline(**submodel_dict) # 5. Carrega o Latent Upsampler (também na CPU) latent_upsampler = None if self.config["spatial_upscaler_model_path"]: spatial_path_str = hf_hub_download(repo_id=LTX_REPO_ID, filename=self.config["spatial_upscaler_model_path"], cache_dir=CACHE_DIR) spatial_path = Path(spatial_path_str) if not ckpt_path.is_file(): raise FileNotFoundError(f"Main checkpoint file not found: {spatial_path}") latent_upsampler = self._create_latent_upsampler(spatial_path, device="cpu") if precision == "bfloat16": latent_upsampler.to(torch.bfloat16) logging.info(f"Modelos LTX carregados na CPU em {time.perf_counter()-t0:.2f}s") return pipeline, latent_upsampler def move_to_device(self, main_device_str: str, vae_device_str: str): """Moves pipeline components to their target devices.""" target_main_device = torch.device(main_device_str) target_vae_device = torch.device(vae_device_str) logging.info(f"Moving LTX models -> Main Pipeline: {target_main_device}, VAE: {target_vae_device}") self.main_device = target_main_device self.pipeline.to(self.main_device) self.vae_device = target_vae_device self.pipeline.vae.to(self.vae_device) if self.latent_upsampler: self.latent_upsampler.to(self.main_device) logging.info("LTX models successfully moved to target devices.") def move_to_cpu(self): """Moves all LTX components to CPU to free VRAM.""" self.move_to_device(main_device_str="cpu", vae_device_str="cpu") if torch.cuda.is_available(): torch.cuda.empty_cache() def finalize(self): """Cleans up GPU memory after a generation task.""" gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() try: torch.cuda.ipc_collect(); except Exception: pass # ========================================================================== # --- PUBLIC ORCHESTRATORS --- # ========================================================================== def generate_narrative_low(self, prompt: str, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]: """[ORCHESTRATOR] Generates a video from a multi-line prompt (sequence of scenes).""" logging.info("Starting narrative low-res generation...") used_seed = self._resolve_seed(kwargs.get("seed")) seed_everything(used_seed) prompt_list = [p.strip() for p in prompt.splitlines() if p.strip()] if not prompt_list: raise ValueError("Prompt is empty or contains no valid lines.") num_chunks = len(prompt_list) total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0)) frames_per_chunk = (total_frames // num_chunks // FRAMES_ALIGNMENT) * FRAMES_ALIGNMENT overlap_frames = self.config.get("overlap_frames", 8) all_latents_paths = [] overlap_condition_item = None try: for i, chunk_prompt in enumerate(prompt_list): logging.info(f"Generating narrative chunk {i+1}/{num_chunks}: '{chunk_prompt[:50]}...'") current_frames = frames_per_chunk if i > 0: current_frames += overlap_frames current_conditions = kwargs.get("initial_conditions", []) if i == 0 else [] if overlap_condition_item: current_conditions.append(overlap_condition_item) chunk_latents = self._generate_single_chunk_low( prompt=chunk_prompt, num_frames=current_frames, seed=used_seed + i, conditioning_items=current_conditions, **kwargs ) if chunk_latents is None: raise RuntimeError(f"Failed to generate latents for chunk {i+1}.") if i < num_chunks - 1: overlap_latents = chunk_latents[:, :, -overlap_frames:, :, :].clone() overlap_condition_item = ConditioningItem(media_item=overlap_latents, media_frame_number=0, conditioning_strength=1.0) if i > 0: chunk_latents = chunk_latents[:, :, overlap_frames:, :, :] chunk_path = RESULTS_DIR / f"temp_chunk_{i}_{used_seed}.pt" torch.save(chunk_latents.cpu(), chunk_path) all_latents_paths.append(chunk_path) return self._finalize_generation(all_latents_paths, "narrative_video", used_seed) except Exception as e: logging.error(f"Error during narrative generation: {e}", exc_info=True) return None, None, None finally: for path in all_latents_paths: if path.exists(): path.unlink() self.finalize() def generate_single_low(self, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]: """[ORCHESTRATOR] Generates a video from a single prompt in one go.""" logging.info("Starting single-prompt low-res generation...") used_seed = self._resolve_seed(kwargs.get("seed")) seed_everything(used_seed) try: total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0), min_frames=9) final_latents = self._generate_single_chunk_low( num_frames=total_frames, seed=used_seed, conditioning_items=kwargs.get("initial_conditions", []), **kwargs ) if final_latents is None: raise RuntimeError("Failed to generate latents.") latents_path = RESULTS_DIR / f"temp_single_{used_seed}.pt" torch.save(final_latents.cpu(), latents_path) return self._finalize_generation([latents_path], "single_video", used_seed) except Exception as e: logging.error(f"Error during single generation: {e}", exc_info=True) return None, None, None finally: self.finalize() # ========================================================================== # --- INTERNAL WORKER & HELPER METHODS --- # ========================================================================== def _generate_single_chunk_low( self, prompt: str, negative_prompt: str, height: int, width: int, num_frames: int, seed: int, conditioning_items: List[ConditioningItem], ltx_configs_override: Optional[Dict], **kwargs ) -> Optional[torch.Tensor]: """[WORKER] Generates a single chunk of latents. This is the core generation unit.""" height_padded, width_padded = (self._align(d) for d in (height, width)) downscale_factor = self.config.get("downscale_factor", 0.6666666) vae_scale_factor = self.pipeline.vae_scale_factor downscaled_height = self._align(int(height_padded * downscale_factor), vae_scale_factor) downscaled_width = self._align(int(width_padded * downscale_factor), vae_scale_factor) first_pass_config = self.config.get("first_pass", {}).copy() if ltx_configs_override: first_pass_config.update(self._prepare_guidance_overrides(ltx_configs_override)) pipeline_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": downscaled_height, "width": downscaled_width, "num_frames": num_frames, "frame_rate": DEFAULT_FPS, "generator": torch.Generator(device=self.main_device).manual_seed(seed), "output_type": "latent", "conditioning_items": conditioning_items, **first_pass_config } with torch.autocast(device_type=self.main_device.type, dtype=self.runtime_autocast_dtype, enabled="cuda" in self.main_device.type): latents_raw = self.pipeline(**pipeline_kwargs).images log_tensor_info(latents_raw, f"Raw Latents for '{prompt[:40]}...'") return latents_raw @log_function_io def generate_low_resolution( self, prompt_list: List[str], initial_media_items: Optional[List[Tuple[Union[str, Image.Image, torch.Tensor], int, float]]] = None, **kwargs ) -> Tuple[Optional[str], Optional[str], Optional[int]]: # O bloco try...finally garante que _cleanup() seja sempre chamado. try: logging.info("Starting unified low-resolution generation...") used_seed = self._get_random_seed() seed_everything(used_seed) logging.info(f"Using randomly generated seed: {used_seed}") if not prompt_list: raise ValueError("Prompt list cannot be empty.") is_narrative = len(prompt_list) > 1 num_chunks = len(prompt_list) #total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0)) total_frames = max(9, int(round((round(kwargs.get("duration", 1.0) * DEFAULT_FPS) - 1) / 8.0) * 8 + 1)) frames_per_chunk = max(FRAMES_ALIGNMENT, (total_frames // num_chunks // FRAMES_ALIGNMENT) * FRAMES_ALIGNMENT) overlap_frames = 4 if is_narrative else 0 initial_conditions = [] if initial_media_items: initial_conditions = vae_aduc_pipeline.generate_conditioning_items( media_items=[item[0] for item in initial_media_items], target_frames=[item[1] for item in initial_media_items], strengths=[item[2] for item in initial_media_items], target_resolution=(kwargs['height'], kwargs['width']) ) height_padded, width_padded = (self._align(d) for d in (kwargs['height'], kwargs['width'])) downscale_factor = self.config.get("downscale_factor", 0.6666666) vae_scale_factor = self.pipeline.vae_scale_factor downscaled_height = self._align(int(height_padded * downscale_factor), vae_scale_factor) downscaled_width = self._align(int(width_padded * downscale_factor), vae_scale_factor) stg_mode_str = self.config.get("stg_mode", "attention_values") stg_strategy = None if stg_mode_str.lower() in ["stg_av", "attention_values"]: stg_strategy = SkipLayerStrategy.AttentionValues elif stg_mode_str.lower() in ["stg_as", "attention_skip"]: stg_strategy = SkipLayerStrategy.AttentionSkip elif stg_mode_str.lower() in ["stg_r", "residual"]: stg_strategy = SkipLayerStrategy.Residual elif stg_mode_str.lower() in ["stg_t", "transformer_block"]: stg_strategy = SkipLayerStrategy.TransformerBlock height_padded = ((kwargs['height'] - 1) // 8 + 1) * 8 width_padded = ((kwargs['width'] - 1) // 8 + 1) * 8 downscale_factor = self.config.get("downscale_factor", 0.6666666) vae_scale_factor = self.pipeline.vae_scale_factor x_width = int(width_padded * downscale_factor) downscaled_width = x_width - (x_width % vae_scale_factor) x_height = int(height_padded * downscale_factor) downscaled_height = x_height - (x_height % vae_scale_factor) call_kwargs = { "height": downscaled_height, "width": downscaled_width, "skip_initial_inference_steps": 3, "skip_final_inference_steps": 0, "num_inference_steps": 30, "negative_prompt": kwargs['negative_prompt'], "guidance_scale": self.config.get("guidance_scale", [1, 1, 6, 8, 6, 1, 1]), "stg_scale": self.config.get("stg_scale", [0, 0, 4, 4, 4, 2, 1]), "rescaling_scale": self.config.get("rescaling_scale", [1, 1, 0.5, 0.5, 1, 1, 1]), "skip_block_list": self.config.get("skip_block_list", [[], [11, 25, 35, 39], [22, 35, 39], [28], [28], [28], [28]]), "frame_rate": int(DEFAULT_FPS), "generator": torch.Generator(device=self.main_device).manual_seed(self._get_random_seed()), "output_type": "latent", "media_items": None, "decode_timestep": self.config.get("decode_timestep", 0.05), "decode_noise_scale": self.config.get("decode_noise_scale", 0.025), "is_video": True, "vae_per_channel_normalize": True, "offload_to_cpu": False, "enhance_prompt": False, "num_frames": total_frames, "downscale_factor": self.config.get("downscale_factor", 0.6666666), "rescaling_scale": self.config.get("rescaling_scale", [1, 1, 0.5, 0.5, 1, 1, 1]), "guidance_timesteps": self.config.get("guidance_timesteps", [1.0, 0.996, 0.9933, 0.9850, 0.9767, 0.9008, 0.6180]), "skip_block_list": self.config.get("skip_block_list", [[], [11, 25, 35, 39], [22, 35, 39], [28], [28], [28], [28]]), "sampler": self.config.get("sampler", "from_checkpoint"), "precision": self.config.get("precision", "float8_e4m3fn"), "stochastic_sampling": self.config.get("stochastic_sampling", False), "cfg_star_rescale": self.config.get("cfg_star_rescale", True), } ltx_configs_override = kwargs.get("ltx_configs_override", {}) if ltx_configs_override: call_kwargs.update(ltx_configs_override) #if initial_conditions: call_kwargs["conditioning_items"] = initial_conditions overlap_latents = None # --- ETAPA 1: GERAÇÃO DE CHUNKS E SALVAMENTO --- for i, chunk_prompt in enumerate(prompt_list): logging.info(f"Processing scene {i+1}/{num_chunks}: '{chunk_prompt[:50]}...'") call_kwargs.pop("prompt", None) call_kwargs["prompt"] = chunk_prompt print (f"overlap_latents {call_kwargs}") with torch.autocast(device_type=self.main_device.type, dtype=self.runtime_autocast_dtype, enabled="cuda" in self.main_device.type): chunk_latents = self.pipeline(**call_kwargs).images if chunk_latents is None: raise RuntimeError(f"Failed to generate latents for scene {i+1}.") if is_narrative and i < num_chunks - 1: overlap_latents = chunk_latents[:, :, -overlap_frames:, :, :].clone() #call_kwargs["conditioning_items"] = [LatentConditioningItem(overlap_latents, 0, 1.0)] print (f"overlap_latents {overlap_latents.shape}") else: call_kwargs.pop("conditioning_items", None) print (f"chunk_latents {chunk_latents.shape}") #if i > 0: chunk_latents = chunk_latents[:, :, overlap_frames:, :, :] chunk_path = RESULTS_DIR / f"temp_chunk_{i}_{used_seed}.pt" # --- NOVO: Rastreia o arquivo para limpeza --- self._temp_files.append(chunk_path) torch.save(chunk_latents.cpu(), chunk_path) del chunk_latents # --- ETAPA 2: CONCATENAÇÃO DOS LATENTES (CPU) --- logging.info(f"Concatenating {len(self._temp_files)} latent chunks on CPU...") all_tensors_cpu = [torch.load(p) for p in self._temp_files] final_latents_cpu = torch.cat(all_tensors_cpu, dim=2) logging.info(f"Concatenating SuperLat {final_latents_cpu.shape}") # --- ETAPA 3 e 4: FINALIZAÇÃO --- base_filename = "narrative_video" if is_narrative else "single_video" video_path, latents_path = self._finalize_generation(final_latents_cpu, base_filename, used_seed) return video_path, latents_path, used_seed finally: # --- NOVO: A chamada de limpeza inteligente sempre ocorre --- print("") # (O resto das funções de _finalize_generation, _save_and_log_video, etc., permanecem as mesmas) @log_function_io def _finalize_generation(self, final_latents_cpu: torch.Tensor, base_filename: str, seed: int) -> Tuple[str, str]: final_latents_path = RESULTS_DIR / f"latents_{base_filename}_{seed}.pt" torch.save(final_latents_cpu, final_latents_path) logging.info(f"Final latents saved to: {final_latents_path}") logging.info("Delegating to VaeServer for decoding latents to pixels...") pixel_tensor_cpu = vae_aduc_pipeline.decode_to_pixels( final_latents_cpu, decode_timestep=float(self.config.get("decode_timestep", 0.05)) ) logging.info("Delegating to VideoEncodeTool to save pixel tensor as MP4...") video_path = self._save_and_log_video(pixel_tensor_cpu, f"{base_filename}_{seed}") return str(video_path), str(final_latents_path) @log_function_io def _save_and_log_video(self, pixel_tensor_cpu: torch.Tensor, base_filename: str) -> Path: with tempfile.TemporaryDirectory() as temp_dir: temp_path = os.path.join(temp_dir, f"{base_filename}.mp4") video_encode_tool_singleton.save_video_from_tensor(pixel_tensor_cpu, temp_path, fps=DEFAULT_FPS) final_path = RESULTS_DIR / f"{base_filename}.mp4" shutil.move(temp_path, final_path) logging.info(f"Video saved successfully to: {final_path}") return final_path def _apply_precision_policy1(self): precision = str(self.config.get("precision", "bfloat16")).lower() if precision in ["float8_e4m3fn", "bfloat16"]: self.runtime_autocast_dtype = torch.bfloat16 elif precision == "mixed_precision": self.runtime_autocast_dtype = torch.float16 else: self.runtime_autocast_dtype = torch.float32 logging.info(f"Runtime precision policy set for autocast: {self.runtime_autocast_dtype}") def _align1(self, dim: int, alignment: int = FRAMES_ALIGNMENT, alignment_rule: str = 'default') -> int: if alignment_rule == 'n*8+1': return ((dim - 1) // alignment) * alignment + 1 return ((dim - 1) // alignment + 1) * alignment def _calculate_aligned_frames1(self, duration_s: float, min_frames: int = 1) -> int: num_frames = int(round(duration_s * DEFAULT_FPS)) aligned_frames = self._align(num_frames, alignment=FRAMES_ALIGNMENT) return max(aligned_frames, min_frames) def _get_random_seed(self) -> int: return random.randint(0, 2**32 - 1) def _finalize_generation1(self, latents_paths: List[Path], base_filename: str, seed: int) -> Tuple[str, str, int]: """Loads latents, concatenates, decodes to video, and saves both.""" logging.info("Finalizing generation: decoding latents to video.") all_tensors_cpu = [torch.load(p) for p in latents_paths] final_latents = torch.cat(all_tensors_cpu, dim=2) final_latents_path = RESULTS_DIR / f"latents_{base_filename}_{seed}.pt" torch.save(final_latents, final_latents_path) logging.info(f"Final latents saved to: {final_latents_path}") # The decode method in vae_manager now handles moving the tensor to the correct VAE device. pixel_tensor = vae_manager_singleton.decode( final_latents, decode_timestep=float(self.config.get("decode_timestep", 0.05)) ) video_path = self._save_and_log_video(pixel_tensor, f"{base_filename}_{seed}") return str(video_path), str(final_latents_path), seed def prepare_condition_items(self, items_list: List, height: int, width: int, num_frames: int) -> List[ConditioningItem]: """Prepares a list of ConditioningItem objects from file paths or tensors.""" if not items_list: return [] height_padded, width_padded = self._align(height), self._align(width) padding_values = calculate_padding(height, width, height_padded, width_padded) conditioning_items = [] for media, frame, weight in items_list: tensor = self._prepare_conditioning_tensor(media, height, width, padding_values) safe_frame = max(0, min(int(frame), num_frames - 1)) conditioning_items.append(ConditioningItem(tensor, safe_frame, float(weight))) return conditioning_items def _resize_tensor(self, media_items, height, width): n_frames = media_items.shape[2] if media_items.shape[-2:] != (height, width): media_items = rearrange(media_items, "b c n h w -> (b n) c h w") media_items = F.interpolate( media_items, size=(height, width), mode="bilinear", align_corners=False, ) media_items = rearrange(media_items, "(b n) c h w -> b c n h w", n=n_frames) return media_items def _prepare_conditioning_tensor(self, media_path: str, height: int, width: int, padding: Tuple) -> torch.Tensor: """Loads and processes an image to be a conditioning tensor.""" tensor = self._resize_tensor(media_path, height, width) tensor = torch.nn.functional.pad(tensor, padding) # Conditioning tensors are needed on the main device for the transformer pass return tensor.to(self.main_device, dtype=self.runtime_autocast_dtype) def _prepare_guidance_overrides(self, ltx_configs: Dict) -> Dict: """Parses UI presets for guidance into pipeline-compatible arguments.""" overrides = {} preset = ltx_configs.get("guidance_preset", "Padrão (Recomendado)") if preset == "Agressivo": overrides["guidance_scale"] = [1, 2, 8, 12, 8, 2, 1] overrides["stg_scale"] = [0, 0, 5, 6, 5, 3, 2] elif preset == "Suave": overrides["guidance_scale"] = [1, 1, 4, 5, 4, 1, 1] overrides["stg_scale"] = [0, 0, 2, 2, 2, 1, 0] elif preset == "Customizado": try: overrides["guidance_scale"] = json.loads(ltx_configs["guidance_scale_list"]) overrides["stg_scale"] = json.loads(ltx_configs["stg_scale_list"]) except (json.JSONDecodeError, KeyError) as e: logging.warning(f"Failed to parse custom guidance values: {e}. Falling back to defaults.") if overrides: logging.info(f"Applying '{preset}' guidance preset overrides.") return overrides def _save_and_log_video(self, pixel_tensor: torch.Tensor, base_filename: str) -> Path: """Saves a pixel tensor (on CPU) to an MP4 file.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = os.path.join(temp_dir, f"{base_filename}.mp4") video_encode_tool_singleton.save_video_from_tensor( pixel_tensor, temp_path, fps=DEFAULT_FPS ) final_path = RESULTS_DIR / f"{base_filename}.mp4" shutil.move(temp_path, final_path) logging.info(f"Video saved successfully to: {final_path}") return final_path def _apply_precision_policy(self): """Sets the autocast dtype based on the configuration file.""" precision = str(self.config.get("precision", "bfloat16")).lower() if precision in ["float8_e4m3fn", "bfloat16"]: self.runtime_autocast_dtype = torch.bfloat16 elif precision == "mixed_precision": self.runtime_autocast_dtype = torch.float16 else: self.runtime_autocast_dtype = torch.float32 logging.info(f"Runtime precision policy set for autocast: {self.runtime_autocast_dtype}") def _align(self, dim: int, alignment: int = FRAMES_ALIGNMENT) -> int: """Aligns a dimension to the nearest multiple of `alignment`.""" return ((dim - 1) // alignment + 1) * alignment def _calculate_aligned_frames(self, duration_s: float, min_frames: int = 1) -> int: """Calculates total frames based on duration, ensuring alignment.""" num_frames = int(round(duration_s * DEFAULT_FPS)) aligned_frames = self._align(num_frames) return max(aligned_frames + 1, min_frames) def _resolve_seed(self, seed: Optional[int]) -> int: """Returns the given seed or generates a new random one.""" return random.randint(0, 2**32 - 1) if seed is None else int(seed) ltx_aduc_pipeline = LtxAducPipeline() logging.info("Global VideoService orchestrator instance created successfully.")