Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| from smolagents import MultiStepAgent, PythonInterpreterTool | |
| from smolagents.agents import ActionOutput | |
| from smolagents.utils import AgentToolExecutionError | |
| from smolagents.memory import ToolCall | |
| from models import ModelManager | |
| from tools import search_web, scrape_website, read_file, get_youtube_transcript | |
| class MonAgent(MultiStepAgent): | |
| """ | |
| Un agent multi-étapes avec l'implémentation COMPLÈTE de TOUTES les méthodes | |
| requises et optionnelles pour garantir un fonctionnement sans erreur | |
| d'implémentation manquante. | |
| """ | |
| def initialize_system_prompt(self) -> str: | |
| return """You are a hyper-efficient autonomous agent. Your mission is to answer the user's question with surgical precision and no wasted steps. | |
| **CRITICAL DIRECTIVES - YOU MUST OBEY:** | |
| 1. **NO LOOPS:** Before every action, you MUST review your previous steps. NEVER repeat a `search_web` call with similar keywords. If a search yields no result, formulate a COMPLETELY DIFFERENT query or use a different tool. Repeating actions is a critical failure. | |
| 2. **IMMEDIATE ACTION:** If a search result provides a promising URL, your IMMEDIATE next action MUST be to use `scrape_website` on that URL. DO NOT perform another web search. | |
| 3. **PYTHON FOR LOGIC:** For any task that involves logic, data processing, sorting, strict filtering (like botanical vs. culinary classification), or calculations, your FIRST AND ONLY choice is the `PythonInterpreterTool`. It is more reliable than your own reasoning for these tasks. You MUST use it for such problems. | |
| 4. **FINAL ANSWER IS ONLY THE ANSWER:** Your final answer must be ONLY the value requested. No "The answer is...", no explanations, no context. | |
| - If asked "Who nominated the article?", the final answer is "Nimbus". | |
| - If asked "What is the opposite of left?", the final answer is "right". | |
| - If asked for a list, the final answer is "item1, item2, item3". | |
| **RESPONSE FORMAT:** | |
| You MUST respond with a valid JSON object. | |
| - To act: {"plan": [{"tool": "tool_name", "args": {"arg_name": "value"}}]} | |
| - To provide the final answer: {"plan": [], "final_answer": "TheExactFinalAnswer"} | |
| """ | |
| def _step_stream(self, memory_step): | |
| """ | |
| Le cœur de l'agent : décide de la prochaine action. | |
| """ | |
| memory_messages = self.write_memory_to_messages() | |
| memory_step.model_input_messages = memory_messages | |
| try: | |
| chat_message = self.model.generate( | |
| memory_messages, | |
| tools_to_call_from=list(self.tools.values()), | |
| ) | |
| memory_step.model_output_message = chat_message | |
| memory_step.token_usage = chat_message.token_usage | |
| except Exception as e: | |
| raise Exception(f"Erreur lors de la génération du modèle : {e}") | |
| chat_tool_calls = chat_message.tool_calls | |
| if not chat_tool_calls: | |
| yield ActionOutput(output=chat_message.content, is_final_answer=True) | |
| return | |
| # Conversion de ChatMessageToolCall en ToolCall | |
| tool_calls_for_memory = [ | |
| ToolCall(name=tc.function.name, arguments=tc.function.arguments, id=tc.id) | |
| for tc in chat_tool_calls | |
| ] | |
| memory_step.tool_calls = tool_calls_for_memory | |
| # ---------------------------------------------------------------- | |
| final_answer = None | |
| is_final = False | |
| for i, tool_call in enumerate(chat_tool_calls): | |
| yield tool_call # On continue de yield l'objet original pour le stream | |
| tool_name = tool_call.function.name | |
| tool_arguments = tool_call.function.arguments | |
| tool_output_value = self.execute_tool_call(tool_name, tool_arguments) | |
| if tool_name == "final_answer": | |
| final_answer = tool_output_value | |
| is_final = True | |
| observation = self.render_tool_result(tool_output_value) | |
| # On met à jour l'observation dans la mémoire | |
| if memory_step.observations is None: | |
| memory_step.observations = "" | |
| memory_step.observations += f"\nObservation de l'outil '{tool_name}':\n{observation}" | |
| yield {"tool_call_id": tool_call.id, "output": observation} | |
| yield ActionOutput(output=final_answer, is_final_answer=is_final) | |
| def execute_tool_call(self, tool_name: str, arguments: any) -> any: | |
| """ | |
| Exécute un outil avec les arguments fournis, en gérant les arguments | |
| sous forme de chaîne de caractères ou de dictionnaire. | |
| """ | |
| if tool_name not in self.tools: | |
| raise AgentToolExecutionError(f"Outil inconnu '{tool_name}'.", self.logger) | |
| tool = self.tools[tool_name] | |
| # Gestion des arguments sous forme de chaîne | |
| parsed_arguments = arguments | |
| if isinstance(parsed_arguments, str): | |
| try: | |
| # Essayer de parser la chaîne comme du JSON | |
| parsed_arguments = json.loads(parsed_arguments) | |
| except json.JSONDecodeError: | |
| # Si ce n'est pas du JSON, on la passe telle quelle | |
| pass | |
| try: | |
| if isinstance(parsed_arguments, dict): | |
| return tool(**parsed_arguments) | |
| else: | |
| # Si ce n'est pas un dictionnaire, on passe l'argument directement | |
| return tool(parsed_arguments) | |
| except Exception as e: | |
| raise AgentToolExecutionError(f"Erreur lors de l'exécution de l'outil '{tool_name}' avec les arguments {arguments}: {type(e).__name__}: {e}", self.logger) | |
| def parse_plan(self, response: str) -> list[dict]: | |
| """ | |
| Transforme le texte brut du modèle en une liste d'actions structurées. | |
| """ | |
| cleaned_response = response.strip().removeprefix("```json").removesuffix("```").strip() | |
| try: | |
| parsed_json = json.loads(cleaned_response) | |
| return parsed_json.get("plan", []) | |
| except json.JSONDecodeError: | |
| print(f"⚠️ Erreur de parsing JSON dans `parse_plan`. Réponse reçue:\n{response}") | |
| return [] | |
| def render_tool_result(self, tool_output: any) -> str: | |
| """ | |
| Transforme le résultat d'un outil (qui peut être n'importe quel objet Python) en un texte simple que l'IA peut comprendre pour la suite. | |
| C'était probablement la pièce manquante principale. | |
| """ | |
| print(f"⚙️ Formatage du résultat de l'outil: {str(tool_output)[:300]}...") | |
| if isinstance(tool_output, str): | |
| return tool_output | |
| if isinstance(tool_output, (list, dict)): | |
| try: | |
| return json.dumps(tool_output, indent=2, ensure_ascii=False) | |
| except TypeError: | |
| return str(tool_output) | |
| return str(tool_output) | |
| def render_final_answer(self, final_context: dict, final_response: str) -> str: | |
| """ | |
| Est appelée à la toute fin pour formater la réponse finale. | |
| """ | |
| # Essaye de parser une réponse finale structurée en JSON | |
| cleaned_response = final_response.strip().removeprefix("```json").removesuffix("```").strip() | |
| try: | |
| parsed_json = json.loads(cleaned_response) | |
| return parsed_json.get("final_answer", final_response) | |
| except json.JSONDecodeError: | |
| return final_response | |
| class BasicAgent: | |
| """ | |
| Classe de compatibilité qui utilise notre nouvel agent complet et robuste. | |
| Elle gère également la pré-analyse des entrées multimodales. | |
| """ | |
| def __init__(self): | |
| print("Initialisation du BasicAgent...") | |
| try: | |
| if not os.getenv("HF_TOKEN"): | |
| print("⚠️ Attention: Le token Hugging Face (HF_TOKEN) n'est pas défini.") | |
| self.tools_list = [ | |
| search_web, | |
| scrape_website, | |
| read_file, | |
| PythonInterpreterTool(), | |
| get_youtube_transcript | |
| ] | |
| self.agent = MonAgent( | |
| model=ModelManager().get_orchestrator(), | |
| tools=self.tools_list | |
| ) | |
| print("BasicAgent initialisé avec succès") | |
| except Exception as e: | |
| print(f"❌ Erreur critique lors de l'initialisation: {e}") | |
| self.agent = None | |
| def __call__(self, question: str, metadata: dict = None, file_content: str = None) -> str: | |
| """ | |
| Le point d'entrée de l'agent. Gère la pré-analyse en utilisant les métadonnées fournies. | |
| """ | |
| if self.agent is None: | |
| return "Erreur: L'agent n'a pas pu être initialisé." | |
| print(f"\n{'='*40}\n🤖 NOUVELLE QUESTION: {question}\n{'='*40}") | |
| if metadata: | |
| print(f"🕵️♂️ Métadonnées reçues: {metadata}") | |
| else: | |
| print("🕵️♂️ Aucune métadonnée reçue.") | |
| augmented_prompt = question | |
| context_to_add = "" | |
| if file_content: | |
| print("📄 Contenu de fichier trouvé, ajout au contexte.") | |
| context_to_add = f"The user has provided the following file content:\n---\n{file_content}\n---\n" | |
| elif metadata: | |
| url_from_meta = metadata.get("url") or metadata.get("video_url") or metadata.get("image_url") | |
| if url_from_meta: | |
| print(f"🔗 URL trouvée dans les métadonnées : {url_from_meta}") | |
| context_from_url = "" | |
| # On vérifie si l'URL est une image | |
| image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] | |
| is_image = any(url_from_meta.lower().endswith(ext) for ext in image_extensions) | |
| # logique images | |
| if any(url_from_meta.lower().endswith(ext) for ext in image_extensions): | |
| print("🖼️ C'est une URL d'image, analyse avec le modèle de vision...") | |
| try: | |
| vision_model = ModelManager().get_vision_model() | |
| # On demande au modèle de vision de décrire l'image en détail | |
| # pour aider l'agent principal à répondre à la question. | |
| vision_response = vision_model.generate( | |
| [{"role": "user", "content": [ | |
| {"type": "text", "text": f"Describe this image in detail. This description will be used to answer the following question: '{question}'"}, | |
| {"type": "image_url", "image_url": {"url": url_from_meta}} | |
| ]}] | |
| ) | |
| context_from_url = f"Here is a detailed description of the image provided:\n{vision_response.content}" | |
| print("✅ Description de l'image obtenue.") | |
| except Exception as e: | |
| error_msg = f"Erreur lors de l'analyse de l'image : {e}" | |
| print(f"❌ {error_msg}") | |
| context_from_url = error_msg | |
| # logique vidéos | |
| elif url_from_meta.startswith("http://googleusercontent.com/youtube.com/") or url_from_meta.startswith("http://youtube.com/"): | |
| print("📹 C'est une URL YouTube, récupération de la transcription...") | |
| transcript_result = get_youtube_transcript(url_from_meta) | |
| if "error" in transcript_result: | |
| context_from_url = f"Error getting transcript: {transcript_result['error']}" | |
| else: | |
| context_from_url = f"Here is the transcript of the video:\n{transcript_result['transcript']}" | |
| # On construit le prompt augmenté pour l'agent | |
| if context_from_url: | |
| augmented_prompt = ( | |
| f"CONTEXTUAL INFORMATION:\n---\n{context_from_url}\n---\n" | |
| f"Based on the context above, please answer the following question:\n{question}" | |
| ) | |
| print(f"✨ Prompt augmenté pour l'agent.") | |
| # On construit le prompt final si un contexte a été ajouté | |
| if context_to_add: | |
| augmented_prompt = ( | |
| f"CONTEXTUAL INFORMATION:\n{context_to_add}" | |
| f"Based on the context above, please answer the following question:\n{question}" | |
| ) | |
| print("✨ Prompt augmenté pour l'agent.") | |
| try: | |
| return self.agent.run(augmented_prompt) | |
| except Exception as e: | |
| import traceback | |
| print(f"❌ Erreur irrécupérable lors du traitement par MonAgent: {e}\n{traceback.format_exc()}") | |
| return f"Une erreur irrécupérable s'est produite: {e}" |