Spaces:
Paused
Paused
File size: 7,870 Bytes
5105909 7ea6441 8534d97 460fa35 b521010 6c5d136 b521010 bb4dfa4 b521010 b54d196 52c58b6 7ea6441 b3b14bc b521010 f07afbc 7ea6441 bb4dfa4 b521010 7ea6441 a7e6912 a3fb976 7ea6441 ff45f7a eefd283 7ea6441 eefd283 c79c04b bbdfd12 9a6b3d7 b54d196 d27849f 89c5a7b 9a6b3d7 7ea6441 9a6b3d7 f07afbc b521010 d68b4ef b521010 7ea6441 b521010 7ea6441 c139ab2 7ea6441 42998d3 b521010 42998d3 b521010 7ea6441 b521010 f07afbc b521010 7ea6441 b521010 7ea6441 b521010 7ea6441 66af513 7ea6441 b521010 a7e6912 7ea6441 460fa35 b521010 42998d3 460fa35 b521010 42998d3 460fa35 7ea6441 b521010 7ea6441 b521010 7ea6441 b521010 7ea6441 b521010 7ea6441 b521010 7ea6441 b38d3ec b521010 7ea6441 b521010 7ea6441 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# FILE: api/ltx/ltx_aduc_manager.py
# DESCRIPTION: A singleton manager for the LTX-Video pipeline.
# This module loads the pipeline, places it on the correct devices, and applies a
# targeted runtime monkey patch to delegate conditioning tasks to the specialized
# VaeAducPipeline service, enabling full control for the ADUC-SDR architecture.
import time
import yaml
from pathlib import Path
from typing import List, Optional, Tuple, Union, Dict
import threading
import sys
import torch
from dataclasses import dataclass
# --- Importações da arquitetura ADUC-SDR ---
from managers.gpu_manager import gpu_manager
from api.ltx.ltx_utils import build_ltx_pipeline_on_cpu
from utils.debug_utils import log_function_io
# Importa o serviço VAE que fará o trabalho real
# --- Importações da biblioteca LTX-Video ---
LTX_VIDEO_REPO_DIR = Path("/data/LTX-Video")
repo_path = str(LTX_VIDEO_REPO_DIR.resolve())
if repo_path not in sys.path:
sys.path.insert(0, repo_path)
from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline
# Importa o tipo original de conditioning item para type hinting
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem as PipelineConditioningItem
import logging
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", message=".*")
try:
from huggingface_hub import logging as hf_logging
hf_logging.set_verbosity_error()
except ImportError:
pass
logger = logging.getLogger("AducDebug")
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
@dataclass
class LatentConditioningItem:
latent_tensor: torch.Tensor
media_frame_number: int
conditioning_strength: float
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem
# ==============================================================================
# --- O MONKEY PATCH DIRECIONADO E SIMPLES ---
# ==============================================================================
@log_function_io
def _aduc_prepare_conditioning_patch(
self: "LTXVideoPipeline",
conditioning_items: Optional[List[Union[ConditioningItem, LatentConditioningItem]]],
init_latents: torch.Tensor,
num_frames: int,
height: int,
width: int,
vae_per_channel_normalize: bool = False,
generator=None,
) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], int]:
"""
[PATCH] Substitui o método `prepare_conditioning` original da LTXVideoPipeline.
Esta função atua como um proxy (intermediário). Ela não contém lógica de processamento.
Em vez disso, ela delega 100% do trabalho para o `vae_aduc_pipeline`, que é o nosso
serviço especializado e otimizado para essa tarefa.
"""
logging.debug(f"Patch ADUC: Interceptado 'prepare_conditioning'. Delegando para o serviço VaeAducPipeline.")
from api.ltx.vae_aduc_pipeline import vae_aduc_pipeline
# 1. Chama o serviço especializado para fazer todo o trabalho pesado.
# O serviço VAE processa na sua própria GPU dedicada e retorna os tensores na CPU.
latents_cpu, coords_cpu, mask_cpu, num_latents = vae_aduc_pipeline.prepare_conditioning(
conditioning_items=conditioning_items,
init_latents=init_latents,
num_frames=num_frames,
height=height,
width=width,
vae_per_channel_normalize=vae_per_channel_normalize,
generator=generator,
)
# 2. Move os resultados da CPU para o dispositivo correto que a pipeline principal espera.
# O `init_latents.device` garante que estamos usando o dispositivo principal da pipeline (ex: 'cuda:0').
device = init_latents.device
latents = latents_cpu.to(device)
pixel_coords = coords_cpu.to(device)
conditioning_mask = mask_cpu.to(device) if mask_cpu is not None else None
# 3. Retorna os tensores prontos. A pipeline principal continua sua execução normalmente,
# sem saber que a lógica de condicionamento foi executada por um serviço externo.
return latents, pixel_coords, conditioning_mask, num_latents
# ==============================================================================
# --- LTX WORKER E POOL MANAGER ---
# ==============================================================================
class LTXWorker:
"""
Gerencia uma instância única da LTXVideoPipeline, aplicando o patch
necessário durante a inicialização.
"""
def __init__(self, main_device_str: str, vae_device_str: str, config: dict):
self.main_device = torch.device(main_device_str)
self.vae_device = torch.device(vae_device_str)
self.config = config
self.pipeline: LTXVideoPipeline = None
self._load_and_patch_pipeline()
@log_function_io
def _load_and_patch_pipeline(self):
"""
Orquestra o carregamento da pipeline e a aplicação do monkey patch.
"""
logging.info(f"[LTXWorker-{self.main_device}] Carregando pipeline LTX para a CPU...")
self.pipeline, _ = build_ltx_pipeline_on_cpu(self.config)
logging.info(f"[LTXWorker-{self.main_device}] Movendo pipeline para GPUs (Main: {self.main_device}, VAE: {self.vae_device})...")
self.pipeline.to(self.main_device) # Move a maioria dos componentes
self.pipeline.vae.to(self.vae_device) # Move o VAE para sua GPU dedicada
logging.info(f"[LTXWorker-{self.main_device}] Aplicando patch ADUC-SDR em 'prepare_conditioning'...")
# A "mágica" simples e eficaz acontece aqui:
self.pipeline.prepare_conditioning = _aduc_prepare_conditioning_patch.__get__(self.pipeline, LTXVideoPipeline)
logging.info(f"[LTXWorker-{self.main_device}] ✅ Pipeline 'quente', corrigida e pronta para uso.")
class LtxAducManager:
"""
Implementa o padrão Singleton para garantir que a pipeline LTX seja
carregada e corrigida apenas uma vez durante a vida útil da aplicação.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return
with self._lock:
if hasattr(self, '_initialized') and self._initialized:
return
logging.info("⚙️ Inicializando LtxAducManager Singleton...")
self.config = self._load_config()
main_device_str = str(gpu_manager.get_ltx_device())
vae_device_str = str(gpu_manager.get_ltx_vae_device())
# Cria o worker que irá carregar e patchear a pipeline
self.worker = LTXWorker(main_device_str, vae_device_str, self.config)
self._initialized = True
logging.info("✅ LtxAducManager pronto.")
def _load_config(self) -> Dict:
"""Carrega a configuração YAML principal do LTX."""
# TODO: Considerar mover o path da configuração para uma variável de ambiente ou config central
config_path = Path("/data/LTX-Video/configs/ltxv-13b-0.9.8-dev-fp8.yaml")
with open(config_path, "r") as file:
return yaml.safe_load(file)
def get_pipeline(self) -> LTXVideoPipeline:
"""
Ponto de acesso principal para obter a instância da pipeline.
Returns:
LTXVideoPipeline: A instância única, carregada e já corrigida.
"""
return self.worker.pipeline
# --- Instância Singleton Global ---
# Outras partes do código importarão esta instância para interagir com a pipeline.
ltx_aduc_manager = LtxAducManager() |