eeuuia commited on
Commit
3f8bf2e
·
verified ·
1 Parent(s): 20cad86

Update api/ltx_server_refactored_complete.py

Browse files
Files changed (1) hide show
  1. api/ltx_server_refactored_complete.py +189 -577
api/ltx_server_refactored_complete.py CHANGED
@@ -1,602 +1,214 @@
1
- # FILE: ltx_server_refactored_complete.py
2
- # DESCRIPTION: Backend service for video generation using LTX-Video pipeline.
3
- # Features modular generation, narrative chunking, and resource management.
4
 
5
- import warnings
6
- import os, subprocess, shlex, tempfile
7
- import torch
8
- import json
9
- import numpy as np
10
- import random
11
  import os
12
- import io
13
- import shlex
14
- import yaml
15
- from typing import Dict, List, Optional, Tuple
16
- from pathlib import Path
17
- import imageio
18
- from PIL import Image
19
- import tempfile
20
- from huggingface_hub import hf_hub_download
21
  import sys
22
- import subprocess
23
  import gc
24
- import shutil
25
- import contextlib
26
- import time
27
- import traceback
28
- from api.gpu_manager import gpu_manager
29
- from einops import rearrange
30
- import torch.nn.functional as F
31
- from managers.vae_manager import vae_manager_singleton
32
- from tools.video_encode_tool import video_encode_tool_singleton
33
-
34
-
35
- from huggingface_hub import logging
36
- logging.set_verbosity_error()
37
- logging.set_verbosity_warning()
38
- logging.set_verbosity_info()
39
- logging.set_verbosity_debug()
40
- # Suppress excessive logs from external libraries
41
- warnings.filterwarnings("ignore")
42
-
43
- # ==============================================================================
44
- # --- INITIAL SETUP & CONFIGURATION ---
45
- # ==============================================================================
46
-
47
-
48
-
49
- # --- CONSTANTS ---
50
- DEPS_DIR = Path("/data")
51
- LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video"
52
- BASE_CONFIG_PATH = LTX_VIDEO_REPO_DIR / "configs"
53
- DEFAULT_CONFIG_FILE = BASE_CONFIG_PATH / "ltxv-13b-0.9.8-distilled-fp8.yaml"
54
- LTX_REPO_ID = "Lightricks/LTX-Video"
55
- RESULTS_DIR = Path("/app/output")
56
- DEFAULT_FPS = 24.0
57
- FRAMES_ALIGNMENT = 8
58
-
59
- def add_deps_to_path():
60
- repo_path = str(LTX_VIDEO_REPO_DIR.resolve())
61
- if str(LTX_VIDEO_REPO_DIR.resolve()) not in sys.path:
62
- sys.path.insert(0, repo_path)
63
- print(f"[DEBUG] Repo adicionado ao sys.path: {repo_path}")
64
- add_deps_to_path()
65
-
66
- from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXMultiScalePipeline
67
- from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
68
- from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents
69
- from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent
70
- from api.ltx.inference import (
71
- create_ltx_video_pipeline,
72
- create_latent_upsampler,
73
- load_image_to_tensor_with_resize_and_crop,
74
- seed_everething,
75
- )
76
-
77
-
78
-
79
- # ==============================================================================
80
- # --- UTILITY & HELPER FUNCTIONS ---
81
- # ==============================================================================
82
-
83
- def seed_everything(seed: int):
84
- """Sets the seed for reproducibility across all relevant libraries."""
85
- random.seed(seed)
86
- os.environ['PYTHONHASHSEED'] = str(seed)
87
- np.random.seed(seed)
88
- torch.manual_seed(seed)
89
- torch.cuda.manual_seed_all(seed)
90
- # Potentially faster, but less reproducible
91
- # torch.backends.cudnn.deterministic = False
92
- # torch.backends.cudnn.benchmark = True
93
-
94
- def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]:
95
- """Calculates symmetric padding values to reach a target dimension."""
96
- pad_h = target_h - orig_h
97
- pad_w = target_w - orig_w
98
- pad_top = pad_h // 2
99
- pad_bottom = pad_h - pad_top
100
- pad_left = pad_w // 2
101
- pad_right = pad_w - pad_left
102
- return (pad_left, pad_right, pad_top, pad_bottom)
103
 
104
- def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"):
105
- """Logs detailed information about a PyTorch tensor for debugging."""
106
- if not isinstance(tensor, torch.Tensor):
107
- logging.debug(f"'{name}' is not a tensor.")
108
- return
109
-
110
- info_str = (
111
- f"--- Tensor: {name} ---\n"
112
- f" - Shape: {tuple(tensor.shape)}\n"
113
- f" - Dtype: {tensor.dtype}\n"
114
- f" - Device: {tensor.device}\n"
115
- )
116
- if tensor.numel() > 0:
117
- try:
118
- info_str += (
119
- f" - Min: {tensor.min().item():.4f} | "
120
- f"Max: {tensor.max().item():.4f} | "
121
- f"Mean: {tensor.mean().item():.4f}\n"
122
- )
123
- except Exception:
124
- pass # Fails on some dtypes
125
- logging.debug(info_str + "----------------------")
126
 
 
 
 
 
 
 
 
127
 
128
- # ==============================================================================
129
- # --- VIDEO SERVICE CLASS ---
130
- # ==============================================================================
131
 
132
- class VideoService:
 
133
  """
134
- Backend service for orchestrating video generation using the LTX-Video pipeline.
135
- Encapsulates model loading, state management, and the logic for multi-stage
136
- video generation (low-resolution, upscale).
137
  """
138
-
139
- def __init__(self):
140
- t0 = time.perf_counter()
141
- print("[DEBUG] Inicializando VideoService...")
142
-
143
- # 1. Obter o dispositivo alvo a partir do gerenciador
144
- # Não definimos `self.device` ainda, apenas guardamos o alvo.
145
- target_device = gpu_manager.get_ltx_device()
146
- print(f"[DEBUG] LTX foi alocado para o dispositivo: {target_device}")
147
-
148
- # 2. Carregar a configuração e os modelos (na CPU, como a função _load_models faz)
149
- self.config = self._load_config()
150
- self.pipeline, self.latent_upsampler = self._load_models()
151
-
152
- # 3. Mover os modelos para o dispositivo alvo e definir `self.device`
153
- self.move_to_device(target_device) # Usando a função que já criamos!
154
-
155
- # 4. Configurar o resto dos componentes com o dispositivo correto
156
- self._apply_precision_policy()
157
- vae_manager_singleton.attach_pipeline(
158
- self.pipeline,
159
- device=self.device, # Agora `self.device` está correto
160
- autocast_dtype=self.runtime_autocast_dtype
161
- )
162
- self._tmp_dirs = set()
163
- print(f"[DEBUG] VideoService pronto. boot_time={time.perf_counter()-t0:.3f}s")
164
-
165
- # A função move_to_device que criamos antes é essencial aqui
166
- def move_to_device(self, device):
167
- """Move os modelos do pipeline para o dispositivo especificado."""
168
- print(f"[LTX] Movendo modelos para {device}...")
169
- self.device = torch.device(device) # Garante que é um objeto torch.device
170
- self.pipeline.to(self.device)
171
- if self.latent_upsampler:
172
- self.latent_upsampler.to(self.device)
173
- print(f"[LTX] Modelos agora estão em {self.device}.")
174
-
175
- def move_to_cpu(self):
176
- """Move os modelos para a CPU para liberar VRAM."""
177
- self.move_to_device(torch.device("cpu"))
178
- if torch.cuda.is_available():
179
- torch.cuda.empty_cache()
180
-
181
-
182
- # ==========================================================================
183
- # --- LIFECYCLE & MODEL MANAGEMENT ---
184
- # ==========================================================================
185
-
186
- def _load_config(self):
187
- base = LTX_VIDEO_REPO_DIR / "configs"
188
- config_path = base / "ltxv-13b-0.9.8-distilled-fp8.yaml"
189
- with open(config_path, "r") as file:
190
- return yaml.safe_load(file)
191
-
192
- def finalize(self, keep_paths=None, extra_paths=None, clear_gpu=True):
193
- print("[DEBUG] Finalize: iniciando limpeza...")
194
- keep = set(keep_paths or []); extras = set(extra_paths or [])
195
- gc.collect()
196
- try:
197
- if clear_gpu and torch.cuda.is_available():
198
- torch.cuda.empty_cache()
199
- try:
200
- torch.cuda.ipc_collect()
201
- except Exception:
202
- pass
203
- except Exception as e:
204
- print(f"[DEBUG] Finalize: limpeza GPU falhou: {e}")
205
-
206
- def _load_models(self):
207
- t0 = time.perf_counter()
208
- LTX_REPO = "Lightricks/LTX-Video"
209
- print("[DEBUG] Baixando checkpoint principal...")
210
- distilled_model_path = hf_hub_download(
211
- repo_id=LTX_REPO,
212
- filename=self.config["checkpoint_path"],
213
- local_dir=os.getenv("HF_HOME"),
214
- cache_dir=os.getenv("HF_HOME_CACHE"),
215
- token=os.getenv("HF_TOKEN"),
216
- )
217
- self.config["checkpoint_path"] = distilled_model_path
218
- print(f"[DEBUG] Checkpoint em: {distilled_model_path}")
219
-
220
- print("[DEBUG] Baixando upscaler espacial...")
221
- spatial_upscaler_path = hf_hub_download(
222
- repo_id=LTX_REPO,
223
- filename=self.config["spatial_upscaler_model_path"],
224
- local_dir=os.getenv("HF_HOME"),
225
- cache_dir=os.getenv("HF_HOME_CACHE"),
226
- token=os.getenv("HF_TOKEN")
227
- )
228
- self.config["spatial_upscaler_model_path"] = spatial_upscaler_path
229
- print(f"[DEBUG] Upscaler em: {spatial_upscaler_path}")
230
-
231
- print("[DEBUG] Construindo pipeline...")
232
- pipeline = create_ltx_video_pipeline(
233
- ckpt_path=self.config["checkpoint_path"],
234
- precision=self.config["precision"],
235
- text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
236
- sampler=self.config["sampler"],
237
- device="cpu",
238
- enhance_prompt=False,
239
- prompt_enhancer_image_caption_model_name_or_path=self.config["prompt_enhancer_image_caption_model_name_or_path"],
240
- prompt_enhancer_llm_model_name_or_path=self.config["prompt_enhancer_llm_model_name_or_path"],
241
- )
242
- print("[DEBUG] Pipeline pronto.")
243
-
244
- latent_upsampler = None
245
- if self.config.get("spatial_upscaler_model_path"):
246
- print("[DEBUG] Construindo latent_upsampler...")
247
- latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu")
248
- print("[DEBUG] Upsampler pronto.")
249
- print(f"[DEBUG] _load_models() tempo total={time.perf_counter()-t0:.3f}s")
250
- return pipeline, latent_upsampler
251
-
252
- def _apply_precision_policy(self):
253
- prec = str(self.config.get("precision", "")).lower()
254
- self.runtime_autocast_dtype = torch.float32
255
- if prec in ["float8_e4m3fn", "bfloat16"]:
256
- self.runtime_autocast_dtype = torch.bfloat16
257
- elif prec == "mixed_precision":
258
- self.runtime_autocast_dtype = torch.float16
259
-
260
- def _register_tmp_dir(self, d: str):
261
- if d and os.path.isdir(d):
262
- self._tmp_dirs.add(d); print(f"[DEBUG] Registrado tmp dir: {d}")
263
-
264
- @torch.no_grad()
265
- def _upsample_latents_internal(self, latents: torch.Tensor) -> torch.Tensor:
266
- try:
267
- if not self.latent_upsampler:
268
- raise ValueError("Latent Upsampler não está carregado.")
269
- latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True)
270
- upsampled_latents = self.latent_upsampler(latents_unnormalized)
271
- return normalize_latents(upsampled_latents, self.pipeline.vae, vae_per_channel_normalize=True)
272
- except Exception as e:
273
- pass
274
- finally:
275
- torch.cuda.empty_cache()
276
- torch.cuda.ipc_collect()
277
- self.finalize(keep_paths=[])
278
-
279
- def _prepare_conditioning_tensor(self, filepath, height, width, padding_values):
280
- tensor = load_image_to_tensor_with_resize_and_crop(filepath, height, width)
281
- tensor = torch.nn.functional.pad(tensor, padding_values)
282
- log_tensor_info(tensor, f"_prepare_conditioning_tensor")
283
- return tensor.to(self.device, dtype=self.runtime_autocast_dtype)
284
-
285
-
286
- def _save_and_log_video(self, pixel_tensor, base_filename, fps, temp_dir, results_dir, used_seed, progress_callback=None):
287
- output_path = os.path.join(temp_dir, f"{base_filename}_.mp4")
288
- video_encode_tool_singleton.save_video_from_tensor(
289
- pixel_tensor, output_path, fps=fps, progress_callback=progress_callback
290
- )
291
- final_path = os.path.join(results_dir, f"{base_filename}_.mp4")
292
- shutil.move(output_path, final_path)
293
- print(f"[DEBUG] Vídeo salvo em: {final_path}")
294
- return final_path
295
-
296
- def _load_tensor(self, caminho):
297
- # Se já é um tensor, retorna diretamente
298
- if isinstance(caminho, torch.Tensor):
299
- return caminho
300
- # Se é bytes, carrega do buffer
301
- if isinstance(caminho, (bytes, bytearray)):
302
- return torch.load(io.BytesIO(caminho))
303
- # Caso contrário, assume que é um caminho de arquivo
304
- return torch.load(caminho)
305
-
306
- # ==========================================================================
307
- # --- PUBLIC ORCHESTRATORS ---
308
- # These are the main entry points called by the frontend.
309
- # ==========================================================================
310
-
311
- def generate_narrative_low(self, prompt: str, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]:
312
  """
313
- [ORCHESTRATOR] Generates a video from a multi-line prompt, creating a sequence of scenes.
314
-
315
- Returns:
316
- A tuple of (video_path, latents_path, used_seed).
317
  """
318
- logging.info("Starting narrative low-res generation...")
319
- used_seed = self._resolve_seed(kwargs.get("seed"))
320
- seed_everything(used_seed)
321
-
322
- prompt_list = [p.strip() for p in prompt.splitlines() if p.strip()]
323
- if not prompt_list:
324
- raise ValueError("Prompt is empty or contains no valid lines.")
325
-
326
- num_chunks = len(prompt_list)
327
- total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0))
328
- frames_per_chunk = (total_frames // num_chunks // FRAMES_ALIGNMENT) * FRAMES_ALIGNMENT
329
- overlap_frames = self.config.get("overlap_frames", 8)
330
-
331
- all_latents_paths = []
332
- overlap_condition_item = None
333
-
334
  try:
335
- for i, chunk_prompt in enumerate(prompt_list):
336
- logging.info(f"Generating narrative chunk {i+1}/{num_chunks}: '{chunk_prompt[:50]}...'")
337
-
338
- current_frames = frames_per_chunk
339
- if i > 0:
340
- current_frames += overlap_frames
341
-
342
- # Use initial image conditions only for the first chunk
343
- current_conditions = kwargs.get("initial_conditions", []) if i == 0 else []
344
- if overlap_condition_item:
345
- current_conditions.append(overlap_condition_item)
346
-
347
- chunk_latents = self._generate_single_chunk_low(
348
- prompt=chunk_prompt,
349
- num_frames=current_frames,
350
- seed=used_seed + i,
351
- conditioning_items=current_conditions,
352
- **kwargs
353
- )
354
-
355
- if chunk_latents is None:
356
- raise RuntimeError(f"Failed to generate latents for chunk {i+1}.")
 
 
357
 
358
- # Create overlap for the next chunk
359
- if i < num_chunks - 1:
360
- overlap_latents = chunk_latents[:, :, -overlap_frames:, :, :].clone()
361
- log_tensor_info(overlap_latents, f"Overlap Latents from chunk {i+1}")
362
- overlap_condition_item = ConditioningItem(
363
- media_item=overlap_latents, media_frame_number=0, conditioning_strength=1.0
364
- )
365
-
366
- # Trim the overlap from the current chunk before saving
367
- if i > 0:
368
- chunk_latents = chunk_latents[:, :, overlap_frames:, :, :]
369
-
370
- # Save chunk latents to disk to manage memory
371
- chunk_path = RESULTS_DIR / f"chunk_{i}_{used_seed}.pt"
372
- torch.save(chunk_latents.cpu(), chunk_path)
373
- all_latents_paths.append(chunk_path)
374
 
375
- # Concatenate, decode, and save the final video
376
- return self._finalize_generation(all_latents_paths, "narrative_video", used_seed)
 
377
 
378
- except Exception as e:
379
- logging.error(f"Error during narrative generation: {e}")
380
- traceback.print_exc()
381
- return None, None, None
 
 
382
  finally:
383
- # Clean up intermediate chunk files
384
- for path in all_latents_paths:
385
- if os.path.exists(path):
386
- os.remove(path)
387
- self.finalize()
388
-
389
-
390
- def generate_single_low(self, **kwargs) -> Tuple[Optional[str], Optional[str], Optional[int]]:
391
- """
392
- [ORCHESTRATOR] Generates a video from a single prompt in one go.
393
-
394
- Returns:
395
- A tuple of (video_path, latents_path, used_seed).
396
- """
397
- logging.info("Starting single-prompt low-res generation...")
398
- used_seed = self._resolve_seed(kwargs.get("seed"))
399
- seed_everything(used_seed)
400
-
401
  try:
402
- total_frames = self._calculate_aligned_frames(kwargs.get("duration", 4.0), min_frames=9)
 
 
 
 
 
 
 
 
 
 
403
 
404
- final_latents = self._generate_single_chunk_low(
405
- num_frames=total_frames,
406
- seed=used_seed,
407
- conditioning_items=kwargs.get("initial_conditions", []),
408
- **kwargs
409
- )
410
-
411
- if final_latents is None:
412
- raise RuntimeError("Failed to generate latents.")
413
-
414
- # Save latents to a single file, then decode and save video
415
- latents_path = RESULTS_DIR / f"single_{used_seed}.pt"
416
- torch.save(final_latents.cpu(), latents_path)
417
- return self._finalize_generation([latents_path], "single_video", used_seed)
418
-
419
- except Exception as e:
420
- logging.error(f"Error during single generation: {e}")
421
- traceback.print_exc()
422
- return None, None, None
423
  finally:
424
- self.finalize()
425
-
426
-
427
- # ==========================================================================
428
- # --- INTERNAL WORKER UNITS ---
429
- # ==========================================================================
430
-
431
- def _generate_single_chunk_low(
432
- self, prompt: str, negative_prompt: str, height: int, width: int, num_frames: int, seed: int,
433
- conditioning_items: List[ConditioningItem], ltx_configs_override: Optional[Dict], **kwargs
434
- ) -> Optional[torch.Tensor]:
435
- """
436
- [WORKER] Generates a single chunk of latents. This is the core generation unit.
437
- Returns the raw latents tensor on the target device, or None on failure.
438
- """
439
- height_padded, width_padded = (self._align(d) for d in (height, width))
440
- downscale_factor = self.config.get("downscale_factor", 0.6666666)
441
- vae_scale_factor = self.pipeline.vae_scale_factor
442
-
443
- downscaled_height = self._align(int(height_padded * downscale_factor), vae_scale_factor)
444
- downscaled_width = self._align(int(width_padded * downscale_factor), vae_scale_factor)
445
-
446
- first_pass_config = self.config.get("first_pass", {}).copy()
447
- if ltx_configs_override:
448
- first_pass_config.update(self._prepare_guidance_overrides(ltx_configs_override))
449
-
450
- pipeline_kwargs = {
451
- "prompt": prompt,
452
- "negative_prompt": negative_prompt,
453
- "height": downscaled_height,
454
- "width": downscaled_width,
455
- "num_frames": num_frames,
456
- "frame_rate": DEFAULT_FPS,
457
- "generator": torch.Generator(device=self.device).manual_seed(seed),
458
- "output_type": "latent",
459
- "conditioning_items": conditioning_items,
460
- **first_pass_config
461
- }
462
-
463
- logging.debug(f"Pipeline call args: { {k: v for k, v in pipeline_kwargs.items() if k != 'conditioning_items'} }")
464
-
465
- with torch.autocast(device_type=self.device.type, dtype=self.runtime_autocast_dtype, enabled=self.device.type == 'cuda'):
466
- latents_raw = self.pipeline(**pipeline_kwargs).images
467
-
468
- log_tensor_info(latents_raw, f"Raw Latents for '{prompt[:40]}...'")
469
- return latents_raw
470
-
471
-
472
- # ==========================================================================
473
- # --- HELPERS & UTILITY METHODS ---
474
- # ==========================================================================
475
-
476
- def _finalize_generation(self, latents_paths: List[Path], base_filename: str, seed: int) -> Tuple[str, str, int]:
477
- """
478
- Loads latents from paths, concatenates them, decodes to video, and saves both.
479
- """
480
- logging.info("Finalizing generation: decoding latents to video.")
481
- # Load all tensors and concatenate them on the CPU first
482
- all_tensors_cpu = [torch.load(p) for p in latents_paths]
483
- final_latents_cpu = torch.cat(all_tensors_cpu, dim=2)
484
-
485
- # Save final combined latents
486
- final_latents_path = RESULTS_DIR / f"latents_{base_filename}_{seed}.pt"
487
- torch.save(final_latents_cpu, final_latents_path)
488
- logging.info(f"Final latents saved to: {final_latents_path}")
489
-
490
- # Move to GPU for decoding
491
- final_latents_gpu = final_latents_cpu.to(self.device)
492
- log_tensor_info(final_latents_gpu, "Final Concatenated Latents")
493
-
494
- with torch.autocast(device_type=self.device.type, dtype=self.runtime_autocast_dtype, enabled=self.device.type == 'cuda'):
495
- pixel_tensor = vae_manager_singleton.decode(
496
- final_latents_gpu,
497
- decode_timestep=float(self.config.get("decode_timestep", 0.05))
498
- )
499
-
500
- video_path = self._save_and_log_video(pixel_tensor, f"{base_filename}_{seed}")
501
- return str(video_path), str(final_latents_path), seed
502
-
503
- def prepare_condition_items(self, items_list: List, height: int, width: int, num_frames: int) -> List[ConditioningItem]:
504
- """Prepares a list of ConditioningItem objects from file paths or tensors."""
505
- if not items_list:
506
- return []
507
-
508
- height_padded, width_padded = self._align(height), self._align(width)
509
- padding_values = calculate_padding(height, width, height_padded, width_padded)
510
-
511
- conditioning_items = []
512
- for media, frame, weight in items_list:
513
- tensor = self._prepare_conditioning_tensor(media, height, width, padding_values)
514
- safe_frame = max(0, min(int(frame), num_frames - 1))
515
- conditioning_items.append(ConditioningItem(tensor, safe_frame, float(weight)))
516
- return conditioning_items
517
-
518
- def _prepare_conditioning_tensor(self, media_path: str, height: int, width: int, padding: Tuple) -> torch.Tensor:
519
- """Loads and processes an image to be a conditioning tensor."""
520
- tensor = load_image_to_tensor_with_resize_and_crop(media_path, height, width)
521
- tensor = torch.nn.functional.pad(tensor, padding)
522
- log_tensor_info(tensor, f"Prepared Conditioning Tensor from {media_path}")
523
- return tensor.to(self.device, dtype=self.runtime_autocast_dtype)
524
-
525
- def _prepare_guidance_overrides(self, ltx_configs: Dict) -> Dict:
526
- """Parses UI presets for guidance into pipeline-compatible arguments."""
527
- overrides = {}
528
- preset = ltx_configs.get("guidance_preset", "Padrão (Recomendado)")
529
-
530
- # Default LTX values are used if preset is 'Padrão'
531
- if preset == "Agressivo":
532
- overrides["guidance_scale"] = [1, 2, 8, 12, 8, 2, 1]
533
- overrides["stg_scale"] = [0, 0, 5, 6, 5, 3, 2]
534
- elif preset == "Suave":
535
- overrides["guidance_scale"] = [1, 1, 4, 5, 4, 1, 1]
536
- overrides["stg_scale"] = [0, 0, 2, 2, 2, 1, 0]
537
- elif preset == "Customizado":
538
- try:
539
- overrides["guidance_scale"] = json.loads(ltx_configs["guidance_scale_list"])
540
- overrides["stg_scale"] = json.loads(ltx_configs["stg_scale_list"])
541
- except (json.JSONDecodeError, KeyError) as e:
542
- logging.warning(f"Failed to parse custom guidance values: {e}. Falling back to defaults.")
543
-
544
- if overrides:
545
- logging.info(f"Applying '{preset}' guidance preset overrides.")
546
- return overrides
547
-
548
- def _save_and_log_video(self, pixel_tensor: torch.Tensor, base_filename: str) -> Path:
549
- """Saves a pixel tensor to an MP4 file and returns the final path."""
550
- # Work in a temporary directory to handle atomic move
551
- with tempfile.TemporaryDirectory() as temp_dir:
552
- temp_path = os.path.join(temp_dir, f"{base_filename}.mp4")
553
- video_encode_tool_singleton.save_video_from_tensor(
554
- pixel_tensor, temp_path, fps=DEFAULT_FPS
555
- )
556
- final_path = RESULTS_DIR / f"{base_filename}.mp4"
557
- shutil.move(temp_path, final_path)
558
- logging.info(f"Video saved successfully to: {final_path}")
559
- return final_path
560
-
561
- def _apply_precision_policy(self):
562
- """Sets the autocast dtype based on the configuration file."""
563
- precision = str(self.config.get("precision", "bfloat16")).lower()
564
- if precision in ["float8_e4m3fn", "bfloat16"]:
565
- self.runtime_autocast_dtype = torch.bfloat16
566
- elif precision == "mixed_precision":
567
- self.runtime_autocast_dtype = torch.float16
568
- else:
569
- self.runtime_autocast_dtype = torch.float32
570
- logging.info(f"Runtime precision policy set for autocast: {self.runtime_autocast_dtype}")
571
 
572
- def _align(self, dim: int, alignment: int = FRAMES_ALIGNMENT) -> int:
573
- """Aligns a dimension to the nearest multiple of `alignment`."""
574
- return ((dim - 1) // alignment + 1) * alignment
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
575
 
576
- def _calculate_aligned_frames(self, duration_s: float, min_frames: int = 1) -> int:
577
- """Calculates the total number of frames based on duration, ensuring alignment."""
578
- num_frames = int(round(duration_s * DEFAULT_FPS))
579
- aligned_frames = self._align(num_frames)
580
- # Ensure it's at least 1 frame longer than the alignment for some ops, and respects min_frames
581
- final_frames = max(aligned_frames + 1, min_frames)
582
- return final_frames
583
-
584
- def _resolve_seed(self, seed: Optional[int]) -> int:
585
- """Returns the given seed or generates a new random one."""
586
- return random.randint(0, 2**32 - 1) if seed is None else int(seed)
587
-
588
-
589
- # ==============================================================================
590
- # --- SINGLETON INSTANTIATION ---
591
- # ==============================================================================
592
- # The service is instantiated once when the module is imported, ensuring a single
593
- # instance manages the models and GPU resources throughout the application's life.
594
-
595
  try:
596
- video_generation_service = VideoService()
597
- logging.info("Global VideoService instance created successfully.")
598
  except Exception as e:
599
- logging.critical(f"Failed to initialize VideoService: {e}")
600
  traceback.print_exc()
601
- # Exit if the core service fails to start, as the app is non-functional
602
- sys.exit(1)
 
1
+ # FILE: api/vince_pool_manager.py
2
+ # DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager.
 
3
 
 
 
 
 
 
 
4
  import os
 
 
 
 
 
 
 
 
 
5
  import sys
 
6
  import gc
7
+ import subprocess
8
+ import threading
9
+ from pathlib import Path
10
+ from typing import List
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
+ import torch
13
+ from omegaconf import open_dict
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
+ # --- Import do Gerenciador Central de GPUs ---
16
+ # Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar.
17
+ try:
18
+ from api.gpu_manager import gpu_manager
19
+ except ImportError as e:
20
+ print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr)
21
+ sys.exit(1)
22
 
23
+ # --- Configurações Globais (Lidas do Ambiente) ---
24
+ VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
25
+ VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B"))
26
 
27
+ # --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
28
+ class VinceWorker:
29
  """
30
+ Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.
31
+ Opera em um ambiente "isolado" para garantir que veja sua própria GPU.
 
32
  """
33
+ def __init__(self, device_id: str, config_path: str):
34
+ self.device_id_str = device_id
35
+ self.gpu_index_str = self.device_id_str.split(':')[-1]
36
+ self.config_path = config_path
37
+ self.gen = None
38
+ self.config = None
39
+ print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.")
40
+
41
+ def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  """
43
+ Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.
44
+ Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker.
 
 
45
  """
46
+ original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  try:
48
+ os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str
49
+ if torch.cuda.is_available():
50
+ # Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível.
51
+ torch.cuda.set_device(0)
52
+ return function_to_run(*args, **kwargs)
53
+ finally:
54
+ # Restaura o ambiente original para não afetar outros threads/processos.
55
+ if original_cuda_visible is not None:
56
+ os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
57
+ elif 'CUDA_VISIBLE_DEVICES' in os.environ:
58
+ del os.environ['CUDA_VISIBLE_DEVICES']
59
+
60
+ def _load_model_task(self):
61
+ """Tarefa de carregamento do modelo, executada no ambiente isolado."""
62
+ print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...")
63
+ # O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver.
64
+ device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu'
65
+
66
+ original_cwd = Path.cwd()
67
+ try:
68
+ # O código do VINCIE pode precisar ser executado de seu próprio diretório.
69
+ os.chdir(str(VINCIE_DIR))
70
+ # Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE.
71
+ if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
72
 
73
+ from common.config import load_config, create_object
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
+ cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
76
+ self.gen = create_object(cfg)
77
+ self.config = cfg
78
 
79
+ # Executa os passos de configuração internos do VINCIE.
80
+ for name in ("configure_persistence", "configure_models", "configure_diffusion"):
81
+ getattr(self.gen, name)()
82
+
83
+ self.gen.to(torch.device(device_for_vincie))
84
+ print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.")
85
  finally:
86
+ os.chdir(original_cwd) # Restaura o diretório de trabalho original.
87
+
88
+ def load_model_to_gpu(self):
89
+ """Método público para carregar o modelo, garantindo o isolamento da GPU."""
90
+ if self.gen is None:
91
+ self._execute_in_isolated_env(self._load_model_task)
92
+
93
+ def _infer_task(self, **kwargs) -> Path:
94
+ """Tarefa de inferência, executada no ambiente isolado."""
95
+ original_cwd = Path.cwd()
 
 
 
 
 
 
 
 
96
  try:
97
+ os.chdir(str(VINCIE_DIR))
98
+
99
+ # Atualiza a configuração do gerador com os parâmetros da chamada atual.
100
+ with open_dict(self.gen.config):
101
+ self.gen.config.generation.output.dir = str(kwargs["output_dir"])
102
+ image_paths = kwargs.get("image_path", [])
103
+ self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
104
+ if "prompts" in kwargs:
105
+ self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
106
+ if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
107
+ self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
108
 
109
+ # Inicia o loop de inferência do VINCIE.
110
+ self.gen.inference_loop()
111
+ return Path(kwargs["output_dir"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  finally:
113
+ os.chdir(original_cwd)
114
+ # Limpeza de memória após a inferência.
115
+ gc.collect()
116
+ if torch.cuda.is_available():
117
+ torch.cuda.empty_cache()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
+ def infer(self, **kwargs) -> Path:
120
+ """Método público para iniciar a inferência, garantindo o isolamento da GPU."""
121
+ if self.gen is None:
122
+ raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
123
+ return self._execute_in_isolated_env(self._infer_task, **kwargs)
124
+
125
+
126
+ # --- Classe Pool Manager (A Orquestradora Singleton) ---
127
+ class VincePoolManager:
128
+ _instance = None
129
+ _lock = threading.Lock()
130
+
131
+ def __new__(cls, *args, **kwargs):
132
+ with cls._lock:
133
+ if cls._instance is None:
134
+ cls._instance = super().__new__(cls)
135
+ cls._instance._initialized = False
136
+ return cls._instance
137
+
138
+ def __init__(self, output_root: str = "/app/outputs"):
139
+ if self._initialized: return
140
+ with self._lock:
141
+ if self._initialized: return
142
+
143
+ print("⚙️ Inicializando o VincePoolManager Singleton...")
144
+ self.output_root = Path(output_root)
145
+ self.output_root.mkdir(parents=True, exist_ok=True)
146
+ self.worker_lock = threading.Lock()
147
+ self.next_worker_idx = 0
148
+
149
+ # Pergunta ao gerenciador central quais GPUs ele pode usar.
150
+ self.allocated_gpu_indices = gpu_manager.get_vincie_devices()
151
+
152
+ if not self.allocated_gpu_indices:
153
+ # Se não houver GPUs alocadas, não podemos continuar.
154
+ # O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui.
155
+ print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.")
156
+ self.workers = []
157
+ self._initialized = True
158
+ return
159
+
160
+ devices = [f'cuda:{i}' for i in self.allocated_gpu_indices]
161
+ vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
162
+ if not vincie_config_path.exists():
163
+ raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}")
164
+
165
+ self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
166
+
167
+ print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...")
168
+ threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
169
+ for t in threads: t.start()
170
+ for t in threads: t.join()
171
+
172
+ self._initialized = True
173
+ print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
174
+
175
+ def _get_next_worker(self) -> VinceWorker:
176
+ """Seleciona o próximo worker disponível usando uma estratégia round-robin."""
177
+ if not self.workers:
178
+ raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.")
179
+
180
+ with self.worker_lock:
181
+ worker = self.workers[self.next_worker_idx]
182
+ self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
183
+ print(f"Tarefa despachada para o worker: {worker.device_id_str}")
184
+ return worker
185
+
186
+ def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
187
+ """Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos)."""
188
+ worker = self._get_next_worker()
189
+ out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
190
+ out_dir.mkdir(parents=True)
191
+
192
+ infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
193
+ return worker.infer(**infer_kwargs)
194
 
195
+ def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
196
+ """Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final."""
197
+ worker = self._get_next_worker()
198
+ out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}"
199
+ out_dir.mkdir(parents=True)
200
+
201
+ all_prompts = concept_prompts + [final_prompt]
202
+ infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
203
+ return worker.infer(**infer_kwargs)
204
+
205
+ # --- Instância Singleton Global ---
206
+ # A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre
207
+ # se o VINCIE não puder ser inicializado por algum motivo.
 
 
 
 
 
 
208
  try:
209
+ output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
210
+ vince_pool_manager_singleton = VincePoolManager(output_root=output_root_path)
211
  except Exception as e:
212
+ print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
213
  traceback.print_exc()
214
+ vince_pool_manager_singleton = None