|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
|
from typing import List, Dict, Any, Tuple, Callable, Optional, Generator |
|
|
from PIL import Image, ImageOps |
|
|
import os |
|
|
import subprocess |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
import time |
|
|
import gc |
|
|
import torch |
|
|
|
|
|
|
|
|
from .director import AducDirector |
|
|
from .types import GenerationState, PreProductionParams, ProductionParams |
|
|
|
|
|
|
|
|
from .engineers import deformes2d_thinker_singleton, deformes3d_engine_singleton, Deformes4DEngine |
|
|
|
|
|
|
|
|
from .managers.latent_enhancer_manager import latent_enhancer_specialist_singleton |
|
|
from .managers.seedvr_manager import seedvr_manager_singleton |
|
|
from .managers.mmaudio_manager import mmaudio_manager_singleton |
|
|
from .managers.vae_manager import vae_manager_singleton |
|
|
|
|
|
|
|
|
from .tools.video_encode_tool import video_encode_tool_singleton |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
ProgressCallback = Optional[Callable[[float, str], None]] |
|
|
|
|
|
class AducOrchestrator: |
|
|
""" |
|
|
Implementa o Maestro (Γ), a camada de orquestração central do Aduc Framework. |
|
|
Ele recebe solicitações, atualiza o estado de geração, delega tarefas para os |
|
|
engenheiros e seus pools de especialistas, e retorna o estado atualizado. |
|
|
""" |
|
|
def __init__(self, workspace_dir: str): |
|
|
self.director = AducDirector(workspace_dir) |
|
|
self.editor = Deformes4DEngine() |
|
|
self.editor.initialize(workspace_dir) |
|
|
self.painter = deformes3d_engine_singleton |
|
|
self.painter.initialize(workspace_dir) |
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
logger.info("ADUC Maestro (Framework Core) pronto para reger a orquestra de especialistas.") |
|
|
|
|
|
def get_current_state(self) -> GenerationState: |
|
|
"""Retorna o estado de geração atual.""" |
|
|
return self.director.get_full_state() |
|
|
|
|
|
def process_image_for_story(self, image_path: str, size: int, filename: str) -> str: |
|
|
"""Processa uma imagem de referência para o formato quadrado padrão.""" |
|
|
img = Image.open(image_path).convert("RGB") |
|
|
img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS) |
|
|
processed_path = os.path.join(self.director.workspace_dir, filename) |
|
|
img_square.save(processed_path) |
|
|
logger.info(f"Imagem de referência processada e salva em: {processed_path}") |
|
|
return processed_path |
|
|
|
|
|
|
|
|
def task_pre_production(self, params: PreProductionParams, progress_callback: ProgressCallback = None) -> Tuple[List[str], List[str], GenerationState]: |
|
|
"""Orquestra a criação do storyboard e dos keyframes visuais.""" |
|
|
logger.info("Maestro: Iniciando tarefa de Pré-Produção.") |
|
|
self.director.update_parameters("pre_producao", params) |
|
|
|
|
|
if progress_callback: progress_callback(0.1, "Gerando storyboard...") |
|
|
storyboard_list = deformes2d_thinker_singleton.generate_storyboard(prompt=params.prompt, num_keyframes=params.num_keyframes, ref_image_paths=params.ref_paths) |
|
|
self.director.update_pre_production_state(params.prompt, params.ref_paths, storyboard_list) |
|
|
|
|
|
if progress_callback: progress_callback(0.2, "Iniciando geração de keyframes...") |
|
|
keyframes_detailed_data = self.painter.generate_keyframes_from_storyboard(generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback) |
|
|
self.director.update_keyframes_state(keyframes_detailed_data) |
|
|
|
|
|
final_keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_detailed_data] |
|
|
final_state = self.director.get_full_state() |
|
|
logger.info("Maestro: Tarefa de Pré-Produção concluída.") |
|
|
return storyboard_list, final_keyframe_paths, final_state |
|
|
|
|
|
|
|
|
def task_produce_original_movie(self, params: ProductionParams, progress_callback: ProgressCallback = None) -> Tuple[str, List[str], GenerationState]: |
|
|
"""Orquestra a geração do vídeo principal a partir dos keyframes.""" |
|
|
logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.") |
|
|
self.director.update_parameters("producao", params) |
|
|
|
|
|
result_data = self.editor.generate_original_movie(full_generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback) |
|
|
self.director.update_video_state(result_data["video_data"]) |
|
|
|
|
|
final_video_path = result_data["final_path"] |
|
|
latent_paths = result_data["latent_paths"] |
|
|
final_state = self.director.get_full_state() |
|
|
logger.info("Maestro: Tarefa de Produção do Filme Original concluída.") |
|
|
return final_video_path, latent_paths, final_state |
|
|
|
|
|
|
|
|
|
|
|
def task_run_latent_upscaler(self, latent_paths: List[str], chunk_size: int, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
|
|
"""Aplica upscale 2x nos latentes e os decodifica para um novo vídeo.""" |
|
|
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
|
|
if not latent_paths: raise ValueError("Nenhum caminho de latente fornecido para o upscale.") |
|
|
|
|
|
logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---") |
|
|
run_timestamp = int(time.time()) |
|
|
temp_dir = os.path.join(self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}") |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
|
|
final_upscaled_clip_paths = [] |
|
|
num_chunks = -(-len(latent_paths) // chunk_size) |
|
|
|
|
|
for i in range(num_chunks): |
|
|
chunk_paths = latent_paths[i * chunk_size:(i + 1) * chunk_size] |
|
|
if progress_callback: progress_callback(i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}") |
|
|
|
|
|
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths] |
|
|
sub_group_latent = torch.cat(tensors_in_chunk, dim=2) |
|
|
|
|
|
upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent) |
|
|
pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk) |
|
|
|
|
|
current_clip_path = os.path.join(temp_dir, f"upscaled_clip_{i:04d}.mp4") |
|
|
self.editor.save_video_from_tensor(pixel_tensor, current_clip_path, fps=24) |
|
|
final_upscaled_clip_paths.append(current_clip_path) |
|
|
|
|
|
del tensors_in_chunk, sub_group_latent, upscaled_latent_chunk, pixel_tensor |
|
|
gc.collect(); torch.cuda.empty_cache() |
|
|
yield {"progress": (i + 1) / num_chunks} |
|
|
|
|
|
final_video_path = os.path.join(self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4") |
|
|
video_encode_tool_singleton.concatenate_videos(final_upscaled_clip_paths, final_video_path, self.director.workspace_dir) |
|
|
|
|
|
shutil.rmtree(temp_dir) |
|
|
logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}") |
|
|
yield {"final_path": final_video_path} |
|
|
|
|
|
def task_run_hd_mastering(self, source_video_path: str, steps: int, prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
|
|
"""Aplica masterização em HD usando o pool de GPUs do SeedVR com o modelo 3B.""" |
|
|
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
|
|
logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR 3B ---") |
|
|
|
|
|
run_timestamp = int(time.time()) |
|
|
output_path = os.path.join(self.director.workspace_dir, f"hd_mastered_movie_3B_{run_timestamp}.mp4") |
|
|
|
|
|
final_path = seedvr_manager_singleton.process_video( |
|
|
input_video_path=source_video_path, |
|
|
output_video_path=output_path, |
|
|
prompt=prompt, |
|
|
steps=steps |
|
|
) |
|
|
logger.info(f"Masterização HD completa! Vídeo final em: {final_path}") |
|
|
yield {"final_path": final_path} |
|
|
|
|
|
def task_run_audio_generation(self, source_video_path: str, audio_prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]: |
|
|
"""Gera e adiciona áudio ao vídeo usando o pool de GPUs do MMAudio.""" |
|
|
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado.") |
|
|
logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---") |
|
|
|
|
|
if progress_callback: progress_callback(0.1, "Preparando para geração de áudio...") |
|
|
|
|
|
run_timestamp = int(time.time()) |
|
|
source_name = Path(source_video_path).stem |
|
|
output_path = os.path.join(self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4") |
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path], |
|
|
capture_output=True, text=True, check=True |
|
|
) |
|
|
duration = float(result.stdout.strip()) |
|
|
except Exception as e: |
|
|
logger.error(f"Não foi possível obter a duração do vídeo '{source_video_path}': {e}", exc_info=True) |
|
|
yield {"error": "Falha ao obter duração do vídeo."} |
|
|
return |
|
|
|
|
|
if progress_callback: progress_callback(0.5, "Gerando trilha de áudio...") |
|
|
|
|
|
final_path = mmaudio_manager_singleton.generate_audio_for_video( |
|
|
video_path=source_video_path, |
|
|
prompt=audio_prompt, |
|
|
duration_seconds=duration, |
|
|
output_path_override=output_path |
|
|
) |
|
|
|
|
|
logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}") |
|
|
if progress_callback: progress_callback(1.0, "Geração de áudio completa!") |
|
|
yield {"final_path": final_path} |