from typing import List from loguru import logger import json import io import os import base64 from app.utils.config import settings # Keep OpenAI symbol to avoid breaking legacy tests that patch it, but do not use it in runtime paths try: from openai import OpenAI except Exception: # pragma: no cover - optional import during dev OpenAI = None # type: ignore def _load_patient_context() -> str: try: # Prefer CSV summaries for Cameroon context if available csv_path = getattr(settings, "CAMEROON_DATA_CSV", None) if csv_path and os.path.exists(csv_path): import csv rows: list[str] = [] with open(csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for i, row in enumerate(reader): if i >= 120: # cap to avoid oversized prompts break parts = [] for k, v in (row or {}).items(): if v is None: continue s = str(v).strip() if s: parts.append(f"{k}: {s}") if parts: rows.append(" | ".join(parts)) text = "\n".join(rows) return text[:6000] # Fallback to legacy JSON patient data with open(settings.PATIENT_DATA_PATH, "r", encoding="utf-8") as f: data = json.load(f) return json.dumps(data)[:6000] except Exception as exc: logger.warning(f"Cannot load patient RAG data: {exc}") return "" def build_system_prompt(language: str) -> str: if language.lower().startswith("fr"): disclaimer = ( "Tu es Medilang, un assistant médical virtuel compatissant et bienveillant, spécialement conçu pour les utilisateurs camerounais. Ton rôle est de fournir des conseils de premier recours, des informations sanitaires et de l'orientation, en tenant strictement compte du contexte local camerounais. " "N'oublie pas de specifier que tu n'est qu'une ia et recommande le contact au medecin en cas de situation grave" ) else: disclaimer = ( "You are Medilang, a compassionate medical assistant for Cameroon. " "Be clear and adapt advice to local context (malaria, typhoid, vaccination, access to care). " "Include a medical disclaimer and recommend seeing a doctor for serious cases." ) rag = _load_patient_context() return f"{disclaimer}\nContext (Cameroon RAG): {rag[:4000]}" def detect_language(text: str) -> str: try: from langdetect import detect code = detect(text) # Map common codes to our expected values if code.startswith("fr"): return "fr" if code.startswith("en"): return "en" return code except Exception: return "fr" def openai_client(): """Legacy helper kept for backward compatibility in tests. Not used by runtime code after migration to HF/Ollama/LM Studio. """ if not settings.OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY not configured") if OpenAI is None: raise RuntimeError("openai package not available") return OpenAI(api_key=settings.OPENAI_API_KEY) # ========================== # Provider utilities # ========================== def _flatten_messages(messages: List[dict], system: str | None) -> str: parts: List[str] = [] if system: parts.append(f"System:\n{system}\n") for m in messages: role = m.get("role") or "user" content = m.get("content") if isinstance(content, list): # Extract text parts if using OpenAI-style content chunks text_chunks = [] for c in content: if isinstance(c, dict) and c.get("type") == "text": text_chunks.append(c.get("text") or "") elif isinstance(c, dict) and c.get("type") == "image_url": url = (c.get("image_url") or {}).get("url") if isinstance(c.get("image_url"), dict) else c.get("image_url") if url: text_chunks.append(f"[Image: {url}]") content = "\n".join([t for t in text_chunks if t]) parts.append(f"{role.capitalize()}: {content}") parts.append("Assistant:") return "\n\n".join(parts) def _hf_generate_text(prompt: str, max_new_tokens: int = 400, temperature: float = 0.3) -> str: import httpx headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} url = f"https://api-inference.huggingface.co/models/{settings.HF_TEXT_MODEL}" payload = { "inputs": prompt, "parameters": { "max_new_tokens": max_new_tokens, "temperature": temperature, "return_full_text": False, }, } r = httpx.post(url, headers=headers, json=payload, timeout=120) r.raise_for_status() out = r.json() # HF can return list[{generated_text}] or dict/text if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("generated_text"): return out[0]["generated_text"] if isinstance(out, dict) and out.get("generated_text"): return out.get("generated_text") if isinstance(out, str): return out return json.dumps(out)[:1000] def _ollama_chat(messages: List[dict], model: str | None = None, base_url: str | None = None) -> str: import httpx model = model or settings.OLLAMA_MODEL base = (base_url or settings.OLLAMA_BASE_URL).rstrip("/") url = f"{base}/api/chat" payload = { "model": model, "messages": messages, "stream": False, "options": {"temperature": 0.3} } r = httpx.post(url, json=payload, timeout=120) r.raise_for_status() data = r.json() # Newer Ollama returns {message: {content: "..."}} when stream=False if isinstance(data, dict): if data.get("message") and isinstance(data["message"], dict): return data["message"].get("content", "") if data.get("response"): return data.get("response", "") return str(data) def _lmstudio_chat(messages: List[dict]) -> str: import httpx base = settings.LMSTUDIO_BASE_URL.rstrip("/") url = f"{base}/chat/completions" model = settings.LMSTUDIO_MODEL or "local-model" payload = { "model": model, "messages": messages, "temperature": 0.3, } headers = {"Content-Type": "application/json", "Authorization": f"Bearer {settings.OPENAI_API_KEY or 'lm-studio'}"} r = httpx.post(url, headers=headers, json=payload, timeout=120) r.raise_for_status() data = r.json() if isinstance(data, dict) and data.get("choices"): ch0 = data["choices"][0] # OpenAI-style msg = ch0.get("message") if isinstance(ch0, dict) else None if msg and isinstance(msg, dict): return msg.get("content", "") # Some variants return {choices:[{text:"..."}]} if ch0.get("text"): return ch0.get("text") return str(data) def _unified_chat(messages: List[dict], system: str | None = None) -> str: provider = (settings.AI_PROVIDER or "hf").lower() if provider == "ollama": # Ollama supports chat natively final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages return _ollama_chat(final_msgs) if provider == "lmstudio": final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages return _lmstudio_chat(final_msgs) # Default: Hugging Face text generation with flattened chat prompt = _flatten_messages(messages, system) return _hf_generate_text(prompt, max_new_tokens=400, temperature=0.3) def chat_completion(messages: List[dict], language: str) -> str: system = build_system_prompt(language or "fr") # Test compatibility: if openai_client is patched in tests, honor it first try: oc = openai_client() # patched MagicMock returns a mock without requiring real API key final_messages = ([{"role": "system", "content": system}] + messages) resp = oc.chat.completions.create( model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), messages=final_messages, temperature=0.3, ) # Support MagicMock structure used in tests return getattr(resp.choices[0].message, "content", "") except Exception: pass try: return _unified_chat(messages, system) except Exception as e: logger.error(f"Chat completion failed: {e}") return "" def _transcribe_with_huggingface(audio_url: str, language: str | None = None) -> str: """Transcribe audio using Hugging Face Inference API""" import httpx import librosa import soundfile as sf # Load and process audio content: bytes if isinstance(audio_url, str) and os.path.exists(audio_url): with open(audio_url, "rb") as f: content = f.read() else: with httpx.Client(timeout=60.0) as client: resp = client.get(audio_url) resp.raise_for_status() content = resp.content # Process audio with librosa raw_buf = io.BytesIO(content) raw_buf.seek(0) y, sr = librosa.load(raw_buf, sr=None, mono=False) if y.ndim > 1: y = librosa.to_mono(y) if sr != 16000: y = librosa.resample(y, orig_sr=sr, target_sr=16000) sr = 16000 # Denoise import noisereduce as nr noise_frames = int(sr * 0.5) if noise_frames > 0 and len(y) > noise_frames: noise_clip = y[:noise_frames] else: noise_clip = y y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) # Encode to WAV wav_buf = io.BytesIO() sf.write(wav_buf, y, sr, format="WAV") wav_buf.seek(0) audio_bytes = wav_buf.read() # Try configured HF ASR model first, then fallbacks headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} models = [ settings.HF_ASR_MODEL, "openai/whisper-large-v3", "facebook/wav2vec2-large-960h-lv60-self" ] for model in models: try: url = f"https://api-inference.huggingface.co/models/{model}" params = {"task": "transcribe"} if language: params["language"] = language with httpx.Client(timeout=120.0) as client: r = client.post( url, headers={**headers, "Content-Type": "audio/wav"}, params=params, content=audio_bytes, ) r.raise_for_status() out = r.json() if isinstance(out, dict) and out.get("text"): logger.info(f"HF transcription successful with model: {model}") return out["text"] if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"): logger.info(f"HF transcription successful with model: {model}") return out[0]["text"] except Exception as e: logger.warning(f"HF model {model} failed: {e}") continue return "" def transcribe_audio(audio_url: str | None, language: str | None = None) -> str: if not audio_url: return "" # Test compatibility: if openai_client is patched, try it first try: oc = openai_client() import httpx import librosa import soundfile as sf # Load audio bytes (local path or URL) if isinstance(audio_url, str) and os.path.exists(audio_url): with open(audio_url, "rb") as f: raw_bytes = f.read() else: with httpx.Client(timeout=60.0) as client: r = client.get(audio_url) r.raise_for_status() raw_bytes = r.content # Ensure 16 kHz mono and noise reduction before Whisper raw_buf = io.BytesIO(raw_bytes) raw_buf.seek(0) y, sr = librosa.load(raw_buf, sr=None, mono=False) if hasattr(y, "ndim") and getattr(y, "ndim", 1) > 1: y = librosa.to_mono(y) # type: ignore if sr != 16000: y = librosa.resample(y, orig_sr=sr, target_sr=16000) sr = 16000 import noisereduce as nr noise_frames = int(sr * 0.5) noise_clip = y[:noise_frames] if len(y) > noise_frames else y y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) # Encode to WAV file-like for OpenAI Whisper API wav_buf = io.BytesIO() sf.write(wav_buf, y, sr, format="WAV") wav_buf.seek(0) wav_buf.name = "input.wav" # some clients expect a name tr = oc.audio.transcriptions.create( model=getattr(settings, "OPENAI_WHISPER_MODEL", "whisper-1"), file=wav_buf, language=language if language else None, ) return getattr(tr, "text", "") or (tr.get("text") if isinstance(tr, dict) else "") or "" except Exception: pass # Prefer HF ASR try: import httpx import mimetypes import librosa import soundfile as sf # 1) Load audio from local path or URL content: bytes if isinstance(audio_url, str) and os.path.exists(audio_url): with open(audio_url, "rb") as f: content = f.read() else: with httpx.Client(timeout=60.0) as client: resp = client.get(audio_url) resp.raise_for_status() content = resp.content # 2) Decode to waveform (mono, 16k) raw_buf = io.BytesIO(content) raw_buf.seek(0) y, sr = librosa.load(raw_buf, sr=None, mono=False) if hasattr(y, 'ndim') and getattr(y, 'ndim', 1) > 1: import numpy as np y = librosa.to_mono(y) # type: ignore if sr != 16000: y = librosa.resample(y, orig_sr=sr, target_sr=16000) sr = 16000 # 3) Denoise import noisereduce as nr noise_frames = int(sr * 0.5) noise_clip = y[:noise_frames] if len(y) > noise_frames else y y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) # 4) Encode WAV wav_buf = io.BytesIO() sf.write(wav_buf, y, sr, format="WAV") wav_buf.seek(0) audio_bytes = wav_buf.read() # 5) HF headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} asr_models = [settings.HF_ASR_MODEL, "openai/whisper-large-v3"] for model in asr_models: try: url = f"https://api-inference.huggingface.co/models/{model}" with httpx.Client(timeout=180.0) as client: r = client.post(url, headers={**headers, "Content-Type": "audio/wav"}, content=audio_bytes) r.raise_for_status() out = r.json() if isinstance(out, dict) and out.get("text"): return out["text"] if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"): return out[0]["text"] except Exception as e: logger.warning(f"HF ASR model {model} failed: {e}") continue return "" except Exception as exc: logger.error(f"HF transcription failed: {exc}") return "" def _hf_image_caption(image_ref: str) -> str: """Generate a caption for an image using HF image-to-text model.""" import httpx # Load bytes from URL, file path, or data URI data: bytes if isinstance(image_ref, str) and os.path.exists(image_ref): with open(image_ref, "rb") as f: data = f.read() elif isinstance(image_ref, str) and image_ref.startswith("data:"): try: b64 = image_ref.split(",", 1)[1] data = base64.b64decode(b64) except Exception: data = b"" else: with httpx.Client(timeout=60.0) as client: r = client.get(image_ref) r.raise_for_status() data = r.content headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} url = f"https://api-inference.huggingface.co/models/{settings.HF_VISION_CAPTION_MODEL}" r = httpx.post(url, headers=headers, content=data, timeout=120) r.raise_for_status() out = r.json() if isinstance(out, list) and out and isinstance(out[0], dict): return out[0].get("generated_text") or out[0].get("caption", "") or "" if isinstance(out, dict): return out.get("generated_text") or out.get("caption", "") or "" return "" def _ollama_vision(image_ref: str, prompt: str) -> str: import httpx # Prepare image bytes as base64 for Ollama if isinstance(image_ref, str) and os.path.exists(image_ref): with open(image_ref, "rb") as f: img_bytes = f.read() elif isinstance(image_ref, str) and image_ref.startswith("data:"): try: img_bytes = base64.b64decode(image_ref.split(",", 1)[1]) except Exception: img_bytes = b"" else: with httpx.Client(timeout=60.0) as client: r = client.get(image_ref) r.raise_for_status() img_bytes = r.content b64img = base64.b64encode(img_bytes).decode("ascii") base = settings.OLLAMA_BASE_URL.rstrip("/") url = f"{base}/api/generate" payload = { "model": settings.OLLAMA_VISION_MODEL, "prompt": prompt or "Describe the medically relevant observations in this image.", "images": [b64img], "stream": False, "options": {"temperature": 0.2}, } r = httpx.post(url, json=payload, timeout=180) r.raise_for_status() data = r.json() # Non-stream returns may include 'response' if isinstance(data, dict) and data.get("response"): return data["response"] return str(data) def analyze_image(image_url: str, prompt: str | None) -> str: # Test compatibility: if openai_client is patched, use it first try: oc = openai_client() content = [] if prompt: content.append({"type": "text", "text": prompt}) content.append({"type": "image_url", "image_url": {"url": image_url}}) resp = oc.chat.completions.create( model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), messages=[{"role": "user", "content": content}], ) return getattr(resp.choices[0].message, "content", "") or "" except Exception: pass provider = (settings.AI_PROVIDER or "hf").lower() try: if provider == "ollama": return _ollama_vision(image_url, prompt or "Analyze this medical image and report relevant findings.") # Default HF: caption + chat reasoning caption = _hf_image_caption(image_url) reasoning_prompt = ( (prompt or "Analyze this medical image and report relevant findings, red flags, and advice.") + f"\n\nImage caption: {caption}" ) return _hf_generate_text(reasoning_prompt, max_new_tokens=250, temperature=0.2) except Exception as e: logger.error(f"Image analysis failed: {e}") return "" def translate_text(text: str, target_language: str) -> str: # Prefer HF dedicated translation model if available (only if token is set to avoid network in tests) if settings.HF_API_TOKEN: try: import httpx headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} payload = {"inputs": text} model = settings.HF_TRANSLATION_MODEL url = f"https://api-inference.huggingface.co/models/{model}" r = httpx.post(url, headers=headers, json=payload, timeout=60) if r.status_code == 200: out = r.json() if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("translation_text"): return out[0]["translation_text"] except Exception as exc: # pragma: no cover logger.warning(f"HF translation failed: {exc}") # Test compatibility: try OpenAI-style client if patched try: oc = openai_client() resp = oc.chat.completions.create( model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), messages=[ {"role": "system", "content": "You translate text faithfully."}, {"role": "user", "content": f"Translate to {target_language}: {text}"}, ], ) return getattr(resp.choices[0].message, "content", None) or text except Exception: pass # Fallback via unified chat with explicit instruction prompt = f"Translate to {target_language} (preserve meaning and medical accuracy): {text}" try: return _unified_chat([{"role": "user", "content": prompt}], system=None) or text except Exception: return text