Update managers/gemini_manager.py
Browse files- managers/gemini_manager.py +58 -100
managers/gemini_manager.py
CHANGED
|
@@ -1,13 +1,13 @@
|
|
| 1 |
# managers/gemini_manager.py
|
| 2 |
-
# Copyright (C) 4 de Agosto de 2025 Carlos Rodrigues dos Santos
|
| 3 |
#
|
| 4 |
-
#
|
| 5 |
-
# sob os termos da Licença Pública Geral Affero GNU como publicada pela
|
| 6 |
-
# Free Software Foundation, seja a versão 3 da Licença, ou
|
| 7 |
-
# (a seu critério) qualquer versão posterior.
|
| 8 |
#
|
| 9 |
-
#
|
| 10 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
| 11 |
|
| 12 |
import os
|
| 13 |
import logging
|
|
@@ -21,70 +21,85 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
|
|
| 21 |
logger = logging.getLogger(__name__)
|
| 22 |
|
| 23 |
def robust_json_parser(raw_text: str) -> dict:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 24 |
clean_text = raw_text.strip()
|
| 25 |
try:
|
| 26 |
-
#
|
| 27 |
match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL)
|
| 28 |
if match:
|
| 29 |
json_str = match.group(1)
|
| 30 |
return json.loads(json_str)
|
| 31 |
|
| 32 |
-
#
|
| 33 |
start_index = clean_text.find('{')
|
| 34 |
end_index = clean_text.rfind('}')
|
| 35 |
if start_index != -1 and end_index != -1 and end_index > start_index:
|
| 36 |
json_str = clean_text[start_index : end_index + 1]
|
| 37 |
return json.loads(json_str)
|
| 38 |
else:
|
| 39 |
-
raise ValueError("
|
| 40 |
except json.JSONDecodeError as e:
|
| 41 |
-
logger.error(f"
|
| 42 |
-
raise ValueError(f"
|
| 43 |
-
|
| 44 |
-
class
|
|
|
|
|
|
|
|
|
|
|
|
|
| 45 |
def __init__(self):
|
| 46 |
self.api_key = os.environ.get("GEMINI_API_KEY")
|
| 47 |
if self.api_key:
|
| 48 |
genai.configure(api_key=self.api_key)
|
| 49 |
-
|
| 50 |
-
|
| 51 |
-
logger.info("Especialista Gemini (1.5 Pro) inicializado com sucesso.")
|
| 52 |
else:
|
| 53 |
self.model = None
|
| 54 |
-
logger.warning("
|
| 55 |
|
| 56 |
def _check_model(self):
|
|
|
|
| 57 |
if not self.model:
|
| 58 |
-
raise gr.Error("
|
| 59 |
|
| 60 |
def _read_prompt_template(self, filename: str) -> str:
|
|
|
|
| 61 |
try:
|
| 62 |
-
|
|
|
|
|
|
|
| 63 |
return f.read()
|
| 64 |
except FileNotFoundError:
|
| 65 |
-
raise gr.Error(f"
|
| 66 |
|
| 67 |
def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]:
|
|
|
|
| 68 |
self._check_model()
|
| 69 |
try:
|
| 70 |
template = self._read_prompt_template("unified_storyboard_prompt.txt")
|
| 71 |
storyboard_prompt = template.format(user_prompt=prompt, num_fragments=num_keyframes)
|
| 72 |
model_contents = [storyboard_prompt] + [Image.open(p) for p in ref_image_paths]
|
| 73 |
-
response = self.model.generate_content(model_contents)
|
| 74 |
|
| 75 |
-
logger.info(
|
|
|
|
|
|
|
| 76 |
|
| 77 |
storyboard_data = robust_json_parser(response.text)
|
| 78 |
storyboard = storyboard_data.get("scene_storyboard", [])
|
| 79 |
-
if not storyboard or len(storyboard) != num_keyframes:
|
|
|
|
| 80 |
return storyboard
|
| 81 |
except Exception as e:
|
| 82 |
-
raise gr.Error(f"
|
| 83 |
|
| 84 |
def select_keyframes_from_pool(self, storyboard: list, base_image_paths: list[str], pool_image_paths: list[str]) -> list[str]:
|
|
|
|
| 85 |
self._check_model()
|
| 86 |
if not pool_image_paths:
|
| 87 |
-
raise gr.Error("
|
| 88 |
|
| 89 |
try:
|
| 90 |
template = self._read_prompt_template("keyframe_selection_prompt.txt")
|
|
@@ -104,23 +119,24 @@ class GeminiSingleton:
|
|
| 104 |
selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys()))
|
| 105 |
model_contents.append(selection_prompt)
|
| 106 |
|
|
|
|
| 107 |
response = self.model.generate_content(model_contents)
|
| 108 |
-
|
| 109 |
-
logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (select_keyframes_from_pool) ---\n{response.text}\n--------------------")
|
| 110 |
|
| 111 |
selection_data = robust_json_parser(response.text)
|
| 112 |
selected_identifiers = selection_data.get("selected_image_identifiers", [])
|
| 113 |
|
| 114 |
if len(selected_identifiers) != len(storyboard):
|
| 115 |
-
raise ValueError("
|
| 116 |
|
| 117 |
selected_paths = [image_map[identifier] for identifier in selected_identifiers]
|
| 118 |
return selected_paths
|
| 119 |
|
| 120 |
except Exception as e:
|
| 121 |
-
raise gr.Error(f"
|
| 122 |
|
| 123 |
def get_anticipatory_keyframe_prompt(self, global_prompt: str, scene_history: str, current_scene_desc: str, future_scene_desc: str, last_image_path: str, fixed_ref_paths: list[str]) -> str:
|
|
|
|
| 124 |
self._check_model()
|
| 125 |
try:
|
| 126 |
template = self._read_prompt_template("anticipatory_keyframe_prompt.txt")
|
|
@@ -132,7 +148,7 @@ class GeminiSingleton:
|
|
| 132 |
)
|
| 133 |
|
| 134 |
model_contents = [
|
| 135 |
-
"#
|
| 136 |
f"- Global Story Goal: {global_prompt}",
|
| 137 |
"# VISUAL ASSETS:",
|
| 138 |
"Current Base Image [IMG-BASE]:",
|
|
@@ -147,42 +163,20 @@ class GeminiSingleton:
|
|
| 147 |
|
| 148 |
model_contents.append(director_prompt)
|
| 149 |
|
|
|
|
| 150 |
response = self.model.generate_content(model_contents)
|
| 151 |
-
|
| 152 |
-
logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_anticipatory_keyframe_prompt) ---\n{response.text}\n--------------------")
|
| 153 |
|
| 154 |
-
final_flux_prompt = response.text.strip()
|
| 155 |
return final_flux_prompt
|
| 156 |
except Exception as e:
|
| 157 |
-
raise gr.Error(f"
|
| 158 |
-
|
| 159 |
-
def get_initial_motion_prompt(self, user_prompt: str, start_image_path: str, destination_image_path: str, dest_scene_desc: str) -> str:
|
| 160 |
-
"""Gera o prompt de movimento para a PRIMEIRA transição, que não tem um 'passado'."""
|
| 161 |
-
self._check_model()
|
| 162 |
-
try:
|
| 163 |
-
template = self._read_prompt_template("initial_motion_prompt.txt")
|
| 164 |
-
prompt_text = template.format(user_prompt=user_prompt, destination_scene_description=dest_scene_desc)
|
| 165 |
-
model_contents = [
|
| 166 |
-
prompt_text,
|
| 167 |
-
"START Image:",
|
| 168 |
-
Image.open(start_image_path),
|
| 169 |
-
"DESTINATION Image:",
|
| 170 |
-
Image.open(destination_image_path)
|
| 171 |
-
]
|
| 172 |
-
response = self.model.generate_content(model_contents)
|
| 173 |
-
|
| 174 |
-
logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_initial_motion_prompt) ---\n{response.text}\n--------------------")
|
| 175 |
-
|
| 176 |
-
return response.text.strip()
|
| 177 |
-
except Exception as e:
|
| 178 |
-
raise gr.Error(f"O Cineasta Inicial (Gemini) falhou: {e}")
|
| 179 |
|
| 180 |
def get_cinematic_decision(self, global_prompt: str, story_history: str,
|
| 181 |
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str,
|
| 182 |
past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict:
|
| 183 |
"""
|
| 184 |
-
|
| 185 |
-
de edição e gerar prompts de movimento detalhados.
|
| 186 |
"""
|
| 187 |
self._check_model()
|
| 188 |
try:
|
|
@@ -202,56 +196,20 @@ class GeminiSingleton:
|
|
| 202 |
"[FUTURE_IMAGE]:", Image.open(future_keyframe_path)
|
| 203 |
]
|
| 204 |
|
|
|
|
| 205 |
response = self.model.generate_content(model_contents)
|
| 206 |
-
|
| 207 |
-
logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_cinematic_decision) ---\n{response.text}\n--------------------")
|
| 208 |
|
| 209 |
decision_data = robust_json_parser(response.text)
|
| 210 |
if "transition_type" not in decision_data or "motion_prompt" not in decision_data:
|
| 211 |
-
raise ValueError("
|
| 212 |
return decision_data
|
| 213 |
except Exception as e:
|
| 214 |
-
|
| 215 |
-
logger.error(f"O Diretor de Cinema (Gemini) falhou: {e}. Usando fallback para 'continuous'.")
|
| 216 |
return {
|
| 217 |
"transition_type": "continuous",
|
| 218 |
"motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'."
|
| 219 |
}
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
def get_sound_director_prompt(self, audio_history: str,
|
| 224 |
-
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str,
|
| 225 |
-
present_scene_desc: str, motion_prompt: str, future_scene_desc: str) -> str:
|
| 226 |
-
"""
|
| 227 |
-
Atua como um 'Diretor de Som', analisando o contexto completo para criar um prompt
|
| 228 |
-
de áudio imersivo e contínuo para a cena atual.
|
| 229 |
-
"""
|
| 230 |
-
self._check_model()
|
| 231 |
-
try:
|
| 232 |
-
template = self._read_prompt_template("sound_director_prompt.txt")
|
| 233 |
-
prompt_text = template.format(
|
| 234 |
-
audio_history=audio_history,
|
| 235 |
-
present_scene_desc=present_scene_desc,
|
| 236 |
-
motion_prompt=motion_prompt,
|
| 237 |
-
future_scene_desc=future_scene_desc
|
| 238 |
-
)
|
| 239 |
-
|
| 240 |
-
model_contents = [
|
| 241 |
-
prompt_text,
|
| 242 |
-
"[PAST_IMAGE]:", Image.open(past_keyframe_path),
|
| 243 |
-
"[PRESENT_IMAGE]:", Image.open(present_keyframe_path),
|
| 244 |
-
"[FUTURE_IMAGE]:", Image.open(future_keyframe_path)
|
| 245 |
-
]
|
| 246 |
-
|
| 247 |
-
response = self.model.generate_content(model_contents)
|
| 248 |
-
|
| 249 |
-
logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_sound_director_prompt) ---\n{response.text}\n--------------------")
|
| 250 |
-
|
| 251 |
-
return response.text.strip()
|
| 252 |
-
except Exception as e:
|
| 253 |
-
logger.error(f"O Diretor de Som (Gemini) falhou: {e}. Usando fallback.")
|
| 254 |
-
return f"Sound effects matching the scene: {present_scene_desc}"
|
| 255 |
-
|
| 256 |
|
| 257 |
-
|
|
|
|
|
|
| 1 |
# managers/gemini_manager.py
|
|
|
|
| 2 |
#
|
| 3 |
+
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
|
|
|
|
|
|
|
|
|
|
| 4 |
#
|
| 5 |
+
# Version: 1.1.0
|
| 6 |
+
#
|
| 7 |
+
# This file defines the GeminiManager, a specialist responsible for all Natural
|
| 8 |
+
# Language Processing, reasoning, and vision-language tasks. It acts as the
|
| 9 |
+
# Scriptwriter, Editor, and Cinematic Director for the ADUC framework, generating
|
| 10 |
+
# storyboards, prompts, and making creative decisions.
|
| 11 |
|
| 12 |
import os
|
| 13 |
import logging
|
|
|
|
| 21 |
logger = logging.getLogger(__name__)
|
| 22 |
|
| 23 |
def robust_json_parser(raw_text: str) -> dict:
|
| 24 |
+
"""
|
| 25 |
+
Parses a JSON object from a string that might contain extra text,
|
| 26 |
+
such as Markdown code blocks from an LLM's response.
|
| 27 |
+
"""
|
| 28 |
clean_text = raw_text.strip()
|
| 29 |
try:
|
| 30 |
+
# Try to find JSON delimited by ```json ... ```
|
| 31 |
match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL)
|
| 32 |
if match:
|
| 33 |
json_str = match.group(1)
|
| 34 |
return json.loads(json_str)
|
| 35 |
|
| 36 |
+
# If not found, try to find the first '{' and the last '}'
|
| 37 |
start_index = clean_text.find('{')
|
| 38 |
end_index = clean_text.rfind('}')
|
| 39 |
if start_index != -1 and end_index != -1 and end_index > start_index:
|
| 40 |
json_str = clean_text[start_index : end_index + 1]
|
| 41 |
return json.loads(json_str)
|
| 42 |
else:
|
| 43 |
+
raise ValueError("No valid JSON object could be found in the AI's response.")
|
| 44 |
except json.JSONDecodeError as e:
|
| 45 |
+
logger.error(f"Failed to decode JSON. The AI returned the following text:\n---\n{raw_text}\n---")
|
| 46 |
+
raise ValueError(f"The AI returned an invalid JSON format: {e}")
|
| 47 |
+
|
| 48 |
+
class GeminiManager:
|
| 49 |
+
"""
|
| 50 |
+
Manages interactions with the Google Gemini API, acting as the primary
|
| 51 |
+
reasoning and language specialist for the ADUC framework.
|
| 52 |
+
"""
|
| 53 |
def __init__(self):
|
| 54 |
self.api_key = os.environ.get("GEMINI_API_KEY")
|
| 55 |
if self.api_key:
|
| 56 |
genai.configure(api_key=self.api_key)
|
| 57 |
+
self.model = genai.GenerativeModel('gemini-1.5-pro-latest')
|
| 58 |
+
logger.info("Gemini Specialist (1.5 Pro) initialized successfully.")
|
|
|
|
| 59 |
else:
|
| 60 |
self.model = None
|
| 61 |
+
logger.warning("Gemini API key not found. Specialist disabled.")
|
| 62 |
|
| 63 |
def _check_model(self):
|
| 64 |
+
"""Raises an error if the Gemini API is not configured."""
|
| 65 |
if not self.model:
|
| 66 |
+
raise gr.Error("The Google Gemini API key is not configured (GEMINI_API_KEY).")
|
| 67 |
|
| 68 |
def _read_prompt_template(self, filename: str) -> str:
|
| 69 |
+
"""Reads a prompt template file from the 'prompts' directory."""
|
| 70 |
try:
|
| 71 |
+
# Assuming the 'prompts' directory is in the root of the project
|
| 72 |
+
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
|
| 73 |
+
with open(prompts_dir / filename, "r", encoding="utf-8") as f:
|
| 74 |
return f.read()
|
| 75 |
except FileNotFoundError:
|
| 76 |
+
raise gr.Error(f"Prompt template file not found: prompts/{filename}")
|
| 77 |
|
| 78 |
def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]:
|
| 79 |
+
"""Delegated task: Acts as a Scriptwriter to generate a storyboard."""
|
| 80 |
self._check_model()
|
| 81 |
try:
|
| 82 |
template = self._read_prompt_template("unified_storyboard_prompt.txt")
|
| 83 |
storyboard_prompt = template.format(user_prompt=prompt, num_fragments=num_keyframes)
|
| 84 |
model_contents = [storyboard_prompt] + [Image.open(p) for p in ref_image_paths]
|
|
|
|
| 85 |
|
| 86 |
+
logger.info("Calling Gemini to generate storyboard...")
|
| 87 |
+
response = self.model.generate_content(model_contents)
|
| 88 |
+
logger.info(f"Gemini responded with (raw storyboard): {response.text}")
|
| 89 |
|
| 90 |
storyboard_data = robust_json_parser(response.text)
|
| 91 |
storyboard = storyboard_data.get("scene_storyboard", [])
|
| 92 |
+
if not storyboard or len(storyboard) != num_keyframes:
|
| 93 |
+
raise ValueError(f"Incorrect number of scenes generated. Expected {num_keyframes}, got {len(storyboard)}.")
|
| 94 |
return storyboard
|
| 95 |
except Exception as e:
|
| 96 |
+
raise gr.Error(f"The Scriptwriter (Gemini) failed: {e}")
|
| 97 |
|
| 98 |
def select_keyframes_from_pool(self, storyboard: list, base_image_paths: list[str], pool_image_paths: list[str]) -> list[str]:
|
| 99 |
+
"""Delegated task: Acts as a Photographer/Editor to select keyframes."""
|
| 100 |
self._check_model()
|
| 101 |
if not pool_image_paths:
|
| 102 |
+
raise gr.Error("The 'image pool' (Additional Images) is empty.")
|
| 103 |
|
| 104 |
try:
|
| 105 |
template = self._read_prompt_template("keyframe_selection_prompt.txt")
|
|
|
|
| 119 |
selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys()))
|
| 120 |
model_contents.append(selection_prompt)
|
| 121 |
|
| 122 |
+
logger.info("Calling Gemini to select keyframes from pool...")
|
| 123 |
response = self.model.generate_content(model_contents)
|
| 124 |
+
logger.info(f"Gemini responded with (raw keyframe selection): {response.text}")
|
|
|
|
| 125 |
|
| 126 |
selection_data = robust_json_parser(response.text)
|
| 127 |
selected_identifiers = selection_data.get("selected_image_identifiers", [])
|
| 128 |
|
| 129 |
if len(selected_identifiers) != len(storyboard):
|
| 130 |
+
raise ValueError("The AI did not select the correct number of images for the scenes.")
|
| 131 |
|
| 132 |
selected_paths = [image_map[identifier] for identifier in selected_identifiers]
|
| 133 |
return selected_paths
|
| 134 |
|
| 135 |
except Exception as e:
|
| 136 |
+
raise gr.Error(f"The Photographer (Gemini) failed to select images: {e}")
|
| 137 |
|
| 138 |
def get_anticipatory_keyframe_prompt(self, global_prompt: str, scene_history: str, current_scene_desc: str, future_scene_desc: str, last_image_path: str, fixed_ref_paths: list[str]) -> str:
|
| 139 |
+
"""Delegated task: Acts as an Art Director to generate an image prompt."""
|
| 140 |
self._check_model()
|
| 141 |
try:
|
| 142 |
template = self._read_prompt_template("anticipatory_keyframe_prompt.txt")
|
|
|
|
| 148 |
)
|
| 149 |
|
| 150 |
model_contents = [
|
| 151 |
+
"# CONTEXT:",
|
| 152 |
f"- Global Story Goal: {global_prompt}",
|
| 153 |
"# VISUAL ASSETS:",
|
| 154 |
"Current Base Image [IMG-BASE]:",
|
|
|
|
| 163 |
|
| 164 |
model_contents.append(director_prompt)
|
| 165 |
|
| 166 |
+
logger.info("Calling Gemini to generate anticipatory keyframe prompt...")
|
| 167 |
response = self.model.generate_content(model_contents)
|
| 168 |
+
logger.info(f"Gemini responded with (raw keyframe prompt): {response.text}")
|
|
|
|
| 169 |
|
| 170 |
+
final_flux_prompt = response.text.strip().replace("`", "").replace("\"", "")
|
| 171 |
return final_flux_prompt
|
| 172 |
except Exception as e:
|
| 173 |
+
raise gr.Error(f"The Art Director (Gemini) failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 174 |
|
| 175 |
def get_cinematic_decision(self, global_prompt: str, story_history: str,
|
| 176 |
past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str,
|
| 177 |
past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict:
|
| 178 |
"""
|
| 179 |
+
Delegated task: Acts as a Film Director to make editing decisions and generate motion prompts.
|
|
|
|
| 180 |
"""
|
| 181 |
self._check_model()
|
| 182 |
try:
|
|
|
|
| 196 |
"[FUTURE_IMAGE]:", Image.open(future_keyframe_path)
|
| 197 |
]
|
| 198 |
|
| 199 |
+
logger.info("Calling Gemini to generate cinematic decision...")
|
| 200 |
response = self.model.generate_content(model_contents)
|
| 201 |
+
logger.info(f"Gemini responded with (raw cinematic decision): {response.text}")
|
|
|
|
| 202 |
|
| 203 |
decision_data = robust_json_parser(response.text)
|
| 204 |
if "transition_type" not in decision_data or "motion_prompt" not in decision_data:
|
| 205 |
+
raise ValueError("AI response (Cinematographer) is malformed. Missing 'transition_type' or 'motion_prompt'.")
|
| 206 |
return decision_data
|
| 207 |
except Exception as e:
|
| 208 |
+
logger.error(f"The Film Director (Gemini) failed: {e}. Using fallback to 'continuous'.")
|
|
|
|
| 209 |
return {
|
| 210 |
"transition_type": "continuous",
|
| 211 |
"motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'."
|
| 212 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 213 |
|
| 214 |
+
# --- Singleton Instance ---
|
| 215 |
+
gemini_manager_singleton = GeminiManager()
|