# FILE: api/vince_pool_manager.py # DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager. import os import sys import gc import subprocess import threading from pathlib import Path from typing import List import torch from omegaconf import open_dict # --- Import do Gerenciador Central de GPUs --- # Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar. try: from api.gpu_manager import gpu_manager except ImportError as e: print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr) sys.exit(1) # --- Configurações Globais (Lidas do Ambiente) --- VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE")) VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B")) # --- Classe Worker (Gerencia uma única GPU de forma isolada) --- class VinceWorker: """ Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico. Opera em um ambiente "isolado" para garantir que só veja sua própria GPU. """ def __init__(self, device_id: str, config_path: str): self.device_id_str = device_id self.gpu_index_str = self.device_id_str.split(':')[-1] self.config_path = config_path self.gen = None self.config = None print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.") def _execute_in_isolated_env(self, function_to_run, *args, **kwargs): """ Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU. Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker. """ original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES') try: os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str if torch.cuda.is_available(): # Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível. torch.cuda.set_device(0) return function_to_run(*args, **kwargs) finally: # Restaura o ambiente original para não afetar outros threads/processos. if original_cuda_visible is not None: os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible elif 'CUDA_VISIBLE_DEVICES' in os.environ: del os.environ['CUDA_VISIBLE_DEVICES'] def _load_model_task(self): """Tarefa de carregamento do modelo, executada no ambiente isolado.""" print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...") # O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver. device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu' original_cwd = Path.cwd() try: # O código do VINCIE pode precisar ser executado de seu próprio diretório. os.chdir(str(VINCIE_DIR)) # Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE. if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR)) from common.config import load_config, create_object cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"]) self.gen = create_object(cfg) self.config = cfg # Executa os passos de configuração internos do VINCIE. for name in ("configure_persistence", "configure_models", "configure_diffusion"): getattr(self.gen, name)() self.gen.to(torch.device(device_for_vincie)) print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.") finally: os.chdir(original_cwd) # Restaura o diretório de trabalho original. def load_model_to_gpu(self): """Método público para carregar o modelo, garantindo o isolamento da GPU.""" if self.gen is None: self._execute_in_isolated_env(self._load_model_task) def _infer_task(self, **kwargs) -> Path: """Tarefa de inferência, executada no ambiente isolado.""" original_cwd = Path.cwd() try: os.chdir(str(VINCIE_DIR)) # Atualiza a configuração do gerador com os parâmetros da chamada atual. with open_dict(self.gen.config): self.gen.config.generation.output.dir = str(kwargs["output_dir"]) image_paths = kwargs.get("image_path", []) self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)] if "prompts" in kwargs: self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"]) if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None: self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"]) # Inicia o loop de inferência do VINCIE. self.gen.inference_loop() return Path(kwargs["output_dir"]) finally: os.chdir(original_cwd) # Limpeza de memória após a inferência. gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def infer(self, **kwargs) -> Path: """Método público para iniciar a inferência, garantindo o isolamento da GPU.""" if self.gen is None: raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.") return self._execute_in_isolated_env(self._infer_task, **kwargs) # --- Classe Pool Manager (A Orquestradora Singleton) --- class VincePoolManager: _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, output_root: str = "/app/outputs"): if self._initialized: return with self._lock: if self._initialized: return print("⚙️ Inicializando o VincePoolManager Singleton...") self.output_root = Path(output_root) self.output_root.mkdir(parents=True, exist_ok=True) self.worker_lock = threading.Lock() self.next_worker_idx = 0 # Pergunta ao gerenciador central quais GPUs ele pode usar. self.allocated_gpu_indices = gpu_manager.get_vincie_devices() if not self.allocated_gpu_indices: # Se não houver GPUs alocadas, não podemos continuar. # O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui. print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.") self.workers = [] self._initialized = True return devices = [f'cuda:{i}' for i in self.allocated_gpu_indices] vincie_config_path = VINCIE_DIR / "configs/generate.yaml" if not vincie_config_path.exists(): raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}") self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices] print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...") threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers] for t in threads: t.start() for t in threads: t.join() self._initialized = True print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.") def _get_next_worker(self) -> VinceWorker: """Seleciona o próximo worker disponível usando uma estratégia round-robin.""" if not self.workers: raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.") with self.worker_lock: worker = self.workers[self.next_worker_idx] self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers) print(f"Tarefa despachada para o worker: {worker.device_id_str}") return worker def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path: """Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos).""" worker = self._get_next_worker() out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}" out_dir.mkdir(parents=True) infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs} return worker.infer(**infer_kwargs) def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path: """Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final.""" worker = self._get_next_worker() out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}" out_dir.mkdir(parents=True) all_prompts = concept_prompts + [final_prompt] infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs} return worker.infer(**infer_kwargs) # --- Instância Singleton Global --- # A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre # se o VINCIE não puder ser inicializado por algum motivo. try: output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs") vince_pool_manager_singleton = VincePoolManager(output_root=output_root_path) except Exception as e: print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr) traceback.print_exc() vince_pool_manager_singleton = None