Spaces:
Paused
Paused
File size: 10,284 Bytes
1b65cbb c8b13b1 2a81b5b c8b13b1 1b65cbb c8b13b1 1b65cbb c8b13b1 1b65cbb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# FILE: api/vince_aduc_manager.py
# DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager.
import os
import sys
import gc
import subprocess
import threading
from pathlib import Path
from typing import List
import torch
from omegaconf import open_dict
# --- Import do Gerenciador Central de GPUs ---
# Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar.
try:
from managers.gpu_manager import gpu_manager
except ImportError as e:
print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr)
sys.exit(1)
# --- Configurações Globais (Lidas do Ambiente) ---
VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B"))
# --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
class VinceWorker:
"""
Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.
Opera em um ambiente "isolado" para garantir que só veja sua própria GPU.
"""
def __init__(self, device_id: str, config_path: str):
self.device_id_str = device_id
self.gpu_index_str = self.device_id_str.split(':')[-1]
self.config_path = config_path
self.gen = None
self.config = None
print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.")
def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
"""
Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.
Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker.
"""
original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
try:
os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str
if torch.cuda.is_available():
# Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível.
torch.cuda.set_device(0)
return function_to_run(*args, **kwargs)
finally:
# Restaura o ambiente original para não afetar outros threads/processos.
if original_cuda_visible is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
elif 'CUDA_VISIBLE_DEVICES' in os.environ:
del os.environ['CUDA_VISIBLE_DEVICES']
def _load_model_task(self):
"""Tarefa de carregamento do modelo, executada no ambiente isolado."""
print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...")
# O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver.
device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu'
original_cwd = Path.cwd()
try:
# O código do VINCIE pode precisar ser executado de seu próprio diretório.
os.chdir(str(VINCIE_DIR))
# Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE.
if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
from common.config import load_config, create_object
cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
self.gen = create_object(cfg)
self.config = cfg
# Executa os passos de configuração internos do VINCIE.
for name in ("configure_persistence", "configure_models", "configure_diffusion"):
getattr(self.gen, name)()
self.gen.to(torch.device(device_for_vincie))
print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.")
finally:
os.chdir(original_cwd) # Restaura o diretório de trabalho original.
def load_model_to_gpu(self):
"""Método público para carregar o modelo, garantindo o isolamento da GPU."""
if self.gen is None:
self._execute_in_isolated_env(self._load_model_task)
def _infer_task(self, **kwargs) -> Path:
"""Tarefa de inferência, executada no ambiente isolado."""
original_cwd = Path.cwd()
try:
os.chdir(str(VINCIE_DIR))
# Atualiza a configuração do gerador com os parâmetros da chamada atual.
with open_dict(self.gen.config):
self.gen.config.generation.output.dir = str(kwargs["output_dir"])
image_paths = kwargs.get("image_path", [])
self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
if "prompts" in kwargs:
self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
# Inicia o loop de inferência do VINCIE.
self.gen.inference_loop()
return Path(kwargs["output_dir"])
finally:
os.chdir(original_cwd)
# Limpeza de memória após a inferência.
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def infer(self, **kwargs) -> Path:
"""Método público para iniciar a inferência, garantindo o isolamento da GPU."""
if self.gen is None:
raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
return self._execute_in_isolated_env(self._infer_task, **kwargs)
# --- Classe Pool Manager (A Orquestradora Singleton) ---
class VinceAducManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, output_root: str = "/app/outputs"):
if self._initialized: return
with self._lock:
if self._initialized: return
print("⚙️ Inicializando o VincePoolManager Singleton...")
self.output_root = Path(output_root)
self.output_root.mkdir(parents=True, exist_ok=True)
self.worker_lock = threading.Lock()
self.next_worker_idx = 0
# Pergunta ao gerenciador central quais GPUs ele pode usar.
self.allocated_gpu_indices = gpu_manager.get_vincie_devices()
if not self.allocated_gpu_indices:
# Se não houver GPUs alocadas, não podemos continuar.
# O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui.
print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.")
self.workers = []
self._initialized = True
return
devices = [f'cuda:{i}' for i in self.allocated_gpu_indices]
vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
if not vincie_config_path.exists():
raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}")
self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...")
threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
for t in threads: t.start()
for t in threads: t.join()
self._initialized = True
print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
def _get_next_worker(self) -> VinceWorker:
"""Seleciona o próximo worker disponível usando uma estratégia round-robin."""
if not self.workers:
raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.")
with self.worker_lock:
worker = self.workers[self.next_worker_idx]
self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
print(f"Tarefa despachada para o worker: {worker.device_id_str}")
return worker
def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
"""Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos)."""
worker = self._get_next_worker()
out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
return worker.infer(**infer_kwargs)
def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
"""Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final."""
worker = self._get_next_worker()
out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
all_prompts = concept_prompts + [final_prompt]
infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
return worker.infer(**infer_kwargs)
# --- Instância Singleton Global ---
# A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre
# se o VINCIE não puder ser inicializado por algum motivo.
try:
output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
vince_aduc_manager_singleton = VinceAducManager(output_root=output_root_path)
except Exception as e:
print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
traceback.print_exc()
vince_aduc_manager_singleton = None |