Spaces:
Sleeping
Sleeping
| """ | |
| GitHub Storage Utility for Medical RAG Advisor | |
| Handles saving side effects reports and validation results to GitHub repository | |
| """ | |
| import os | |
| import json | |
| import csv | |
| import io | |
| import base64 | |
| import time | |
| import traceback | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional | |
| import requests | |
| from .config import logger | |
| class GitHubStorage: | |
| """ | |
| Utility class for storing medical data files in GitHub repository | |
| """ | |
| def __init__(self, repo_url: str = "https://github.com/MoazEldsouky/HBV-AI-Assistant-data", | |
| github_token: str = None): | |
| """ | |
| Initialize GitHub storage with repository details | |
| Args: | |
| repo_url: GitHub repository URL (default: HBV AI Assistant data repository) | |
| github_token: GitHub personal access token | |
| """ | |
| self.repo_url = repo_url | |
| self.github_token = github_token or os.getenv("GITHUB_TOKEN", "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") | |
| # Log token status (masked for security) | |
| if self.github_token: | |
| token_preview = self.github_token[:7] + "..." + self.github_token[-4:] if len(self.github_token) > 11 else "***" | |
| logger.info(f"GitHub token configured: {token_preview}") | |
| else: | |
| logger.warning("No GitHub token configured - uploads will fail!") | |
| # Extract owner and repo name from URL | |
| if "github.com/" in repo_url: | |
| parts = repo_url.replace("https://github.com/", "").replace(".git", "").split("/") | |
| self.owner = parts[0] | |
| self.repo_name = parts[1] | |
| else: | |
| raise ValueError("Invalid GitHub repository URL format") | |
| self.api_base = f"https://api.github.com/repos/{self.owner}/{self.repo_name}" | |
| self.headers = { | |
| "Authorization": f"token {self.github_token}", | |
| "Accept": "application/vnd.github.v3+json", | |
| "Content-Type": "application/json" | |
| } | |
| logger.info(f"GitHub storage initialized for {self.owner}/{self.repo_name}") | |
| def _get_file_sha(self, file_path: str) -> Optional[str]: | |
| """ | |
| Get the SHA of an existing file in the repository | |
| Args: | |
| file_path: Path to file in repository | |
| Returns: | |
| SHA string if file exists, None otherwise | |
| """ | |
| try: | |
| url = f"{self.api_base}/contents/{file_path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json().get("sha") | |
| elif response.status_code == 404: | |
| return None | |
| else: | |
| logger.error(f"Error getting file SHA: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception getting file SHA: {e}") | |
| return None | |
| def _upload_file(self, file_path: str, content: str, message: str, sha: Optional[str] = None) -> bool: | |
| """ | |
| Upload or update a file in the GitHub repository | |
| Args: | |
| file_path: Path where file should be stored in repo | |
| content: File content as string | |
| message: Commit message | |
| sha: SHA of existing file (for updates) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Encode content to base64 | |
| content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') | |
| # Prepare request data | |
| data = { | |
| "message": message, | |
| "content": content_encoded | |
| } | |
| # Add SHA if updating existing file | |
| if sha: | |
| data["sha"] = sha | |
| # Make API request with timeout | |
| url = f"{self.api_base}/contents/{file_path}" | |
| logger.info(f"Uploading to GitHub: {file_path} (size: {len(content)} bytes)") | |
| response = requests.put(url, headers=self.headers, json=data, timeout=30) | |
| if response.status_code in [200, 201]: | |
| logger.info(f"✓ Successfully uploaded {file_path} to GitHub") | |
| return True | |
| elif response.status_code == 401: | |
| logger.error(f"❌ Authentication failed uploading {file_path}: Invalid or expired GitHub token") | |
| logger.error(f"Response: {response.text}") | |
| return False | |
| elif response.status_code == 403: | |
| logger.error(f"❌ Permission denied uploading {file_path}: Token lacks required permissions") | |
| logger.error(f"Response: {response.text}") | |
| return False | |
| elif response.status_code == 404: | |
| logger.error(f"❌ Repository not found: {self.owner}/{self.repo_name}") | |
| logger.error(f"Response: {response.text}") | |
| return False | |
| elif response.status_code == 409: | |
| logger.error(f"Conflict error uploading {file_path}: File may have been modified. Status: {response.status_code}") | |
| logger.error(f"Response: {response.text[:500]}") | |
| return False | |
| else: | |
| logger.error(f"Failed to upload {file_path}. Status: {response.status_code}") | |
| logger.error(f"Response: {response.text}") | |
| return False | |
| except requests.exceptions.Timeout as e: | |
| logger.error(f"Timeout uploading file to GitHub: {e}") | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Request exception uploading file to GitHub: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Unexpected exception uploading file to GitHub: {e}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return False | |
| def _get_file_content(self, file_path: str) -> Optional[str]: | |
| """ | |
| Get the content of a file from the GitHub repository | |
| Args: | |
| file_path: Path to file in repository | |
| Returns: | |
| File content as string if successful, None otherwise | |
| """ | |
| try: | |
| url = f"{self.api_base}/contents/{file_path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| content_encoded = response.json().get("content", "") | |
| content = base64.b64decode(content_encoded).decode('utf-8') | |
| return content | |
| elif response.status_code == 404: | |
| return None | |
| else: | |
| logger.error(f"Error getting file content: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception getting file content: {e}") | |
| return None | |
| def save_side_effects_report(self, report_data: Dict[str, Any]) -> bool: | |
| """ | |
| Save a side effects report to GitHub repository as CSV | |
| Args: | |
| report_data: Dictionary containing side effects report data | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = "medical_data/side_effects_reports.csv" | |
| # Get existing file content | |
| existing_content = self._get_file_content(file_path) | |
| # Define CSV fieldnames | |
| fieldnames = [ | |
| 'timestamp', 'drug_name', 'side_effects', 'patient_age', | |
| 'patient_gender', 'dosage', 'duration', 'severity', | |
| 'outcome', 'additional_details', 'reporter_info', 'raw_input' | |
| ] | |
| # Create CSV content | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=fieldnames) | |
| # If file doesn't exist, write header | |
| if existing_content is None: | |
| writer.writeheader() | |
| csv_content = output.getvalue() | |
| else: | |
| # File exists, append to existing content | |
| csv_content = existing_content | |
| # Append new row | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=fieldnames) | |
| writer.writerow(report_data) | |
| new_row = output.getvalue() | |
| # Combine existing content with new row | |
| final_content = csv_content + new_row | |
| # Get SHA for update | |
| sha = self._get_file_sha(file_path) | |
| # Upload file | |
| commit_message = f"Add side effects report for {report_data.get('drug_name', 'unknown drug')} - {report_data.get('timestamp', 'unknown time')}" | |
| return self._upload_file(file_path, final_content, commit_message, sha) | |
| except Exception as e: | |
| logger.error(f"Error saving side effects report to GitHub: {e}") | |
| return False | |
| def save_validation_results(self, evaluation_data: Dict[str, Any]) -> bool: | |
| """ | |
| Save validation results to GitHub repository as JSON with robust append logic. | |
| Always loads existing data first, then appends new evaluation without overwriting. | |
| Args: | |
| evaluation_data: Dictionary containing evaluation data with interaction_id already set | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| file_path = "medical_data/evaluation_results.json" | |
| # STEP 1: Get existing file content with verification | |
| logger.info(f"Attempt {retry_count + 1}/{max_retries}: Loading existing evaluations from GitHub...") | |
| existing_content = self._get_file_content(file_path) | |
| # STEP 2: Parse existing data or create new list | |
| evaluations = [] | |
| if existing_content: | |
| try: | |
| evaluations = json.loads(existing_content) | |
| if not isinstance(evaluations, list): | |
| logger.warning("Existing content is not a list, creating new list") | |
| evaluations = [] | |
| else: | |
| logger.info(f"Successfully loaded {len(evaluations)} existing evaluations") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse existing evaluation_results.json: {e}") | |
| # Don't start fresh - this could lose data. Instead, fail and retry. | |
| if retry_count < max_retries - 1: | |
| retry_count += 1 | |
| logger.warning(f"Retrying due to JSON parse error...") | |
| time.sleep(2) # Wait before retry | |
| continue | |
| else: | |
| logger.error("Max retries reached. Cannot parse existing data.") | |
| return False | |
| else: | |
| logger.info("No existing file found, creating new evaluation list") | |
| # STEP 3: Verify we're not about to lose data | |
| new_interaction_id = evaluation_data.get('interaction_id', 'unknown') | |
| logger.info(f"Adding new evaluation with ID: {new_interaction_id}") | |
| # Check if this ID already exists (prevent duplicates) | |
| existing_ids = [e.get('interaction_id') for e in evaluations] | |
| if new_interaction_id in existing_ids: | |
| logger.warning(f"Evaluation with ID {new_interaction_id} already exists. Skipping duplicate.") | |
| return True # Not an error, just already saved | |
| # STEP 4: Add new evaluation to the list (APPEND, not replace) | |
| evaluations.append(evaluation_data) | |
| logger.info(f"Appended new evaluation. Total count: {len(evaluations)}") | |
| # STEP 5: Convert to JSON string | |
| json_content = json.dumps(evaluations, indent=2, ensure_ascii=False) | |
| # STEP 6: Get SHA for update (must be fresh to avoid conflicts) | |
| sha = self._get_file_sha(file_path) | |
| if existing_content and not sha: | |
| logger.error("File exists but SHA not found. Possible race condition.") | |
| if retry_count < max_retries - 1: | |
| retry_count += 1 | |
| logger.warning("Retrying due to SHA retrieval failure...") | |
| time.sleep(2) # Wait before retry | |
| continue | |
| else: | |
| return False | |
| # STEP 7: Upload file with the complete list | |
| commit_message = f"Add validation results for interaction {new_interaction_id} - {evaluation_data.get('timestamp', 'unknown time')}" | |
| success = self._upload_file(file_path, json_content, commit_message, sha) | |
| if success: | |
| logger.info(f"✓ Successfully saved evaluation {new_interaction_id}. Total evaluations now: {len(evaluations)}") | |
| return True | |
| else: | |
| logger.error(f"Failed to upload file (attempt {retry_count + 1}/{max_retries})") | |
| if retry_count < max_retries - 1: | |
| retry_count += 1 | |
| logger.warning("Retrying upload...") | |
| time.sleep(2) # Wait before retry | |
| continue | |
| else: | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error saving validation results to GitHub (attempt {retry_count + 1}/{max_retries}): {e}") | |
| if retry_count < max_retries - 1: | |
| retry_count += 1 | |
| logger.warning("Retrying due to exception...") | |
| time.sleep(2) # Wait before retry | |
| continue | |
| else: | |
| return False | |
| return False | |
| def get_side_effects_reports(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get all side effects reports from GitHub repository | |
| Returns: | |
| List of side effects reports as dictionaries | |
| """ | |
| try: | |
| file_path = "medical_data/side_effects_reports.csv" | |
| content = self._get_file_content(file_path) | |
| if not content: | |
| return [] | |
| # Parse CSV content | |
| csv_reader = csv.DictReader(io.StringIO(content)) | |
| reports = list(csv_reader) | |
| return reports | |
| except Exception as e: | |
| logger.error(f"Error getting side effects reports from GitHub: {e}") | |
| return [] | |
| def get_validation_results(self, limit: int = 10) -> Dict[str, Any]: | |
| """ | |
| Get validation results from GitHub repository | |
| Args: | |
| limit: Maximum number of recent evaluations to return | |
| Returns: | |
| Dictionary containing evaluation summary and recent evaluations | |
| """ | |
| try: | |
| file_path = "medical_data/evaluation_results.json" | |
| content = self._get_file_content(file_path) | |
| if not content: | |
| return {"message": "No evaluations found", "evaluations": []} | |
| # Parse JSON content | |
| evaluations = json.loads(content) | |
| if not isinstance(evaluations, list): | |
| evaluations = [] | |
| # Get recent evaluations | |
| recent_evaluations = evaluations[-limit:] if evaluations else [] | |
| # Calculate average scores | |
| if recent_evaluations: | |
| total_scores = { | |
| "accuracy": 0, | |
| "coherence": 0, | |
| "relevance": 0, | |
| "completeness": 0, | |
| "citations": 0, | |
| "length": 0, | |
| "overall": 0 | |
| } | |
| count = len(recent_evaluations) | |
| for eval_data in recent_evaluations: | |
| report = eval_data.get("validation_report", {}) | |
| total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) | |
| total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) | |
| total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) | |
| total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) | |
| total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) | |
| total_scores["length"] += int(report.get("Length_Rating", 0)) | |
| total_scores["overall"] += int(report.get("Overall_Rating", 0)) | |
| averages = {key: round(value / count, 1) for key, value in total_scores.items()} | |
| else: | |
| averages = {} | |
| return { | |
| "total_evaluations": len(evaluations), | |
| "recent_count": len(recent_evaluations), | |
| "average_scores": averages, | |
| "evaluations": recent_evaluations | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting validation results from GitHub: {e}") | |
| return {"error": str(e), "evaluations": []} | |
| def get_drug_reports(self, drug_name: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get side effects reports for a specific drug from GitHub repository | |
| Args: | |
| drug_name: Name of the drug to filter reports | |
| Returns: | |
| List of reports for the specified drug | |
| """ | |
| try: | |
| all_reports = self.get_side_effects_reports() | |
| # Filter reports for the specific drug (case-insensitive) | |
| drug_reports = [ | |
| report for report in all_reports | |
| if report.get('drug_name', '').lower() == drug_name.lower() | |
| ] | |
| return drug_reports | |
| except Exception as e: | |
| logger.error(f"Error getting drug reports from GitHub: {e}") | |
| return [] | |
| # Global GitHub storage instance | |
| _github_storage = None | |
| def get_github_storage() -> GitHubStorage: | |
| """Get the global GitHub storage instance with lazy loading.""" | |
| global _github_storage | |
| if _github_storage is None: | |
| _github_storage = GitHubStorage() | |
| return _github_storage | |