Aduc_sdr / aduc_framework /orchestrator.py
aducsdr's picture
Update aduc_framework/orchestrator.py
909d03d verified
raw
history blame
9.94 kB
# aduc_framework/orchestrator.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 3.2.0 (Com Orquestração de Pós-Produção)
#
# Esta versão representa a camada de orquestração do Aduc Framework.
# Ela é agnóstica a qualquer interface (UI ou API) e opera com
# tipos de dados bem definidos (Pydantic) e um estado de geração central.
# Agora, é responsável por inicializar os singletons e orquestrar todas as etapas.
import logging
from typing import List, Dict, Any, Tuple, Callable, Optional, Generator
from PIL import Image, ImageOps
import os
import subprocess
import shutil
from pathlib import Path
import time
import gc
import torch
# Importa componentes internos do framework
from .director import AducDirector
from .types import GenerationState, PreProductionParams, ProductionParams
from .engineers import deformes2d_thinker_singleton, deformes3d_engine_singleton, Deformes4DEngine
# Importa managers diretamente para as tarefas de pós-produção
from .managers.latent_enhancer_manager import latent_enhancer_specialist_singleton
from .managers.seedvr_manager import seedvr_manager_singleton
from .managers.mmaudio_manager import mmaudio_manager_singleton
from .managers.vae_manager import vae_manager_singleton
from .tools.video_encode_tool import video_encode_tool_singleton
logger = logging.getLogger(__name__)
ProgressCallback = Optional[Callable[[float, str], None]]
class AducOrchestrator:
"""
Implementa o Maestro (Γ), a camada de orquestração central do Aduc Framework.
Ele recebe solicitações, atualiza o estado de geração, delega tarefas para os
engenheiros especialistas e retorna o estado atualizado.
"""
def __init__(self, workspace_dir: str):
self.director = AducDirector(workspace_dir)
self.editor = Deformes4DEngine()
self.editor.initialize(workspace_dir)
self.painter = deformes3d_engine_singleton
self.painter.initialize(workspace_dir)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info("ADUC Maestro (Framework Core) está no pódio. Engenheiros especialistas prontos.")
def get_current_state(self) -> GenerationState:
return self.director.get_full_state()
def process_image_for_story(self, image_path: str, size: int, filename: str) -> str:
img = Image.open(image_path).convert("RGB")
img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS)
processed_path = os.path.join(self.director.workspace_dir, filename)
img_square.save(processed_path)
logger.info(f"Imagem de referência processada e salva em: {processed_path}")
return processed_path
def task_pre_production(self, params: PreProductionParams, progress_callback: ProgressCallback = None) -> Tuple[List[str], List[str], GenerationState]:
logger.info("Maestro: Iniciando tarefa de Pré-Produção.")
self.director.update_parameters("pre_producao", params)
if progress_callback: progress_callback(0.1, "Gerando storyboard...")
storyboard_list = deformes2d_thinker_singleton.generate_storyboard(prompt=params.prompt, num_keyframes=params.num_keyframes, ref_image_paths=params.ref_paths)
self.director.update_pre_production_state(params.prompt, params.ref_paths, storyboard_list)
if progress_callback: progress_callback(0.2, "Iniciando geração de keyframes...")
keyframes_detailed_data = self.painter.generate_keyframes_from_storyboard(generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback)
self.director.update_keyframes_state(keyframes_detailed_data)
final_keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_detailed_data]
final_state = self.director.get_full_state()
logger.info("Maestro: Tarefa de Pré-Produção concluída.")
return storyboard_list, final_keyframe_paths, final_state
def task_produce_original_movie(self, params: ProductionParams, progress_callback: ProgressCallback = None) -> Tuple[str, List[str], GenerationState]:
logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.")
self.director.update_parameters("producao", params)
result_data = self.editor.generate_original_movie(full_generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback)
self.director.update_video_state(result_data["video_data"])
final_video_path = result_data["final_path"]
latent_paths = result_data["latent_paths"]
final_state = self.director.get_full_state()
logger.info("Maestro: Tarefa de Produção do Filme Original concluída.")
return final_video_path, latent_paths, final_state
def task_run_latent_upscaler(self, latent_paths: List[str], chunk_size: int, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado corretamente.")
if not latent_paths:
raise ValueError("Não é possível fazer o upscale: nenhum caminho de latente foi fornecido.")
logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---")
run_timestamp = int(time.time())
temp_dir = os.path.join(self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}")
os.makedirs(temp_dir, exist_ok=True)
final_upscaled_clip_paths = []
num_chunks = -(-len(latent_paths) // chunk_size) if chunk_size > 0 else 0
for i in range(num_chunks):
chunk_start_index = i * chunk_size
chunk_end_index = chunk_start_index + chunk_size
chunk_paths = latent_paths[chunk_start_index:chunk_end_index]
if progress_callback: progress_callback(i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
sub_group_latent = torch.cat(tensors_in_chunk, dim=2)
del tensors_in_chunk; gc.collect(); torch.cuda.empty_cache()
upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent)
del sub_group_latent; gc.collect(); torch.cuda.empty_cache()
pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk)
del upscaled_latent_chunk; gc.collect(); torch.cuda.empty_cache()
base_name = f"upscaled_clip_{i:04d}_{run_timestamp}"
current_clip_path = os.path.join(temp_dir, f"{base_name}.mp4")
self.editor.save_video_from_tensor(pixel_tensor, current_clip_path, fps=24)
final_upscaled_clip_paths.append(current_clip_path)
del pixel_tensor; gc.collect(); torch.cuda.empty_cache()
yield {"progress": (i + 1) / num_chunks, "desc": f"Lote {i+1}/{num_chunks} completo."}
if progress_callback: progress_callback(0.98, "Montando vídeo com upscale...")
final_video_path = os.path.join(self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4")
video_encode_tool_singleton.concatenate_videos(final_upscaled_clip_paths, final_video_path, self.director.workspace_dir)
shutil.rmtree(temp_dir)
logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}")
yield {"final_path": final_video_path}
def task_run_hd_mastering(self, source_video_path: str, model_version: str, steps: int, prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado corretamente.")
logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR {model_version} ---")
run_timestamp = int(time.time())
output_path = os.path.join(self.director.workspace_dir, f"hd_mastered_movie_{model_version}_{run_timestamp}.mp4")
final_path = seedvr_manager_singleton.process_video(
input_video_path=source_video_path,
output_video_path=output_path,
prompt=prompt
)
logger.info(f"Masterização HD completa! Vídeo final em: {final_path}")
yield {"final_path": final_path}
def task_run_audio_generation(self, source_video_path: str, audio_prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
if not self.director.workspace_dir: raise RuntimeError("Orchestrator não inicializado corretamente.")
logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---")
if progress_callback: progress_callback(0.1, "Preparando para geração de áudio...")
run_timestamp = int(time.time())
source_name = Path(source_video_path).stem
output_path = os.path.join(self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4")
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path],
capture_output=True, text=True, check=True
)
duration = float(result.stdout.strip())
if progress_callback: progress_callback(0.5, "Gerando trilha de áudio...")
final_path = mmaudio_manager_singleton.generate_audio_for_video(
video_path=source_video_path,
prompt=audio_prompt,
duration_seconds=duration,
output_path_override=output_path
)
logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}")
if progress_callback: progress_callback(1.0, "Geração de áudio completa!")
yield {"final_path": final_path}