medilang-tech / app /ai_services.py
Dama03's picture
first push of the AI
411a994
from typing import List
from loguru import logger
import json
import io
import os
import base64
from app.utils.config import settings
# Keep OpenAI symbol to avoid breaking legacy tests that patch it, but do not use it in runtime paths
try:
from openai import OpenAI
except Exception: # pragma: no cover - optional import during dev
OpenAI = None # type: ignore
def _load_patient_context() -> str:
try:
# Prefer CSV summaries for Cameroon context if available
csv_path = getattr(settings, "CAMEROON_DATA_CSV", None)
if csv_path and os.path.exists(csv_path):
import csv
rows: list[str] = []
with open(csv_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
if i >= 120: # cap to avoid oversized prompts
break
parts = []
for k, v in (row or {}).items():
if v is None:
continue
s = str(v).strip()
if s:
parts.append(f"{k}: {s}")
if parts:
rows.append(" | ".join(parts))
text = "\n".join(rows)
return text[:6000]
# Fallback to legacy JSON patient data
with open(settings.PATIENT_DATA_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return json.dumps(data)[:6000]
except Exception as exc:
logger.warning(f"Cannot load patient RAG data: {exc}")
return ""
def build_system_prompt(language: str) -> str:
if language.lower().startswith("fr"):
disclaimer = (
"Tu es Medilang, un assistant médical virtuel compatissant et bienveillant, spécialement conçu pour les utilisateurs camerounais. Ton rôle est de fournir des conseils de premier recours, des informations sanitaires et de l'orientation, en tenant strictement compte du contexte local camerounais. "
"N'oublie pas de specifier que tu n'est qu'une ia et recommande le contact au medecin en cas de situation grave"
)
else:
disclaimer = (
"You are Medilang, a compassionate medical assistant for Cameroon. "
"Be clear and adapt advice to local context (malaria, typhoid, vaccination, access to care). "
"Include a medical disclaimer and recommend seeing a doctor for serious cases."
)
rag = _load_patient_context()
return f"{disclaimer}\nContext (Cameroon RAG): {rag[:4000]}"
def detect_language(text: str) -> str:
try:
from langdetect import detect
code = detect(text)
# Map common codes to our expected values
if code.startswith("fr"):
return "fr"
if code.startswith("en"):
return "en"
return code
except Exception:
return "fr"
def openai_client():
"""Legacy helper kept for backward compatibility in tests.
Not used by runtime code after migration to HF/Ollama/LM Studio.
"""
if not settings.OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY not configured")
if OpenAI is None:
raise RuntimeError("openai package not available")
return OpenAI(api_key=settings.OPENAI_API_KEY)
# ==========================
# Provider utilities
# ==========================
def _flatten_messages(messages: List[dict], system: str | None) -> str:
parts: List[str] = []
if system:
parts.append(f"System:\n{system}\n")
for m in messages:
role = m.get("role") or "user"
content = m.get("content")
if isinstance(content, list):
# Extract text parts if using OpenAI-style content chunks
text_chunks = []
for c in content:
if isinstance(c, dict) and c.get("type") == "text":
text_chunks.append(c.get("text") or "")
elif isinstance(c, dict) and c.get("type") == "image_url":
url = (c.get("image_url") or {}).get("url") if isinstance(c.get("image_url"), dict) else c.get("image_url")
if url:
text_chunks.append(f"[Image: {url}]")
content = "\n".join([t for t in text_chunks if t])
parts.append(f"{role.capitalize()}: {content}")
parts.append("Assistant:")
return "\n\n".join(parts)
def _hf_generate_text(prompt: str, max_new_tokens: int = 400, temperature: float = 0.3) -> str:
import httpx
headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {}
url = f"https://api-inference.huggingface.co/models/{settings.HF_TEXT_MODEL}"
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"return_full_text": False,
},
}
r = httpx.post(url, headers=headers, json=payload, timeout=120)
r.raise_for_status()
out = r.json()
# HF can return list[{generated_text}] or dict/text
if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("generated_text"):
return out[0]["generated_text"]
if isinstance(out, dict) and out.get("generated_text"):
return out.get("generated_text")
if isinstance(out, str):
return out
return json.dumps(out)[:1000]
def _ollama_chat(messages: List[dict], model: str | None = None, base_url: str | None = None) -> str:
import httpx
model = model or settings.OLLAMA_MODEL
base = (base_url or settings.OLLAMA_BASE_URL).rstrip("/")
url = f"{base}/api/chat"
payload = {
"model": model,
"messages": messages,
"stream": False,
"options": {"temperature": 0.3}
}
r = httpx.post(url, json=payload, timeout=120)
r.raise_for_status()
data = r.json()
# Newer Ollama returns {message: {content: "..."}} when stream=False
if isinstance(data, dict):
if data.get("message") and isinstance(data["message"], dict):
return data["message"].get("content", "")
if data.get("response"):
return data.get("response", "")
return str(data)
def _lmstudio_chat(messages: List[dict]) -> str:
import httpx
base = settings.LMSTUDIO_BASE_URL.rstrip("/")
url = f"{base}/chat/completions"
model = settings.LMSTUDIO_MODEL or "local-model"
payload = {
"model": model,
"messages": messages,
"temperature": 0.3,
}
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {settings.OPENAI_API_KEY or 'lm-studio'}"}
r = httpx.post(url, headers=headers, json=payload, timeout=120)
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and data.get("choices"):
ch0 = data["choices"][0]
# OpenAI-style
msg = ch0.get("message") if isinstance(ch0, dict) else None
if msg and isinstance(msg, dict):
return msg.get("content", "")
# Some variants return {choices:[{text:"..."}]}
if ch0.get("text"):
return ch0.get("text")
return str(data)
def _unified_chat(messages: List[dict], system: str | None = None) -> str:
provider = (settings.AI_PROVIDER or "hf").lower()
if provider == "ollama":
# Ollama supports chat natively
final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages
return _ollama_chat(final_msgs)
if provider == "lmstudio":
final_msgs = ([] if not system else [{"role": "system", "content": system}]) + messages
return _lmstudio_chat(final_msgs)
# Default: Hugging Face text generation with flattened chat
prompt = _flatten_messages(messages, system)
return _hf_generate_text(prompt, max_new_tokens=400, temperature=0.3)
def chat_completion(messages: List[dict], language: str) -> str:
system = build_system_prompt(language or "fr")
# Test compatibility: if openai_client is patched in tests, honor it first
try:
oc = openai_client() # patched MagicMock returns a mock without requiring real API key
final_messages = ([{"role": "system", "content": system}] + messages)
resp = oc.chat.completions.create(
model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"),
messages=final_messages,
temperature=0.3,
)
# Support MagicMock structure used in tests
return getattr(resp.choices[0].message, "content", "")
except Exception:
pass
try:
return _unified_chat(messages, system)
except Exception as e:
logger.error(f"Chat completion failed: {e}")
return ""
def _transcribe_with_huggingface(audio_url: str, language: str | None = None) -> str:
"""Transcribe audio using Hugging Face Inference API"""
import httpx
import librosa
import soundfile as sf
# Load and process audio
content: bytes
if isinstance(audio_url, str) and os.path.exists(audio_url):
with open(audio_url, "rb") as f:
content = f.read()
else:
with httpx.Client(timeout=60.0) as client:
resp = client.get(audio_url)
resp.raise_for_status()
content = resp.content
# Process audio with librosa
raw_buf = io.BytesIO(content)
raw_buf.seek(0)
y, sr = librosa.load(raw_buf, sr=None, mono=False)
if y.ndim > 1:
y = librosa.to_mono(y)
if sr != 16000:
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
sr = 16000
# Denoise
import noisereduce as nr
noise_frames = int(sr * 0.5)
if noise_frames > 0 and len(y) > noise_frames:
noise_clip = y[:noise_frames]
else:
noise_clip = y
y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr)
# Encode to WAV
wav_buf = io.BytesIO()
sf.write(wav_buf, y, sr, format="WAV")
wav_buf.seek(0)
audio_bytes = wav_buf.read()
# Try configured HF ASR model first, then fallbacks
headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {}
models = [
settings.HF_ASR_MODEL,
"openai/whisper-large-v3",
"facebook/wav2vec2-large-960h-lv60-self"
]
for model in models:
try:
url = f"https://api-inference.huggingface.co/models/{model}"
params = {"task": "transcribe"}
if language:
params["language"] = language
with httpx.Client(timeout=120.0) as client:
r = client.post(
url,
headers={**headers, "Content-Type": "audio/wav"},
params=params,
content=audio_bytes,
)
r.raise_for_status()
out = r.json()
if isinstance(out, dict) and out.get("text"):
logger.info(f"HF transcription successful with model: {model}")
return out["text"]
if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"):
logger.info(f"HF transcription successful with model: {model}")
return out[0]["text"]
except Exception as e:
logger.warning(f"HF model {model} failed: {e}")
continue
return ""
def transcribe_audio(audio_url: str | None, language: str | None = None) -> str:
if not audio_url:
return ""
# Test compatibility: if openai_client is patched, try it first
try:
oc = openai_client()
import httpx
import librosa
import soundfile as sf
# Load audio bytes (local path or URL)
if isinstance(audio_url, str) and os.path.exists(audio_url):
with open(audio_url, "rb") as f:
raw_bytes = f.read()
else:
with httpx.Client(timeout=60.0) as client:
r = client.get(audio_url)
r.raise_for_status()
raw_bytes = r.content
# Ensure 16 kHz mono and noise reduction before Whisper
raw_buf = io.BytesIO(raw_bytes)
raw_buf.seek(0)
y, sr = librosa.load(raw_buf, sr=None, mono=False)
if hasattr(y, "ndim") and getattr(y, "ndim", 1) > 1:
y = librosa.to_mono(y) # type: ignore
if sr != 16000:
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
sr = 16000
import noisereduce as nr
noise_frames = int(sr * 0.5)
noise_clip = y[:noise_frames] if len(y) > noise_frames else y
y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr)
# Encode to WAV file-like for OpenAI Whisper API
wav_buf = io.BytesIO()
sf.write(wav_buf, y, sr, format="WAV")
wav_buf.seek(0)
wav_buf.name = "input.wav" # some clients expect a name
tr = oc.audio.transcriptions.create(
model=getattr(settings, "OPENAI_WHISPER_MODEL", "whisper-1"),
file=wav_buf,
language=language if language else None,
)
return getattr(tr, "text", "") or (tr.get("text") if isinstance(tr, dict) else "") or ""
except Exception:
pass
# Prefer HF ASR
try:
import httpx
import mimetypes
import librosa
import soundfile as sf
# 1) Load audio from local path or URL
content: bytes
if isinstance(audio_url, str) and os.path.exists(audio_url):
with open(audio_url, "rb") as f:
content = f.read()
else:
with httpx.Client(timeout=60.0) as client:
resp = client.get(audio_url)
resp.raise_for_status()
content = resp.content
# 2) Decode to waveform (mono, 16k)
raw_buf = io.BytesIO(content)
raw_buf.seek(0)
y, sr = librosa.load(raw_buf, sr=None, mono=False)
if hasattr(y, 'ndim') and getattr(y, 'ndim', 1) > 1:
import numpy as np
y = librosa.to_mono(y) # type: ignore
if sr != 16000:
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
sr = 16000
# 3) Denoise
import noisereduce as nr
noise_frames = int(sr * 0.5)
noise_clip = y[:noise_frames] if len(y) > noise_frames else y
y = nr.reduce_noise(y=y, y_noise=noise_clip, sr=sr)
# 4) Encode WAV
wav_buf = io.BytesIO()
sf.write(wav_buf, y, sr, format="WAV")
wav_buf.seek(0)
audio_bytes = wav_buf.read()
# 5) HF
headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {}
asr_models = [settings.HF_ASR_MODEL, "openai/whisper-large-v3"]
for model in asr_models:
try:
url = f"https://api-inference.huggingface.co/models/{model}"
with httpx.Client(timeout=180.0) as client:
r = client.post(url, headers={**headers, "Content-Type": "audio/wav"}, content=audio_bytes)
r.raise_for_status()
out = r.json()
if isinstance(out, dict) and out.get("text"):
return out["text"]
if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("text"):
return out[0]["text"]
except Exception as e:
logger.warning(f"HF ASR model {model} failed: {e}")
continue
return ""
except Exception as exc:
logger.error(f"HF transcription failed: {exc}")
return ""
def _hf_image_caption(image_ref: str) -> str:
"""Generate a caption for an image using HF image-to-text model."""
import httpx
# Load bytes from URL, file path, or data URI
data: bytes
if isinstance(image_ref, str) and os.path.exists(image_ref):
with open(image_ref, "rb") as f:
data = f.read()
elif isinstance(image_ref, str) and image_ref.startswith("data:"):
try:
b64 = image_ref.split(",", 1)[1]
data = base64.b64decode(b64)
except Exception:
data = b""
else:
with httpx.Client(timeout=60.0) as client:
r = client.get(image_ref)
r.raise_for_status()
data = r.content
headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"} if settings.HF_API_TOKEN else {}
url = f"https://api-inference.huggingface.co/models/{settings.HF_VISION_CAPTION_MODEL}"
r = httpx.post(url, headers=headers, content=data, timeout=120)
r.raise_for_status()
out = r.json()
if isinstance(out, list) and out and isinstance(out[0], dict):
return out[0].get("generated_text") or out[0].get("caption", "") or ""
if isinstance(out, dict):
return out.get("generated_text") or out.get("caption", "") or ""
return ""
def _ollama_vision(image_ref: str, prompt: str) -> str:
import httpx
# Prepare image bytes as base64 for Ollama
if isinstance(image_ref, str) and os.path.exists(image_ref):
with open(image_ref, "rb") as f:
img_bytes = f.read()
elif isinstance(image_ref, str) and image_ref.startswith("data:"):
try:
img_bytes = base64.b64decode(image_ref.split(",", 1)[1])
except Exception:
img_bytes = b""
else:
with httpx.Client(timeout=60.0) as client:
r = client.get(image_ref)
r.raise_for_status()
img_bytes = r.content
b64img = base64.b64encode(img_bytes).decode("ascii")
base = settings.OLLAMA_BASE_URL.rstrip("/")
url = f"{base}/api/generate"
payload = {
"model": settings.OLLAMA_VISION_MODEL,
"prompt": prompt or "Describe the medically relevant observations in this image.",
"images": [b64img],
"stream": False,
"options": {"temperature": 0.2},
}
r = httpx.post(url, json=payload, timeout=180)
r.raise_for_status()
data = r.json()
# Non-stream returns may include 'response'
if isinstance(data, dict) and data.get("response"):
return data["response"]
return str(data)
def analyze_image(image_url: str, prompt: str | None) -> str:
# Test compatibility: if openai_client is patched, use it first
try:
oc = openai_client()
content = []
if prompt:
content.append({"type": "text", "text": prompt})
content.append({"type": "image_url", "image_url": {"url": image_url}})
resp = oc.chat.completions.create(
model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"),
messages=[{"role": "user", "content": content}],
)
return getattr(resp.choices[0].message, "content", "") or ""
except Exception:
pass
provider = (settings.AI_PROVIDER or "hf").lower()
try:
if provider == "ollama":
return _ollama_vision(image_url, prompt or "Analyze this medical image and report relevant findings.")
# Default HF: caption + chat reasoning
caption = _hf_image_caption(image_url)
reasoning_prompt = (
(prompt or "Analyze this medical image and report relevant findings, red flags, and advice.")
+ f"\n\nImage caption: {caption}"
)
return _hf_generate_text(reasoning_prompt, max_new_tokens=250, temperature=0.2)
except Exception as e:
logger.error(f"Image analysis failed: {e}")
return ""
def translate_text(text: str, target_language: str) -> str:
# Prefer HF dedicated translation model if available (only if token is set to avoid network in tests)
if settings.HF_API_TOKEN:
try:
import httpx
headers = {"Authorization": f"Bearer {settings.HF_API_TOKEN}"}
payload = {"inputs": text}
model = settings.HF_TRANSLATION_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
r = httpx.post(url, headers=headers, json=payload, timeout=60)
if r.status_code == 200:
out = r.json()
if isinstance(out, list) and out and isinstance(out[0], dict) and out[0].get("translation_text"):
return out[0]["translation_text"]
except Exception as exc: # pragma: no cover
logger.warning(f"HF translation failed: {exc}")
# Test compatibility: try OpenAI-style client if patched
try:
oc = openai_client()
resp = oc.chat.completions.create(
model=getattr(settings, "OPENAI_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": "You translate text faithfully."},
{"role": "user", "content": f"Translate to {target_language}: {text}"},
],
)
return getattr(resp.choices[0].message, "content", None) or text
except Exception:
pass
# Fallback via unified chat with explicit instruction
prompt = f"Translate to {target_language} (preserve meaning and medical accuracy): {text}"
try:
return _unified_chat([{"role": "user", "content": prompt}], system=None) or text
except Exception:
return text