Update managers/gemini_manager.py
Browse files- managers/gemini_manager.py +40 -147
managers/gemini_manager.py
CHANGED
|
@@ -2,12 +2,12 @@
|
|
| 2 |
#
|
| 3 |
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
|
| 4 |
#
|
| 5 |
-
# Version: 1.1.
|
| 6 |
#
|
| 7 |
-
# This file defines the GeminiManager, a specialist responsible for
|
| 8 |
-
#
|
| 9 |
-
#
|
| 10 |
-
#
|
| 11 |
|
| 12 |
import os
|
| 13 |
import logging
|
|
@@ -17,6 +17,7 @@ import gradio as gr
|
|
| 17 |
from PIL import Image
|
| 18 |
import google.generativeai as genai
|
| 19 |
import re
|
|
|
|
| 20 |
|
| 21 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
| 22 |
logger = logging.getLogger(__name__)
|
|
@@ -28,13 +29,11 @@ def robust_json_parser(raw_text: str) -> dict:
|
|
| 28 |
"""
|
| 29 |
clean_text = raw_text.strip()
|
| 30 |
try:
|
| 31 |
-
# Try to find JSON delimited by ```json ... ```
|
| 32 |
match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL)
|
| 33 |
if match:
|
| 34 |
json_str = match.group(1)
|
| 35 |
return json.loads(json_str)
|
| 36 |
|
| 37 |
-
# If not found, try to find the first '{' and the last '}'
|
| 38 |
start_index = clean_text.find('{')
|
| 39 |
end_index = clean_text.rfind('}')
|
| 40 |
if start_index != -1 and end_index != -1 and end_index > start_index:
|
|
@@ -48,169 +47,63 @@ def robust_json_parser(raw_text: str) -> dict:
|
|
| 48 |
|
| 49 |
class GeminiManager:
|
| 50 |
"""
|
| 51 |
-
Manages interactions with the Google Gemini API
|
| 52 |
-
reasoning and language specialist for the ADUC framework.
|
| 53 |
"""
|
| 54 |
def __init__(self):
|
| 55 |
self.api_key = os.environ.get("GEMINI_API_KEY")
|
| 56 |
if self.api_key:
|
| 57 |
genai.configure(api_key=self.api_key)
|
| 58 |
-
self.model = genai.GenerativeModel('gemini-
|
| 59 |
-
logger.info("
|
| 60 |
else:
|
| 61 |
self.model = None
|
| 62 |
-
logger.warning("Gemini API key not found.
|
| 63 |
|
| 64 |
def _check_model(self):
|
| 65 |
"""Raises an error if the Gemini API is not configured."""
|
| 66 |
if not self.model:
|
| 67 |
raise gr.Error("The Google Gemini API key is not configured (GEMINI_API_KEY).")
|
| 68 |
|
| 69 |
-
def
|
| 70 |
-
"""
|
| 71 |
-
try:
|
| 72 |
-
# Assuming the 'prompts' directory is in the root of the project
|
| 73 |
-
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
|
| 74 |
-
with open(prompts_dir / filename, "r", encoding="utf-8") as f:
|
| 75 |
-
return f.read()
|
| 76 |
-
except FileNotFoundError:
|
| 77 |
-
raise gr.Error(f"Prompt template file not found: prompts/{filename}")
|
| 78 |
-
|
| 79 |
-
def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]:
|
| 80 |
-
"""Delegated task: Acts as a Scriptwriter to generate a storyboard."""
|
| 81 |
self._check_model()
|
| 82 |
-
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
|
| 86 |
-
|
| 87 |
-
logger.info("Calling Gemini to generate storyboard...")
|
| 88 |
-
response = self.model.generate_content(model_contents)
|
| 89 |
-
logger.info(f"Gemini responded with (raw storyboard): {response.text}")
|
| 90 |
-
|
| 91 |
-
storyboard_data = robust_json_parser(response.text)
|
| 92 |
-
storyboard = storyboard_data.get("scene_storyboard", [])
|
| 93 |
-
if not storyboard or len(storyboard) != num_keyframes:
|
| 94 |
-
raise ValueError(f"Incorrect number of scenes generated. Expected {num_keyframes}, got {len(storyboard)}.")
|
| 95 |
-
return storyboard
|
| 96 |
-
except Exception as e:
|
| 97 |
-
raise gr.Error(f"The Scriptwriter (Gemini) failed: {e}")
|
| 98 |
|
| 99 |
-
def
|
| 100 |
-
"""
|
| 101 |
-
|
| 102 |
-
if not pool_image_paths:
|
| 103 |
-
raise gr.Error("The 'image pool' (Additional Images) is empty.")
|
| 104 |
-
|
| 105 |
-
try:
|
| 106 |
-
template = self._read_prompt_template("keyframe_selection_prompt.txt")
|
| 107 |
-
|
| 108 |
-
image_map = {f"IMG-{i+1}": path for i, path in enumerate(pool_image_paths)}
|
| 109 |
-
base_image_map = {f"BASE-{i+1}": path for i, path in enumerate(base_image_paths)}
|
| 110 |
-
|
| 111 |
-
model_contents = ["# Reference Images (Story Base)"]
|
| 112 |
-
for identifier, path in base_image_map.items():
|
| 113 |
-
model_contents.extend([f"Identifier: {identifier}", Image.open(path)])
|
| 114 |
-
|
| 115 |
-
model_contents.append("\n# Image Pool (Scene Bank)")
|
| 116 |
-
for identifier, path in image_map.items():
|
| 117 |
-
model_contents.extend([f"Identifier: {identifier}", Image.open(path)])
|
| 118 |
-
|
| 119 |
-
storyboard_str = "\n".join([f"- Scene {i+1}: {s}" for i, s in enumerate(storyboard)])
|
| 120 |
-
selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys()))
|
| 121 |
-
model_contents.append(selection_prompt)
|
| 122 |
-
|
| 123 |
-
logger.info("Calling Gemini to select keyframes from pool...")
|
| 124 |
-
response = self.model.generate_content(model_contents)
|
| 125 |
-
logger.info(f"Gemini responded with (raw keyframe selection): {response.text}")
|
| 126 |
-
|
| 127 |
-
selection_data = robust_json_parser(response.text)
|
| 128 |
-
selected_identifiers = selection_data.get("selected_image_identifiers", [])
|
| 129 |
-
|
| 130 |
-
if len(selected_identifiers) != len(storyboard):
|
| 131 |
-
raise ValueError("The AI did not select the correct number of images for the scenes.")
|
| 132 |
-
|
| 133 |
-
selected_paths = [image_map[identifier] for identifier in selected_identifiers]
|
| 134 |
-
return selected_paths
|
| 135 |
|
| 136 |
-
|
| 137 |
-
|
| 138 |
|
| 139 |
-
|
| 140 |
-
|
| 141 |
-
|
| 142 |
try:
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
director_prompt = template.format(
|
| 146 |
-
historico_prompt=scene_history,
|
| 147 |
-
cena_atual=current_scene_desc,
|
| 148 |
-
cena_futura=future_scene_desc
|
| 149 |
-
)
|
| 150 |
-
|
| 151 |
-
model_contents = [
|
| 152 |
-
"# CONTEXT:",
|
| 153 |
-
f"- Global Story Goal: {global_prompt}",
|
| 154 |
-
"# VISUAL ASSETS:",
|
| 155 |
-
"Current Base Image [IMG-BASE]:",
|
| 156 |
-
Image.open(last_image_path)
|
| 157 |
-
]
|
| 158 |
-
|
| 159 |
-
ref_counter = 1
|
| 160 |
-
for path in fixed_ref_paths:
|
| 161 |
-
if path != last_image_path:
|
| 162 |
-
model_contents.extend([f"General Reference Image [IMG-REF-{ref_counter}]:", Image.open(path)])
|
| 163 |
-
ref_counter += 1
|
| 164 |
-
|
| 165 |
-
model_contents.append(director_prompt)
|
| 166 |
-
|
| 167 |
-
logger.info("Calling Gemini to generate anticipatory keyframe prompt...")
|
| 168 |
-
response = self.model.generate_content(model_contents)
|
| 169 |
-
logger.info(f"Gemini responded with (raw keyframe prompt): {response.text}")
|
| 170 |
-
|
| 171 |
-
final_flux_prompt = response.text.strip().replace("`", "").replace("\"", "")
|
| 172 |
-
return final_flux_prompt
|
| 173 |
except Exception as e:
|
| 174 |
-
|
|
|
|
| 175 |
|
| 176 |
-
def
|
| 177 |
-
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str,
|
| 178 |
-
past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict:
|
| 179 |
"""
|
| 180 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 181 |
"""
|
| 182 |
-
self._check_model()
|
| 183 |
try:
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
global_prompt=global_prompt,
|
| 187 |
-
story_history=story_history,
|
| 188 |
-
past_scene_desc=past_scene_desc,
|
| 189 |
-
present_scene_desc=present_scene_desc,
|
| 190 |
-
future_scene_desc=future_scene_desc
|
| 191 |
-
)
|
| 192 |
-
|
| 193 |
-
model_contents = [
|
| 194 |
-
prompt_text,
|
| 195 |
-
"[PAST_IMAGE]:", Image.open(past_keyframe_path),
|
| 196 |
-
"[PRESENT_IMAGE]:", Image.open(present_keyframe_path),
|
| 197 |
-
"[FUTURE_IMAGE]:", Image.open(future_keyframe_path)
|
| 198 |
-
]
|
| 199 |
-
|
| 200 |
-
logger.info("Calling Gemini to generate cinematic decision...")
|
| 201 |
-
response = self.model.generate_content(model_contents)
|
| 202 |
-
logger.info(f"Gemini responded with (raw cinematic decision): {response.text}")
|
| 203 |
-
|
| 204 |
-
decision_data = robust_json_parser(response.text)
|
| 205 |
-
if "transition_type" not in decision_data or "motion_prompt" not in decision_data:
|
| 206 |
-
raise ValueError("AI response (Cinematographer) is malformed. Missing 'transition_type' or 'motion_prompt'.")
|
| 207 |
-
return decision_data
|
| 208 |
except Exception as e:
|
| 209 |
-
logger.error(f"
|
| 210 |
-
|
| 211 |
-
"transition_type": "continuous",
|
| 212 |
-
"motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'."
|
| 213 |
-
}
|
| 214 |
|
| 215 |
# --- Singleton Instance ---
|
| 216 |
-
gemini_manager_singleton = GeminiManager()
|
|
|
|
| 2 |
#
|
| 3 |
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
|
| 4 |
#
|
| 5 |
+
# Version: 1.1.1
|
| 6 |
#
|
| 7 |
+
# This file defines the GeminiManager, a specialist responsible for raw communication
|
| 8 |
+
# with the Google Gemini API. It acts as a lean API client, handling requests,
|
| 9 |
+
# parsing responses, and managing API-level errors. It does not contain any
|
| 10 |
+
# high-level prompt engineering or creative logic.
|
| 11 |
|
| 12 |
import os
|
| 13 |
import logging
|
|
|
|
| 17 |
from PIL import Image
|
| 18 |
import google.generativeai as genai
|
| 19 |
import re
|
| 20 |
+
from typing import List, Union, Any
|
| 21 |
|
| 22 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
| 23 |
logger = logging.getLogger(__name__)
|
|
|
|
| 29 |
"""
|
| 30 |
clean_text = raw_text.strip()
|
| 31 |
try:
|
|
|
|
| 32 |
match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL)
|
| 33 |
if match:
|
| 34 |
json_str = match.group(1)
|
| 35 |
return json.loads(json_str)
|
| 36 |
|
|
|
|
| 37 |
start_index = clean_text.find('{')
|
| 38 |
end_index = clean_text.rfind('}')
|
| 39 |
if start_index != -1 and end_index != -1 and end_index > start_index:
|
|
|
|
| 47 |
|
| 48 |
class GeminiManager:
|
| 49 |
"""
|
| 50 |
+
Manages raw interactions with the Google Gemini API.
|
|
|
|
| 51 |
"""
|
| 52 |
def __init__(self):
|
| 53 |
self.api_key = os.environ.get("GEMINI_API_KEY")
|
| 54 |
if self.api_key:
|
| 55 |
genai.configure(api_key=self.api_key)
|
| 56 |
+
self.model = genai.GenerativeModel('gemini-1.5-pro-latest')
|
| 57 |
+
logger.info("GeminiManager (Communication Layer) initialized successfully.")
|
| 58 |
else:
|
| 59 |
self.model = None
|
| 60 |
+
logger.warning("Gemini API key not found. GeminiManager disabled.")
|
| 61 |
|
| 62 |
def _check_model(self):
|
| 63 |
"""Raises an error if the Gemini API is not configured."""
|
| 64 |
if not self.model:
|
| 65 |
raise gr.Error("The Google Gemini API key is not configured (GEMINI_API_KEY).")
|
| 66 |
|
| 67 |
+
def _generate_content(self, prompt_parts: List[Any]) -> str:
|
| 68 |
+
"""Internal method to make the API call."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 69 |
self._check_model()
|
| 70 |
+
logger.info("Calling Gemini API...")
|
| 71 |
+
response = self.model.generate_content(prompt_parts)
|
| 72 |
+
logger.info(f"Gemini responded with raw text: {response.text}")
|
| 73 |
+
return response.text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 74 |
|
| 75 |
+
def get_raw_text(self, prompt_parts: List[Any]) -> str:
|
| 76 |
+
"""
|
| 77 |
+
Sends a prompt to the Gemini API and returns the raw text response.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 78 |
|
| 79 |
+
Args:
|
| 80 |
+
prompt_parts (List[Any]): A list containing strings and/or PIL.Image objects.
|
| 81 |
|
| 82 |
+
Returns:
|
| 83 |
+
str: The raw string response from the API.
|
| 84 |
+
"""
|
| 85 |
try:
|
| 86 |
+
return self._generate_content(prompt_parts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 87 |
except Exception as e:
|
| 88 |
+
logger.error(f"Gemini API call failed: {e}", exc_info=True)
|
| 89 |
+
raise gr.Error(f"Gemini API communication failed: {e}")
|
| 90 |
|
| 91 |
+
def get_json_object(self, prompt_parts: List[Any]) -> dict:
|
|
|
|
|
|
|
| 92 |
"""
|
| 93 |
+
Sends a prompt to the Gemini API, expects a JSON response, parses it, and returns a dictionary.
|
| 94 |
+
|
| 95 |
+
Args:
|
| 96 |
+
prompt_parts (List[Any]): A list containing strings and/or PIL.Image objects.
|
| 97 |
+
|
| 98 |
+
Returns:
|
| 99 |
+
dict: The parsed JSON object from the API response.
|
| 100 |
"""
|
|
|
|
| 101 |
try:
|
| 102 |
+
raw_response = self._generate_content(prompt_parts)
|
| 103 |
+
return robust_json_parser(raw_response)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 104 |
except Exception as e:
|
| 105 |
+
logger.error(f"Gemini API call or JSON parsing failed: {e}", exc_info=True)
|
| 106 |
+
raise gr.Error(f"Gemini API communication or response parsing failed: {e}")
|
|
|
|
|
|
|
|
|
|
| 107 |
|
| 108 |
# --- Singleton Instance ---
|
| 109 |
+
gemini_manager_singleton = GeminiManager()```
|