File size: 16,734 Bytes
532ae25 906ad5a abb29fd 906ad5a 532ae25 906ad5a 532ae25 906ad5a 532ae25 906ad5a 532ae25 906ad5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# managers/ltx_manager.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Version: 2.1.0
#
# This file manages the LTX-Video specialist pool. It now includes a crucial
# "monkey patch" for the LTX pipeline's `prepare_conditioning` method. This approach
# isolates our ADUC-specific modifications from the original library code, ensuring
# better maintainability and respecting the principle of separation of concerns.
import torch
import gc
import os
import yaml
import logging
import huggingface_hub
import time
import threading
from typing import Optional, List, Tuple, Union
from optimization import optimize_ltx_worker, can_optimize_fp8
from hardware_manager import hardware_manager
from managers.ltx_pipeline_utils import create_ltx_video_pipeline, calculate_padding
from ltx_video.pipelines.pipeline_ltx_video import LTXVideoPipeline, ConditioningItem, LatentConditioningItem
from ltx_video.models.autoencoders.vae_encode import vae_encode, latent_to_pixel_coords
from ltx_video.pipelines.pipeline_ltx_video import LTXMultiScalePipeline
from diffusers.utils.torch_utils import randn_tensor
logger = logging.getLogger(__name__)
# --- MONKEY PATCHING SECTION ---
# This section contains our custom logic that will override the default
# behavior of the LTX pipeline at runtime.
def _aduc_prepare_conditioning_patch(
self: LTXVideoPipeline, # 'self' will be the instance of the LTXVideoPipeline
conditioning_items: Optional[List[Union[ConditioningItem, "LatentConditioningItem"]]],
init_latents: torch.Tensor,
num_frames: int,
height: int,
width: int,
vae_per_channel_normalize: bool = False,
generator=None,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, int]:
"""
This is our custom version of the `prepare_conditioning` method.
It correctly handles both standard ConditioningItem (from pixels) and our
ADUC-specific LatentConditioningItem (from latents), which the original
method does not. This function will replace the original one at runtime.
"""
if not conditioning_items:
init_latents, init_latent_coords = self.patchifier.patchify(latents=init_latents)
init_pixel_coords = latent_to_pixel_coords(
init_latent_coords, self.vae, causal_fix=self.transformer.config.causal_temporal_positioning
)
return init_latents, init_pixel_coords, None, 0
init_conditioning_mask = torch.zeros(init_latents[:, 0, :, :, :].shape, dtype=torch.float32, device=init_latents.device)
extra_conditioning_latents = []
extra_conditioning_pixel_coords = []
extra_conditioning_mask = []
extra_conditioning_num_latents = 0
is_latent_mode = hasattr(conditioning_items[0], 'latent_tensor')
if is_latent_mode:
for item in conditioning_items:
media_item_latents = item.latent_tensor.to(dtype=init_latents.dtype, device=init_latents.device)
media_frame_number = item.media_frame_number
strength = item.conditioning_strength
if media_frame_number == 0:
f_l, h_l, w_l = media_item_latents.shape[-3:]
init_latents[:, :, :f_l, :h_l, :w_l] = torch.lerp(init_latents[:, :, :f_l, :h_l, :w_l], media_item_latents, strength)
init_conditioning_mask[:, :f_l, :h_l, :w_l] = strength
else:
noise = randn_tensor(media_item_latents.shape, generator=generator, device=media_item_latents.device, dtype=media_item_latents.dtype)
media_item_latents = torch.lerp(noise, media_item_latents, strength)
patched_latents, latent_coords = self.patchifier.patchify(latents=media_item_latents)
pixel_coords = latent_to_pixel_coords(latent_coords, self.vae, causal_fix=self.transformer.config.causal_temporal_positioning)
pixel_coords[:, 0] += media_frame_number
extra_conditioning_num_latents += patched_latents.shape[1]
new_mask = torch.full(patched_latents.shape[:2], strength, dtype=torch.float32, device=init_latents.device)
extra_conditioning_latents.append(patched_latents)
extra_conditioning_pixel_coords.append(pixel_coords)
extra_conditioning_mask.append(new_mask)
else: # Original pixel-based logic for fallback
for item in conditioning_items:
if not isinstance(item, ConditioningItem): continue
item = self._resize_conditioning_item(item, height, width)
media_item_latents = vae_encode(
item.media_item.to(dtype=self.vae.dtype, device=self.vae.device),
self.vae, vae_per_channel_normalize=vae_per_channel_normalize
).to(dtype=init_latents.dtype)
media_frame_number = item.media_frame_number
strength = item.conditioning_strength
if media_frame_number == 0:
media_item_latents, l_x, l_y = self._get_latent_spatial_position(media_item_latents, item, height, width, strip_latent_border=True)
f_l, h_l, w_l = media_item_latents.shape[-3:]
init_latents[:, :, :f_l, l_y:l_y+h_l, l_x:l_x+w_l] = torch.lerp(init_latents[:, :, :f_l, l_y:l_y+h_l, l_x:l_x+w_l], media_item_latents, strength)
init_conditioning_mask[:, :f_l, l_y:l_y+h_l, l_x:l_x+w_l] = strength
else:
logger.warning("Pixel-based conditioning for non-zero frames is not fully implemented in this patch.")
pass
init_latents, init_latent_coords = self.patchifier.patchify(latents=init_latents)
init_pixel_coords = latent_to_pixel_coords(init_latent_coords, self.vae, causal_fix=self.transformer.config.causal_temporal_positioning)
init_conditioning_mask, _ = self.patchifier.patchify(latents=init_conditioning_mask.unsqueeze(1))
init_conditioning_mask = init_conditioning_mask.squeeze(-1)
if extra_conditioning_latents:
init_latents = torch.cat([*extra_conditioning_latents, init_latents], dim=1)
init_pixel_coords = torch.cat([*extra_conditioning_pixel_coords, init_pixel_coords], dim=2)
init_conditioning_mask = torch.cat([*extra_conditioning_mask, init_conditioning_mask], dim=1)
if self.transformer.use_tpu_flash_attention:
init_latents = init_latents[:, :-extra_conditioning_num_latents]
init_pixel_coords = init_pixel_coords[:, :, :-extra_conditioning_num_latents]
init_conditioning_mask = init_conditioning_mask[:, :-extra_conditioning_num_latents]
return init_latents, init_pixel_coords, init_conditioning_mask, extra_conditioning_num_latents
# --- END OF MONKEY PATCHING SECTION ---
class LtxWorker:
"""
Represents a single instance of the LTX-Video pipeline on a specific device.
Manages model loading to CPU and movement to/from GPU.
"""
def __init__(self, device_id, ltx_config_file):
self.cpu_device = torch.device('cpu')
self.device = torch.device(device_id if torch.cuda.is_available() else 'cpu')
logger.info(f"LTX Worker ({self.device}): Initializing with config '{ltx_config_file}'...")
with open(ltx_config_file, "r") as file:
self.config = yaml.safe_load(file)
self.is_distilled = "distilled" in self.config.get("checkpoint_path", "")
models_dir = "downloaded_models_gradio"
logger.info(f"LTX Worker ({self.device}): Loading model to CPU...")
model_path = os.path.join(models_dir, self.config["checkpoint_path"])
if not os.path.exists(model_path):
model_path = huggingface_hub.hf_hub_download(
repo_id="Lightricks/LTX-Video", filename=self.config["checkpoint_path"],
local_dir=models_dir, local_dir_use_symlinks=False
)
self.pipeline = create_ltx_video_pipeline(
ckpt_path=model_path, precision=self.config["precision"],
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
sampler=self.config["sampler"], device='cpu'
)
logger.info(f"LTX Worker ({self.device}): Model ready on CPU. Is distilled model? {self.is_distilled}")
def to_gpu(self):
"""Moves the pipeline to the designated GPU AND optimizes if possible."""
if self.device.type == 'cpu': return
logger.info(f"LTX Worker: Moving pipeline to GPU {self.device}...")
self.pipeline.to(self.device)
if self.device.type == 'cuda' and can_optimize_fp8():
logger.info(f"LTX Worker ({self.device}): FP8 supported GPU detected. Optimizing...")
optimize_ltx_worker(self)
logger.info(f"LTX Worker ({self.device}): Optimization complete.")
elif self.device.type == 'cuda':
logger.info(f"LTX Worker ({self.device}): FP8 optimization not supported or disabled.")
def to_cpu(self):
"""Moves the pipeline back to the CPU and frees GPU memory."""
if self.device.type == 'cpu': return
logger.info(f"LTX Worker: Unloading pipeline from GPU {self.device}...")
self.pipeline.to('cpu')
gc.collect()
if torch.cuda.is_available(): torch.cuda.empty_cache()
def generate_video_fragment_internal(self, **kwargs):
"""Invokes the generation pipeline."""
return self.pipeline(**kwargs).images
class LtxPoolManager:
"""
Manages a pool of LtxWorkers for optimized multi-GPU usage.
HOT START MODE: Keeps all models loaded in VRAM for minimum latency.
"""
def __init__(self, device_ids, ltx_config_file):
logger.info(f"LTX POOL MANAGER: Creating workers for devices: {device_ids}")
self.workers = [LtxWorker(dev_id, ltx_config_file) for dev_id in device_ids]
self.current_worker_index = 0
self.lock = threading.Lock()
self._apply_ltx_pipeline_patches()
if all(w.device.type == 'cuda' for w in self.workers):
logger.info("LTX POOL MANAGER: HOT START MODE ENABLED. Pre-warming all GPUs...")
for worker in self.workers:
worker.to_gpu()
logger.info("LTX POOL MANAGER: All GPUs are hot and ready.")
else:
logger.info("LTX POOL MANAGER: Operating in CPU or mixed mode. GPU pre-warming skipped.")
def _apply_ltx_pipeline_patches(self):
"""
Applies runtime patches to the LTX pipeline for ADUC-SDR compatibility.
"""
logger.info("LTX POOL MANAGER: Applying ADUC-SDR patches to LTX pipeline...")
for worker in self.workers:
worker.pipeline.prepare_conditioning = _aduc_prepare_conditioning_patch.__get__(worker.pipeline, LTXVideoPipeline)
logger.info("LTX POOL MANAGER: All pipeline instances have been patched successfully.")
def _get_next_worker(self):
with self.lock:
worker = self.workers[self.current_worker_index]
self.current_worker_index = (self.current_worker_index + 1) % len(self.workers)
return worker
def _prepare_pipeline_params(self, worker: LtxWorker, **kwargs) -> dict:
pipeline_params = {
"height": kwargs['height'], "width": kwargs['width'], "num_frames": kwargs['video_total_frames'],
"frame_rate": kwargs.get('video_fps', 24),
"generator": torch.Generator(device=worker.device).manual_seed(int(time.time()) + kwargs.get('current_fragment_index', 0)),
"is_video": True, "vae_per_channel_normalize": True,
"prompt": kwargs.get('motion_prompt', ""), "negative_prompt": kwargs.get('negative_prompt', "blurry, distorted, static, bad quality"),
"guidance_scale": kwargs.get('guidance_scale', 1.0), "stg_scale": kwargs.get('stg_scale', 0.0),
"rescaling_scale": kwargs.get('rescaling_scale', 0.15), "num_inference_steps": kwargs.get('num_inference_steps', 20),
"output_type": "latent"
}
if 'latents' in kwargs:
pipeline_params["latents"] = kwargs['latents'].to(worker.device, dtype=worker.pipeline.transformer.dtype)
if 'strength' in kwargs:
pipeline_params["strength"] = kwargs['strength']
if 'conditioning_items_data' in kwargs:
final_conditioning_items = []
for item in kwargs['conditioning_items_data']:
item.latent_tensor = item.latent_tensor.to(worker.device)
final_conditioning_items.append(item)
pipeline_params["conditioning_items"] = final_conditioning_items
if worker.is_distilled:
logger.info(f"Worker {worker.device} is using a distilled model. Using fixed timesteps.")
fixed_timesteps = worker.config.get("first_pass", {}).get("timesteps")
pipeline_params["timesteps"] = fixed_timesteps
if fixed_timesteps:
pipeline_params["num_inference_steps"] = len(fixed_timesteps)
return pipeline_params
def generate_latent_fragment(self, **kwargs) -> (torch.Tensor, tuple):
worker_to_use = self._get_next_worker()
try:
height, width = kwargs['height'], kwargs['width']
padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32
padding_vals = calculate_padding(height, width, padded_h, padded_w)
kwargs['height'], kwargs['width'] = padded_h, padded_w
pipeline_params = self._prepare_pipeline_params(worker_to_use, **kwargs)
logger.info(f"Initiating GENERATION on {worker_to_use.device} with shape {padded_w}x{padded_h}")
if isinstance(worker_to_use.pipeline, LTXMultiScalePipeline):
result = worker_to_use.pipeline.video_pipeline(**pipeline_params).images
else:
result = worker_to_use.generate_video_fragment_internal(**pipeline_params)
return result, padding_vals
except Exception as e:
logger.error(f"LTX POOL MANAGER: Error during generation on {worker_to_use.device}: {e}", exc_info=True)
raise e
finally:
if worker_to_use and worker_to_use.device.type == 'cuda':
with torch.cuda.device(worker_to_use.device):
gc.collect(); torch.cuda.empty_cache()
def refine_latents(self, latents_to_refine: torch.Tensor, **kwargs) -> (torch.Tensor, tuple):
worker_to_use = self._get_next_worker()
try:
_b, _c, _f, latent_h, latent_w = latents_to_refine.shape
vae_scale_factor = worker_to_use.pipeline.vae_scale_factor
kwargs['height'] = latent_h * vae_scale_factor
kwargs['width'] = latent_w * vae_scale_factor
kwargs['video_total_frames'] = kwargs.get('video_total_frames', _f * worker_to_use.pipeline.video_scale_factor)
kwargs['latents'] = latents_to_refine
kwargs['strength'] = kwargs.get('denoise_strength', 0.4)
kwargs['num_inference_steps'] = int(kwargs.get('refine_steps', 10))
pipeline_params = self._prepare_pipeline_params(worker_to_use, **kwargs)
logger.info(f"Initiating REFINEMENT on {worker_to_use.device} with shape {kwargs['width']}x{kwargs['height']}")
pipeline_to_call = worker_to_use.pipeline.video_pipeline if isinstance(worker_to_use.pipeline, LTXMultiScalePipeline) else worker_to_use.pipeline
result = pipeline_to_call(**pipeline_params).images
return result, None
except torch.cuda.OutOfMemoryError as e:
logger.error(f"MEMORY FAILURE DURING REFINEMENT on {worker_to_use.device}: {e}")
logger.warning("Clearing VRAM and returning None to signal failure.")
gc.collect(); torch.cuda.empty_cache()
return None, None
except Exception as e:
logger.error(f"LTX POOL MANAGER: Unexpected error during refinement on {worker_to_use.device}: {e}", exc_info=True)
raise e
finally:
if worker_to_use and worker_to_use.device.type == 'cuda':
with torch.cuda.device(worker_to_use.device):
gc.collect(); torch.cuda.empty_cache()
# --- Singleton Instantiation ---
logger.info("Reading config.yaml to initialize LTX Pool Manager...")
with open("config.yaml", 'r') as f:
config = yaml.safe_load(f)
ltx_gpus_required = config['specialists']['ltx']['gpus_required']
ltx_device_ids = hardware_manager.allocate_gpus('LTX', ltx_gpus_required)
ltx_config_path = config['specialists']['ltx']['config_file']
ltx_manager_singleton = LtxPoolManager(device_ids=ltx_device_ids, ltx_config_file=ltx_config_path)
logger.info("Video Specialist (LTX) ready.") |