import json import os import uuid import traceback from datetime import datetime from typing import Dict, List, Any, Optional import pytz from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage from .config import logger from .github_storage import get_github_storage class MedicalAnswerValidator: """ Medical answer validation system that evaluates responses using a separate LLM instance. Produces structured JSON evaluations and saves them to evaluation_results.json. """ def __init__(self): """Initialize the validator with LLM and system prompt.""" self.validator_llm = self._create_validator_llm() self.validation_system_prompt = self._create_validation_system_prompt() self.evaluation_file = "evaluation_results.json" logger.info("Medical answer validator initialized successfully") def _get_next_interaction_id(self) -> str: """Get the next interaction ID by finding the highest existing ID and adding 1.""" try: # Try to get from GitHub first github_storage = get_github_storage() existing_content = github_storage._get_file_content("medical_data/evaluation_results.json") if existing_content: try: evaluations = json.loads(existing_content) if evaluations and isinstance(evaluations, list): logger.info(f"Found {len(evaluations)} existing evaluations in GitHub") # Find the highest existing ID max_id = 0 for eval_item in evaluations: try: current_id = int(eval_item.get("interaction_id", "0")) max_id = max(max_id, current_id) except (ValueError, TypeError): continue next_id = str(max_id + 1) logger.info(f"Next interaction ID will be: {next_id}") return next_id except json.JSONDecodeError as e: logger.warning(f"Failed to parse GitHub evaluation file: {e}") pass # Fallback to local file check if os.path.exists(self.evaluation_file): logger.info("GitHub file not found, checking local file") with open(self.evaluation_file, "r", encoding="utf-8") as f: evaluations = json.load(f) if evaluations: logger.info(f"Found {len(evaluations)} existing evaluations in local file") # Find the highest existing ID max_id = 0 for eval_item in evaluations: try: current_id = int(eval_item.get("interaction_id", "0")) max_id = max(max_id, current_id) except (ValueError, TypeError): continue next_id = str(max_id + 1) logger.info(f"Next interaction ID from local file: {next_id}") return next_id else: logger.info("Local file is empty, starting with ID 1") return "1" else: logger.info("No existing evaluation file found, starting with ID 1") return "1" except Exception as e: logger.error(f"Error getting next interaction ID: {e}") return "1" def _clean_documents_for_storage(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Clean documents by removing snippets and keeping only essential fields.""" cleaned_docs = [] for doc in documents: is_context_page = doc.get("context_enrichment", False) cleaned_doc = { "doc_id": doc.get("doc_id"), "source": doc.get("source", "unknown"), "provider": doc.get("provider", "unknown"), "page_number": doc.get("page_number", "unknown"), "disease": doc.get("disease", "unknown"), "page_type": "CONTEXT PAGE" if is_context_page else "ORIGINAL PAGE", "context_enrichment": is_context_page, "content": doc.get("content", "") } cleaned_docs.append(cleaned_doc) return cleaned_docs def _create_validation_system_prompt(self) -> str: """Create the system prompt for the validation LLM.""" return """Role You are medical information validator tasked with validating the following answer to ensure it is accurate, complete, relevant, well-structured (coherent), appropriately concise (length), and properly attributed (cited) based STRICTLY AND ONLY on the provided documents. CRITICAL VALIDATION RULES: 1. **STRICT SOURCE VERIFICATION**: Every claim in the answer MUST be explicitly present in the provided documents. Do NOT accept general medical knowledge or reasonable inferences. 2. **CITATION VERIFICATION**: For EVERY citation (e.g., [SASLT 2021, p. X]), you MUST verify that the specific page number mentioned actually contains that exact information. If a page is cited but does not contain the claim, this is a CRITICAL ERROR that must significantly reduce the Citations_Attribution_Rating. 3. **NO HALLUCINATIONS**: Any information not explicitly stated in the documents is considered a hallucination and must be flagged in the Accuracy_Comment. 4. **EXACT THRESHOLDS**: If the answer mentions numeric thresholds (e.g., "HBV DNA > 100,000 IU/mL"), verify these exact numbers appear in the cited documents. Do not accept paraphrased or generalized thresholds. 5. **PAGE-SPECIFIC VALIDATION**: When evaluating citations, check that the content on the cited page actually supports the claim. Never assume a citation is correct without verification. Here is your input: Question: [User's original question] Retrieved Answer: [The answer generated or retrieved from documents] Documents: [Provide a link or summary of the relevant document sections] Validation Task Criteria: For each criterion below, provide a Score (0-100%) and a detailed Comment explaining the score and noting any necessary improvements, specific issues, or confirming satisfactory performance. Accuracy (0-100%) Is the answer factually correct based STRICTLY AND ONLY on the provided documents? Ensure that no information contradicts what is written in the documents. CRITICAL: Check if the answer contains ANY information that is NOT explicitly stated in the provided documents. This includes: - General medical knowledge not present in the documents - Reasonable inferences or interpretations not explicitly stated - Information from other sources or guidelines not provided If you find any discrepancies or factual errors, point them out in the [Accuracy_Comment]. If the answer contains unsupported statements (hallucinations) - information not explicitly present in the provided documents - highlight them SPECIFICALLY in the [Accuracy_Comment] with the exact claim and why it's not supported. Validation Score Guidelines: 100%: The answer is factually correct, with no contradictions or missing information, and EVERY statement is explicitly supported by the provided documents. 85-99%: The answer is mostly correct, but contains minor inaccuracies or omissions that don't substantially affect the overall accuracy. 70-84%: The answer contains notable factual errors, unsupported claims, or omissions that may affect the response's reliability. Below 70%: The answer is factually incorrect, contains critical errors, hallucinations, or misrepresents the content of the documents. Coherence (0-100%) Is the answer logically structured and clear? Ensure the answer flows well, uses appropriate language, and makes sense to a human reader. If the answer is unclear or poorly structured, suggest specific improvements in the [Coherence_Comment]. Coherence Score Guidelines: 100%: The answer is logically structured, easy to understand, and free from confusion or ambiguity. 85-99%: The answer is mostly clear but may have slight issues with flow or readability, such as minor disjointedness. 70-84%: The answer lacks clarity or contains some sections that confuse the reader due to poor structure. Below 70%: The answer is poorly structured or difficult to follow, requiring significant improvement in clarity and flow. Relevance (0-100%) Does the answer address the user's question adequately and fully? Ensure that the core topic of the question is covered and that no irrelevant or off-topic information is included. If parts of the question are missed or the answer is irrelevant, identify which parts need improvement in the [Relevance_Comment]. Relevance Score Guidelines: 100%: The answer directly addresses all parts of the user's question without unnecessary deviations. 85-99%: The answer is mostly relevant, but might include slight off-topic information or miss minor aspects of the question. 70-84%: The answer misses key points or includes significant irrelevant details that distract from the question. Below 70%: The answer is largely irrelevant to the user's question or includes significant off-topic information. Completeness (0-100%) Does the answer provide all necessary information that is available in the documents to fully address the question? Are there any critical details missing? If the answer is incomplete or vague, suggest what additional details should be included from the documents in the [Completeness_Comment]. Completeness Score Guidelines: 100%: The answer provides all necessary information in sufficient detail, covering all aspects of the question based on the documents. 85-99%: The answer covers most of the required details but may lack some minor points available in the source. 70-84%: The answer is missing critical information available in the documents or lacks important details to fully address the question. Below 70%: The answer is severely incomplete, leaving out essential information available in the documents. Citations/Attribution (0-100%) Is every claim in the answer correctly attributed (cited) to the relevant document(s)? Are all citations accurate and correctly placed? CRITICAL CITATION VERIFICATION REQUIREMENTS: 1. **PAGE CONTENT VERIFICATION**: For EVERY citation (e.g., [SASLT 2021, p. X]), you MUST verify that the specific page number cited actually contains that exact information in the provided documents. 2. **INCORRECT CITATIONS ARE CRITICAL ERRORS**: If a claim cites a page that does NOT contain that information, this is a CRITICAL ERROR and must be explicitly identified in the [Citations_Attribution_Comment] with the specific claim and incorrect page number. 3. **NO ASSUMPTIONS**: Never assume a citation is correct. Always verify against the provided document content. 4. **SPECIFIC EXAMPLES REQUIRED**: In your comment, provide specific examples of incorrect citations if found (e.g., "The answer claims 'TDF is used for HIV coinfection [SASLT 2021, p. 6]' but page 6 does not mention HIV coinfection or TDF use for HIV patients"). If any statement lacks a citation or has an incorrect citation, note the SPECIFIC issue in the [Citations_Attribution_Comment] with the exact claim and page number. Citations/Attribution Score Guidelines: 100%: Every piece of information is correctly and appropriately cited to the supporting document(s), and ALL page numbers have been verified to contain the cited information. 85-99%: Citations are mostly correct, but there are one or two minor errors (e.g., misplaced citation, minor formatting issue). No incorrect page attributions. 70-84%: Several statements are missing citations, OR there are one or more citations that reference pages that do NOT contain the cited information. Below 70%: The majority of the answer lacks proper citation, or multiple citations reference incorrect pages, making them unreliable and misleading. Length (0-100%) Is the answer the right length to fully answer the question, without being too short (lacking detail) or too long (causing distraction or including irrelevant information)? Provide a rating based on whether the answer strikes the right balance in the [Length_Comment]. Length Score Guidelines: 100%: The answer is appropriately detailed, offering enough information to fully address the question without unnecessary elaboration. 85-99%: The answer is sufficiently detailed but could be slightly more concise or might include minor irrelevant information. 70-84%: The answer is either too brief and lacks necessary detail or too lengthy with excessive, distracting information. Below 70%: The answer is either too short to be meaningful or too long, causing distractions or loss of focus. Final Evaluation Output Based on the above checks, provide a rating and a comment for each aspect, and a final overall rating. Your entire output must be a single JSON object that strictly follows the structure defined below. CRITICAL INSTRUCTIONS: - Output ONLY valid JSON - no additional text before or after - Use double quotes for all strings - Ensure all rating values are numbers between 0-100 (no quotes around numbers) - Do not include any markdown formatting or code blocks - Start your response immediately with { and end with } Required JSON Output Structure: { "Accuracy_Rating": "95", "Accuracy_Comment": "Detailed comment on factual correctness/issues", "Coherence_Rating": "90", "Coherence_Comment": "Detailed comment on flow, structure, and clarity", "Relevance_Rating": "88", "Relevance_Comment": "Detailed comment on addressing the question fully/irrelevant info", "Completeness_Rating": "92", "Completeness_Comment": "Detailed comment on missing critical details available in the documents", "Citations_Attribution_Rating": "85", "Citations_Attribution_Comment": "Detailed comment on citation accuracy and completeness", "Length_Rating": "90", "Length_Comment": "Detailed comment on conciseness and appropriate detail", "Overall_Rating": "90", "Final_Summary_and_Improvement_Plan": "Overall judgment. If rating is below 90%, describe what specific changes are needed to achieve a 100%. If 90% or above, state that the answer is ready." } REMEMBER: Output ONLY the JSON object above with your specific ratings and comments. No other text.""" def _create_validator_llm(self) -> ChatOpenAI: """Create a separate LLM instance for validation.""" try: openai_key = os.getenv("OPENAI_API_KEY") if not openai_key: raise ValueError("OpenAI API key is required for validation") return ChatOpenAI( model="gpt-4o", api_key=openai_key, # base_url=os.getenv("OPENAI_BASE_URL"), temperature=0.0, max_tokens=1024, request_timeout=60, max_retries=3, streaming=False, ) except Exception as e: logger.error(f"Failed to create validator LLM: {e}") raise def validate_answer( self, question: str, retrieved_documents: List[Dict[str, Any]], generated_answer: str ) -> Dict[str, Any]: """ Validate a medical answer and return structured evaluation. Args: question: The original user question retrieved_documents: List of retrieved documents with metadata generated_answer: The AI-generated answer to validate Returns: Dict containing the complete evaluation with metadata """ try: # Generate simple sequential interaction ID interaction_id = self._get_next_interaction_id() logger.info(f"Starting validation for interaction {interaction_id}") # Clean documents (remove snippets) for storage cleaned_documents = self._clean_documents_for_storage(retrieved_documents) # Format documents for validation formatted_docs = self._format_documents_for_validation(retrieved_documents) # Create validation prompt validation_prompt = f"""Question: {question} Retrieved Answer: {generated_answer} Documents: {formatted_docs}""" # Get validation from LLM with retry logic validation_report = None max_retries = 3 for attempt in range(max_retries): try: messages = [ SystemMessage(content=self.validation_system_prompt), HumanMessage(content=validation_prompt) ] response = self.validator_llm.invoke(messages) validation_content = response.content.strip() # Check if response is empty if not validation_content: logger.warning(f"Empty response from validation LLM (attempt {attempt + 1})") if attempt < max_retries - 1: continue else: validation_report = self._create_fallback_validation("Empty response from validation LLM") break # Try to parse JSON directly first try: validation_report = json.loads(validation_content) except json.JSONDecodeError: # Try to extract JSON from response that might have extra text validation_report = self._extract_json_from_response(validation_content) if validation_report is None: raise json.JSONDecodeError("Could not extract valid JSON", validation_content, 0) # Validate that all required fields are present required_fields = [ "Accuracy_Rating", "Accuracy_Comment", "Coherence_Rating", "Coherence_Comment", "Relevance_Rating", "Relevance_Comment", "Completeness_Rating", "Completeness_Comment", "Citations_Attribution_Rating", "Citations_Attribution_Comment", "Length_Rating", "Length_Comment", "Overall_Rating", "Final_Summary_and_Improvement_Plan" ] missing_fields = [field for field in required_fields if field not in validation_report] if missing_fields: logger.warning(f"Missing fields in validation response: {missing_fields}") if attempt < max_retries - 1: continue else: # Fill missing fields for field in missing_fields: if field.endswith("_Rating"): validation_report[field] = "0" else: validation_report[field] = f"Field missing from validation response: {field}" # Success - break out of retry loop break except json.JSONDecodeError as e: logger.error(f"Failed to parse validation JSON (attempt {attempt + 1}): {e}") logger.error(f"Raw response: {validation_content[:200]}...") if attempt < max_retries - 1: continue else: validation_report = self._create_fallback_validation(f"JSON parsing failed after {max_retries} attempts: {str(e)}") except Exception as e: logger.error(f"Validation LLM error (attempt {attempt + 1}): {e}") if attempt < max_retries - 1: continue else: # Use basic validation as final fallback logger.info("Using basic heuristic validation as fallback") validation_report = self._create_basic_validation(question, generated_answer, retrieved_documents) # Ensure we have a validation report if validation_report is None: logger.info("Creating basic validation as final fallback") validation_report = self._create_basic_validation(question, generated_answer, retrieved_documents) # Create complete evaluation structure evaluation = { "interaction_id": interaction_id, "timestamp": datetime.now(pytz.timezone('Africa/Cairo')).isoformat(), "question": question, "retrieved_documents": cleaned_documents, "generated_answer": generated_answer, "validation_report": validation_report } # Save to JSON file self._save_evaluation(evaluation) return evaluation except Exception as e: logger.error(f"Error during validation: {e}") return self._create_error_evaluation(question, retrieved_documents, generated_answer, str(e)) def _format_documents_for_validation(self, documents: List[Dict[str, Any]]) -> str: """Format retrieved documents for validation prompt.""" if not documents: return "No documents provided." formatted_docs = [] for i, doc in enumerate(documents, 1): doc_info = f"Document {i}:\n" doc_info += f"Source: {doc.get('source', 'Unknown')}\n" doc_info += f"Provider: {doc.get('provider', 'Unknown')}\n" doc_info += f"Page: {doc.get('page_number', 'Unknown')}\n" doc_info += f"Content: {doc.get('snippet', doc.get('content', 'No content'))}\n" formatted_docs.append(doc_info) return "\n\n".join(formatted_docs) def _create_fallback_validation(self, error_msg: str) -> Dict[str, str]: """Create a fallback validation report when JSON parsing fails.""" return { "Accuracy_Rating": "0", "Accuracy_Comment": f"Validation failed due to parsing error: {error_msg}", "Coherence_Rating": "0", "Coherence_Comment": "Unable to evaluate due to validation system error", "Relevance_Rating": "0", "Relevance_Comment": "Unable to evaluate due to validation system error", "Completeness_Rating": "0", "Completeness_Comment": "Unable to evaluate due to validation system error", "Citations_Attribution_Rating": "0", "Citations_Attribution_Comment": "Unable to evaluate due to validation system error", "Length_Rating": "0", "Length_Comment": "Unable to evaluate due to validation system error", "Overall_Rating": "0", "Final_Summary_and_Improvement_Plan": f"Validation system encountered an error: {error_msg}" } def _extract_json_from_response(self, response_text: str) -> Dict[str, str]: """Extract JSON from response that might contain extra text.""" try: # Try to find JSON in the response start_idx = response_text.find('{') end_idx = response_text.rfind('}') if start_idx != -1 and end_idx != -1 and end_idx > start_idx: json_text = response_text[start_idx:end_idx + 1] return json.loads(json_text) else: raise ValueError("No JSON object found in response") except Exception as e: logger.error(f"Failed to extract JSON from response: {e}") return None def _create_basic_validation(self, question: str, answer: str, documents: List[Dict[str, Any]]) -> Dict[str, str]: """Create a basic validation when LLM fails but we can still provide some assessment.""" # Basic heuristic scoring accuracy_score = "75" # Assume reasonable accuracy if documents are provided coherence_score = "80" if len(answer) > 100 and "." in answer else "60" relevance_score = "70" if any(word in answer.lower() for word in question.lower().split()) else "50" completeness_score = "70" if len(answer) > 200 else "50" citations_score = "80" if "Source:" in answer else "30" length_score = "75" if 100 < len(answer) < 2000 else "60" # Calculate overall as average scores = [int(accuracy_score), int(coherence_score), int(relevance_score), int(completeness_score), int(citations_score), int(length_score)] overall_score = str(sum(scores) // len(scores)) return { "Accuracy_Rating": accuracy_score, "Accuracy_Comment": "Basic heuristic assessment - LLM validation unavailable. Answer appears to reference provided documents.", "Coherence_Rating": coherence_score, "Coherence_Comment": "Basic heuristic assessment - Answer structure and length suggest reasonable coherence.", "Relevance_Rating": relevance_score, "Relevance_Comment": "Basic heuristic assessment - Answer appears to address key terms from the question.", "Completeness_Rating": completeness_score, "Completeness_Comment": "Basic heuristic assessment - Answer length suggests reasonable completeness.", "Citations_Attribution_Rating": citations_score, "Citations_Attribution_Comment": "Basic heuristic assessment - Citations detected in answer format." if "Source:" in answer else "Basic heuristic assessment - Limited citation formatting detected.", "Length_Rating": length_score, "Length_Comment": "Basic heuristic assessment - Answer length appears appropriate for medical question.", "Overall_Rating": overall_score, "Final_Summary_and_Improvement_Plan": f"Basic validation completed (Overall: {overall_score}/100). LLM-based validation was unavailable, so heuristic scoring was used. For full validation, ensure the validation LLM service is accessible." } def _create_error_evaluation( self, question: str, documents: List[Dict[str, Any]], answer: str, error_msg: str ) -> Dict[str, Any]: """Create an error evaluation when validation completely fails.""" return { "interaction_id": str(uuid.uuid4()), "timestamp": datetime.now(pytz.timezone('Africa/Cairo')).isoformat(), "question": question, "retrieved_documents": documents, "generated_answer": answer, "validation_report": { "Accuracy_Rating": "0", "Accuracy_Comment": f"Validation error: {error_msg}", "Coherence_Rating": "0", "Coherence_Comment": f"Validation error: {error_msg}", "Relevance_Rating": "0", "Relevance_Comment": f"Validation error: {error_msg}", "Completeness_Rating": "0", "Completeness_Comment": f"Validation error: {error_msg}", "Citations_Attribution_Rating": "0", "Citations_Attribution_Comment": f"Validation error: {error_msg}", "Length_Rating": "0", "Length_Comment": f"Validation error: {error_msg}", "Overall_Rating": "0", "Final_Summary_and_Improvement_Plan": f"System error prevented validation: {error_msg}" }, "error": error_msg } def _save_evaluation(self, evaluation: Dict[str, Any]) -> None: """Save evaluation to GitHub repository.""" try: logger.info(f"Attempting to save evaluation with ID: {evaluation['interaction_id']}") # Try to save to GitHub first github_storage = get_github_storage() logger.info("GitHub storage instance obtained, calling save_validation_results...") success = github_storage.save_validation_results(evaluation) if success: logger.info(f"✓ Evaluation saved to GitHub successfully with ID: {evaluation['interaction_id']}") else: logger.warning(f"GitHub save failed for evaluation {evaluation['interaction_id']}, falling back to local storage") # Fallback to local storage if GitHub fails evaluations = [] if os.path.exists(self.evaluation_file): try: with open(self.evaluation_file, 'r', encoding='utf-8') as f: evaluations = json.load(f) logger.info(f"Loaded {len(evaluations)} existing evaluations from local file") except (json.JSONDecodeError, FileNotFoundError) as e: logger.warning(f"Could not load local file: {e}") evaluations = [] # Add new evaluation evaluations.append(evaluation) # Save back to local file with open(self.evaluation_file, 'w', encoding='utf-8') as f: json.dump(evaluations, f, indent=2, ensure_ascii=False) logger.info(f"✓ Evaluation saved locally (GitHub failed) with ID: {evaluation['interaction_id']}") except Exception as e: logger.error(f"Failed to save evaluation: {e}") logger.error(f"Traceback: {traceback.format_exc()}") def get_evaluation_summary(self, limit: int = 10) -> Dict[str, Any]: """Get summary of recent evaluations from GitHub repository.""" try: # Try to get data from GitHub first github_storage = get_github_storage() github_results = github_storage.get_validation_results(limit) if github_results and "error" not in github_results: return github_results # Fallback to local file if GitHub fails if not os.path.exists(self.evaluation_file): return {"message": "No evaluations found", "evaluations": []} with open(self.evaluation_file, 'r', encoding='utf-8') as f: evaluations = json.load(f) # Get recent evaluations recent_evaluations = evaluations[-limit:] if evaluations else [] # Calculate average scores if recent_evaluations: total_scores = { "accuracy": 0, "coherence": 0, "relevance": 0, "completeness": 0, "citations": 0, "length": 0, "overall": 0 } count = len(recent_evaluations) for eval_data in recent_evaluations: report = eval_data.get("validation_report", {}) total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) total_scores["length"] += int(report.get("Length_Rating", 0)) total_scores["overall"] += int(report.get("Overall_Rating", 0)) averages = {key: round(value / count, 1) for key, value in total_scores.items()} else: averages = {} return { "total_evaluations": len(evaluations), "recent_count": len(recent_evaluations), "average_scores": averages, "evaluations": recent_evaluations } except Exception as e: logger.error(f"Failed to get evaluation summary: {e}") return {"error": str(e), "evaluations": []} # Global validator instance _validator = None def get_validator() -> MedicalAnswerValidator: """Get the global validator instance with lazy loading.""" global _validator if _validator is None: _validator = MedicalAnswerValidator() return _validator def validate_medical_answer( question: str, retrieved_documents: List[Dict[str, Any]], generated_answer: str ) -> Dict[str, Any]: """ Convenience function to validate a medical answer. Args: question: The original user question retrieved_documents: List of retrieved documents with metadata generated_answer: The AI-generated answer to validate Returns: Dict containing the complete evaluation with metadata """ validator = get_validator() return validator.validate_answer(question, retrieved_documents, generated_answer)