Test / appy.py
eeuuia's picture
Rename app.py to appy.py
198574f verified
import gradio as gr
import torch
import numpy as np
import tempfile
import os
from torchvision import transforms
from diffusers import LTXLatentUpsamplePipeline, AutoModel
#from pipeline_ltx_condition_control import LTXConditionPipeline, LTXVideoCondition
from diffusers.pipelines.ltx.pipeline_ltx_condition import LTXConditionPipeline, LTXVideoCondition
from diffusers.utils import export_to_video, load_video
from torchvision import transforms
import random
import imageio
from PIL import Image, ImageOps
import cv2
import shutil
import glob
from pathlib import Path
import warnings
import logging
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", message=".*")
from huggingface_hub import logging as ll, hf_hub_download
ll.set_verbosity_error()
ll.set_verbosity_warning()
ll.set_verbosity_info()
ll.set_verbosity_debug()
logger = logging.getLogger("AducDebug")
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
FPS = 24
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
# Carregamento das pipelines
#pipeline = LTXConditionPipeline.from_pretrained(
# "Lightricks/LTX-Video-0.9.8-13B-distilled",
# offload_state_dict=False,
# torch_dtype=torch.bfloat16,
# cache_dir=os.getenv("HF_HOME_CACHE"),
# token=os.getenv("HF_TOKEN"),
#)
base_repo="Lightricks/LTX-Video"
checkpoint_path="ltxv-13b-0.9.8-distilled.safetensors"
upscaler_repo="Lightricks/ltxv-spatial-upscaler-0.9.7"
CACHE_DIR=os.getenv("HF_HOME_CACHE")
FPS = 24
# 2. Baixar os arquivos do modelo base
print(f"=== Baixando snapshot do repositório base: {base_repo} ===")
ckpt_path_str = hf_hub_download(repo_id=base_repo, filename=checkpoint_path, cache_dir=CACHE_DIR)
ckpt_path = Path(ckpt_path_str)
if not ckpt_path.is_file():
raise FileNotFoundError(f"Main checkpoint file not found: {ckpt_path}")
# 3. Carregar cada componente da pipeline explicitamente
print("=== Carregando componentes da pipeline... ===")
vae = AutoModel.from_pretrained(
"Lightricks/LTX-Video",
subfolder="vae",
cache_dir=CACHE_DIR
)
text_encoder = AutoModel.from_pretrained(
"Lightricks/LTX-Video",
subfolder="text_encoder",
cache_dir=CACHE_DIR
)
scheduler = AutoModel.from_pretrained(
"Lightricks/LTX-Video",
subfolder="scheduler",
cache_dir=CACHE_DIR
)
tokenizer = AutoModel.from_pretrained(
"Lightricks/LTX-Video",
subfolder="tokenizer",
cache_dir=CACHE_DIR
)
if hasattr(scheduler.config, 'use_dynamic_shifting') and scheduler.config.use_dynamic_shifting:
print("[Config] Desativando 'use_dynamic_shifting' no scheduler.")
scheduler.config.use_dynamic_shifting = False
transformer = AutoModel.from_pretrained(
"Lightricks/LTX-Video",
subfolder="transformer",
cache_dir=CACHE_DIR
)
transformer.enable_layerwise_casting(
storage_dtype=torch.float8_e4m3fn, compute_dtype=dtype,
)
# 4. Montar a pipeline principal
print("Montando a LTXConditionPipeline...")
pipeline = LTXConditionPipeline(
vae=vae, text_encoder=text_encoder, tokenizer=tokenizer,
scheduler=scheduler, transformer=transformer,
)
pipeline.to(device)
pipeline.vae.enable_tiling()
pipe_upsample = LTXLatentUpsamplePipeline.from_pretrained(
"Lightricks/ltxv-spatial-upscaler-0.9.7",
cache_dir=os.getenv("HF_HOME_CACHE"),
vae=pipeline.vae, dtype=dtype
)
pipeline.to(device)
pipe_upsample.to(device)
pipeline.vae.enable_tiling()
current_dir = Path(__file__).parent
def cleanup_session_files(request: gr.Request):
"""Limpa arquivos temporários da sessão quando o usuário se desconecta."""
try:
session_id = request.session_hash
session_dir = os.path.join("/tmp/gradio", session_id)
if os.path.exists(session_dir):
shutil.rmtree(session_dir)
print(f"Limpou o diretório da sessão: {session_dir}")
except Exception as e:
print(f"Erro durante a limpeza da sessão: {e}")
def read_video(video) -> torch.Tensor:
"""Lê um arquivo de vídeo e converte para um tensor torch."""
to_tensor_transform = transforms.ToTensor()
if isinstance(video, str):
video_tensor = torch.stack([to_tensor_transform(img) for img in imageio.get_reader(video)])
else:
video_tensor = torch.stack([to_tensor_transform(img) for img in video])
return video_tensor
def round_to_nearest_resolution_acceptable_by_vae(height, width, vae_temporal_compression_ratio):
"""Arredonda a resolução para valores aceitáveis pelo VAE."""
height = height - (height % vae_temporal_compression_ratio)
width = width - (width % vae_temporal_compression_ratio)
return height, width
# A assinatura da função volta a aceitar argumentos individuais para compatibilidade com o Gradio
def generate_video(
condition_image_1,
condition_strength_1,
condition_frame_index_1,
condition_image_2,
condition_strength_2,
condition_frame_index_2,
prompt,
duration=3.0,
negative_prompt="worst quality, inconsistent motion, blurry, jittery, distorted",
height=768,
width=1152,
num_inference_steps=7,
guidance_scale=1.0,
seed=0,
randomize_seed=False,
progress=gr.Progress(track_tqdm=True)
):
if True:
# Lógica para agrupar as condições *dentro* da função
# Cálculo de frames e resolução
num_frames = int(duration * FPS) + 1
temporal_compression = pipeline.vae_temporal_compression_ratio
num_frames = ((num_frames - 1) // temporal_compression) * temporal_compression + 1
downscale_factor = 2 / 3
downscaled_height = int(height * downscale_factor)
downscaled_width = int(width * downscale_factor)
downscaled_height, downscaled_width = round_to_nearest_resolution_acceptable_by_vae(
downscaled_height, downscaled_width, pipeline.vae_temporal_compression_ratio
)
conditions = []
if condition_image_1 is not None:
condition_image_1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS)
conditions.append(LTXVideoCondition(
image=condition_image_1,
strength=condition_strength_1,
frame_index=int(condition_frame_index_1)
))
if condition_image_2 is not None:
condition_image_2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS)
conditions.append(LTXVideoCondition(
image=condition_image_2,
strength=condition_strength_2,
frame_index=int(condition_frame_index_2)
))
pipeline_args = {}
if conditions:
pipeline_args["conditions"] = conditions
if True:
# dentro da função generatevideo(), após calcular downscaledheight, downscaledwidth:
conditions = []
def image_to_latents(img: Image):
# converte PIL→tensor 4-D [C, H, W]
tensor = transforms.ToTensor()(img).unsqueeze(0) # [1, C, H, W]
tensor = tensor.unsqueeze(2).to(device).to(dtype) # [1, C, 1, H, W]
with torch.no_grad():
vae_out = pipeline.vae.encode(tensor) # agora aceita 5-D
latents = vae_out.latent_dist.sample() # amostra 5-D [1, C_lat, 1, H', W']
# aplica fator de escala se houver
if hasattr(pipeline.vae.config, "scaling_factor"):
latents = latents * pipeline.vae.config.scaling_factor
return latents
# exemplo para primeira condição
if condition_image_1 is not None:
img1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS)
lat1 = image_to_latents(img1)
# agora lat1 → forma [1, C, H', W'], expande dimensão de frames
# aqui usamos 1 frame de condicionamento; se quiser mais, repita ou ajuste
lat1 = lat1.unsqueeze(2) # [1, C, 1, H', W']
conditions.append(
LTXVideoCondition(
latents=lat1,
strength=condition_strength_1,
frame_index=int(condition_frame_index_1),
)
)
print (f"condition_image_1 {lat1.shape}")
# mesma lógica para condição 2
if condition_image_2 is not None:
img2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS)
lat2 = image_to_latents(img2).unsqueeze(2)
conditions.append(
LTXVideoCondition(
latents=lat2,
strength=condition_strength_2,
frame_index=int(condition_frame_index_2),
)
)
print (f"condition_image_2 {lat2.shape}")
# Manipulação da seed
if randomize_seed:
seed = random.randint(0, 2**32 - 1)
# ETAPA 1: Geração do vídeo em baixa resolução
latents = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=downscaled_width,
height=downscaled_height,
num_frames=num_frames,
timesteps=[1000, 993, 987, 981, 975, 909, 725, 0.03],
decode_timestep=0.05,
decode_noise_scale=0.025,
image_cond_noise_scale=0.0,
guidance_scale=guidance_scale,
guidance_rescale=0.7,
generator=torch.Generator().manual_seed(seed),
output_type="latent",
**pipeline_args
).frames
# ETAPA 2: Upscale dos latentes
upscaled_height, upscaled_width = downscaled_height * 2, downscaled_width * 2
upscaled_latents = pipe_upsample(
latents=latents,
output_type="latent"
).frames
conditions = []
if condition_image_1 is not None:
condition_image_1 = ImageOps.fit(condition_image_1, (upscaled_width, upscaled_height), Image.LANCZOS)
conditions.append(LTXVideoCondition(
image=condition_image_1,
strength=condition_strength_1,
frame_index=int(condition_frame_index_1)
))
if condition_image_2 is not None:
condition_image_2 = ImageOps.fit(condition_image_2, (upscaled_width, upscaled_height), Image.LANCZOS)
conditions.append(LTXVideoCondition(
image=condition_image_2,
strength=condition_strength_2,
frame_index=int(condition_frame_index_2)
))
pipeline_args = {}
if conditions:
pipeline_args["conditions"] = conditions
# ETAPA 3: Denoise final em alta resolução
final_video_frames_np = pipeline(
prompt=prompt,
negative_prompt=negative_prompt,
width=upscaled_width,
height=upscaled_height,
num_frames=num_frames,
denoise_strength=0.999,
timesteps=[1000, 909, 725, 421, 0],
latents=upscaled_latents,
decode_timestep=0.05,
decode_noise_scale=0.025,
image_cond_noise_scale=0.0,
guidance_scale=guidance_scale,
guidance_rescale=0.7,
generator=torch.Generator(device="cuda").manual_seed(seed),
output_type="np",
**pipeline_args
).frames[0]
# Exportação para arquivo MP4
video_uint8_frames = [(frame * 255).astype(np.uint8) for frame in final_video_frames_np]
output_filename = "output.mp4"
with imageio.get_writer(output_filename, fps=FPS, quality=8, macro_block_size=1) as writer:
for frame_idx, frame_data in enumerate(video_uint8_frames):
progress((frame_idx + 1) / len(video_uint8_frames), desc="Codificando frames do vídeo...")
writer.append_data(frame_data)
return output_filename, seed
# Interface Gráfica com Gradio
with gr.Blocks(theme=gr.themes.Ocean(font=[gr.themes.GoogleFont("Lexend Deca"), "sans-serif"]), delete_cache=(60, 900)) as demo:
gr.Markdown(
"""
# Geração de Vídeo com LTX
**Crie vídeos a partir de texto e imagens de condição usando o modelo LTX-Video.**
"""
)
with gr.Row():
with gr.Column(scale=1):
prompt = gr.Textbox(
label="Prompt",
placeholder="Descreva o vídeo que você quer gerar...",
lines=3,
value="O Coringa em seu icônico terno roxo e cabelo verde, dançando sozinho em um quarto escuro e decadente. Seus movimentos são erráticos e imprevisíveis, alternando entre graciosos e caóticos enquanto ele se perde no momento. A câmera captura seus gestos teatrais, sua dança refletindo sua personalidade desequilibrada. Iluminação temperamental com sombras dançando pelas paredes, criando uma atmosfera de bela loucura."
)
with gr.Accordion("Imagem de Condição 1", open=True):
condition_image_1 = gr.Image(label="Imagem de Condição 1", type="pil")
with gr.Row():
condition_strength_1 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0)
condition_frame_index_1 = gr.Number(label="Frame", value=0, precision=0)
with gr.Accordion("Imagem de Condição 2", open=False):
condition_image_2 = gr.Image(label="Imagem de Condição 2", type="pil")
with gr.Row():
condition_strength_2 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0)
condition_frame_index_2 = gr.Number(label="Frame", value=0, precision=0)
duration = gr.Slider(label="Duração (segundos)", minimum=1.0, maximum=10.0, step=0.5, value=2)
with gr.Accordion("Configurações Avançadas", open=False):
negative_prompt = gr.Textbox(label="Prompt Negativo", placeholder="O que você não quer no vídeo...", lines=2, value="pior qualidade, movimento inconsistente, embaçado, tremido, distorcido")
with gr.Row():
height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=32, value=768)
width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=32, value=1152)
num_inference_steps = gr.Slider(label="Passos de Inferência", minimum=5, maximum=10, step=1, value=7, visible=False)
with gr.Row():
guidance_scale = gr.Slider(label="Escala de Orientação (Guidance)", minimum=1.0, maximum=5.0, step=0.1, value=1.0)
with gr.Row():
randomize_seed = gr.Checkbox(label="Seed Aleatória", value=True)
seed = gr.Number(label="Seed", value=0, precision=0)
generate_btn = gr.Button("Gerar Vídeo", variant="primary", size="lg")
with gr.Column(scale=1):
output_video = gr.Video(label="Vídeo Gerado", height=400)
# CORREÇÃO: A lista de inputs agora é "plana", contendo apenas componentes do Gradio
generate_btn.click(
fn=generate_video,
inputs=[
condition_image_1,
condition_strength_1,
condition_frame_index_1,
condition_image_2,
condition_strength_2,
condition_frame_index_2,
prompt,
duration,
negative_prompt,
height,
width,
num_inference_steps,
guidance_scale,
seed,
randomize_seed,
],
outputs=[output_video, seed],
show_progress=True
)
demo.unload(cleanup_session_files)
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", server_port=7860, debug=True, show_error=True)