Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| from typing import List | |
| from loguru import logger | |
| import json | |
| import io | |
| import os | |
| import base64 | |
| from app.utils.config import settings | |
| # Keep OpenAI symbol to avoid breaking legacy tests that patch it, but do not use it in runtime paths | |
| try: | |
| from openai import OpenAI | |
| except Exception: # pragma: no cover - optional import during dev | |
| OpenAI = None # type: ignore | |
| def _load_patient_context() -> str: | |
| try: | |
| # Prefer CSV summaries for Cameroon context if available | |
| csv_path = getattr(settings, "CAMEROON_DATA_CSV", None) | |
| if csv_path and os.path.exists(csv_path): | |
| import csv | |
| rows: list[str] = [] | |
| with open(csv_path, "r", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| for i, row in enumerate(reader): | |
| if i >= 120: # cap to avoid oversized prompts | |
| break | |
| parts = [] | |
| for k, v in (row or {}).items(): | |
| if v is None: | |
| continue | |
| s = str(v).strip() | |
| if s: | |
| parts.append(f"{k}: {s}") | |
| if parts: | |
| rows.append(" | ".join(parts)) | |
| text = "\n".join(rows) | |
| return text[:6000] | |
| # Fallback to legacy JSON patient data | |
| with open(settings.PATIENT_DATA_PATH, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return json.dumps(data)[:6000] | |
| except Exception as exc: | |
| logger.warning(f"Cannot load patient RAG data: {exc}") | |
| return "" | |
| def build_system_prompt(language: str) -> str: | |
| if language.lower().startswith("fr"): | |
| disclaimer = ( | |
| "Tu es Medilang, un assistant médical virtuel compatissant et bienveillant, spécialement conçu pour les utilisateurs camerounais. Ton rôle est de fournir des conseils de premier recours, des informations sanitaires et de l'orientation, en tenant strictement compte du contexte local camerounais. " | |
| "N'oublie pas de specifier que tu n'est qu'une ia et recommande le contact au medecin en cas de situation grave" | |
| ) | |
| else: | |
| disclaimer = ( | |
| "You are Medilang, a compassionate medical assistant for Cameroon. " | |
| "Be clear and adapt advice to local context (malaria, typhoid, vaccination, access to care). " | |
| "Include a medical disclaimer and recommend seeing a doctor for serious cases." | |
| ) | |
| rag = _load_patient_context() | |
| return f"{disclaimer}\nContext (Cameroon RAG): {rag[:4000]}" | |
| def detect_language(text: str) -> str: | |
| try: | |
| from langdetect import detect | |
| code = detect(text) | |
| # Map common codes to our expected values | |
| if code.startswith("fr"): | |
| return "fr" | |
| if code.startswith("en"): | |
| return "en" | |
| return code | |
| except Exception: | |
| return "fr" | |
| def openai_client(): | |
| """Legacy helper kept for backward compatibility in tests. | |
| Not used by runtime code after migration to HF/Ollama/LM Studio. | |
| """ | |
| if not settings.OPENAI_API_KEY: | |
| raise RuntimeError("OPENAI_API_KEY not configured") | |
| if OpenAI is None: | |
| raise RuntimeError("openai package not available") | |
| return OpenAI(api_key=settings.OPENAI_API_KEY) | |
| # ========================== | |
| # Provider utilities | |
| # ========================== | |
| def _flatten_messages(messages: List[dict], system: str | None) -> str: | |
| parts: List[str] = [] | |
| if system: | |
| parts.append(f"System:\n{system}\n") | |
| for m in messages: | |
| role = m.get("role") or "user" | |
| content = m.get("content") | |
| if isinstance(content, list): | |
| # Extract text parts if using OpenAI-style content chunks | |
| text_chunks = [] | |
| for c in content: | |
| if isinstance(c, dict) and c.get("type") == "text": | |
| text_chunks.append(c.get("text") or "") | |
| elif isinstance(c, dict) and c.get("type") == "image_url": | |
| url = (c.get("image_url") or {}).get("url") if isinstance(c.get("image_url"), dict) else c.get("image_url") | |
| if url: | |
| text_chunks.append(f"[Image: {url}]") | |
| content = "\n".join([t for t in text_chunks if t]) | |
| parts.append(f"{role.capitalize()}: {content}") | |
| parts.append("Assistant:") | |
| return "\n\n".join(parts) | |
| def _hf_generate_text(prompt: str, max_new_tokens: int = 400, temperature: float = 0.3) -> str: | |
| import httpx | |
| headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} | |
| url = f"https://api-inference.huggingface.co/models/{settings.HF_TEXT_MODEL}" | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": temperature, | |
| "return_full_text": False, | |
| }, | |
| } | |
| r = httpx.post(url, headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| out = r.json() | |
| # HF can return list[{generated_text}] or dict/text | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("generated_text"): | |
| return out[0]["generated_text"] | |
| if isinstance(out, dict) and out.get("generated_text"): | |
| return out.get("generated_text") | |
| if isinstance(out, str): | |
| return out | |
| return json.dumps(out)[:1000] | |
| def _ollama_chat(messages: List[dict], model: str | None = None, base_url: str | None = None) -> str: | |
| import httpx | |
| model = model or settings.OLLAMA_MODEL | |
| base = (base_url or settings.OLLAMA_BASE_URL).rstrip("/") | |
| url = f"{base}/api/chat" | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": False, | |
| "options": {"temperature": 0.3} | |
| } | |
| r = httpx.post(url, json=payload, timeout=120) | |
| r.raise_for_status() | |
| data = r.json() | |
| # Newer Ollama returns {message: {content: "..."}} when stream=False | |
| if isinstance(data, dict): | |
| if data.get("message") and isinstance(data["message"], dict): | |
| return data["message"].get("content", "") | |
| if data.get("response"): | |
| return data.get("response", "") | |
| return str(data) | |
| def _lmstudio_chat(messages: List[dict]) -> str: | |
| import httpx | |
| base = settings.LMSTUDIO_BASE_URL.rstrip("/") | |
| url = f"{base}/chat/completions" | |
| model = settings.LMSTUDIO_MODEL or "local-model" | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": 0.3, | |
| } | |
| headers = {"Content-Type": "application/json", "Authorization": f"Bearer {settings.OPENAI_API_KEY or 'lm-studio'}"} | |
| r = httpx.post(url, headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| data = r.json() | |
| if isinstance(data, dict) and data.get("choices"): | |
| ch0 = data["choices"][0] | |
| # OpenAI-style | |
| msg = ch0.get("message") if isinstance(ch0, dict) else None | |
| if msg and isinstance(msg, dict): | |
| return msg.get("content", "") | |
| # Some variants return {choices:[{text:"..."}]} | |
| if ch0.get("text"): | |
| return ch0.get("text") | |
| return str(data) | |
| def _unified_chat(messages: List[dict], system: str | None = None) -> str: | |
| provider = (settings.AI_PROVIDER or "hf").lower() | |
| if provider == "ollama": | |
| # Ollama supports chat natively | |
| final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages | |
| return _ollama_chat(final_msgs) | |
| if provider == "lmstudio": | |
| final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages | |
| return _lmstudio_chat(final_msgs) | |
| # Default: Hugging Face text generation with flattened chat | |
| prompt = _flatten_messages(messages, system) | |
| return _hf_generate_text(prompt, max_new_tokens=400, temperature=0.3) | |
| def chat_completion(messages: List[dict], language: str) -> str: | |
| system = build_system_prompt(language or "fr") | |
| # Test compatibility: if openai_client is patched in tests, honor it first | |
| try: | |
| oc = openai_client() # patched MagicMock returns a mock without requiring real API key | |
| final_messages = ([{"role": "system", "content": system}] + messages) | |
| resp = oc.chat.completions.create( | |
| model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), | |
| messages=final_messages, | |
| temperature=0.3, | |
| ) | |
| # Support MagicMock structure used in tests | |
| return getattr(resp.choices[0].message, "content", "") | |
| except Exception: | |
| pass | |
| try: | |
| return _unified_chat(messages, system) | |
| except Exception as e: | |
| logger.error(f"Chat completion failed: {e}") | |
| return "" | |
| def _transcribe_with_huggingface(audio_url: str, language: str | None = None) -> str: | |
| """Transcribe audio using Hugging Face Inference API""" | |
| import httpx | |
| import librosa | |
| import soundfile as sf | |
| # Load and process audio | |
| content: bytes | |
| if isinstance(audio_url, str) and os.path.exists(audio_url): | |
| with open(audio_url, "rb") as f: | |
| content = f.read() | |
| else: | |
| with httpx.Client(timeout=60.0) as client: | |
| resp = client.get(audio_url) | |
| resp.raise_for_status() | |
| content = resp.content | |
| # Process audio with librosa | |
| raw_buf = io.BytesIO(content) | |
| raw_buf.seek(0) | |
| y, sr = librosa.load(raw_buf, sr=None, mono=False) | |
| if y.ndim > 1: | |
| y = librosa.to_mono(y) | |
| if sr != 16000: | |
| y = librosa.resample(y, orig_sr=sr, target_sr=16000) | |
| sr = 16000 | |
| # Denoise | |
| import noisereduce as nr | |
| noise_frames = int(sr * 0.5) | |
| if noise_frames > 0 and len(y) > noise_frames: | |
| noise_clip = y[:noise_frames] | |
| else: | |
| noise_clip = y | |
| y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) | |
| # Encode to WAV | |
| wav_buf = io.BytesIO() | |
| sf.write(wav_buf, y, sr, format="WAV") | |
| wav_buf.seek(0) | |
| audio_bytes = wav_buf.read() | |
| # Try configured HF ASR model first, then fallbacks | |
| headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} | |
| models = [ | |
| settings.HF_ASR_MODEL, | |
| "openai/whisper-large-v3", | |
| "facebook/wav2vec2-large-960h-lv60-self" | |
| ] | |
| for model in models: | |
| try: | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| params = {"task": "transcribe"} | |
| if language: | |
| params["language"] = language | |
| with httpx.Client(timeout=120.0) as client: | |
| r = client.post( | |
| url, | |
| headers={**headers, "Content-Type": "audio/wav"}, | |
| params=params, | |
| content=audio_bytes, | |
| ) | |
| r.raise_for_status() | |
| out = r.json() | |
| if isinstance(out, dict) and out.get("text"): | |
| logger.info(f"HF transcription successful with model: {model}") | |
| return out["text"] | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"): | |
| logger.info(f"HF transcription successful with model: {model}") | |
| return out[0]["text"] | |
| except Exception as e: | |
| logger.warning(f"HF model {model} failed: {e}") | |
| continue | |
| return "" | |
| def transcribe_audio(audio_url: str | None, language: str | None = None) -> str: | |
| if not audio_url: | |
| return "" | |
| # Test compatibility: if openai_client is patched, try it first | |
| try: | |
| oc = openai_client() | |
| import httpx | |
| import librosa | |
| import soundfile as sf | |
| # Load audio bytes (local path or URL) | |
| if isinstance(audio_url, str) and os.path.exists(audio_url): | |
| with open(audio_url, "rb") as f: | |
| raw_bytes = f.read() | |
| else: | |
| with httpx.Client(timeout=60.0) as client: | |
| r = client.get(audio_url) | |
| r.raise_for_status() | |
| raw_bytes = r.content | |
| # Ensure 16 kHz mono and noise reduction before Whisper | |
| raw_buf = io.BytesIO(raw_bytes) | |
| raw_buf.seek(0) | |
| y, sr = librosa.load(raw_buf, sr=None, mono=False) | |
| if hasattr(y, "ndim") and getattr(y, "ndim", 1) > 1: | |
| y = librosa.to_mono(y) # type: ignore | |
| if sr != 16000: | |
| y = librosa.resample(y, orig_sr=sr, target_sr=16000) | |
| sr = 16000 | |
| import noisereduce as nr | |
| noise_frames = int(sr * 0.5) | |
| noise_clip = y[:noise_frames] if len(y) > noise_frames else y | |
| y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) | |
| # Encode to WAV file-like for OpenAI Whisper API | |
| wav_buf = io.BytesIO() | |
| sf.write(wav_buf, y, sr, format="WAV") | |
| wav_buf.seek(0) | |
| wav_buf.name = "input.wav" # some clients expect a name | |
| tr = oc.audio.transcriptions.create( | |
| model=getattr(settings, "OPENAI_WHISPER_MODEL", "whisper-1"), | |
| file=wav_buf, | |
| language=language if language else None, | |
| ) | |
| return getattr(tr, "text", "") or (tr.get("text") if isinstance(tr, dict) else "") or "" | |
| except Exception: | |
| pass | |
| # Prefer HF ASR | |
| try: | |
| import httpx | |
| import mimetypes | |
| import librosa | |
| import soundfile as sf | |
| # 1) Load audio from local path or URL | |
| content: bytes | |
| if isinstance(audio_url, str) and os.path.exists(audio_url): | |
| with open(audio_url, "rb") as f: | |
| content = f.read() | |
| else: | |
| with httpx.Client(timeout=60.0) as client: | |
| resp = client.get(audio_url) | |
| resp.raise_for_status() | |
| content = resp.content | |
| # 2) Decode to waveform (mono, 16k) | |
| raw_buf = io.BytesIO(content) | |
| raw_buf.seek(0) | |
| y, sr = librosa.load(raw_buf, sr=None, mono=False) | |
| if hasattr(y, 'ndim') and getattr(y, 'ndim', 1) > 1: | |
| import numpy as np | |
| y = librosa.to_mono(y) # type: ignore | |
| if sr != 16000: | |
| y = librosa.resample(y, orig_sr=sr, target_sr=16000) | |
| sr = 16000 | |
| # 3) Denoise | |
| import noisereduce as nr | |
| noise_frames = int(sr * 0.5) | |
| noise_clip = y[:noise_frames] if len(y) > noise_frames else y | |
| y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr) | |
| # 4) Encode WAV | |
| wav_buf = io.BytesIO() | |
| sf.write(wav_buf, y, sr, format="WAV") | |
| wav_buf.seek(0) | |
| audio_bytes = wav_buf.read() | |
| # 5) HF | |
| headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} | |
| asr_models = [settings.HF_ASR_MODEL, "openai/whisper-large-v3"] | |
| for model in asr_models: | |
| try: | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| with httpx.Client(timeout=180.0) as client: | |
| r = client.post(url, headers={**headers, "Content-Type": "audio/wav"}, content=audio_bytes) | |
| r.raise_for_status() | |
| out = r.json() | |
| if isinstance(out, dict) and out.get("text"): | |
| return out["text"] | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"): | |
| return out[0]["text"] | |
| except Exception as e: | |
| logger.warning(f"HF ASR model {model} failed: {e}") | |
| continue | |
| return "" | |
| except Exception as exc: | |
| logger.error(f"HF transcription failed: {exc}") | |
| return "" | |
| def _hf_image_caption(image_ref: str) -> str: | |
| """Generate a caption for an image using HF image-to-text model.""" | |
| import httpx | |
| # Load bytes from URL, file path, or data URI | |
| data: bytes | |
| if isinstance(image_ref, str) and os.path.exists(image_ref): | |
| with open(image_ref, "rb") as f: | |
| data = f.read() | |
| elif isinstance(image_ref, str) and image_ref.startswith("data:"): | |
| try: | |
| b64 = image_ref.split(",", 1)[1] | |
| data = base64.b64decode(b64) | |
| except Exception: | |
| data = b"" | |
| else: | |
| with httpx.Client(timeout=60.0) as client: | |
| r = client.get(image_ref) | |
| r.raise_for_status() | |
| data = r.content | |
| headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {} | |
| url = f"https://api-inference.huggingface.co/models/{settings.HF_VISION_CAPTION_MODEL}" | |
| r = httpx.post(url, headers=headers, content=data, timeout=120) | |
| r.raise_for_status() | |
| out = r.json() | |
| if isinstance(out, list) and out and isinstance(out[0], dict): | |
| return out[0].get("generated_text") or out[0].get("caption", "") or "" | |
| if isinstance(out, dict): | |
| return out.get("generated_text") or out.get("caption", "") or "" | |
| return "" | |
| def _ollama_vision(image_ref: str, prompt: str) -> str: | |
| import httpx | |
| # Prepare image bytes as base64 for Ollama | |
| if isinstance(image_ref, str) and os.path.exists(image_ref): | |
| with open(image_ref, "rb") as f: | |
| img_bytes = f.read() | |
| elif isinstance(image_ref, str) and image_ref.startswith("data:"): | |
| try: | |
| img_bytes = base64.b64decode(image_ref.split(",", 1)[1]) | |
| except Exception: | |
| img_bytes = b"" | |
| else: | |
| with httpx.Client(timeout=60.0) as client: | |
| r = client.get(image_ref) | |
| r.raise_for_status() | |
| img_bytes = r.content | |
| b64img = base64.b64encode(img_bytes).decode("ascii") | |
| base = settings.OLLAMA_BASE_URL.rstrip("/") | |
| url = f"{base}/api/generate" | |
| payload = { | |
| "model": settings.OLLAMA_VISION_MODEL, | |
| "prompt": prompt or "Describe the medically relevant observations in this image.", | |
| "images": [b64img], | |
| "stream": False, | |
| "options": {"temperature": 0.2}, | |
| } | |
| r = httpx.post(url, json=payload, timeout=180) | |
| r.raise_for_status() | |
| data = r.json() | |
| # Non-stream returns may include 'response' | |
| if isinstance(data, dict) and data.get("response"): | |
| return data["response"] | |
| return str(data) | |
| def analyze_image(image_url: str, prompt: str | None) -> str: | |
| # Test compatibility: if openai_client is patched, use it first | |
| try: | |
| oc = openai_client() | |
| content = [] | |
| if prompt: | |
| content.append({"type": "text", "text": prompt}) | |
| content.append({"type": "image_url", "image_url": {"url": image_url}}) | |
| resp = oc.chat.completions.create( | |
| model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), | |
| messages=[{"role": "user", "content": content}], | |
| ) | |
| return getattr(resp.choices[0].message, "content", "") or "" | |
| except Exception: | |
| pass | |
| provider = (settings.AI_PROVIDER or "hf").lower() | |
| try: | |
| if provider == "ollama": | |
| return _ollama_vision(image_url, prompt or "Analyze this medical image and report relevant findings.") | |
| # Default HF: caption + chat reasoning | |
| caption = _hf_image_caption(image_url) | |
| reasoning_prompt = ( | |
| (prompt or "Analyze this medical image and report relevant findings, red flags, and advice.") | |
| + f"\n\nImage caption: {caption}" | |
| ) | |
| return _hf_generate_text(reasoning_prompt, max_new_tokens=250, temperature=0.2) | |
| except Exception as e: | |
| logger.error(f"Image analysis failed: {e}") | |
| return "" | |
| def translate_text(text: str, target_language: str) -> str: | |
| # Prefer HF dedicated translation model if available (only if token is set to avoid network in tests) | |
| if settings.HF_API_TOKEN: | |
| try: | |
| import httpx | |
| headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} | |
| payload = {"inputs": text} | |
| model = settings.HF_TRANSLATION_MODEL | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| r = httpx.post(url, headers=headers, json=payload, timeout=60) | |
| if r.status_code == 200: | |
| out = r.json() | |
| if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("translation_text"): | |
| return out[0]["translation_text"] | |
| except Exception as exc: # pragma: no cover | |
| logger.warning(f"HF translation failed: {exc}") | |
| # Test compatibility: try OpenAI-style client if patched | |
| try: | |
| oc = openai_client() | |
| resp = oc.chat.completions.create( | |
| model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"), | |
| messages=[ | |
| {"role": "system", "content": "You translate text faithfully."}, | |
| {"role": "user", "content": f"Translate to {target_language}: {text}"}, | |
| ], | |
| ) | |
| return getattr(resp.choices[0].message, "content", None) or text | |
| except Exception: | |
| pass | |
| # Fallback via unified chat with explicit instruction | |
| prompt = f"Translate to {target_language} (preserve meaning and medical accuracy): {text}" | |
| try: | |
| return _unified_chat([{"role": "user", "content": prompt}], system=None) or text | |
| except Exception: | |
| return text | |