""" GitHub Storage Utility for Medical RAG Advisor Handles saving side effects reports and validation results to GitHub repository """ import os import json import csv import io import base64 import time import traceback from datetime import datetime from typing import Dict, List, Any, Optional import requests from .config import logger class GitHubStorage: """ Utility class for storing medical data files in GitHub repository """ def __init__(self, repo_url: str = "https://github.com/MoazEldsouky/HBV-AI-Assistant-data", github_token: str = None): """ Initialize GitHub storage with repository details Args: repo_url: GitHub repository URL (default: HBV AI Assistant data repository) github_token: GitHub personal access token """ self.repo_url = repo_url self.github_token = github_token or os.getenv("GITHUB_TOKEN", "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") # Log token status (masked for security) if self.github_token: token_preview = self.github_token[:7] + "..." + self.github_token[-4:] if len(self.github_token) > 11 else "***" logger.info(f"GitHub token configured: {token_preview}") else: logger.warning("No GitHub token configured - uploads will fail!") # Extract owner and repo name from URL if "github.com/" in repo_url: parts = repo_url.replace("https://github.com/", "").replace(".git", "").split("/") self.owner = parts[0] self.repo_name = parts[1] else: raise ValueError("Invalid GitHub repository URL format") self.api_base = f"https://api.github.com/repos/{self.owner}/{self.repo_name}" self.headers = { "Authorization": f"token {self.github_token}", "Accept": "application/vnd.github.v3+json", "Content-Type": "application/json" } logger.info(f"GitHub storage initialized for {self.owner}/{self.repo_name}") def _get_file_sha(self, file_path: str) -> Optional[str]: """ Get the SHA of an existing file in the repository Args: file_path: Path to file in repository Returns: SHA string if file exists, None otherwise """ try: url = f"{self.api_base}/contents/{file_path}" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json().get("sha") elif response.status_code == 404: return None else: logger.error(f"Error getting file SHA: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Exception getting file SHA: {e}") return None def _upload_file(self, file_path: str, content: str, message: str, sha: Optional[str] = None) -> bool: """ Upload or update a file in the GitHub repository Args: file_path: Path where file should be stored in repo content: File content as string message: Commit message sha: SHA of existing file (for updates) Returns: True if successful, False otherwise """ try: # Encode content to base64 content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') # Prepare request data data = { "message": message, "content": content_encoded } # Add SHA if updating existing file if sha: data["sha"] = sha # Make API request with timeout url = f"{self.api_base}/contents/{file_path}" logger.info(f"Uploading to GitHub: {file_path} (size: {len(content)} bytes)") response = requests.put(url, headers=self.headers, json=data, timeout=30) if response.status_code in [200, 201]: logger.info(f"✓ Successfully uploaded {file_path} to GitHub") return True elif response.status_code == 401: logger.error(f"❌ Authentication failed uploading {file_path}: Invalid or expired GitHub token") logger.error(f"Response: {response.text}") return False elif response.status_code == 403: logger.error(f"❌ Permission denied uploading {file_path}: Token lacks required permissions") logger.error(f"Response: {response.text}") return False elif response.status_code == 404: logger.error(f"❌ Repository not found: {self.owner}/{self.repo_name}") logger.error(f"Response: {response.text}") return False elif response.status_code == 409: logger.error(f"Conflict error uploading {file_path}: File may have been modified. Status: {response.status_code}") logger.error(f"Response: {response.text[:500]}") return False else: logger.error(f"Failed to upload {file_path}. Status: {response.status_code}") logger.error(f"Response: {response.text}") return False except requests.exceptions.Timeout as e: logger.error(f"Timeout uploading file to GitHub: {e}") return False except requests.exceptions.RequestException as e: logger.error(f"Request exception uploading file to GitHub: {e}") return False except Exception as e: logger.error(f"Unexpected exception uploading file to GitHub: {e}") logger.error(f"Traceback: {traceback.format_exc()}") return False def _get_file_content(self, file_path: str) -> Optional[str]: """ Get the content of a file from the GitHub repository Args: file_path: Path to file in repository Returns: File content as string if successful, None otherwise """ try: url = f"{self.api_base}/contents/{file_path}" response = requests.get(url, headers=self.headers) if response.status_code == 200: content_encoded = response.json().get("content", "") content = base64.b64decode(content_encoded).decode('utf-8') return content elif response.status_code == 404: return None else: logger.error(f"Error getting file content: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Exception getting file content: {e}") return None def save_side_effects_report(self, report_data: Dict[str, Any]) -> bool: """ Save a side effects report to GitHub repository as CSV Args: report_data: Dictionary containing side effects report data Returns: True if successful, False otherwise """ try: file_path = "medical_data/side_effects_reports.csv" # Get existing file content existing_content = self._get_file_content(file_path) # Define CSV fieldnames fieldnames = [ 'timestamp', 'drug_name', 'side_effects', 'patient_age', 'patient_gender', 'dosage', 'duration', 'severity', 'outcome', 'additional_details', 'reporter_info', 'raw_input' ] # Create CSV content output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) # If file doesn't exist, write header if existing_content is None: writer.writeheader() csv_content = output.getvalue() else: # File exists, append to existing content csv_content = existing_content # Append new row output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writerow(report_data) new_row = output.getvalue() # Combine existing content with new row final_content = csv_content + new_row # Get SHA for update sha = self._get_file_sha(file_path) # Upload file commit_message = f"Add side effects report for {report_data.get('drug_name', 'unknown drug')} - {report_data.get('timestamp', 'unknown time')}" return self._upload_file(file_path, final_content, commit_message, sha) except Exception as e: logger.error(f"Error saving side effects report to GitHub: {e}") return False def save_validation_results(self, evaluation_data: Dict[str, Any]) -> bool: """ Save validation results to GitHub repository as JSON with robust append logic. Always loads existing data first, then appends new evaluation without overwriting. Args: evaluation_data: Dictionary containing evaluation data with interaction_id already set Returns: True if successful, False otherwise """ max_retries = 3 retry_count = 0 while retry_count < max_retries: try: file_path = "medical_data/evaluation_results.json" # STEP 1: Get existing file content with verification logger.info(f"Attempt {retry_count + 1}/{max_retries}: Loading existing evaluations from GitHub...") existing_content = self._get_file_content(file_path) # STEP 2: Parse existing data or create new list evaluations = [] if existing_content: try: evaluations = json.loads(existing_content) if not isinstance(evaluations, list): logger.warning("Existing content is not a list, creating new list") evaluations = [] else: logger.info(f"Successfully loaded {len(evaluations)} existing evaluations") except json.JSONDecodeError as e: logger.error(f"Failed to parse existing evaluation_results.json: {e}") # Don't start fresh - this could lose data. Instead, fail and retry. if retry_count < max_retries - 1: retry_count += 1 logger.warning(f"Retrying due to JSON parse error...") time.sleep(2) # Wait before retry continue else: logger.error("Max retries reached. Cannot parse existing data.") return False else: logger.info("No existing file found, creating new evaluation list") # STEP 3: Verify we're not about to lose data new_interaction_id = evaluation_data.get('interaction_id', 'unknown') logger.info(f"Adding new evaluation with ID: {new_interaction_id}") # Check if this ID already exists (prevent duplicates) existing_ids = [e.get('interaction_id') for e in evaluations] if new_interaction_id in existing_ids: logger.warning(f"Evaluation with ID {new_interaction_id} already exists. Skipping duplicate.") return True # Not an error, just already saved # STEP 4: Add new evaluation to the list (APPEND, not replace) evaluations.append(evaluation_data) logger.info(f"Appended new evaluation. Total count: {len(evaluations)}") # STEP 5: Convert to JSON string json_content = json.dumps(evaluations, indent=2, ensure_ascii=False) # STEP 6: Get SHA for update (must be fresh to avoid conflicts) sha = self._get_file_sha(file_path) if existing_content and not sha: logger.error("File exists but SHA not found. Possible race condition.") if retry_count < max_retries - 1: retry_count += 1 logger.warning("Retrying due to SHA retrieval failure...") time.sleep(2) # Wait before retry continue else: return False # STEP 7: Upload file with the complete list commit_message = f"Add validation results for interaction {new_interaction_id} - {evaluation_data.get('timestamp', 'unknown time')}" success = self._upload_file(file_path, json_content, commit_message, sha) if success: logger.info(f"✓ Successfully saved evaluation {new_interaction_id}. Total evaluations now: {len(evaluations)}") return True else: logger.error(f"Failed to upload file (attempt {retry_count + 1}/{max_retries})") if retry_count < max_retries - 1: retry_count += 1 logger.warning("Retrying upload...") time.sleep(2) # Wait before retry continue else: return False except Exception as e: logger.error(f"Error saving validation results to GitHub (attempt {retry_count + 1}/{max_retries}): {e}") if retry_count < max_retries - 1: retry_count += 1 logger.warning("Retrying due to exception...") time.sleep(2) # Wait before retry continue else: return False return False def get_side_effects_reports(self) -> List[Dict[str, Any]]: """ Get all side effects reports from GitHub repository Returns: List of side effects reports as dictionaries """ try: file_path = "medical_data/side_effects_reports.csv" content = self._get_file_content(file_path) if not content: return [] # Parse CSV content csv_reader = csv.DictReader(io.StringIO(content)) reports = list(csv_reader) return reports except Exception as e: logger.error(f"Error getting side effects reports from GitHub: {e}") return [] def get_validation_results(self, limit: int = 10) -> Dict[str, Any]: """ Get validation results from GitHub repository Args: limit: Maximum number of recent evaluations to return Returns: Dictionary containing evaluation summary and recent evaluations """ try: file_path = "medical_data/evaluation_results.json" content = self._get_file_content(file_path) if not content: return {"message": "No evaluations found", "evaluations": []} # Parse JSON content evaluations = json.loads(content) if not isinstance(evaluations, list): evaluations = [] # Get recent evaluations recent_evaluations = evaluations[-limit:] if evaluations else [] # Calculate average scores if recent_evaluations: total_scores = { "accuracy": 0, "coherence": 0, "relevance": 0, "completeness": 0, "citations": 0, "length": 0, "overall": 0 } count = len(recent_evaluations) for eval_data in recent_evaluations: report = eval_data.get("validation_report", {}) total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) total_scores["length"] += int(report.get("Length_Rating", 0)) total_scores["overall"] += int(report.get("Overall_Rating", 0)) averages = {key: round(value / count, 1) for key, value in total_scores.items()} else: averages = {} return { "total_evaluations": len(evaluations), "recent_count": len(recent_evaluations), "average_scores": averages, "evaluations": recent_evaluations } except Exception as e: logger.error(f"Error getting validation results from GitHub: {e}") return {"error": str(e), "evaluations": []} def get_drug_reports(self, drug_name: str) -> List[Dict[str, Any]]: """ Get side effects reports for a specific drug from GitHub repository Args: drug_name: Name of the drug to filter reports Returns: List of reports for the specified drug """ try: all_reports = self.get_side_effects_reports() # Filter reports for the specific drug (case-insensitive) drug_reports = [ report for report in all_reports if report.get('drug_name', '').lower() == drug_name.lower() ] return drug_reports except Exception as e: logger.error(f"Error getting drug reports from GitHub: {e}") return [] # Global GitHub storage instance _github_storage = None def get_github_storage() -> GitHubStorage: """Get the global GitHub storage instance with lazy loading.""" global _github_storage if _github_storage is None: _github_storage = GitHubStorage() return _github_storage