Test4 / api /ltx_server_refactored.py
euiiiia's picture
Update api/ltx_server_refactored.py
f9a7e20 verified
raw
history blame
26.4 kB
# ltx_server_clean_refactor.py — VideoService (Modular Version with Simple Overlap Chunking)
# ==============================================================================
# 0. CONFIGURAÇÃO DE AMBIENTE E IMPORTAÇÕES
# ==============================================================================
import os
import sys
import gc
import yaml
import time
import json
import random
import shutil
import warnings
import tempfile
import traceback
import subprocess
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
import cv2
# --- Configurações de Logging e Avisos ---
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
from huggingface_hub import logging as hf_logging
hf_logging.set_verbosity_error()
# --- Importações de Bibliotecas de ML/Processamento ---
import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
from einops import rearrange
from huggingface_hub import hf_hub_download
from safetensors import safe_open
from managers.vae_manager import vae_manager_singleton
from tools.video_encode_tool import video_encode_tool_singleton
from api.aduc_ltx_latent_patch import LTXLatentConditioningPatch, PatchedConditioningItem
# --- Constantes Globais ---
LTXV_DEBUG = True # Mude para False para desativar logs de debug
LTXV_FRAME_LOG_EVERY = 8
DEPS_DIR = Path("/data")
LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video"
RESULTS_DIR = Path("/app/output")
DEFAULT_FPS = 24.0
# ==============================================================================
# 1. SETUP E FUNÇÕES AUXILIARES DE AMBIENTE
# ==============================================================================
def _run_setup_script():
"""Executa o script setup.py se o repositório LTX-Video não existir."""
setup_script_path = "setup.py"
if not os.path.exists(setup_script_path):
print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.")
return
print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...")
try:
subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True)
print("[DEBUG] Script 'setup.py' concluído com sucesso.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}")
sys.exit(1)
def add_deps_to_path(repo_path: Path):
"""Adiciona o diretório do repositório ao sys.path para importações locais."""
resolved_path = str(repo_path.resolve())
if resolved_path not in sys.path:
sys.path.insert(0, resolved_path)
if LTXV_DEBUG:
print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}")
# --- Execução da configuração inicial ---
if not LTX_VIDEO_REPO_DIR.exists():
_run_setup_script()
add_deps_to_path(LTX_VIDEO_REPO_DIR)
# --- Importações Dependentes do Path Adicionado ---
from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents
from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent
from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier
from ltx_video.models.transformers.transformer3d import Transformer3DModel
from ltx_video.schedulers.rf import RectifiedFlowScheduler
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
import ltx_video.pipelines.crf_compressor as crf_compressor
def create_latent_upsampler(latent_upsampler_model_path: str, device: str):
latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path)
latent_upsampler.to(device)
latent_upsampler.eval()
return latent_upsampler
def create_ltx_video_pipeline(
ckpt_path: str,
precision: str,
text_encoder_model_name_or_path: str,
sampler: Optional[str] = None,
device: Optional[str] = None,
enhance_prompt: bool = False,
prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None,
prompt_enhancer_llm_model_name_or_path: Optional[str] = None,
) -> LTXVideoPipeline:
ckpt_path = Path(ckpt_path)
assert os.path.exists(
ckpt_path
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist"
with safe_open(ckpt_path, framework="pt") as f:
metadata = f.metadata()
config_str = metadata.get("config")
configs = json.loads(config_str)
allowed_inference_steps = configs.get("allowed_inference_steps", None)
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path)
transformer = Transformer3DModel.from_pretrained(ckpt_path)
# Use constructor if sampler is specified, otherwise use from_pretrained
if sampler == "from_checkpoint" or not sampler:
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path)
else:
scheduler = RectifiedFlowScheduler(
sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic")
)
text_encoder = T5EncoderModel.from_pretrained(
text_encoder_model_name_or_path, subfolder="text_encoder"
)
patchifier = SymmetricPatchifier(patch_size=1)
tokenizer = T5Tokenizer.from_pretrained(
text_encoder_model_name_or_path, subfolder="tokenizer"
)
transformer = transformer.to(device)
vae = vae.to(device)
text_encoder = text_encoder.to(device)
if enhance_prompt:
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
torch_dtype="bfloat16",
)
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
)
else:
prompt_enhancer_image_caption_model = None
prompt_enhancer_image_caption_processor = None
prompt_enhancer_llm_model = None
prompt_enhancer_llm_tokenizer = None
vae = vae.to(torch.bfloat16)
if precision == "bfloat16" and transformer.dtype != torch.bfloat16:
transformer = transformer.to(torch.bfloat16)
text_encoder = text_encoder.to(torch.bfloat16)
# Use submodels for the pipeline
submodel_dict = {
"transformer": transformer,
"patchifier": patchifier,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
"vae": vae,
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model,
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor,
"prompt_enhancer_llm_model": prompt_enhancer_llm_model,
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer,
"allowed_inference_steps": allowed_inference_steps,
}
pipeline = LTXVideoPipeline(**submodel_dict)
LTXLatentConditioningPatch.apply()
pipeline = pipeline.to(device)
return pipeline
# ==============================================================================
# 2. FUNÇÕES AUXILIARES DE PROCESSAMENTO
# ==============================================================================
def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]:
"""Calcula o preenchimento para centralizar uma imagem em uma nova dimensão."""
pad_h = target_h - orig_h
pad_w = target_w - orig_w
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
pad_left = pad_w // 2
pad_right = pad_w - pad_left
return (pad_left, pad_right, pad_top, pad_bottom)
def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"):
"""Exibe informações detalhadas sobre um tensor para depuração."""
if not isinstance(tensor, torch.Tensor):
print(f"\n[INFO] '{name}' não é um tensor.")
return
print(f"\n--- Tensor Info: {name} ---")
print(f" - Shape: {tuple(tensor.shape)}")
print(f" - Dtype: {tensor.dtype}")
print(f" - Device: {tensor.device}")
if tensor.numel() > 0:
try:
print(f" - Stats: Min={tensor.min().item():.4f}, Max={tensor.max().item():.4f}, Mean={tensor.mean().item():.4f}")
except RuntimeError:
print(" - Stats: Não foi possível calcular (ex: tensores bool).")
print("-" * 30)
# ==============================================================================
# 3. CLASSE PRINCIPAL DO SERVIÇO DE VÍDEO
# ==============================================================================
class VideoService:
"""
Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video.
Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos
passos (baixa resolução, upscale com denoise) e pós-processamento.
"""
def __init__(self):
"""Inicializa o serviço, carregando configurações e modelos."""
t0 = time.perf_counter()
print("[INFO] Inicializando VideoService...")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.config = self._load_config("ltxv-13b-0.9.8-distilled-fp8.yaml")
self.pipeline, self.latent_upsampler = self._load_models_from_hub()
self._move_models_to_device()
self.runtime_autocast_dtype = self._get_precision_dtype()
vae_manager_singleton.attach_pipeline(
self.pipeline,
device=self.device,
autocast_dtype=self.runtime_autocast_dtype
)
self._tmp_dirs = set()
RESULTS_DIR.mkdir(exist_ok=True)
print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s")
# --------------------------------------------------------------------------
# --- Métodos Públicos (API do Serviço) ---
# --------------------------------------------------------------------------
def _load_image_to_tensor_with_resize_and_crop(
self,
image_input: Union[str, Image.Image],
target_height: int = 512,
target_width: int = 768,
just_crop: bool = False,
) -> torch.Tensor:
"""Load and process an image into a tensor.
Args:
image_input: Either a file path (str) or a PIL Image object
target_height: Desired height of output tensor
target_width: Desired width of output tensor
just_crop: If True, only crop the image to the target size without resizing
"""
if isinstance(image_input, str):
image = Image.open(image_input).convert("RGB")
elif isinstance(image_input, Image.Image):
image = image_input
else:
raise ValueError("image_input must be either a file path or a PIL Image object")
input_width, input_height = image.size
aspect_ratio_target = target_width / target_height
aspect_ratio_frame = input_width / input_height
if aspect_ratio_frame > aspect_ratio_target:
new_width = int(input_height * aspect_ratio_target)
new_height = input_height
x_start = (input_width - new_width) // 2
y_start = 0
else:
new_width = input_width
new_height = int(input_width / aspect_ratio_target)
x_start = 0
y_start = (input_height - new_height) // 2
image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height))
if not just_crop:
image = image.resize((target_width, target_height))
image = np.array(image)
image = cv2.GaussianBlur(image, (3, 3), 0)
frame_tensor = torch.from_numpy(image).float()
frame_tensor = crf_compressor.compress(frame_tensor / 255.0) * 255.0
frame_tensor = frame_tensor.permute(2, 0, 1)
frame_tensor = (frame_tensor / 127.5) - 1.0
# Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width)
return frame_tensor.unsqueeze(0).unsqueeze(2)
# ADICIONE A FUNÇÃO ABAIXO
@torch.no_grad()
def _image_to_latents(self, image_input: Union[str, Image.Image], height: int, width: int) -> torch.Tensor:
"""
Converte uma imagem (caminho ou PIL) em um tensor de latentes 5D.
Retorna: Tensor na forma [1, C_lat, 1, H_lat, W_lat]
"""
print(f"[DEBUG] Codificando imagem para latente ({height}x{width})...")
# 1. Carrega a imagem e a transforma em um tensor de pixel 5D
pixel_tensor = self._load_image_to_tensor_with_resize_and_crop(
image_input, target_height=height, target_width=width
)
pixel_tensor_gpu = pixel_tensor.to(self.device, dtype=self.pipeline.vae.dtype)
# 2. Usa a VAE para codificar o tensor de pixel em um tensor de latentes
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
# O vae_encode da pipeline já lida com tensores 5D
latents = self.pipeline.vae.encode(pixel_tensor_gpu).latent_dist.sample()
# 3. Aplica o fator de escala (importante para consistência)
if hasattr(self.pipeline.vae.config, "scaling_factor"):
latents = latents * self.pipeline.vae.config.scaling_factor
print(f"[DEBUG] Imagem codificada para latente com shape: {latents.shape}")
return latents
def _prepare_condition_items(self, items_list: List[Tuple], height: int, width: int) -> List[PatchedConditioningItem]:
"""
Prepara os itens de condicionamento.
Recebe uma lista [Imagem, frame, peso], converte a Imagem para LATENTE
e cria uma lista de PatchedConditioningItem com o tensor em `latents`.
"""
if not items_list:
return []
conditioning_items = []
for media_input, frame_idx, weight in items_list:
# 1. USA A NOVA FUNÇÃO PARA OBTER O TENSOR DE LATENTES DIRETAMENTE
latent_tensor = self._image_to_latents(media_input, height, width)
safe_frame_idx = int(frame_idx)
# 2. CRIA O PatchedConditioningItem COM O CAMPO `latents` PREENCHIDO
item = PatchedConditioningItem(
media_frame_number=safe_frame_idx,
conditioning_strength=float(weight),
media_item=None, # Importante: media_item é None
latents=latent_tensor # O latente pré-calculado vai aqui!
)
conditioning_items.append(item)
print(f"[INFO] Preparados {len(conditioning_items)} itens de condicionamento com latentes pré-codificados.")
return conditioning_items
def generate_low_resolution(
self,
prompt: str,
negative_prompt: str,
height: int,
width: int,
duration_secs: float,
guidance_scale: float,
seed: Optional[int] = None,
conditioning_items: Optional[List[PatchedConditioningItem]] = None
) -> Tuple[str, str, int]:
"""
ETAPA 1: Gera um vídeo e latentes em resolução base a partir de um prompt e
condicionamentos opcionais.
"""
print("[INFO] Iniciando ETAPA 1: Geração de Baixa Resolução...")
# --- Configuração de Seed e Diretórios ---
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
#seed_everything(used_seed)
print(f" - Usando Seed: {used_seed}")
temp_dir = tempfile.mkdtemp(prefix="ltxv_low_")
self._register_tmp_dir(temp_dir)
results_dir = "/app/output"
os.makedirs(results_dir, exist_ok=True)
# --- Cálculo de Dimensões e Frames ---
actual_num_frames = int(round(duration_secs * DEFAULT_FPS))
downscaled_height = height
downscaled_width = width
#self._calculate_downscaled_dims(height, width)
print(f" - Frames: {actual_num_frames}, Duração: {duration_secs}s")
print(f" - Dimensões de Saída: {downscaled_height}x{downscaled_width}")
# --- Execução da Pipeline ---
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
first_pass_kwargs = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"height": downscaled_height,
"width": downscaled_width,
"num_frames": (actual_num_frames//8)+1,
"frame_rate": int(DEFAULT_FPS),
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"output_type": "latent",
"vae_per_channel_normalize": True,
"is_video": True,
"conditioning_items": conditioning_items,
"guidance_scale": float(guidance_scale),
**(self.config.get("first_pass", {}))
}
print(" - Enviando para a pipeline LTX...")
latents = self.pipeline(**first_pass_kwargs).images
print(f" - Latentes gerados com shape: {latents.shape}")
# Decodifica os latentes para pixels para criar o vídeo de preview
pixel_tensor = vae_manager_singleton.decode(latents, decode_timestep=float(self.config.get("decode_timestep", 0.05)))
tensor_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed)
final_video_path = self._save_video_from_tensor(pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=DEFAULT_FPS)
return final_video_path
# --- Limpeza ---
self._finalize()
print("[SUCCESS] ETAPA 1 Concluída.")
return final_video_path, tensor_path, used_seed
# --------------------------------------------------------------------------
# --- Métodos Internos e Auxiliares ---
# --------------------------------------------------------------------------
def _finalize(self):
"""Limpa a memória da GPU e os diretórios temporários."""
if LTXV_DEBUG:
print("[DEBUG] Finalize: iniciando limpeza...")
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
# Limpa todos os diretórios temporários registrados
for d in list(self._tmp_dirs):
shutil.rmtree(d, ignore_errors=True)
self._tmp_dirs.remove(d)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário removido: {d}")
def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str:
"""Salva um tensor de latentes em um arquivo .pt."""
latents_cpu = latents_tensor.detach().to("cpu")
tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt"
torch.save(latents_cpu, tensor_path)
if LTXV_DEBUG:
print(f"[DEBUG] Latentes salvos em: {tensor_path}")
return str(tensor_path)
def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str:
"""Salva um tensor de pixels como um arquivo de vídeo MP4."""
temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4")
video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=DEFAULT_FPS)
final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4"
shutil.move(temp_path, final_path)
print(f"[INFO] Vídeo final salvo em: {final_path}")
return str(final_path)
def _load_config(self, config_filename: str) -> Dict:
"""Carrega o arquivo de configuração YAML."""
config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename
print(f"[INFO] Carregando configuração de: {config_path}")
with open(config_path, "r") as file:
return yaml.safe_load(file)
def _load_models_from_hub(self):
"""Baixa e cria as instâncias da pipeline e do upsampler."""
t0 = time.perf_counter()
LTX_REPO = "Lightricks/LTX-Video"
print("[INFO] Baixando checkpoint principal...")
self.config["checkpoint_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["checkpoint_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}")
print("[INFO] Construindo pipeline...")
pipeline = create_ltx_video_pipeline(
ckpt_path=self.config["checkpoint_path"],
precision=self.config["precision"],
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
sampler=self.config["sampler"],
device="cpu", # Carrega em CPU primeiro
enhance_prompt=False
)
print("[INFO] Pipeline construída.")
latent_upsampler = None
if self.config.get("spatial_upscaler_model_path"):
print("[INFO] Baixando upscaler espacial...")
self.config["spatial_upscaler_model_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}")
print("[INFO] Construindo latent_upsampler...")
latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu")
print("[INFO] Latent upsampler construído.")
print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s")
return pipeline, latent_upsampler
def _move_models_to_device(self):
"""Move os modelos carregados para o dispositivo de computação (GPU/CPU)."""
print(f"[INFO] Movendo modelos para o dispositivo: {self.device}")
self.pipeline.to(self.device)
if self.latent_upsampler:
self.latent_upsampler.to(self.device)
def _get_precision_dtype(self) -> torch.dtype:
"""Determina o dtype para autocast com base na configuração de precisão."""
prec = str(self.config.get("precision", "")).lower()
if prec in ["float8_e4m3fn", "bfloat16"]:
return torch.bfloat16
elif prec == "mixed_precision":
return torch.float16
return torch.float32
@torch.no_grad()
def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor:
"""Aplica o upsample espacial e o filtro AdaIN aos latentes."""
if not self.latent_upsampler:
raise ValueError("Latent Upsampler não está carregado para a operação de upscale.")
latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True)
upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized)
upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True)
# Filtro AdaIN para manter consistência de cor/estilo com o vídeo de baixa resolução
return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents)
def _prepare_conditioning_tensor_from_path(self, filepath: str, height: int, width: int, padding: Tuple) -> torch.Tensor:
"""Carrega uma imagem, redimensiona, aplica padding e move para o dispositivo."""
tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width)
tensor = F.pad(tensor, padding)
return tensor.to(self.device, dtype=self.runtime_autocast_dtype)
def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]:
"""Calcula as dimensões para o primeiro passo (baixa resolução)."""
height_padded = ((height - 1) // 8 + 1) * 8
width_padded = ((width - 1) // 8 + 1) * 8
downscale_factor = self.config.get("downscale_factor", 0.6666666)
vae_scale_factor = self.pipeline.vae_scale_factor
target_w = int(width_padded * downscale_factor)
downscaled_width = target_w - (target_w % vae_scale_factor)
target_h = int(height_padded * downscale_factor)
downscaled_height = target_h - (target_h % vae_scale_factor)
return downscaled_height, downscaled_width
def _seed_everething(self, seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
if torch.backends.mps.is_available():
torch.mps.manual_seed(seed)
def _register_tmp_dir(self, dir_path: str):
"""Registra um diretório temporário para limpeza posterior."""
if dir_path and os.path.isdir(dir_path):
self._tmp_dirs.add(dir_path)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário registrado: {dir_path}")
# ==============================================================================
# 4. INSTANCIAÇÃO E PONTO DE ENTRADA (Exemplo)
# ==============================================================================
print("Criando instância do VideoService. O carregamento do modelo começará agora...")
video_generation_service = VideoService()
print("Instância do VideoService pronta para uso.")