|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import logging |
|
|
import json |
|
|
from pathlib import Path |
|
|
import gradio as gr |
|
|
from PIL import Image |
|
|
import google.generativeai as genai |
|
|
import re |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def robust_json_parser(raw_text: str) -> dict: |
|
|
""" |
|
|
Parses a JSON object from a string that might contain extra text, |
|
|
such as Markdown code blocks from an LLM's response. |
|
|
""" |
|
|
clean_text = raw_text.strip() |
|
|
try: |
|
|
|
|
|
match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL) |
|
|
if match: |
|
|
json_str = match.group(1) |
|
|
return json.loads(json_str) |
|
|
|
|
|
|
|
|
start_index = clean_text.find('{') |
|
|
end_index = clean_text.rfind('}') |
|
|
if start_index != -1 and end_index != -1 and end_index > start_index: |
|
|
json_str = clean_text[start_index : end_index + 1] |
|
|
return json.loads(json_str) |
|
|
else: |
|
|
raise ValueError("No valid JSON object could be found in the AI's response.") |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Failed to decode JSON. The AI returned the following text:\n---\n{raw_text}\n---") |
|
|
raise ValueError(f"The AI returned an invalid JSON format: {e}") |
|
|
|
|
|
class GeminiManager: |
|
|
""" |
|
|
Manages interactions with the Google Gemini API, acting as the primary |
|
|
reasoning and language specialist for the ADUC framework. |
|
|
""" |
|
|
def __init__(self): |
|
|
self.api_key = os.environ.get("GEMINI_API_KEY") |
|
|
if self.api_key: |
|
|
genai.configure(api_key=self.api_key) |
|
|
self.model = genai.GenerativeModel('gemini-2.5-pro') |
|
|
logger.info("Gemini Specialist (1.5 Pro) initialized successfully.") |
|
|
else: |
|
|
self.model = None |
|
|
logger.warning("Gemini API key not found. Specialist disabled.") |
|
|
|
|
|
def _check_model(self): |
|
|
"""Raises an error if the Gemini API is not configured.""" |
|
|
if not self.model: |
|
|
raise gr.Error("The Google Gemini API key is not configured (GEMINI_API_KEY).") |
|
|
|
|
|
def _read_prompt_template(self, filename: str) -> str: |
|
|
"""Reads a prompt template file from the 'prompts' directory.""" |
|
|
try: |
|
|
|
|
|
prompts_dir = Path(__file__).resolve().parent.parent / "prompts" |
|
|
with open(prompts_dir / filename, "r", encoding="utf-8") as f: |
|
|
return f.read() |
|
|
except FileNotFoundError: |
|
|
raise gr.Error(f"Prompt template file not found: prompts/{filename}") |
|
|
|
|
|
def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]: |
|
|
"""Delegated task: Acts as a Scriptwriter to generate a storyboard.""" |
|
|
self._check_model() |
|
|
try: |
|
|
template = self._read_prompt_template("unified_storyboard_prompt.txt") |
|
|
storyboard_prompt = template.format(user_prompt=prompt, num_fragments=num_keyframes) |
|
|
model_contents = [storyboard_prompt] + [Image.open(p) for p in ref_image_paths] |
|
|
|
|
|
logger.info("Calling Gemini to generate storyboard...") |
|
|
response = self.model.generate_content(model_contents) |
|
|
logger.info(f"Gemini responded with (raw storyboard): {response.text}") |
|
|
|
|
|
storyboard_data = robust_json_parser(response.text) |
|
|
storyboard = storyboard_data.get("scene_storyboard", []) |
|
|
if not storyboard or len(storyboard) != num_keyframes: |
|
|
raise ValueError(f"Incorrect number of scenes generated. Expected {num_keyframes}, got {len(storyboard)}.") |
|
|
return storyboard |
|
|
except Exception as e: |
|
|
raise gr.Error(f"The Scriptwriter (Gemini) failed: {e}") |
|
|
|
|
|
def select_keyframes_from_pool(self, storyboard: list, base_image_paths: list[str], pool_image_paths: list[str]) -> list[str]: |
|
|
"""Delegated task: Acts as a Photographer/Editor to select keyframes.""" |
|
|
self._check_model() |
|
|
if not pool_image_paths: |
|
|
raise gr.Error("The 'image pool' (Additional Images) is empty.") |
|
|
|
|
|
try: |
|
|
template = self._read_prompt_template("keyframe_selection_prompt.txt") |
|
|
|
|
|
image_map = {f"IMG-{i+1}": path for i, path in enumerate(pool_image_paths)} |
|
|
base_image_map = {f"BASE-{i+1}": path for i, path in enumerate(base_image_paths)} |
|
|
|
|
|
model_contents = ["# Reference Images (Story Base)"] |
|
|
for identifier, path in base_image_map.items(): |
|
|
model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) |
|
|
|
|
|
model_contents.append("\n# Image Pool (Scene Bank)") |
|
|
for identifier, path in image_map.items(): |
|
|
model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) |
|
|
|
|
|
storyboard_str = "\n".join([f"- Scene {i+1}: {s}" for i, s in enumerate(storyboard)]) |
|
|
selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys())) |
|
|
model_contents.append(selection_prompt) |
|
|
|
|
|
logger.info("Calling Gemini to select keyframes from pool...") |
|
|
response = self.model.generate_content(model_contents) |
|
|
logger.info(f"Gemini responded with (raw keyframe selection): {response.text}") |
|
|
|
|
|
selection_data = robust_json_parser(response.text) |
|
|
selected_identifiers = selection_data.get("selected_image_identifiers", []) |
|
|
|
|
|
if len(selected_identifiers) != len(storyboard): |
|
|
raise ValueError("The AI did not select the correct number of images for the scenes.") |
|
|
|
|
|
selected_paths = [image_map[identifier] for identifier in selected_identifiers] |
|
|
return selected_paths |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"The Photographer (Gemini) failed to select images: {e}") |
|
|
|
|
|
def get_anticipatory_keyframe_prompt(self, global_prompt: str, scene_history: str, current_scene_desc: str, future_scene_desc: str, last_image_path: str, fixed_ref_paths: list[str]) -> str: |
|
|
"""Delegated task: Acts as an Art Director to generate an image prompt.""" |
|
|
self._check_model() |
|
|
try: |
|
|
template = self._read_prompt_template("anticipatory_keyframe_prompt.txt") |
|
|
|
|
|
director_prompt = template.format( |
|
|
historico_prompt=scene_history, |
|
|
cena_atual=current_scene_desc, |
|
|
cena_futura=future_scene_desc |
|
|
) |
|
|
|
|
|
model_contents = [ |
|
|
"# CONTEXT:", |
|
|
f"- Global Story Goal: {global_prompt}", |
|
|
"# VISUAL ASSETS:", |
|
|
"Current Base Image [IMG-BASE]:", |
|
|
Image.open(last_image_path) |
|
|
] |
|
|
|
|
|
ref_counter = 1 |
|
|
for path in fixed_ref_paths: |
|
|
if path != last_image_path: |
|
|
model_contents.extend([f"General Reference Image [IMG-REF-{ref_counter}]:", Image.open(path)]) |
|
|
ref_counter += 1 |
|
|
|
|
|
model_contents.append(director_prompt) |
|
|
|
|
|
logger.info("Calling Gemini to generate anticipatory keyframe prompt...") |
|
|
response = self.model.generate_content(model_contents) |
|
|
logger.info(f"Gemini responded with (raw keyframe prompt): {response.text}") |
|
|
|
|
|
final_flux_prompt = response.text.strip().replace("`", "").replace("\"", "") |
|
|
return final_flux_prompt |
|
|
except Exception as e: |
|
|
raise gr.Error(f"The Art Director (Gemini) failed: {e}") |
|
|
|
|
|
def get_cinematic_decision(self, global_prompt: str, story_history: str, |
|
|
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str, |
|
|
past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict: |
|
|
""" |
|
|
Delegated task: Acts as a Film Director to make editing decisions and generate motion prompts. |
|
|
""" |
|
|
self._check_model() |
|
|
try: |
|
|
template = self._read_prompt_template("cinematic_director_prompt.txt") |
|
|
prompt_text = template.format( |
|
|
global_prompt=global_prompt, |
|
|
story_history=story_history, |
|
|
past_scene_desc=past_scene_desc, |
|
|
present_scene_desc=present_scene_desc, |
|
|
future_scene_desc=future_scene_desc |
|
|
) |
|
|
|
|
|
model_contents = [ |
|
|
prompt_text, |
|
|
"[PAST_IMAGE]:", Image.open(past_keyframe_path), |
|
|
"[PRESENT_IMAGE]:", Image.open(present_keyframe_path), |
|
|
"[FUTURE_IMAGE]:", Image.open(future_keyframe_path) |
|
|
] |
|
|
|
|
|
logger.info("Calling Gemini to generate cinematic decision...") |
|
|
response = self.model.generate_content(model_contents) |
|
|
logger.info(f"Gemini responded with (raw cinematic decision): {response.text}") |
|
|
|
|
|
decision_data = robust_json_parser(response.text) |
|
|
if "transition_type" not in decision_data or "motion_prompt" not in decision_data: |
|
|
raise ValueError("AI response (Cinematographer) is malformed. Missing 'transition_type' or 'motion_prompt'.") |
|
|
return decision_data |
|
|
except Exception as e: |
|
|
logger.error(f"The Film Director (Gemini) failed: {e}. Using fallback to 'continuous'.") |
|
|
return { |
|
|
"transition_type": "continuous", |
|
|
"motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'." |
|
|
} |
|
|
|
|
|
|
|
|
gemini_manager_singleton = GeminiManager() |