File size: 13,209 Bytes
47e3582 8983805 64b8f7a 47e3582 0004b52 47e3582 f73c316 29c313c 5c1fa92 645c97c 8644d71 84d39f9 8644d71 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 a3cb67a 84d39f9 a3cb67a 84d39f9 a3cb67a 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 84d39f9 47e3582 645c97c 84d39f9 47e3582 8644d71 f73c316 645c97c 84d39f9 8644d71 64b8f7a 47e3582 8644d71 64b8f7a 47e3582 8644d71 5c1fa92 eb14570 8644d71 47e3582 8644d71 47e3582 8644d71 47e3582 8644d71 47e3582 8644d71 47e3582 8644d71 47e3582 8644d71 47e3582 8644d71 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# src/core/response_pipeline.py
from fastapi import HTTPException, status
from src.core import prompt_builder
from src.core.state import AppState
from src.data.medical_kb import search_medical_kb
from src.models.account import Account
from src.services import local_llm_service
from src.services.gemini import gemini_chat
from src.services.guard import SafetyGuard
from src.utils.logger import logger
from src.utils.rotator import APIKeyRotator
# --- Private Helper Functions ---
def _validate_user_query(message: str, safety_guard: SafetyGuard | None):
"""
Checks the user's query against the safety guard.
Raises an HTTPException if the query is unsafe.
"""
if not safety_guard: return
try:
is_safe, reason = safety_guard.check_user_query(message)
if not is_safe:
logger().warning(f"Safety guard blocked user query: {reason}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Query blocked for safety reasons: {reason}"
)
logger().info(f"User query passed safety validation: {reason}")
except Exception as e:
logger().error(f"Safety guard failed on user query: {e}")
# Re-raise to be caught by the main orchestrator
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to validate user query safety."
) from e
def _validate_model_response(
query: str,
response: str,
safety_guard: SafetyGuard | None
) -> str:
"""
Checks the generated model response against the safety guard.
Returns a safe fallback message if the response is deemed unsafe.
"""
if not safety_guard: return response
safe_fallback = "I apologize, but I cannot provide a response to that query as it may contain unsafe content. Please consult with a qualified healthcare professional for medical advice."
try:
is_safe, reason = safety_guard.check_model_answer(query, response)
if not is_safe:
logger().warning(f"Safety guard blocked AI response: {reason}")
return safe_fallback
logger().info(f"AI response passed safety validation: {reason}")
return response
except Exception as e:
logger().error(f"Safety guard failed on model response: {e}")
logger().warning("Safety guard failed, allowing response through (fail-open)")
# Fail open: return the original response if the guard itself fails
return response
async def _retrieve_context(
state: AppState,
session_id: str,
patient_id: str,
message: str
) -> str:
"""Retrieves enhanced medical context. This is the entry point for RAG."""
try:
return await state.memory_manager.get_enhanced_context(
session_id=session_id,
patient_id=patient_id,
question=message,
nvidia_rotator=state.nvidia_rotator
)
except Exception as e:
logger().error(f"Error getting medical context: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to build medical context."
) from e
def _add_disclaimer(response_text: str) -> str:
"""Adds a standard medical disclaimer if one is not already present."""
if "disclaimer" not in response_text.lower() and "consult" not in response_text.lower():
disclaimer = "\n\n⚠️ **Important Disclaimer:** This information is for educational purposes only and should not replace professional medical advice, diagnosis, or treatment. Always consult with qualified healthcare professionals."
return response_text + disclaimer
return response_text
async def _persist_exchange(
state: AppState,
session_id: str,
patient_id: str,
account_id: str,
question: str,
answer: str
):
"""Processes and stores the full conversation exchange."""
summary = await state.memory_manager.process_medical_exchange(
session_id=session_id,
patient_id=patient_id,
doctor_id=account_id,
question=question,
answer=answer,
gemini_rotator=state.gemini_rotator,
nvidia_rotator=state.nvidia_rotator
)
if not summary:
logger().warning(f"Failed to process and store medical exchange for session {session_id}")
# --- Core Response Generation Logic ---
async def generate_llm_response(
account: Account,
message: str,
rotator: APIKeyRotator,
medical_context: str = ""
) -> str | None:
"""
Generates an intelligent medical response using the LLM, adding a disclaimer.
This function is now purely for generation, with safety checks handled elsewhere.
"""
prompt = prompt_builder.medical_response_prompt(
account=account,
user_message=message,
medical_context=medical_context
)
if local_llm_service.model_loaded:
response_text = local_llm_service.get_inference(prompt=prompt)
else:
response_text = await gemini_chat(prompt, rotator)
if not response_text:
return None
response_with_disclaimer = _add_disclaimer(response_text)
logger().info(f"Gemini response generated, length: {len(response_with_disclaimer)} chars")
return response_with_disclaimer
# --- Main Pipeline Orchestrator ---
async def generate_chat_response(
state: AppState,
message: str,
session_id: str,
patient_id: str,
account_id: str
) -> str:
"""
Orchestrates the pipeline for generating a chat response.
"""
logger().info(f"Starting response pipeline for session {session_id}")
safety_guard: SafetyGuard | None = None
try:
safety_guard = SafetyGuard(state.nvidia_rotator)
except Exception as e:
logger().warning("Safety guard failed to be created, ignoring")
# 1. Validate User Query
_validate_user_query(message, safety_guard)
# 2. Retrieve Context (RAG Entry Point)
medical_context = await _retrieve_context(state, session_id, patient_id, message)
# 3. Fetch Account Details
account = state.memory_manager.get_account(account_id)
if not account:
logger().error(f"Account not found for account_id: {account_id}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found")
# 4. Generate AI Response
try:
response_text = await generate_llm_response(
message=message,
account=account,
rotator=state.gemini_rotator,
medical_context=medical_context
)
# If LLM fails, use a fallback
if not response_text:
logger().warning("LLM response failed, using fallback.")
response_text = _generate_fallback_response(message=message, account=account)
except Exception as e:
logger().error(f"Error generating medical response: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate AI response."
) from e
# 5. Validate Model's Response
final_response = _validate_model_response(message, response_text, safety_guard)
# 6. Persist the Exchange (Asynchronously)
# This can be done in the background if it's not critical for the user response
await _persist_exchange(
state=state,
session_id=session_id,
patient_id=patient_id,
account_id=account_id,
question=message,
answer=final_response
)
return final_response
def _generate_fallback_response(
message: str,
account: Account
) -> str:
"""Generates a fallback response using a local knowledge base."""
kb_info = search_medical_kb(message)
logger().info("Generating backup response")
# Build response based on available information
response_parts = []
# Analyze the question to provide more specific responses
question_lower = message.lower()
if kb_info:
response_parts.append(f"Based on your question about medical topics, here's what I found:\n\n{kb_info}")
# Add specific guidance based on the medical topic
if any(word in question_lower for word in ["fever", "temperature", "hot"]):
response_parts.append("\n\n**Key Points about Fever:**")
response_parts.append("• Normal body temperature is around 98.6°F (37°C)")
response_parts.append("• Fever is often a sign of infection or inflammation")
response_parts.append("• Monitor for other symptoms that accompany fever")
response_parts.append("• Seek medical attention for high fevers (>103°F/39.4°C) or persistent fevers")
elif any(word in question_lower for word in ["headache", "head pain", "migraine"]):
response_parts.append("\n\n**Key Points about Headaches:**")
response_parts.append("• Tension headaches are the most common type")
response_parts.append("• Migraines often have specific triggers and symptoms")
response_parts.append("• Sudden, severe headaches require immediate medical attention")
response_parts.append("• Keep a headache diary to identify patterns")
elif any(word in question_lower for word in ["cough", "cold", "respiratory"]):
response_parts.append("\n\n**Key Points about Respiratory Symptoms:**")
response_parts.append("• Dry vs. productive cough have different implications")
response_parts.append("• Most colds resolve within 7-10 days")
response_parts.append("• Persistent cough may indicate underlying conditions")
response_parts.append("• Monitor for difficulty breathing or chest pain")
elif any(word in question_lower for word in ["hypertension", "blood pressure", "high bp"]):
response_parts.append("\n\n**Key Points about Hypertension:**")
response_parts.append("• Often called the 'silent killer' due to lack of symptoms")
response_parts.append("• Regular monitoring is essential")
response_parts.append("• Lifestyle modifications can help control blood pressure")
response_parts.append("• Medication may be necessary for some individuals")
elif any(word in question_lower for word in ["diabetes", "blood sugar", "glucose"]):
response_parts.append("\n\n**Key Points about Diabetes:**")
response_parts.append("• Type 1: Autoimmune, requires insulin")
response_parts.append("• Type 2: Often lifestyle-related, may be managed with diet/exercise")
response_parts.append("• Regular blood sugar monitoring is crucial")
response_parts.append("• Complications can affect multiple organ systems")
else:
# Provide more helpful response for general questions
if "what is" in question_lower or "define" in question_lower:
response_parts.append("I understand you're asking about a medical topic. While I don't have specific information about this particular condition or symptom, I can provide some general guidance.")
elif "how to" in question_lower or "treatment" in question_lower:
response_parts.append("I understand you're asking about treatment or management of a medical condition. This is an area where professional medical advice is particularly important.")
elif "symptom" in question_lower or "sign" in question_lower:
response_parts.append("I understand you're asking about symptoms or signs of a medical condition. Remember that symptoms can vary between individuals and may indicate different conditions.")
else:
response_parts.append("Thank you for your medical question. While I can provide general information, it's important to consult with healthcare professionals for personalized medical advice.")
# Add role-specific guidance
if account.role.lower() in ["physician", "doctor", "nurse"]:
response_parts.append("\n\n**Professional Context:** As a healthcare professional, you're likely familiar with these concepts. Remember to always follow your institution's protocols and guidelines, and consider the latest clinical evidence in your practice.")
elif account.role.lower() in ["medical student", "student"]:
response_parts.append("\n\n**Educational Context:** As a medical student, this information can help with your studies. Always verify information with your professors and clinical supervisors, and use this as a starting point for further research.")
elif account.role.lower() in ["patient"]:
response_parts.append("\n\n**Patient Context:** As a patient, this information is for educational purposes only. Please discuss any concerns with your healthcare provider, and don't make treatment decisions based solely on this information.")
else:
response_parts.append("\n\n**General Context:** This information is provided for educational purposes. Always consult with qualified healthcare professionals for medical advice.")
# Add specialty-specific information if available
if account.specialty and account.specialty.lower() in ["cardiology", "cardiac"]:
response_parts.append("\n\n**Cardiology Perspective:** Given your interest in cardiology, consider how this information relates to cardiovascular health and patient care. Many conditions can have cardiac implications.")
elif account.specialty and account.specialty.lower() in ["pediatrics", "pediatric"]:
response_parts.append("\n\n**Pediatric Perspective:** In pediatric care, remember that children may present differently than adults and may require specialized approaches. Consider age-appropriate considerations.")
elif account.specialty and account.specialty.lower() in ["emergency", "er"]:
response_parts.append("\n\n**Emergency Medicine Perspective:** In emergency settings, rapid assessment and intervention are crucial. Consider the urgency and severity of presenting symptoms.")
# Add medical disclaimer
response_parts.append("\n\n⚠️ **Important Disclaimer:** This information is for educational purposes only and should not replace professional medical advice, diagnosis, or treatment. Always consult with qualified healthcare professionals.")
return "\n".join(response_parts)
|