File size: 7,870 Bytes
5105909
7ea6441
 
 
 
8534d97
460fa35
b521010
 
6c5d136
b521010
bb4dfa4
b521010
b54d196
52c58b6
7ea6441
b3b14bc
b521010
f07afbc
7ea6441
bb4dfa4
b521010
7ea6441
a7e6912
 
 
a3fb976
 
7ea6441
 
ff45f7a
eefd283
 
 
 
 
7ea6441
 
 
 
 
 
eefd283
c79c04b
bbdfd12
 
 
9a6b3d7
b54d196
 
 
 
 
 
d27849f
89c5a7b
9a6b3d7
7ea6441
9a6b3d7
 
f07afbc
b521010
 
d68b4ef
b521010
7ea6441
 
 
b521010
 
7ea6441
 
 
 
 
 
 
 
 
 
c139ab2
 
7ea6441
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42998d3
 
b521010
42998d3
b521010
 
7ea6441
 
 
 
b521010
 
 
 
 
 
 
f07afbc
b521010
7ea6441
 
 
b521010
 
7ea6441
b521010
7ea6441
 
 
 
 
66af513
7ea6441
 
b521010
a7e6912
7ea6441
 
 
 
460fa35
b521010
42998d3
460fa35
b521010
 
 
 
 
42998d3
460fa35
7ea6441
 
b521010
7ea6441
 
 
b521010
 
 
7ea6441
 
b521010
7ea6441
b521010
7ea6441
b521010
 
 
7ea6441
b38d3ec
b521010
 
 
 
7ea6441
 
 
 
 
 
b521010
 
 
7ea6441
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# FILE: api/ltx/ltx_aduc_manager.py
# DESCRIPTION: A singleton manager for the LTX-Video pipeline.
# This module loads the pipeline, places it on the correct devices, and applies a
# targeted runtime monkey patch to delegate conditioning tasks to the specialized
# VaeAducPipeline service, enabling full control for the ADUC-SDR architecture.

import time
import yaml
from pathlib import Path
from typing import List, Optional, Tuple, Union, Dict
import threading
import sys
import torch
from dataclasses import dataclass

# --- Importações da arquitetura ADUC-SDR ---
from managers.gpu_manager import gpu_manager
from api.ltx.ltx_utils import build_ltx_pipeline_on_cpu
from utils.debug_utils import log_function_io
# Importa o serviço VAE que fará o trabalho real

# --- Importações da biblioteca LTX-Video ---
LTX_VIDEO_REPO_DIR = Path("/data/LTX-Video")
repo_path = str(LTX_VIDEO_REPO_DIR.resolve())
if repo_path not in sys.path:
   sys.path.insert(0, repo_path)

from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline
# Importa o tipo original de conditioning item para type hinting
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem as PipelineConditioningItem
import logging

import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", message=".*")

try:
    from huggingface_hub import logging as hf_logging
    hf_logging.set_verbosity_error()
except ImportError:
    pass

logger = logging.getLogger("AducDebug")
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)


@dataclass
class LatentConditioningItem:
    latent_tensor: torch.Tensor
    media_frame_number: int
    conditioning_strength: float
    
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem
   
# ==============================================================================
# --- O MONKEY PATCH DIRECIONADO E SIMPLES ---
# ==============================================================================

@log_function_io
def _aduc_prepare_conditioning_patch(
    self: "LTXVideoPipeline",
    conditioning_items: Optional[List[Union[ConditioningItem, LatentConditioningItem]]],
    init_latents: torch.Tensor,
    num_frames: int,
    height: int,
    width: int,
    vae_per_channel_normalize: bool = False,
    generator=None,
) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], int]:
    """
    [PATCH] Substitui o método `prepare_conditioning` original da LTXVideoPipeline.

    Esta função atua como um proxy (intermediário). Ela não contém lógica de processamento.
    Em vez disso, ela delega 100% do trabalho para o `vae_aduc_pipeline`, que é o nosso
    serviço especializado e otimizado para essa tarefa.
    """
    logging.debug(f"Patch ADUC: Interceptado 'prepare_conditioning'. Delegando para o serviço VaeAducPipeline.")

    from api.ltx.vae_aduc_pipeline import vae_aduc_pipeline
   
    # 1. Chama o serviço especializado para fazer todo o trabalho pesado.
    #    O serviço VAE processa na sua própria GPU dedicada e retorna os tensores na CPU.
    latents_cpu, coords_cpu, mask_cpu, num_latents = vae_aduc_pipeline.prepare_conditioning(
        conditioning_items=conditioning_items,
        init_latents=init_latents,
        num_frames=num_frames,
        height=height,
        width=width,
        vae_per_channel_normalize=vae_per_channel_normalize,
        generator=generator,
    )

    # 2. Move os resultados da CPU para o dispositivo correto que a pipeline principal espera.
    #    O `init_latents.device` garante que estamos usando o dispositivo principal da pipeline (ex: 'cuda:0').
    device = init_latents.device
    latents = latents_cpu.to(device)
    pixel_coords = coords_cpu.to(device)
    conditioning_mask = mask_cpu.to(device) if mask_cpu is not None else None

    # 3. Retorna os tensores prontos. A pipeline principal continua sua execução normalmente,
    #    sem saber que a lógica de condicionamento foi executada por um serviço externo.
    return latents, pixel_coords, conditioning_mask, num_latents

# ==============================================================================
# --- LTX WORKER E POOL MANAGER ---
# ==============================================================================

class LTXWorker:
    """
    Gerencia uma instância única da LTXVideoPipeline, aplicando o patch
    necessário durante a inicialização.
    """
    def __init__(self, main_device_str: str, vae_device_str: str, config: dict):
        self.main_device = torch.device(main_device_str)
        self.vae_device = torch.device(vae_device_str)
        self.config = config
        self.pipeline: LTXVideoPipeline = None
        self._load_and_patch_pipeline()

    @log_function_io
    def _load_and_patch_pipeline(self):
        """
        Orquestra o carregamento da pipeline e a aplicação do monkey patch.
        """
        logging.info(f"[LTXWorker-{self.main_device}] Carregando pipeline LTX para a CPU...")
        self.pipeline, _ = build_ltx_pipeline_on_cpu(self.config)
        
        logging.info(f"[LTXWorker-{self.main_device}] Movendo pipeline para GPUs (Main: {self.main_device}, VAE: {self.vae_device})...")
        self.pipeline.to(self.main_device)  # Move a maioria dos componentes
        self.pipeline.vae.to(self.vae_device) # Move o VAE para sua GPU dedicada
        
        logging.info(f"[LTXWorker-{self.main_device}] Aplicando patch ADUC-SDR em 'prepare_conditioning'...")
        # A "mágica" simples e eficaz acontece aqui:
        self.pipeline.prepare_conditioning = _aduc_prepare_conditioning_patch.__get__(self.pipeline, LTXVideoPipeline)
        
        logging.info(f"[LTXWorker-{self.main_device}] ✅ Pipeline 'quente', corrigida e pronta para uso.")

class LtxAducManager:
    """
    Implementa o padrão Singleton para garantir que a pipeline LTX seja
    carregada e corrigida apenas uma vez durante a vida útil da aplicação.
    """
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
            return cls._instance

    def __init__(self):
        if hasattr(self, '_initialized') and self._initialized:
            return
        with self._lock:
            if hasattr(self, '_initialized') and self._initialized:
                return
            logging.info("⚙️ Inicializando LtxAducManager Singleton...")
            self.config = self._load_config()
            main_device_str = str(gpu_manager.get_ltx_device())
            vae_device_str = str(gpu_manager.get_ltx_vae_device())
            
            # Cria o worker que irá carregar e patchear a pipeline
            self.worker = LTXWorker(main_device_str, vae_device_str, self.config)
            
            self._initialized = True
            logging.info("✅ LtxAducManager pronto.")

    def _load_config(self) -> Dict:
        """Carrega a configuração YAML principal do LTX."""
        # TODO: Considerar mover o path da configuração para uma variável de ambiente ou config central
        config_path = Path("/data/LTX-Video/configs/ltxv-13b-0.9.8-dev-fp8.yaml")
        with open(config_path, "r") as file:
            return yaml.safe_load(file)

    def get_pipeline(self) -> LTXVideoPipeline:
        """
        Ponto de acesso principal para obter a instância da pipeline.

        Returns:
            LTXVideoPipeline: A instância única, carregada e já corrigida.
        """
        return self.worker.pipeline

# --- Instância Singleton Global ---
# Outras partes do código importarão esta instância para interagir com a pipeline.
ltx_aduc_manager = LtxAducManager()