eeuuia commited on
Commit
c8b13b1
·
verified ·
1 Parent(s): 3b5ff37

Create vince_pool_manager.py

Browse files
Files changed (1) hide show
  1. api/vince_pool_manager.py +214 -0
api/vince_pool_manager.py ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # FILE: api/vince_pool_manager.py
2
+ # DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager.
3
+
4
+ import os
5
+ import sys
6
+ import gc
7
+ import subprocess
8
+ import threading
9
+ from pathlib import Path
10
+ from typing import List
11
+
12
+ import torch
13
+ from omegaconf import open_dict
14
+
15
+ # --- Import do Gerenciador Central de GPUs ---
16
+ # Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar.
17
+ try:
18
+ from api.gpu_manager import gpu_manager
19
+ except ImportError as e:
20
+ print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr)
21
+ sys.exit(1)
22
+
23
+ # --- Configurações Globais (Lidas do Ambiente) ---
24
+ VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
25
+ VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B"))
26
+
27
+ # --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
28
+ class VinceWorker:
29
+ """
30
+ Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.
31
+ Opera em um ambiente "isolado" para garantir que só veja sua própria GPU.
32
+ """
33
+ def __init__(self, device_id: str, config_path: str):
34
+ self.device_id_str = device_id
35
+ self.gpu_index_str = self.device_id_str.split(':')[-1]
36
+ self.config_path = config_path
37
+ self.gen = None
38
+ self.config = None
39
+ print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.")
40
+
41
+ def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
42
+ """
43
+ Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.
44
+ Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker.
45
+ """
46
+ original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
47
+ try:
48
+ os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str
49
+ if torch.cuda.is_available():
50
+ # Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível.
51
+ torch.cuda.set_device(0)
52
+ return function_to_run(*args, **kwargs)
53
+ finally:
54
+ # Restaura o ambiente original para não afetar outros threads/processos.
55
+ if original_cuda_visible is not None:
56
+ os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
57
+ elif 'CUDA_VISIBLE_DEVICES' in os.environ:
58
+ del os.environ['CUDA_VISIBLE_DEVICES']
59
+
60
+ def _load_model_task(self):
61
+ """Tarefa de carregamento do modelo, executada no ambiente isolado."""
62
+ print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...")
63
+ # O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver.
64
+ device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu'
65
+
66
+ original_cwd = Path.cwd()
67
+ try:
68
+ # O código do VINCIE pode precisar ser executado de seu próprio diretório.
69
+ os.chdir(str(VINCIE_DIR))
70
+ # Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE.
71
+ if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
72
+
73
+ from common.config import load_config, create_object
74
+
75
+ cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
76
+ self.gen = create_object(cfg)
77
+ self.config = cfg
78
+
79
+ # Executa os passos de configuração internos do VINCIE.
80
+ for name in ("configure_persistence", "configure_models", "configure_diffusion"):
81
+ getattr(self.gen, name)()
82
+
83
+ self.gen.to(torch.device(device_for_vincie))
84
+ print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.")
85
+ finally:
86
+ os.chdir(original_cwd) # Restaura o diretório de trabalho original.
87
+
88
+ def load_model_to_gpu(self):
89
+ """Método público para carregar o modelo, garantindo o isolamento da GPU."""
90
+ if self.gen is None:
91
+ self._execute_in_isolated_env(self._load_model_task)
92
+
93
+ def _infer_task(self, **kwargs) -> Path:
94
+ """Tarefa de inferência, executada no ambiente isolado."""
95
+ original_cwd = Path.cwd()
96
+ try:
97
+ os.chdir(str(VINCIE_DIR))
98
+
99
+ # Atualiza a configuração do gerador com os parâmetros da chamada atual.
100
+ with open_dict(self.gen.config):
101
+ self.gen.config.generation.output.dir = str(kwargs["output_dir"])
102
+ image_paths = kwargs.get("image_path", [])
103
+ self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
104
+ if "prompts" in kwargs:
105
+ self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
106
+ if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
107
+ self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
108
+
109
+ # Inicia o loop de inferência do VINCIE.
110
+ self.gen.inference_loop()
111
+ return Path(kwargs["output_dir"])
112
+ finally:
113
+ os.chdir(original_cwd)
114
+ # Limpeza de memória após a inferência.
115
+ gc.collect()
116
+ if torch.cuda.is_available():
117
+ torch.cuda.empty_cache()
118
+
119
+ def infer(self, **kwargs) -> Path:
120
+ """Método público para iniciar a inferência, garantindo o isolamento da GPU."""
121
+ if self.gen is None:
122
+ raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
123
+ return self._execute_in_isolated_env(self._infer_task, **kwargs)
124
+
125
+
126
+ # --- Classe Pool Manager (A Orquestradora Singleton) ---
127
+ class VincePoolManager:
128
+ _instance = None
129
+ _lock = threading.Lock()
130
+
131
+ def __new__(cls, *args, **kwargs):
132
+ with cls._lock:
133
+ if cls._instance is None:
134
+ cls._instance = super().__new__(cls)
135
+ cls._instance._initialized = False
136
+ return cls._instance
137
+
138
+ def __init__(self, output_root: str = "/app/outputs"):
139
+ if self._initialized: return
140
+ with self._lock:
141
+ if self._initialized: return
142
+
143
+ print("⚙️ Inicializando o VincePoolManager Singleton...")
144
+ self.output_root = Path(output_root)
145
+ self.output_root.mkdir(parents=True, exist_ok=True)
146
+ self.worker_lock = threading.Lock()
147
+ self.next_worker_idx = 0
148
+
149
+ # Pergunta ao gerenciador central quais GPUs ele pode usar.
150
+ self.allocated_gpu_indices = gpu_manager.get_vincie_devices()
151
+
152
+ if not self.allocated_gpu_indices:
153
+ # Se não houver GPUs alocadas, não podemos continuar.
154
+ # O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui.
155
+ print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.")
156
+ self.workers = []
157
+ self._initialized = True
158
+ return
159
+
160
+ devices = [f'cuda:{i}' for i in self.allocated_gpu_indices]
161
+ vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
162
+ if not vincie_config_path.exists():
163
+ raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}")
164
+
165
+ self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
166
+
167
+ print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...")
168
+ threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
169
+ for t in threads: t.start()
170
+ for t in threads: t.join()
171
+
172
+ self._initialized = True
173
+ print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
174
+
175
+ def _get_next_worker(self) -> VinceWorker:
176
+ """Seleciona o próximo worker disponível usando uma estratégia round-robin."""
177
+ if not self.workers:
178
+ raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.")
179
+
180
+ with self.worker_lock:
181
+ worker = self.workers[self.next_worker_idx]
182
+ self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
183
+ print(f"Tarefa despachada para o worker: {worker.device_id_str}")
184
+ return worker
185
+
186
+ def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
187
+ """Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos)."""
188
+ worker = self._get_next_worker()
189
+ out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
190
+ out_dir.mkdir(parents=True)
191
+
192
+ infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
193
+ return worker.infer(**infer_kwargs)
194
+
195
+ def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
196
+ """Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final."""
197
+ worker = self._get_next_worker()
198
+ out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}"
199
+ out_dir.mkdir(parents=True)
200
+
201
+ all_prompts = concept_prompts + [final_prompt]
202
+ infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
203
+ return worker.infer(**infer_kwargs)
204
+
205
+ # --- Instância Singleton Global ---
206
+ # A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre
207
+ # se o VINCIE não puder ser inicializado por algum motivo.
208
+ try:
209
+ output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
210
+ vince_pool_manager_singleton = VincePoolManager(output_root=output_root_path)
211
+ except Exception as e:
212
+ print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
213
+ traceback.print_exc()
214
+ vince_pool_manager_singleton = None