Spaces:
Paused
Paused
| # FILE: api/ltx/ltx_aduc_manager.py | |
| # DESCRIPTION: A singleton manager for the LTX-Video pipeline. | |
| # This module loads the pipeline, places it on the correct devices, and applies a | |
| # targeted runtime monkey patch to delegate conditioning tasks to the specialized | |
| # VaeAducPipeline service, enabling full control for the ADUC-SDR architecture. | |
| import time | |
| import yaml | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Union, Dict | |
| import threading | |
| import sys | |
| import torch | |
| from dataclasses import dataclass | |
| # --- Importações da arquitetura ADUC-SDR --- | |
| from managers.gpu_manager import gpu_manager | |
| from api.ltx.ltx_utils import build_ltx_pipeline_on_cpu | |
| from utils.debug_utils import log_function_io | |
| # Importa o serviço VAE que fará o trabalho real | |
| # --- Importações da biblioteca LTX-Video --- | |
| LTX_VIDEO_REPO_DIR = Path("/data/LTX-Video") | |
| repo_path = str(LTX_VIDEO_REPO_DIR.resolve()) | |
| if repo_path not in sys.path: | |
| sys.path.insert(0, repo_path) | |
| from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline | |
| # Importa o tipo original de conditioning item para type hinting | |
| from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem as PipelineConditioningItem | |
| import logging | |
| import warnings | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", message=".*") | |
| try: | |
| from huggingface_hub import logging as hf_logging | |
| hf_logging.set_verbosity_error() | |
| except ImportError: | |
| pass | |
| logger = logging.getLogger("AducDebug") | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger.setLevel(logging.DEBUG) | |
| class LatentConditioningItem: | |
| latent_tensor: torch.Tensor | |
| media_frame_number: int | |
| conditioning_strength: float | |
| from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem | |
| # ============================================================================== | |
| # --- O MONKEY PATCH DIRECIONADO E SIMPLES --- | |
| # ============================================================================== | |
| def _aduc_prepare_conditioning_patch( | |
| self: "LTXVideoPipeline", | |
| conditioning_items: Optional[List[Union[ConditioningItem, LatentConditioningItem]]], | |
| init_latents: torch.Tensor, | |
| num_frames: int, | |
| height: int, | |
| width: int, | |
| vae_per_channel_normalize: bool = False, | |
| generator=None, | |
| ) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], int]: | |
| """ | |
| [PATCH] Substitui o método `prepare_conditioning` original da LTXVideoPipeline. | |
| Esta função atua como um proxy (intermediário). Ela não contém lógica de processamento. | |
| Em vez disso, ela delega 100% do trabalho para o `vae_aduc_pipeline`, que é o nosso | |
| serviço especializado e otimizado para essa tarefa. | |
| """ | |
| logging.debug(f"Patch ADUC: Interceptado 'prepare_conditioning'. Delegando para o serviço VaeAducPipeline.") | |
| from api.ltx.vae_aduc_pipeline import vae_aduc_pipeline | |
| # 1. Chama o serviço especializado para fazer todo o trabalho pesado. | |
| # O serviço VAE processa na sua própria GPU dedicada e retorna os tensores na CPU. | |
| latents_cpu, coords_cpu, mask_cpu, num_latents = vae_aduc_pipeline.prepare_conditioning( | |
| conditioning_items=conditioning_items, | |
| init_latents=init_latents, | |
| num_frames=num_frames, | |
| height=height, | |
| width=width, | |
| vae_per_channel_normalize=vae_per_channel_normalize, | |
| generator=generator, | |
| ) | |
| # 2. Move os resultados da CPU para o dispositivo correto que a pipeline principal espera. | |
| # O `init_latents.device` garante que estamos usando o dispositivo principal da pipeline (ex: 'cuda:0'). | |
| device = init_latents.device | |
| latents = latents_cpu.to(device) | |
| pixel_coords = coords_cpu.to(device) | |
| conditioning_mask = mask_cpu.to(device) if mask_cpu is not None else None | |
| # 3. Retorna os tensores prontos. A pipeline principal continua sua execução normalmente, | |
| # sem saber que a lógica de condicionamento foi executada por um serviço externo. | |
| return latents, pixel_coords, conditioning_mask, num_latents | |
| # ============================================================================== | |
| # --- LTX WORKER E POOL MANAGER --- | |
| # ============================================================================== | |
| class LTXWorker: | |
| """ | |
| Gerencia uma instância única da LTXVideoPipeline, aplicando o patch | |
| necessário durante a inicialização. | |
| """ | |
| def __init__(self, main_device_str: str, vae_device_str: str, config: dict): | |
| self.main_device = torch.device(main_device_str) | |
| self.vae_device = torch.device(vae_device_str) | |
| self.config = config | |
| self.pipeline: LTXVideoPipeline = None | |
| self._load_and_patch_pipeline() | |
| def _load_and_patch_pipeline(self): | |
| """ | |
| Orquestra o carregamento da pipeline e a aplicação do monkey patch. | |
| """ | |
| logging.info(f"[LTXWorker-{self.main_device}] Carregando pipeline LTX para a CPU...") | |
| self.pipeline, _ = build_ltx_pipeline_on_cpu(self.config) | |
| logging.info(f"[LTXWorker-{self.main_device}] Movendo pipeline para GPUs (Main: {self.main_device}, VAE: {self.vae_device})...") | |
| self.pipeline.to(self.main_device) # Move a maioria dos componentes | |
| self.pipeline.vae.to(self.vae_device) # Move o VAE para sua GPU dedicada | |
| logging.info(f"[LTXWorker-{self.main_device}] Aplicando patch ADUC-SDR em 'prepare_conditioning'...") | |
| # A "mágica" simples e eficaz acontece aqui: | |
| self.pipeline.prepare_conditioning = _aduc_prepare_conditioning_patch.__get__(self.pipeline, LTXVideoPipeline) | |
| logging.info(f"[LTXWorker-{self.main_device}] ✅ Pipeline 'quente', corrigida e pronta para uso.") | |
| class LtxAducManager: | |
| """ | |
| Implementa o padrão Singleton para garantir que a pipeline LTX seja | |
| carregada e corrigida apenas uma vez durante a vida útil da aplicação. | |
| """ | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls, *args, **kwargs): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if hasattr(self, '_initialized') and self._initialized: | |
| return | |
| with self._lock: | |
| if hasattr(self, '_initialized') and self._initialized: | |
| return | |
| logging.info("⚙️ Inicializando LtxAducManager Singleton...") | |
| self.config = self._load_config() | |
| main_device_str = str(gpu_manager.get_ltx_device()) | |
| vae_device_str = str(gpu_manager.get_ltx_vae_device()) | |
| # Cria o worker que irá carregar e patchear a pipeline | |
| self.worker = LTXWorker(main_device_str, vae_device_str, self.config) | |
| self._initialized = True | |
| logging.info("✅ LtxAducManager pronto.") | |
| def _load_config(self) -> Dict: | |
| """Carrega a configuração YAML principal do LTX.""" | |
| # TODO: Considerar mover o path da configuração para uma variável de ambiente ou config central | |
| config_path = Path("/data/LTX-Video/configs/ltxv-13b-0.9.8-dev-fp8.yaml") | |
| with open(config_path, "r") as file: | |
| return yaml.safe_load(file) | |
| def get_pipeline(self) -> LTXVideoPipeline: | |
| """ | |
| Ponto de acesso principal para obter a instância da pipeline. | |
| Returns: | |
| LTXVideoPipeline: A instância única, carregada e já corrigida. | |
| """ | |
| return self.worker.pipeline | |
| # --- Instância Singleton Global --- | |
| # Outras partes do código importarão esta instância para interagir com a pipeline. | |
| ltx_aduc_manager = LtxAducManager() |