Spaces:
Paused
Paused
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| import tempfile | |
| import os | |
| from torchvision import transforms | |
| from diffusers import LTXLatentUpsamplePipeline, AutoModel | |
| #from pipeline_ltx_condition_control import LTXConditionPipeline, LTXVideoCondition | |
| from diffusers.pipelines.ltx.pipeline_ltx_condition import LTXConditionPipeline, LTXVideoCondition | |
| from diffusers.utils import export_to_video, load_video | |
| from torchvision import transforms | |
| import random | |
| import imageio | |
| from PIL import Image, ImageOps | |
| import cv2 | |
| import shutil | |
| import glob | |
| from pathlib import Path | |
| import warnings | |
| import logging | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", message=".*") | |
| from huggingface_hub import logging as ll, hf_hub_download | |
| ll.set_verbosity_error() | |
| ll.set_verbosity_warning() | |
| ll.set_verbosity_info() | |
| ll.set_verbosity_debug() | |
| logger = logging.getLogger("AducDebug") | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger.setLevel(logging.DEBUG) | |
| FPS = 24 | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Carregamento das pipelines | |
| #pipeline = LTXConditionPipeline.from_pretrained( | |
| # "Lightricks/LTX-Video-0.9.8-13B-distilled", | |
| # offload_state_dict=False, | |
| # torch_dtype=torch.bfloat16, | |
| # cache_dir=os.getenv("HF_HOME_CACHE"), | |
| # token=os.getenv("HF_TOKEN"), | |
| #) | |
| base_repo="Lightricks/LTX-Video" | |
| checkpoint_path="ltxv-13b-0.9.8-distilled.safetensors" | |
| upscaler_repo="Lightricks/ltxv-spatial-upscaler-0.9.7" | |
| CACHE_DIR=os.getenv("HF_HOME_CACHE") | |
| FPS = 24 | |
| # 2. Baixar os arquivos do modelo base | |
| print(f"=== Baixando snapshot do repositório base: {base_repo} ===") | |
| ckpt_path_str = hf_hub_download(repo_id=base_repo, filename=checkpoint_path, cache_dir=CACHE_DIR) | |
| ckpt_path = Path(ckpt_path_str) | |
| if not ckpt_path.is_file(): | |
| raise FileNotFoundError(f"Main checkpoint file not found: {ckpt_path}") | |
| # 3. Carregar cada componente da pipeline explicitamente | |
| print("=== Carregando componentes da pipeline... ===") | |
| vae = AutoModel.from_pretrained( | |
| "Lightricks/LTX-Video", | |
| subfolder="vae", | |
| cache_dir=CACHE_DIR | |
| ) | |
| text_encoder = AutoModel.from_pretrained( | |
| "Lightricks/LTX-Video", | |
| subfolder="text_encoder", | |
| cache_dir=CACHE_DIR | |
| ) | |
| scheduler = AutoModel.from_pretrained( | |
| "Lightricks/LTX-Video", | |
| subfolder="scheduler", | |
| cache_dir=CACHE_DIR | |
| ) | |
| tokenizer = AutoModel.from_pretrained( | |
| "Lightricks/LTX-Video", | |
| subfolder="tokenizer", | |
| cache_dir=CACHE_DIR | |
| ) | |
| if hasattr(scheduler.config, 'use_dynamic_shifting') and scheduler.config.use_dynamic_shifting: | |
| print("[Config] Desativando 'use_dynamic_shifting' no scheduler.") | |
| scheduler.config.use_dynamic_shifting = False | |
| transformer = AutoModel.from_pretrained( | |
| "Lightricks/LTX-Video", | |
| subfolder="transformer", | |
| cache_dir=CACHE_DIR | |
| ) | |
| transformer.enable_layerwise_casting( | |
| storage_dtype=torch.float8_e4m3fn, compute_dtype=dtype, | |
| ) | |
| # 4. Montar a pipeline principal | |
| print("Montando a LTXConditionPipeline...") | |
| pipeline = LTXConditionPipeline( | |
| vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, | |
| scheduler=scheduler, transformer=transformer, | |
| ) | |
| pipeline.to(device) | |
| pipeline.vae.enable_tiling() | |
| pipe_upsample = LTXLatentUpsamplePipeline.from_pretrained( | |
| "Lightricks/ltxv-spatial-upscaler-0.9.7", | |
| cache_dir=os.getenv("HF_HOME_CACHE"), | |
| vae=pipeline.vae, dtype=dtype | |
| ) | |
| pipeline.to(device) | |
| pipe_upsample.to(device) | |
| pipeline.vae.enable_tiling() | |
| current_dir = Path(__file__).parent | |
| def cleanup_session_files(request: gr.Request): | |
| """Limpa arquivos temporários da sessão quando o usuário se desconecta.""" | |
| try: | |
| session_id = request.session_hash | |
| session_dir = os.path.join("/tmp/gradio", session_id) | |
| if os.path.exists(session_dir): | |
| shutil.rmtree(session_dir) | |
| print(f"Limpou o diretório da sessão: {session_dir}") | |
| except Exception as e: | |
| print(f"Erro durante a limpeza da sessão: {e}") | |
| def read_video(video) -> torch.Tensor: | |
| """Lê um arquivo de vídeo e converte para um tensor torch.""" | |
| to_tensor_transform = transforms.ToTensor() | |
| if isinstance(video, str): | |
| video_tensor = torch.stack([to_tensor_transform(img) for img in imageio.get_reader(video)]) | |
| else: | |
| video_tensor = torch.stack([to_tensor_transform(img) for img in video]) | |
| return video_tensor | |
| def round_to_nearest_resolution_acceptable_by_vae(height, width, vae_temporal_compression_ratio): | |
| """Arredonda a resolução para valores aceitáveis pelo VAE.""" | |
| height = height - (height % vae_temporal_compression_ratio) | |
| width = width - (width % vae_temporal_compression_ratio) | |
| return height, width | |
| # A assinatura da função volta a aceitar argumentos individuais para compatibilidade com o Gradio | |
| def generate_video( | |
| condition_image_1, | |
| condition_strength_1, | |
| condition_frame_index_1, | |
| condition_image_2, | |
| condition_strength_2, | |
| condition_frame_index_2, | |
| prompt, | |
| duration=3.0, | |
| negative_prompt="worst quality, inconsistent motion, blurry, jittery, distorted", | |
| height=768, | |
| width=1152, | |
| num_inference_steps=7, | |
| guidance_scale=1.0, | |
| seed=0, | |
| randomize_seed=False, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| if True: | |
| # Lógica para agrupar as condições *dentro* da função | |
| # Cálculo de frames e resolução | |
| num_frames = int(duration * FPS) + 1 | |
| temporal_compression = pipeline.vae_temporal_compression_ratio | |
| num_frames = ((num_frames - 1) // temporal_compression) * temporal_compression + 1 | |
| downscale_factor = 2 / 3 | |
| downscaled_height = int(height * downscale_factor) | |
| downscaled_width = int(width * downscale_factor) | |
| downscaled_height, downscaled_width = round_to_nearest_resolution_acceptable_by_vae( | |
| downscaled_height, downscaled_width, pipeline.vae_temporal_compression_ratio | |
| ) | |
| conditions = [] | |
| if condition_image_1 is not None: | |
| condition_image_1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS) | |
| conditions.append(LTXVideoCondition( | |
| image=condition_image_1, | |
| strength=condition_strength_1, | |
| frame_index=int(condition_frame_index_1) | |
| )) | |
| if condition_image_2 is not None: | |
| condition_image_2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS) | |
| conditions.append(LTXVideoCondition( | |
| image=condition_image_2, | |
| strength=condition_strength_2, | |
| frame_index=int(condition_frame_index_2) | |
| )) | |
| pipeline_args = {} | |
| if conditions: | |
| pipeline_args["conditions"] = conditions | |
| if True: | |
| # dentro da função generatevideo(), após calcular downscaledheight, downscaledwidth: | |
| conditions = [] | |
| def image_to_latents(img: Image): | |
| # converte PIL→tensor 4-D [C, H, W] | |
| tensor = transforms.ToTensor()(img).unsqueeze(0) # [1, C, H, W] | |
| tensor = tensor.unsqueeze(2).to(device).to(dtype) # [1, C, 1, H, W] | |
| with torch.no_grad(): | |
| vae_out = pipeline.vae.encode(tensor) # agora aceita 5-D | |
| latents = vae_out.latent_dist.sample() # amostra 5-D [1, C_lat, 1, H', W'] | |
| # aplica fator de escala se houver | |
| if hasattr(pipeline.vae.config, "scaling_factor"): | |
| latents = latents * pipeline.vae.config.scaling_factor | |
| return latents | |
| # exemplo para primeira condição | |
| if condition_image_1 is not None: | |
| img1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS) | |
| lat1 = image_to_latents(img1) | |
| # agora lat1 → forma [1, C, H', W'], expande dimensão de frames | |
| # aqui usamos 1 frame de condicionamento; se quiser mais, repita ou ajuste | |
| lat1 = lat1.unsqueeze(2) # [1, C, 1, H', W'] | |
| conditions.append( | |
| LTXVideoCondition( | |
| latents=lat1, | |
| strength=condition_strength_1, | |
| frame_index=int(condition_frame_index_1), | |
| ) | |
| ) | |
| print (f"condition_image_1 {lat1.shape}") | |
| # mesma lógica para condição 2 | |
| if condition_image_2 is not None: | |
| img2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS) | |
| lat2 = image_to_latents(img2).unsqueeze(2) | |
| conditions.append( | |
| LTXVideoCondition( | |
| latents=lat2, | |
| strength=condition_strength_2, | |
| frame_index=int(condition_frame_index_2), | |
| ) | |
| ) | |
| print (f"condition_image_2 {lat2.shape}") | |
| # Manipulação da seed | |
| if randomize_seed: | |
| seed = random.randint(0, 2**32 - 1) | |
| # ETAPA 1: Geração do vídeo em baixa resolução | |
| latents = pipeline( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| width=downscaled_width, | |
| height=downscaled_height, | |
| num_frames=num_frames, | |
| timesteps=[1000, 993, 987, 981, 975, 909, 725, 0.03], | |
| decode_timestep=0.05, | |
| decode_noise_scale=0.025, | |
| image_cond_noise_scale=0.0, | |
| guidance_scale=guidance_scale, | |
| guidance_rescale=0.7, | |
| generator=torch.Generator().manual_seed(seed), | |
| output_type="latent", | |
| **pipeline_args | |
| ).frames | |
| # ETAPA 2: Upscale dos latentes | |
| upscaled_height, upscaled_width = downscaled_height * 2, downscaled_width * 2 | |
| upscaled_latents = pipe_upsample( | |
| latents=latents, | |
| output_type="latent" | |
| ).frames | |
| conditions = [] | |
| if condition_image_1 is not None: | |
| condition_image_1 = ImageOps.fit(condition_image_1, (upscaled_width, upscaled_height), Image.LANCZOS) | |
| conditions.append(LTXVideoCondition( | |
| image=condition_image_1, | |
| strength=condition_strength_1, | |
| frame_index=int(condition_frame_index_1) | |
| )) | |
| if condition_image_2 is not None: | |
| condition_image_2 = ImageOps.fit(condition_image_2, (upscaled_width, upscaled_height), Image.LANCZOS) | |
| conditions.append(LTXVideoCondition( | |
| image=condition_image_2, | |
| strength=condition_strength_2, | |
| frame_index=int(condition_frame_index_2) | |
| )) | |
| pipeline_args = {} | |
| if conditions: | |
| pipeline_args["conditions"] = conditions | |
| # ETAPA 3: Denoise final em alta resolução | |
| final_video_frames_np = pipeline( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| width=upscaled_width, | |
| height=upscaled_height, | |
| num_frames=num_frames, | |
| denoise_strength=0.999, | |
| timesteps=[1000, 909, 725, 421, 0], | |
| latents=upscaled_latents, | |
| decode_timestep=0.05, | |
| decode_noise_scale=0.025, | |
| image_cond_noise_scale=0.0, | |
| guidance_scale=guidance_scale, | |
| guidance_rescale=0.7, | |
| generator=torch.Generator(device="cuda").manual_seed(seed), | |
| output_type="np", | |
| **pipeline_args | |
| ).frames[0] | |
| # Exportação para arquivo MP4 | |
| video_uint8_frames = [(frame * 255).astype(np.uint8) for frame in final_video_frames_np] | |
| output_filename = "output.mp4" | |
| with imageio.get_writer(output_filename, fps=FPS, quality=8, macro_block_size=1) as writer: | |
| for frame_idx, frame_data in enumerate(video_uint8_frames): | |
| progress((frame_idx + 1) / len(video_uint8_frames), desc="Codificando frames do vídeo...") | |
| writer.append_data(frame_data) | |
| return output_filename, seed | |
| # Interface Gráfica com Gradio | |
| with gr.Blocks(theme=gr.themes.Ocean(font=[gr.themes.GoogleFont("Lexend Deca"), "sans-serif"]), delete_cache=(60, 900)) as demo: | |
| gr.Markdown( | |
| """ | |
| # Geração de Vídeo com LTX | |
| **Crie vídeos a partir de texto e imagens de condição usando o modelo LTX-Video.** | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| placeholder="Descreva o vídeo que você quer gerar...", | |
| lines=3, | |
| value="O Coringa em seu icônico terno roxo e cabelo verde, dançando sozinho em um quarto escuro e decadente. Seus movimentos são erráticos e imprevisíveis, alternando entre graciosos e caóticos enquanto ele se perde no momento. A câmera captura seus gestos teatrais, sua dança refletindo sua personalidade desequilibrada. Iluminação temperamental com sombras dançando pelas paredes, criando uma atmosfera de bela loucura." | |
| ) | |
| with gr.Accordion("Imagem de Condição 1", open=True): | |
| condition_image_1 = gr.Image(label="Imagem de Condição 1", type="pil") | |
| with gr.Row(): | |
| condition_strength_1 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0) | |
| condition_frame_index_1 = gr.Number(label="Frame", value=0, precision=0) | |
| with gr.Accordion("Imagem de Condição 2", open=False): | |
| condition_image_2 = gr.Image(label="Imagem de Condição 2", type="pil") | |
| with gr.Row(): | |
| condition_strength_2 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0) | |
| condition_frame_index_2 = gr.Number(label="Frame", value=0, precision=0) | |
| duration = gr.Slider(label="Duração (segundos)", minimum=1.0, maximum=10.0, step=0.5, value=2) | |
| with gr.Accordion("Configurações Avançadas", open=False): | |
| negative_prompt = gr.Textbox(label="Prompt Negativo", placeholder="O que você não quer no vídeo...", lines=2, value="pior qualidade, movimento inconsistente, embaçado, tremido, distorcido") | |
| with gr.Row(): | |
| height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=32, value=768) | |
| width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=32, value=1152) | |
| num_inference_steps = gr.Slider(label="Passos de Inferência", minimum=5, maximum=10, step=1, value=7, visible=False) | |
| with gr.Row(): | |
| guidance_scale = gr.Slider(label="Escala de Orientação (Guidance)", minimum=1.0, maximum=5.0, step=0.1, value=1.0) | |
| with gr.Row(): | |
| randomize_seed = gr.Checkbox(label="Seed Aleatória", value=True) | |
| seed = gr.Number(label="Seed", value=0, precision=0) | |
| generate_btn = gr.Button("Gerar Vídeo", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| output_video = gr.Video(label="Vídeo Gerado", height=400) | |
| # CORREÇÃO: A lista de inputs agora é "plana", contendo apenas componentes do Gradio | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[ | |
| condition_image_1, | |
| condition_strength_1, | |
| condition_frame_index_1, | |
| condition_image_2, | |
| condition_strength_2, | |
| condition_frame_index_2, | |
| prompt, | |
| duration, | |
| negative_prompt, | |
| height, | |
| width, | |
| num_inference_steps, | |
| guidance_scale, | |
| seed, | |
| randomize_seed, | |
| ], | |
| outputs=[output_video, seed], | |
| show_progress=True | |
| ) | |
| demo.unload(cleanup_session_files) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, debug=True, show_error=True) |