|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import logging |
|
|
import random |
|
|
import time |
|
|
import shutil |
|
|
from typing import List, Optional, Tuple |
|
|
|
|
|
import imageio |
|
|
import numpy as np |
|
|
import torch |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class VideoToolError(Exception): |
|
|
"""Exceção personalizada para erros originados do VideoEncodeTool.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class VideoEncodeTool: |
|
|
def __init__(self, frame_log_every=8): |
|
|
self.frame_log_every = frame_log_every |
|
|
|
|
|
""" |
|
|
Um especialista para lidar com tarefas de codificação e manipulação de vídeo. |
|
|
""" |
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def save_video_from_tensor(self, pixel_5d: torch.Tensor, path: str, fps: int = 24, progress_callback=None): |
|
|
""" |
|
|
Espera pixel_5d em [0,1], shape (B,C,T,H,W). |
|
|
Escreve MP4 incremental, convertendo cada frame para (H,W,C) uint8. |
|
|
""" |
|
|
|
|
|
device = "cuda" if pixel_5d.is_cuda else "cpu" |
|
|
B, C, T, H, W = pixel_5d.shape |
|
|
if B != 1: |
|
|
|
|
|
raise ValueError(f"Esperado B=1, recebido B={B}") |
|
|
|
|
|
|
|
|
with imageio.get_writer(path, fps=int(fps), codec="libx264", quality=8) as writer: |
|
|
for i in range(T): |
|
|
frame_chw = pixel_5d[0, :, i] |
|
|
frame_hwc_u8 = (frame_chw.permute(1, 2, 0) |
|
|
.clamp(0, 1) |
|
|
.mul(255) |
|
|
.to(torch.uint8) |
|
|
.cpu() |
|
|
.numpy()) |
|
|
writer.append_data(frame_hwc_u8) |
|
|
if progress_callback: |
|
|
progress_callback(i + 1, T) |
|
|
if i % self.frame_log_every == 0: |
|
|
print(f"[DEBUG] [Encoder] frame {i}/{T} gravado ({H}x{W}@{fps}fps)") |
|
|
|
|
|
|
|
|
def extract_first_frame(self, video_path: str, output_image_path: str) -> str: |
|
|
""" |
|
|
Extrai o primeiro frame de um arquivo de vídeo e o salva como uma imagem. |
|
|
""" |
|
|
logger.info(f"Extraindo primeiro frame de '{os.path.basename(video_path)}'...") |
|
|
cmd = ['ffmpeg', '-y', '-v', 'error', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
|
return output_image_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"FFmpeg (extract_first_frame) falhou: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao extrair o primeiro frame de {video_path}") |
|
|
|
|
|
def extract_last_frame(self, video_path: str, output_image_path: str) -> str: |
|
|
""" |
|
|
Extrai o último frame de um arquivo de vídeo e o salva como uma imagem. |
|
|
""" |
|
|
logger.info(f"Extraindo último frame de '{os.path.basename(video_path)}'...") |
|
|
cmd = ['ffmpeg', '-y', '-v', 'error', '-sseof', '-0.1', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] |
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
|
return output_image_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"FFmpeg (extract_last_frame) falhou: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao extrair o último frame de {video_path}") |
|
|
|
|
|
def create_transition_bridge(self, start_image_path: str, end_image_path: str, |
|
|
duration: float, fps: int, target_resolution: Tuple[int, int], |
|
|
workspace_dir: str, effect: Optional[str] = None) -> str: |
|
|
""" |
|
|
Cria um clipe de vídeo curto que transiciona entre duas imagens estáticas. |
|
|
""" |
|
|
output_path = os.path.join(workspace_dir, f"bridge_{int(time.time())}_{random.randint(100, 999)}.mp4") |
|
|
width, height = target_resolution |
|
|
fade_effects = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "dissolve", "fadeblack", "fadewhite", "radial", "rectcrop", "circleopen", "circleclose", "horzopen", "horzclose"] |
|
|
selected_effect = effect if effect and effect.strip() else random.choice(fade_effects) |
|
|
transition_duration = max(0.1, duration) |
|
|
cmd = (f"ffmpeg -y -v error -loop 1 -t {transition_duration} -i \"{start_image_path}\" -loop 1 -t {transition_duration} -i \"{end_image_path}\" " |
|
|
f"-filter_complex \"[0:v]scale={width}:{height},setsar=1[v0];[1:v]scale={width}:{height},setsar=1[v1];" |
|
|
f"[v0][v1]xfade=transition={selected_effect}:duration={transition_duration}:offset=0[out]\" " |
|
|
f"-map \"[out]\" -c:v libx264 -r {fps} -pix_fmt yuv420p \"{output_path}\"") |
|
|
logger.info(f"Criando ponte de transição com efeito '{selected_effect}'...") |
|
|
try: |
|
|
subprocess.run(cmd, shell=True, check=True, text=True) |
|
|
except subprocess.CalledProcessError as e: |
|
|
raise VideoToolError(f"Falha ao criar vídeo de transição: {e.stderr}") |
|
|
return output_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def concatenate_videos(self, video_paths: List[str], output_path: str, workspace_dir: str, start:int= 0, overlap:int=3) -> str: |
|
|
""" |
|
|
Concatena múltiplos vídeos MP4, removendo exatamente o último frame |
|
|
de cada vídeo (exceto o último), salvando os cortes e recriando a lista a cada execução. |
|
|
""" |
|
|
if not video_paths: |
|
|
raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") |
|
|
|
|
|
|
|
|
if len(video_paths) == 1: |
|
|
shutil.copy(video_paths[0], output_path) |
|
|
print(f"[Concat] Apenas um clipe fornecido. Copiado para '{output_path}'.") |
|
|
return output_path |
|
|
|
|
|
|
|
|
trimmed_dir = os.path.join(workspace_dir, "trimmed_parts") |
|
|
os.makedirs(trimmed_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
for old in os.listdir(trimmed_dir): |
|
|
try: |
|
|
os.remove(os.path.join(trimmed_dir, old)) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
processed_videos = [] |
|
|
|
|
|
try: |
|
|
for i, base in enumerate(video_paths): |
|
|
abs_base = os.path.abspath(base) |
|
|
base_name = os.path.basename(abs_base) |
|
|
video_podado = os.path.join(trimmed_dir, f"cut_{i}_{base_name}") |
|
|
|
|
|
|
|
|
probe_cmd = [ |
|
|
"ffprobe", "-v", "error", |
|
|
"-select_streams", "v:0", |
|
|
"-count_frames", |
|
|
"-show_entries", "stream=nb_read_frames", |
|
|
"-of", "default=nokey=1:noprint_wrappers=1", |
|
|
abs_base |
|
|
] |
|
|
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) |
|
|
total_frames = int(result.stdout.strip()) |
|
|
print(f"[Trim] {base_name} → total_frames={total_frames}") |
|
|
|
|
|
|
|
|
start_frame = start |
|
|
end_frame = total_frames if i == len(video_paths) - 1 else total_frames - overlap |
|
|
|
|
|
if i < len(video_paths) - 1: |
|
|
|
|
|
cmd_fim = ( |
|
|
f'ffmpeg -y -hide_banner -loglevel error -i "{abs_base}" ' |
|
|
f'-vf "trim=start_frame={start_frame}:end_frame={end_frame},setpts=PTS-STARTPTS" ' |
|
|
f'-an "{video_podado}"' |
|
|
) |
|
|
print(f"[CmdTrim] {cmd_fim}") |
|
|
subprocess.run(cmd_fim, shell=True, check=True) |
|
|
print(f"[TrimOK] {base_name}: corte {end_frame}/{total_frames} frames → {os.path.basename(video_podado)}") |
|
|
processed_videos.append(video_podado) |
|
|
else: |
|
|
processed_videos.append(abs_base) |
|
|
print(f"[Keep] Último vídeo sem corte: {base_name}") |
|
|
|
|
|
|
|
|
list_file_path = os.path.join(workspace_dir, "concat_list.txt") |
|
|
if os.path.exists(list_file_path): |
|
|
os.remove(list_file_path) |
|
|
with open(list_file_path, "w", encoding="utf-8") as f: |
|
|
for p in processed_videos: |
|
|
f.write(f"file '{os.path.abspath(p)}'\n") |
|
|
|
|
|
|
|
|
cmd_concat = ( |
|
|
f'ffmpeg -y -hide_banner -loglevel error -f concat -safe 0 ' |
|
|
f'-i "{list_file_path}" -c copy "{output_path}"' |
|
|
) |
|
|
print(f"[Concat] Executando concatenação final:\n{cmd_concat}") |
|
|
subprocess.run(cmd_concat, shell=True, check=True) |
|
|
print("[ConcatOK] Concatenação concluída com sucesso.") |
|
|
return output_path |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"[ConcatERR] Erro FFmpeg: {e}") |
|
|
raise VideoToolError("Falha durante concatenação de vídeos.") |
|
|
|
|
|
def concatenate_videos2(self, video_paths: List[str], output_path: str, workspace_dir: str) -> str: |
|
|
""" |
|
|
Concatena múltiplos clipes de vídeo em um único arquivo sem re-codificar. |
|
|
""" |
|
|
if not video_paths: |
|
|
raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") |
|
|
|
|
|
if len(video_paths) == 1: |
|
|
shutil.copy(video_paths[0], output_path) |
|
|
logger.info(f"Apenas um clipe fornecido. Copiado para '{output_path}'.") |
|
|
return output_path |
|
|
|
|
|
list_file_path = os.path.join(workspace_dir, f"concat_list_{int(time.time())}.txt") |
|
|
try: |
|
|
with open(list_file_path, 'w', encoding='utf-8') as f: |
|
|
for path in video_paths: |
|
|
|
|
|
f.write(f"file '{os.path.abspath(path)}'\n") |
|
|
|
|
|
cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path] |
|
|
logger.info(f"Concatenando {len(video_paths)} clipes para '{os.path.basename(output_path)}'...") |
|
|
subprocess.run(cmd_list, check=True, capture_output=True, text=True) |
|
|
logger.info("Concatenação FFmpeg bem-sucedida.") |
|
|
return output_path |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Falha ao montar o vídeo final com FFmpeg: {e.stderr}") |
|
|
raise VideoToolError(f"Falha ao montar o vídeo final com FFmpeg.") |
|
|
finally: |
|
|
if os.path.exists(list_file_path): |
|
|
os.remove(list_file_path) |
|
|
|
|
|
|
|
|
video_encode_tool_singleton = VideoEncodeTool() |