File size: 10,284 Bytes
1b65cbb
c8b13b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a81b5b
c8b13b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b65cbb
c8b13b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b65cbb
c8b13b1
 
 
1b65cbb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# FILE: api/vince_aduc_manager.py
# DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager.

import os
import sys
import gc
import subprocess
import threading
from pathlib import Path
from typing import List

import torch
from omegaconf import open_dict

# --- Import do Gerenciador Central de GPUs ---
# Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar.
try:
    from managers.gpu_manager import gpu_manager
except ImportError as e:
    print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr)
    sys.exit(1)

# --- Configurações Globais (Lidas do Ambiente) ---
VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B"))

# --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
class VinceWorker:
    """
    Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.
    Opera em um ambiente "isolado" para garantir que só veja sua própria GPU.
    """
    def __init__(self, device_id: str, config_path: str):
        self.device_id_str = device_id
        self.gpu_index_str = self.device_id_str.split(':')[-1]
        self.config_path = config_path
        self.gen = None
        self.config = None
        print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.")

    def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
        """
        Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.
        Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker.
        """
        original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
        try:
            os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str
            if torch.cuda.is_available():
                # Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível.
                torch.cuda.set_device(0)
            return function_to_run(*args, **kwargs)
        finally:
            # Restaura o ambiente original para não afetar outros threads/processos.
            if original_cuda_visible is not None:
                os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
            elif 'CUDA_VISIBLE_DEVICES' in os.environ:
                del os.environ['CUDA_VISIBLE_DEVICES']

    def _load_model_task(self):
        """Tarefa de carregamento do modelo, executada no ambiente isolado."""
        print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...")
        # O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver.
        device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu'
        
        original_cwd = Path.cwd()
        try:
            # O código do VINCIE pode precisar ser executado de seu próprio diretório.
            os.chdir(str(VINCIE_DIR))
            # Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE.
            if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))

            from common.config import load_config, create_object
            
            cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
            self.gen = create_object(cfg)
            self.config = cfg

            # Executa os passos de configuração internos do VINCIE.
            for name in ("configure_persistence", "configure_models", "configure_diffusion"):
                getattr(self.gen, name)()
            
            self.gen.to(torch.device(device_for_vincie))
            print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.")
        finally:
            os.chdir(original_cwd) # Restaura o diretório de trabalho original.
    
    def load_model_to_gpu(self):
        """Método público para carregar o modelo, garantindo o isolamento da GPU."""
        if self.gen is None:
            self._execute_in_isolated_env(self._load_model_task)

    def _infer_task(self, **kwargs) -> Path:
        """Tarefa de inferência, executada no ambiente isolado."""
        original_cwd = Path.cwd()
        try:
            os.chdir(str(VINCIE_DIR))

            # Atualiza a configuração do gerador com os parâmetros da chamada atual.
            with open_dict(self.gen.config):
                self.gen.config.generation.output.dir = str(kwargs["output_dir"])
                image_paths = kwargs.get("image_path", [])
                self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
                if "prompts" in kwargs:
                    self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
                if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
                    self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
            
            # Inicia o loop de inferência do VINCIE.
            self.gen.inference_loop()
            return Path(kwargs["output_dir"])
        finally:
            os.chdir(original_cwd)
            # Limpeza de memória após a inferência.
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

    def infer(self, **kwargs) -> Path:
        """Método público para iniciar a inferência, garantindo o isolamento da GPU."""
        if self.gen is None:
            raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
        return self._execute_in_isolated_env(self._infer_task, **kwargs)


# --- Classe Pool Manager (A Orquestradora Singleton) ---
class VinceAducManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
            return cls._instance

    def __init__(self, output_root: str = "/app/outputs"):
        if self._initialized: return
        with self._lock:
            if self._initialized: return

            print("⚙️ Inicializando o VincePoolManager Singleton...")
            self.output_root = Path(output_root)
            self.output_root.mkdir(parents=True, exist_ok=True)
            self.worker_lock = threading.Lock()
            self.next_worker_idx = 0

            # Pergunta ao gerenciador central quais GPUs ele pode usar.
            self.allocated_gpu_indices = gpu_manager.get_vincie_devices()
            
            if not self.allocated_gpu_indices:
                # Se não houver GPUs alocadas, não podemos continuar.
                # O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui.
                print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.")
                self.workers = []
                self._initialized = True
                return

            devices = [f'cuda:{i}' for i in self.allocated_gpu_indices]
            vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
            if not vincie_config_path.exists():
                raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}")

            self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]

            print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...")
            threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
            for t in threads: t.start()
            for t in threads: t.join()
            
            self._initialized = True
            print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")

    def _get_next_worker(self) -> VinceWorker:
        """Seleciona o próximo worker disponível usando uma estratégia round-robin."""
        if not self.workers:
            raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.")
        
        with self.worker_lock:
            worker = self.workers[self.next_worker_idx]
            self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
            print(f"Tarefa despachada para o worker: {worker.device_id_str}")
            return worker

    def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
        """Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos)."""
        worker = self._get_next_worker()
        out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
        out_dir.mkdir(parents=True)
        
        infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
        return worker.infer(**infer_kwargs)
    
    def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
        """Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final."""
        worker = self._get_next_worker()
        out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}"
        out_dir.mkdir(parents=True)
        
        all_prompts = concept_prompts + [final_prompt]
        infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
        return worker.infer(**infer_kwargs)

# --- Instância Singleton Global ---
# A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre
# se o VINCIE não puder ser inicializado por algum motivo.
try:
    output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
    vince_aduc_manager_singleton = VinceAducManager(output_root=output_root_path)
except Exception as e:
    print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
    traceback.print_exc()
    vince_aduc_manager_singleton = None