from typing import List, Dict, Any, Optional try: # Newer LC may or may not expose create_react_agent from langchain.agents import create_react_agent, initialize_agent, AgentType # type: ignore except Exception: # pragma: no cover - be resilient to LC API changes from langchain.agents import initialize_agent, AgentType # type: ignore create_react_agent = None # type: ignore from langchain_openai import ChatOpenAI from langchain_community.chat_models import ChatOllama from langchain_community.tools import Tool from langchain_community.utilities import SerpAPIWrapper from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate from pydantic import BaseModel, Field from langchain_core.output_parsers import PydanticOutputParser from langchain_huggingface import HuggingFaceEndpoint from .maps_tool import GoogleMapsTool from .medical_prompt import medical_system_prompt, medical_direct_prompt from .cameroon_data import get_cameroon_data from ..utils.config import settings from app.ai_services import ( analyze_image as svc_analyze_image, transcribe_audio as svc_transcribe_audio, chat_completion as svc_chat_completion, translate_text as svc_translate_text, ) import logging import base64 import io import json import mimetypes import os import tempfile import requests import secrets import time # Robust import for OpenAI rate limit error across SDK versions try: from openai import RateLimitError # Newer SDKs except Exception: # pragma: no cover - fallback for older SDKs try: from openai.error import RateLimitError # Older SDKs except Exception: class RateLimitError(Exception): pass logger = logging.getLogger(__name__) # Pydantic model for structured ReAct output class ReActOutput(BaseModel): thought: str = Field(description="Your internal reasoning about the query") action: str = Field(description="Name of the tool to use, or empty if none", default="") action_input: str = Field(description="Input for the tool, or empty if no tool", default="") observation: str = Field(description="Observation from tool, or empty if no tool used", default="") final_answer: str = Field(description="Final response to the user, or empty if continuing", default="") def calculate_dosage(input_text): # À compléter selon besoins médicaux réels return "Calcul de dosage basé sur: " + input_text # ========================== # Helper utilities for tools # ========================== MAX_IMAGE_CHARS = 6_000_000 # ~6MB when passed as data URI length cap MAX_AUDIO_BYTES = 10 * 1024 * 1024 # 10 MB MAX_FILE_BYTES = 2 * 1024 * 1024 # 2 MB def _is_data_uri(s: str) -> bool: return isinstance(s, str) and s.startswith("data:") def _looks_like_url(s: str) -> bool: return isinstance(s, str) and (s.startswith("http://") or s.startswith("https://")) def _to_image_data_ref(value: str) -> str: """Return a URL or data URI suitable for OpenAI vision input. If it's base64 without data: prefix, assume PNG. """ if not isinstance(value, str): raise ValueError("Image reference must be a string (URL or base64/data URI)") if _is_data_uri(value) or _looks_like_url(value): return value if value.startswith("attach://"): data = resolve_attachment(value) b64 = base64.b64encode(data).decode("utf-8") return f"data:image/png;base64,{b64}" # Assume raw base64 return f"data:image/png;base64,{value}" def _download_bytes(url: str, timeout: int = 15, max_bytes: int = MAX_AUDIO_BYTES) -> bytes: with requests.get(url, stream=True, timeout=timeout) as r: r.raise_for_status() data = io.BytesIO() total = 0 for chunk in r.iter_content(chunk_size=8192): if not chunk: continue total += len(chunk) if total > max_bytes: raise ValueError("Downloaded content exceeds size limit") data.write(chunk) return data.getvalue() def _decode_data_uri(data_uri: str) -> bytes: # Format: data:;base64, try: header, b64data = data_uri.split(',', 1) return base64.b64decode(b64data) except Exception: raise ValueError("Invalid data URI") def _to_bytes_from_any(ref: str, max_bytes: int) -> bytes: # Handle in-memory attach scheme first if isinstance(ref, str) and ref.startswith("attach://"): data = resolve_attachment(ref) if len(data) > max_bytes: raise ValueError("Content exceeds size limit") return data if _looks_like_url(ref): return _download_bytes(ref, max_bytes=max_bytes) if _is_data_uri(ref): data = _decode_data_uri(ref) if len(data) > max_bytes: raise ValueError("Content exceeds size limit") return data # Assume base64 data = base64.b64decode(ref) if len(data) > max_bytes: raise ValueError("Content exceeds size limit") return data # ========================== # In-memory attachment registry # ========================== ATTACHMENT_STORE: Dict[str, Dict[str, Any]] = {} def register_attachment(data: bytes, filename: str | None = None, mime: str | None = None) -> str: """Store bytes in a temp registry and return an attach:// token URI.""" token = secrets.token_urlsafe(16) ATTACHMENT_STORE[token] = {"data": data, "filename": filename, "mime": mime} return f"attach://{token}" def resolve_attachment(ref: str) -> bytes: """Resolve an attach:// token to bytes, or raise KeyError/ValueError.""" if not isinstance(ref, str) or not ref.startswith("attach://"): raise ValueError("Not an attach:// reference") token = ref.split("://", 1)[1] item = ATTACHMENT_STORE.get(token) if not item: raise KeyError("Attachment not found or expired") return item["data"] # ========================== # Tool: Vision (Image analysis) # ========================== def analyze_image_tool(input_str: str) -> str: """Analyze a medical image (URL or base64/data URI) with an optional question. Input can be JSON: {{"image": "", "question": "..."}} or a pipe-separated string: "|". Returns concise medical observations and red flags. """ try: image = None question = ( "Analyze this image and describe medically relevant observations, differential considerations, " "and red flags. If urgent signs are suspected, advise seeking care." ) if input_str.strip().startswith('{'): obj = json.loads(input_str) image = obj.get('image') or obj.get('url') or obj.get('image_url') question = obj.get('question') or question else: parts = [p.strip() for p in input_str.split('|', 1)] if parts: image = parts[0] if len(parts) > 1 and parts[1]: question = parts[1] if not image: return "Format attendu: JSON {\"image\": \"...\", \"question\": \"...\"} ou 'image|question'" image_ref = _to_image_data_ref(image) if len(image_ref) > MAX_IMAGE_CHARS: return "Image trop volumineuse. Réduisez la taille ou fournissez une URL." # Delegate to unified image analysis service (HF or local) return svc_analyze_image(image_ref, question) except Exception as e: logger.error(f"Error in analyze_image_tool: {e}", exc_info=True) return "Impossible d'analyser l'image pour le moment. Essayez une image plus petite ou une meilleure connexion." # ========================== # Tool: Audio transcription (Whisper) # ========================== def transcribe_audio_tool(input_str: str) -> str: """Transcribe an audio file (URL or base64/data URI). Returns plain text transcription. Input can be JSON: {{"audio": "", "filename": "name.ext"}} or just the URL/base64. Size cap: 10 MB. """ try: audio_ref = None if input_str.strip().startswith('{'): obj = json.loads(input_str) audio_ref = obj.get('audio') or obj.get('url') else: audio_ref = input_str.strip() if not audio_ref: return "Format attendu: {\"audio\": \"...\"} ou une chaîne URL/base64/data URI." audio_bytes = _to_bytes_from_any(audio_ref, MAX_AUDIO_BYTES) # Write to a temp file and delegate to unified ASR with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: tmp.write(audio_bytes) tmp.flush() temp_path = tmp.name try: return svc_transcribe_audio(temp_path, None) finally: try: os.unlink(temp_path) except Exception: pass except ValueError as ve: return f"Audio non valide: {ve}" except Exception as e: logger.error(f"Error in transcribe_audio_tool: {e}", exc_info=True) return "Impossible de transcrire l'audio pour le moment. Fournissez un fichier plus petit ou réessayez." # ========================== # Tool: File parse and summarize # ========================== def parse_file_tool(input_str: str) -> str: """Parse and summarize a small file (text/PDF). Input JSON: {{"file": "", "filename": "..."}} or "|". Size cap: 2 MB. Returns a concise medically-relevant summary. """ try: file_ref = None filename = None if input_str.strip().startswith('{'): obj = json.loads(input_str) file_ref = obj.get('file') or obj.get('url') or obj.get('content') filename = obj.get('filename') else: parts = [p.strip() for p in input_str.split('|', 1)] if parts: file_ref = parts[0] if len(parts) > 1: filename = parts[1] if not file_ref: return "Format attendu: JSON {\"file\": \"...\", \"filename\": \"...\"} ou 'file|filename'" data = _to_bytes_from_any(file_ref, MAX_FILE_BYTES) # Determine type ext = (os.path.splitext(filename)[1].lower() if filename else '') text_content = None if ext == '.pdf' or (_is_data_uri(file_ref) and 'application/pdf' in file_ref[:64]): try: import PyPDF2 # type: ignore reader = PyPDF2.PdfReader(io.BytesIO(data)) pages = min(3, len(reader.pages)) buf = [] for i in range(pages): try: buf.append(reader.pages[i].extract_text() or '') except Exception: continue text_content = "\n".join(buf) except ImportError: return "Lecture PDF indisponible: installez PyPDF2 pour activer l'analyse des PDF." except Exception: return "Impossible de lire ce PDF. Assurez-vous qu'il n'est pas corrompu et qu'il est < 2 Mo." else: # Assume text-like try: text_content = data.decode('utf-8', errors='ignore') except Exception: return "Type de fichier non supporté. Fournissez un texte ou un PDF (PyPDF2 requis)." if not text_content or not text_content.strip(): return "Aucun texte exploitable trouvé dans le fichier." snippet = text_content[:50_000] # keep prompt small language = detect_language(snippet) prompt = ( "Résume de façon concise les informations médicales pertinentes du contenu suivant. " "Inclue les signaux d'alerte éventuels et recommande, si nécessaire, une consultation.\n\n" f"Contenu:\n{snippet}" ) return svc_chat_completion([{"role": "user", "content": prompt}], language) except ValueError as ve: return f"Fichier non valide: {ve}" except Exception as e: logger.error(f"Error in parse_file_tool: {e}", exc_info=True) return "Impossible d'analyser le fichier pour le moment. Essayez un fichier texte court ou un PDF léger (<2 Mo)." def create_medical_agent(scratchpad_style: str = "messages"): logger.info(f"Creating medical agent (scratchpad_style={scratchpad_style})...") try: provider = (settings.AI_PROVIDER or "hf").lower() logger.info(f"Agent provider: {provider}") # Initialize with system message system_message = """ Tu es Medicare, un assistant médical intelligent et bienveillant pour la population camerounaise. Ton objectif : - Écouter et rassurer la personne, comme un vrai professionnel de santé empathique. - Répondre simplement, comme si tu discutais avec un proche ou un patient, mais toujours avec sérieux. - Utiliser Google Maps pour proposer des hôpitaux ou pharmacies proches si besoin. - Donner des conseils adaptés au Cameroun (maladies, médicaments, habitudes locales). - Si tu retrouves des cas similaires dans la base camerounaise, mentionne-les naturellement dans la discussion. - Si la question sort du médical, explique gentiment que tu es là pour la santé. Règles : - Ne pose jamais de diagnostic définitif. - Pour les symptômes graves, incite à consulter un médecin ou à se rendre aux urgences, sans paniquer l'utilisateur. - Si tu as besoin de plus d'infos, pose des questions ouvertes et humaines. - Garde le fil de la conversation et adapte tes réponses à l'historique de l'échange. - Si des pièces jointes sont listées sous forme de jetons attach://, utilise les outils appropriés en leur passant directement ces références : * Analyse_Image pour les images (JSON {{"image": "attach://...", "question": "..."}}). * Transcription_Audio pour l'audio (JSON {{"audio": "attach://..."}}). * Analyse_Fichier pour les fichiers texte/PDF (JSON {{"file": "attach://...", "filename": "..."}}). Format de réponse : - Commence par une phrase chaleureuse ou rassurante. - Donne l'information ou le conseil principal de façon claire et naturelle. - Si tu proposes des établissements, présente-les comme tu le ferais à un ami (nom, adresse, statut, téléphone). - Termine par une phrase d'ouverture ou d'encouragement ("N'hésite pas si tu as d'autres questions !"). - Ajoute un avertissement discret si nécessaire (ex : "Si tu te sens vraiment mal, va vite consulter !"). IMPORTANT: N'utilise PAS de traductions pour les étiquettes. Utilise EXACTEMENT ces étiquettes en anglais: "Thought:", "Action:", "Action Input:", "Observation:", "Final Answer:". Toujours répondre en utilisant CE FORMAT EXACT. Après chaque "Thought:", fais l'un des deux: 1) Si tu as besoin d'un outil: fournis "Action:" et "Action Input:", puis attends l'"Observation:" de l'outil. 2) Si tu n'as pas besoin d'outil: termine directement avec "Final Answer:". Thought: [Tes réflexions internes sur la question et ce que tu vas faire ensuite. Sois bref et logique. Décide si un outil est nécessaire.] Action: [Nom de l'outil exact, comme "Recherche_Web" ou "Google_Maps". Un seul outil. (N'inclus PAS "Final Answer" ici.)] Action Input: [Entrée précise pour l'outil.] Observation: [Résultat de l'outil.] Final Answer: [Ta réponse finale complète pour l'utilisateur, en respectant le "Format de réponse" ci-dessus.] Tu peux répéter Thought/Action/Observation au besoin (maximum 3 itérations), puis termine OBLIGATOIREMENT par "Final Answer:". N'oublie pas : tu es empathique, humain, et tu adaptes toujours ton niveau de langage à la personne en face de toi. """ if provider == "ollama": logger.info(f"Using ChatOllama model: {settings.OLLAMA_MODEL}") llm = ChatOllama( temperature=0, model=(settings.OLLAMA_MODEL or "llama3.1:8b"), base_url=settings.OLLAMA_BASE_URL, ) elif provider == "lmstudio": logger.info(f"Using LM Studio base_url: {settings.LMSTUDIO_BASE_URL}") llm = ChatOpenAI( temperature=0, model=(settings.LMSTUDIO_MODEL or "local-model"), openai_api_key=(settings.OPENAI_API_KEY or "lm-studio"), base_url=settings.LMSTUDIO_BASE_URL, streaming=False, max_retries=1, timeout=30, max_tokens=500, ) elif provider == "hf": logger.info(f"Using Hugging Face Inference model: {settings.HF_TEXT_MODEL}") llm = HuggingFaceEndpoint( repo_id=(settings.HF_TEXT_MODEL or "mistralai/Mistral-7B-Instruct-v0.3"), task="text-generation", max_new_tokens=500, temperature=0.0, huggingfacehub_api_token=settings.HF_API_TOKEN, ) else: # Default: try local Ollama first, then fallback to ChatOpenAI config try: llm = ChatOllama( temperature=0, model=(settings.OLLAMA_MODEL or "llama3.1:8b"), base_url=settings.OLLAMA_BASE_URL, ) except Exception: llm = ChatOpenAI( temperature=0, model=(settings.OPENAI_MODEL or "gpt-4o-mini"), openai_api_key=settings.OPENAI_API_KEY, streaming=False, max_retries=1, timeout=30, max_tokens=500, ) # Initialize tools tools = [] # Add Google Maps tool if API key is available if settings.GOOGLE_MAPS_API_KEY: logger.info("Adding Google Maps tool") maps_tool = GoogleMapsTool() tools.append(maps_tool) # Add web search tool if API key is available if settings.SERPAPI_API_KEY: logger.info("Adding web search tool") search = SerpAPIWrapper(serpapi_api_key=settings.SERPAPI_API_KEY) web_search_tool = Tool( name="Recherche_Web", func=search.run, description="Utile pour rechercher des informations médicales générales ou des hôpitaux" ) tools.append(web_search_tool) # Add dosage calculator tool logger.info("Adding dosage calculator tool") dosage_tool = Tool( name="Calculateur_Dosage", func=calculate_dosage, description="Utile pour calculer des dosages de médicaments basés sur le poids et l'âge" ) tools.append(dosage_tool) # Add Vision, Audio Transcription, and File Parser tools vision_tool = Tool( name="Analyse_Image", func=analyze_image_tool, description=( "Analyser une image médicale (URL ou base64/data URI). " "Entrée: JSON {{\"image\": \"\", \"question\": \"...\"}} ou 'image|question'. Retourne des observations cliniques concises." ), ) tools.append(vision_tool) audio_tool = Tool( name="Transcription_Audio", func=transcribe_audio_tool, description=( "Transcrire un audio (URL ou base64/data URI) en texte. " "Entrée: JSON {{\"audio\": \"\"}} ou une chaîne URL/base64. Taille <= 10 Mo." ), ) tools.append(audio_tool) file_tool = Tool( name="Analyse_Fichier", func=parse_file_tool, description=( "Analyser et résumer un petit fichier (texte/PDF). " "Entrée: JSON {{\"file\": \"\", \"filename\": \"...\"}} ou 'file|filename'. PDF nécessite PyPDF2. Taille <= 2 Mo." ), ) tools.append(file_tool) logger.info(f"Initialized {len(tools)} tools") # Build agent according to requested scratchpad style requested = str(scratchpad_style).lower() logger.info(f"Requested scratchpad style: {requested}") # For HF provider, prefer legacy ReAct agent which works with text-generation LLMs if provider == "hf" and requested != "legacy": logger.info("HF provider: switching scratchpad to 'legacy' for compatibility") requested = "legacy" common_kwargs = { "verbose": True, "handle_parsing_errors": True, "max_iterations": 3, "max_execution_time": 15, } if requested == "legacy" or requested == "string": logger.info("Creating legacy string-based ReAct agent") # Use PromptTemplate for string scratchpad prompt = PromptTemplate.from_template( system_message + "\n\n{input}\n{agent_scratchpad}" ) legacy_agent = initialize_agent( tools=tools, llm=llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, prompt=prompt, **common_kwargs, ) logger.info("Legacy/string agent created successfully") return legacy_agent # Messages-based ReAct agent (fallbacks if create_react_agent is unavailable) logger.info("Creating messages-based ReAct agent") # Combine system prompt with ReAct-style instructions (no JSON) template = system_message + """ Tools: {tools} Tool names: {tool_names} Begin! {input} {agent_scratchpad}""" prompt = ChatPromptTemplate.from_template(template) # Supply tool variables for the prompt prompt = prompt.partial( tool_names=", ".join([t.name for t in tools]) if tools else "none", tools="\n".join([f"- {t.name}: {t.description}" for t in tools]) if tools else "No tools available", ) if create_react_agent is not None: try: agent = create_react_agent(llm, tools, prompt) logger.info("Messages-based agent created successfully (create_react_agent)") return agent except Exception as e: logger.warning(f"create_react_agent unavailable or failed: {e}; falling back to initialize_agent") # Fallback: initialize_agent ZERO_SHOT_REACT_DESCRIPTION with our prompt try: legacy_agent = initialize_agent( tools=tools, llm=llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, prompt=prompt, **common_kwargs, ) logger.info("Messages-based agent created successfully via initialize_agent fallback") return legacy_agent except Exception as e: logger.warning(f"initialize_agent with prompt failed: {e}; trying minimal initialize_agent") # Last resort minimal init without prompt arg (for very new/old LC variants) legacy_agent = initialize_agent( tools=tools, llm=llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, **common_kwargs, ) logger.info("Messages-based agent created via minimal initialize_agent") return legacy_agent except Exception as e: logger.error(f"Error creating agent: {str(e)}", exc_info=True) raise import asyncio from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError from functools import partial import re # logger is initialized at the top of this module def _invoke_with_timeout(agent: Any, user_input: str, timeout: int = 60) -> Any: """Invoke the agent with a hard timeout in a background thread.""" def _invoke(): return agent.invoke({"input": user_input}) with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_invoke) return future.result(timeout=timeout) def search_cases_with_timeout(query: str, timeout: int = 10) -> str: """Search for similar cases with a timeout to prevent hanging.""" try: cameroon_data = get_cameroon_data() if cameroon_data is None: logger.warning("No clinical data available") return "" similar_cases = cameroon_data.search_similar_cases(query, top_k=3) if not similar_cases: return "" context = "Cas cliniques camerounais similaires trouvés :\n" for case in similar_cases: # Format the case nicely, showing only non-empty fields case_info = [] for key, value in case.items(): if value and str(value).strip() and str(value).lower() != 'nan': case_info.append(f"{key}: {value}") if case_info: context += f"- {' | '.join(case_info)}\n" return context except Exception as e: logger.error(f"Error searching cases: {str(e)}") return "" from langdetect import detect def detect_language(text: str) -> str: """Detect the language of the input text.""" try: lang = detect(text) return lang if lang in ['fr', 'en'] else 'fr' # Default to French if not English except: return 'fr' # Default to French on detection failure def handle_user_query( query: str, user_location: str = None, image: str = None, audio: str = None, files: List[str] = None, file_names: List[str] = None, images: List[str] = None, audios: List[str] = None, agent_mode: str = None, ) -> str: try: logger.info(f"Handling query: {query[:100]}...") # Log first 100 chars of query # Detect input language input_language = detect_language(query) logger.info(f"Detected input language: {input_language}") # Build the final input with language instruction try: language_instruction = " (répondez en français)" if input_language == 'fr' else " (respond in English)" query_with_language = f"{query}{language_instruction}" # Quick simple-query bypass BEFORE agent creation: no attachments and no location has_attachments = bool((image and image.strip()) or (images and len(images) > 0) or (audio and audio.strip()) or (audios and len(audios) > 0) or (files and len(files) > 0)) if not has_attachments and not user_location: logger.info("Simple query; direct LLM (no agent) via unified provider") try: direct_prompt = medical_direct_prompt + f"\n{query_with_language}" return svc_chat_completion([{"role": "user", "content": direct_prompt}], input_language) except Exception as e: logger.warning(f"Simple path failed, using agent: {e}") except Exception as e: logger.warning(f"Error preparing input for simple bypass: {e}") # Create agent automatically only if needed: try messages first, fall back to legacy/string try: agent = create_medical_agent("messages") logger.info("Agent created successfully (messages scratchpad)") except Exception as e: logger.warning(f"Messages-based agent creation failed, trying legacy fallback: {e}") try: agent = create_medical_agent("legacy") logger.info("Agent created successfully (legacy scratchpad)") except Exception as e2: logger.error(f"Error creating agent after legacy fallback: {str(e2)}", exc_info=True) return "Désolé, une erreur est survenue lors de l'initialisation de l'assistant." # Run Cameroon data search with timeout in a separate thread context = "" try: with ThreadPoolExecutor() as executor: future = executor.submit(search_cases_with_timeout, query) context = future.result(timeout=15) # 15 seconds timeout logger.info("Successfully searched clinical data") except FuturesTimeoutError: logger.warning("Cameroon data search timed out") except Exception as e: logger.error(f"Error in clinical data search: {str(e)}", exc_info=True) # Build the final input with language instruction try: # Summarize attachments so the agent is aware of provided modalities attachment_lines: List[str] = [] all_images = [] if image: all_images.append(image) if images: all_images.extend(images) if all_images: # Show attach URIs to enable tool usage show = ", ".join(all_images[:5]) + ("..." if len(all_images) > 5 else "") attachment_lines.append(f"- Images: {len(all_images)} fournies -> {show}") all_audios = [] if audio: all_audios.append(audio) if audios: all_audios.extend(audios) if all_audios: show = ", ".join(all_audios[:5]) + ("..." if len(all_audios) > 5 else "") attachment_lines.append(f"- Audios: {len(all_audios)} fournis -> {show}") if files: names = file_names or [] display_names = ", ".join(names[:5]) + ("..." if len(names) > 5 else "") show = ", ".join((files or [])[:5]) + ("..." if len(files or []) > 5 else "") attachment_lines.append(f"- Fichiers: {len(files)} ({display_names}) -> {show}") attachments_context = "" if attachment_lines: attachments_context = "Pièces jointes:\n" + "\n".join(attachment_lines) components: List[str] = [] if context: components.append(context) if attachments_context: components.append(attachments_context) components.append(query_with_language) user_input = "\n\n".join(components) if user_location: user_input += f"\nUser location: {user_location}" logger.debug(f"Final input to agent: {user_input[:200]}...") # Log first 200 chars # Simple-query bypass: if no attachments and no explicit location, answer directly via LLM if not attachments_context and not user_location: try: simple_prompt = medical_direct_prompt + f"\n{user_input}" return svc_chat_completion([{"role": "user", "content": simple_prompt}], input_language) except Exception as e: logger.warning(f"Simple path failed, using agent: {e}") # Execute agent with a hard timeout; if parsing issues occur, apply fallbacks try: response = _invoke_with_timeout(agent, user_input, timeout=60) except FuturesTimeoutError: logger.warning("Agent invocation timed out; retrying after 10s sleep") time.sleep(10) try: response = _invoke_with_timeout(agent, user_input, timeout=60) except FuturesTimeoutError: logger.error("Agent invocation timed out again after retry") return _offline_fallback_response(query, context, input_language) except Exception as e2: logger.error(f"Error after timeout retry: {e2}", exc_info=True) return _offline_fallback_response(query, context, input_language) except Exception as e: msg = str(e).lower() # Timeout retry once if ("timeout" in msg) or isinstance(e, FuturesTimeoutError): logger.warning("Invocation timed out; retrying after 10s sleep") time.sleep(10) return _invoke_with_timeout(agent, user_input, timeout=60) # Parsing-related issues: prefer direct LLM fallback if isinstance(e, RateLimitError): wait_time = 180 logger.warning(f"Rate limit hit; sleeping {wait_time}s before retry") time.sleep(wait_time) try: response = _invoke_with_timeout(agent, user_input, timeout=60) except Exception as e2: logger.error(f"Retry after rate limit failed: {e2}", exc_info=True) return _offline_fallback_response(query, context, input_language) elif re.search(r"(invalid format|missing action|parsing failure)", msg, re.IGNORECASE): logger.warning("Detected parsing issue; using direct LLM fallback first") # Prefer a single-call direct LLM response to avoid further agent loops try: direct_prompt = medical_direct_prompt + f"\n{user_input}" response_text = svc_chat_completion([{"role": "user", "content": direct_prompt}], input_language) return response_text except Exception: logger.warning("Direct LLM fallback failed; trying legacy agent as secondary fallback") try: agent = create_medical_agent("legacy") response = _invoke_with_timeout(agent, user_input, timeout=60) except Exception: return _offline_fallback_response(query, context, input_language) else: logger.error(f"Error during agent invocation: {e}", exc_info=True) return _offline_fallback_response(query, context, input_language) # Normalize response across LC versions (dict vs. string) if response is None: logger.error("Agent returned None") return _offline_fallback_response(query, context, input_language) # Extract response text if isinstance(response, dict): # Handle dict response (newer LC versions) if 'output' in response: response_text = response['output'] elif 'final_answer' in response: response_text = response['final_answer'] elif 'result' in response: response_text = response['result'] else: logger.warning(f"Unexpected response format: {response}") response_text = str(response) else: # Handle string response (older LC versions) response_text = str(response) # Ensure response is in the same language as the input if input_language == 'fr' and not any(word in response_text.lower() for word in ['désolé', 'bonjour', 'merci']): try: response_text = svc_translate_text(response_text, 'fr') except Exception as e: logger.error(f"Error translating response to French: {e}") response_text = f"[English Response] {response_text}\n\nDésolé, je n'ai pas pu traduire la réponse en français. Voici la réponse en anglais ci-dessus." return response_text except Exception as e: logger.error(f"Error in agent execution: {str(e)}", exc_info=True) return _offline_fallback_response(query, context, input_language) except Exception as e: logger.critical(f"Critical error in handle_user_query: {str(e)}", exc_info=True) return "Désolé, une erreur inattendue s'est produite. Veuillez réessayer plus tard." def _offline_fallback_response(query: str, context: str, lang: str) -> str: """Produce a concise offline fallback response for low connectivity situations.""" if lang == 'fr': parts = [ "Connexion instable détectée. Voici une réponse rapide basée sur des bonnes pratiques générales :", ] if context: parts.append(context.strip()) parts.extend([ "- Prends soin de toi et évite les efforts inutiles.", "- Tu peux envisager des antalgiques en vente libre avec prudence si nécessaire.", "- Si les symptômes sont sévères, persistent ou s'aggravent, consulte rapidement un professionnel de santé.", "Note: message généré en mode connectivité limitée. Pour un avis personnalisé, consulte un professionnel.", ]) return "\n".join(parts) else: parts = [ "Unstable connection detected. Here is a brief response based on general good practices:", ] if context: parts.append(context.strip()) parts.extend([ "- Take care and avoid unnecessary strain.", "- Consider over-the-counter pain relief responsibly if appropriate.", "- If symptoms are severe, persistent, or worsening, seek medical care promptly.", "Note: generated in low-connectivity mode. For personalized advice, consult a professional.", ]) return "\n".join(parts)