|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import logging |
|
|
import random |
|
|
import time |
|
|
import shutil |
|
|
from typing import List, Optional, Tuple |
|
|
|
|
|
import imageio |
|
|
import numpy as np |
|
|
import torch |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class VideoToolError(Exception): |
|
|
"""Exceção personalizada para erros originados do VideoEncodeTool.""" |
|
|
pass |
|
|
|
|
|
class VideoEncodeTool: |
|
|
""" |
|
|
Um especialista para lidar com tarefas de codificação e manipulação de vídeo. |
|
|
""" |
|
|
|
|
|
def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24): |
|
|
""" |
|
|
Salva um tensor de pixel como um arquivo de vídeo .mp4 usando parâmetros otimizados. |
|
|
Espera um tensor no formato (B, C, F, H, W) onde B=1. |
|
|
""" |
|
|
|
|
|
if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[0] != 1 or video_tensor.shape[2] == 0: |
|
|
logger.warning(f"Tensor de vídeo inválido ou vazio recebido. Shape: {video_tensor.shape if video_tensor is not None else 'None'}. Pulando salvamento de vídeo para '{path}'.") |
|
|
return |
|
|
|
|
|
logger.info(f"Salvando tensor de vídeo com shape {video_tensor.shape} para '{os.path.basename(path)}'...") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
video_tensor_permuted = video_tensor.squeeze(0).permute(1, 2, 3, 0) |
|
|
|
|
|
|
|
|
video_tensor_normalized = (video_tensor_permuted.clamp(-1, 1) + 1) / 2.0 |
|
|
|
|
|
|
|
|
video_np = (video_tensor_normalized.detach().cpu().float().numpy() * 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
with imageio.get_writer( |
|
|
path, |
|
|
fps=fps, |
|
|
codec='libx264', |
|
|
quality=8, |
|
|
output_params=['-pix_fmt', 'yuv420p'] |
|
|
) as writer: |
|
|
for frame in video_np: |
|
|
writer.append_data(frame) |
|
|
|
|
|
|
|
|
logger.info(f"Vídeo salvo com sucesso em: {path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Falha ao salvar vídeo com imageio para '{path}': {e}", exc_info=True) |
|
|
raise VideoToolError(f"Não foi possível escrever o arquivo de vídeo: {e}") |
|
|
|
|
|
def extract_first_frame(self, video_path: str, output_image_path: str) -> str: |
|
|
""" |
|
|
Extrai o primeiro frame de um arquivo de vídeo e o salva como uma imagem. |
|
|
""" |
|
|
logger.info(f"Extraindo primeiro frame de '{os.path.basename(video_path)}'...") |
|
|
cmd = ['ffmpeg', '-y', '-v', 'error', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
|
return output_image_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"FFmpeg (extract_first_frame) falhou: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao extrair o primeiro frame de {video_path}") |
|
|
|
|
|
def extract_last_frame(self, video_path: str, output_image_path: str) -> str: |
|
|
""" |
|
|
Extrai o último frame de um arquivo de vídeo e o salva como uma imagem. |
|
|
""" |
|
|
logger.info(f"Extraindo último frame de '{os.path.basename(video_path)}'...") |
|
|
cmd = ['ffmpeg', '-y', '-v', 'error', '-sseof', '-0.1', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
|
return output_image_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"FFmpeg (extract_last_frame) falhou: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao extrair o último frame de {video_path}") |
|
|
|
|
|
def create_transition_bridge(self, start_image_path: str, end_image_path: str, |
|
|
duration: float, fps: int, target_resolution: Tuple[int, int], |
|
|
workspace_dir: str, effect: Optional[str] = None) -> str: |
|
|
""" |
|
|
Cria um clipe de vídeo curto que transiciona entre duas imagens estáticas. |
|
|
""" |
|
|
output_path = os.path.join(workspace_dir, f"bridge_{int(time.time())}_{random.randint(100, 999)}.mp4") |
|
|
width, height = target_resolution |
|
|
fade_effects = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "dissolve", "fadeblack", "fadewhite", "radial", "rectcrop", "circleopen", "circleclose", "horzopen", "horzclose"] |
|
|
selected_effect = effect if effect and effect.strip() else random.choice(fade_effects) |
|
|
transition_duration = max(0.1, duration) |
|
|
cmd = (f"ffmpeg -y -v error -loop 1 -t {transition_duration} -i \"{start_image_path}\" -loop 1 -t {transition_duration} -i \"{end_image_path}\" " |
|
|
f"-filter_complex \"[0:v]scale={width}:{height},setsar=1[v0];[1:v]scale={width}:{height},setsar=1[v1];" |
|
|
f"[v0][v1]xfade=transition={selected_effect}:duration={transition_duration}:offset=0[out]\" " |
|
|
f"-map \"[out]\" -c:v libx264 -r {fps} -pix_fmt yuv420p \"{output_path}\"") |
|
|
logger.info(f"Criando ponte de transição com efeito '{selected_effect}'...") |
|
|
try: |
|
|
subprocess.run(cmd, shell=True, check=True, text=True) |
|
|
except subprocess.CalledProcessError as e: |
|
|
raise VideoToolError(f"Falha ao criar vídeo de transição: {e.stderr}") |
|
|
return output_path |
|
|
|
|
|
def concatenate_videos(self, video_paths: List[str], output_path: str, workspace_dir: str) -> str: |
|
|
""" |
|
|
Concatena múltiplos clipes de vídeo em um único arquivo sem re-codificar. |
|
|
""" |
|
|
if not video_paths: |
|
|
raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") |
|
|
|
|
|
if len(video_paths) == 1: |
|
|
shutil.copy(video_paths[0], output_path) |
|
|
logger.info(f"Apenas um clipe fornecido. Copiado para '{output_path}'.") |
|
|
return output_path |
|
|
|
|
|
list_file_path = os.path.join(workspace_dir, f"concat_list_{int(time.time())}.txt") |
|
|
try: |
|
|
with open(list_file_path, 'w', encoding='utf-8') as f: |
|
|
for path in video_paths: |
|
|
|
|
|
f.write(f"file '{os.path.abspath(path)}'\n") |
|
|
|
|
|
cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path] |
|
|
logger.info(f"Concatenando {len(video_paths)} clipes para '{os.path.basename(output_path)}'...") |
|
|
subprocess.run(cmd_list, check=True, capture_output=True, text=True) |
|
|
logger.info("Concatenação FFmpeg bem-sucedida.") |
|
|
return output_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Falha ao montar o vídeo final com FFmpeg: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao montar o vídeo final com FFmpeg.") |
|
|
finally: |
|
|
if os.path.exists(list_file_path): |
|
|
os.remove(list_file_path) |
|
|
|
|
|
|
|
|
video_encode_tool_singleton = VideoEncodeTool() |