Spaces:
Sleeping
Sleeping
| import mysql.connector | |
| from mysql.connector import Error | |
| import logging | |
| from datetime import datetime | |
| import json | |
| import uuid | |
| import os | |
| from PIL import Image | |
| class DatabaseManager: | |
| """Database operations manager for SmartHeal application (aligned to existing schema).""" | |
| def __init__(self, mysql_config): | |
| self.mysql_config = mysql_config | |
| self.test_connection() | |
| self._ensure_default_questionnaire() | |
| # ---------------------- Connection helpers ---------------------- | |
| def test_connection(self): | |
| try: | |
| conn = self.get_connection() | |
| if conn: | |
| conn.close() | |
| logging.info("✅ Database connection successful") | |
| else: | |
| logging.error("❌ Database connection failed") | |
| except Exception as e: | |
| logging.error(f"Database connection test failed: {e}") | |
| def get_connection(self): | |
| try: | |
| return mysql.connector.connect(**self.mysql_config) | |
| except Error as e: | |
| logging.error(f"Error connecting to MySQL: {e}") | |
| return None | |
| def execute_query(self, query, params=None, fetch=False): | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = None | |
| try: | |
| cur = conn.cursor(dictionary=True) | |
| cur.execute(query, params or ()) | |
| if fetch: | |
| return cur.fetchall() | |
| conn.commit() | |
| return cur.rowcount | |
| except Error as e: | |
| logging.error(f"Error executing query: {e} | SQL: {query} | Params: {params}") | |
| if conn: | |
| conn.rollback() | |
| return None | |
| finally: | |
| if cur: cur.close() | |
| if conn and conn.is_connected(): conn.close() | |
| def execute_query_one(self, query, params=None): | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = None | |
| try: | |
| cur = conn.cursor(dictionary=True) | |
| cur.execute(query, params or ()) | |
| return cur.fetchone() | |
| except Error as e: | |
| logging.error(f"Error executing query: {e} | SQL: {query} | Params: {params}") | |
| return None | |
| finally: | |
| if cur: cur.close() | |
| if conn and conn.is_connected(): conn.close() | |
| # ---------------------- One-time ensures ---------------------- | |
| def _ensure_default_questionnaire(self): | |
| """Ensure a 'Default Patient Assessment' row exists in questionnaires.""" | |
| try: | |
| row = self.execute_query_one( | |
| "SELECT id FROM questionnaires WHERE name = %s LIMIT 1", | |
| ("Default Patient Assessment",) | |
| ) | |
| if not row: | |
| self.execute_query( | |
| "INSERT INTO questionnaires (name, description, created_at, updated_at) VALUES (%s, %s, NOW(), NOW())", | |
| ("Default Patient Assessment", "Standard patient wound assessment form") | |
| ) | |
| logging.info("Created default questionnaire 'Default Patient Assessment'") | |
| except Exception as e: | |
| logging.error(f"Error ensuring default questionnaire: {e}") | |
| # ---------------------- Business ops ---------------------- | |
| def save_questionnaire(self, questionnaire_data): | |
| """ | |
| Creates/gets patient in EXISTING `patients` table, then creates a row in `questionnaire_responses`. | |
| Returns the `questionnaire_response_id` (int) or None. | |
| """ | |
| conn = None | |
| cur = None | |
| try: | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = conn.cursor(dictionary=True) | |
| # 1) Create or get patient (EXISTING `patients` table) | |
| patient_id = self._create_or_get_patient(cur, questionnaire_data) | |
| if not patient_id: | |
| raise Exception("Failed to get or create patient") | |
| # 2) Get template questionnaire id | |
| cur.execute("SELECT id FROM questionnaires WHERE name = %s LIMIT 1", | |
| ("Default Patient Assessment",)) | |
| row = cur.fetchone() | |
| questionnaire_id = row["id"] if row else None | |
| if not questionnaire_id: | |
| raise Exception("Default questionnaire not found") | |
| # 3) Build response_data | |
| response_data = { | |
| 'patient_info': { | |
| 'name': questionnaire_data.get('patient_name'), | |
| 'age': questionnaire_data.get('patient_age'), | |
| 'gender': questionnaire_data.get('patient_gender') | |
| }, | |
| 'wound_details': { | |
| 'location': questionnaire_data.get('wound_location'), | |
| 'duration': questionnaire_data.get('wound_duration'), | |
| 'pain_level': questionnaire_data.get('pain_level'), | |
| 'moisture_level': questionnaire_data.get('moisture_level'), | |
| 'infection_signs': questionnaire_data.get('infection_signs'), | |
| 'diabetic_status': questionnaire_data.get('diabetic_status') | |
| }, | |
| 'medical_history': { | |
| 'previous_treatment': questionnaire_data.get('previous_treatment'), | |
| 'medical_history': questionnaire_data.get('medical_history'), | |
| 'medications': questionnaire_data.get('medications'), | |
| 'allergies': questionnaire_data.get('allergies'), | |
| 'additional_notes': questionnaire_data.get('additional_notes') | |
| } | |
| } | |
| practitioner_id = questionnaire_data.get('user_id') | |
| if not practitioner_id: | |
| # Fall back gracefully; your schema expects NOT NULL here. | |
| practitioner_id = 1 | |
| # 4) Insert into `questionnaire_responses` | |
| insert_sql = """ | |
| INSERT INTO questionnaire_responses | |
| (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) | |
| VALUES (%s, %s, %s, %s, %s) | |
| """ | |
| cur.execute(insert_sql, ( | |
| questionnaire_id, | |
| patient_id, # <-- This is patients.id (BIGINT) and WILL be filled now. | |
| practitioner_id, | |
| json.dumps(response_data, ensure_ascii=False), | |
| datetime.now() | |
| )) | |
| response_id = cur.lastrowid | |
| conn.commit() | |
| logging.info(f"✅ Saved questionnaire response ID {response_id} (patient_id={patient_id})") | |
| return response_id | |
| except Exception as e: | |
| logging.error(f"❌ Error saving questionnaire: {e}") | |
| if conn: conn.rollback() | |
| return None | |
| finally: | |
| if cur: cur.close() | |
| if conn: conn.close() | |
| def _create_or_get_patient(self, cur, questionnaire_data): | |
| """ | |
| Works against the EXISTING `patients` table: | |
| columns include id (PK), uuid, name, age (int), gender (varchar), illness, allergy, notes, etc. | |
| Returns patients.id (int). | |
| """ | |
| try: | |
| name = questionnaire_data.get('patient_name') | |
| age = questionnaire_data.get('patient_age') | |
| gender = questionnaire_data.get('patient_gender') | |
| # Try to find an existing patient via (name, age, gender) | |
| cur.execute(""" | |
| SELECT id FROM patients | |
| WHERE name = %s AND (age = %s OR %s IS NULL) AND (gender = %s OR %s IS NULL) | |
| LIMIT 1 | |
| """, (name, age, age, gender, gender)) | |
| row = cur.fetchone() | |
| if row: | |
| return row["id"] | |
| # Create new patient | |
| cur.execute(""" | |
| INSERT INTO patients (uuid, name, age, gender, illness, allergy, notes, created_at, updated_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) | |
| """, ( | |
| str(uuid.uuid4()), | |
| name, | |
| int(age) if (isinstance(age, (int, float, str)) and str(age).isdigit()) else None, | |
| gender, | |
| questionnaire_data.get('medical_history', ''), | |
| questionnaire_data.get('allergies', ''), | |
| questionnaire_data.get('additional_notes', '') | |
| )) | |
| return cur.lastrowid | |
| except Exception as e: | |
| logging.error(f"Error creating/getting patient: {e}") | |
| return None | |
| def _get_patient_uuid(self, patient_id, cur=None): | |
| """Fetch patients.uuid for a given patients.id (helps when tables store patient_id as VARCHAR uuid).""" | |
| owns_cursor = False | |
| conn = None | |
| try: | |
| if cur is None: | |
| conn = self.get_connection() | |
| if not conn: | |
| return None | |
| cur = conn.cursor(dictionary=True) | |
| owns_cursor = True | |
| cur.execute("SELECT uuid FROM patients WHERE id = %s LIMIT 1", (patient_id,)) | |
| row = cur.fetchone() | |
| return row["uuid"] if row else None | |
| except Exception as e: | |
| logging.error(f"Error fetching patient uuid: {e}") | |
| return None | |
| finally: | |
| if owns_cursor and cur: cur.close() | |
| if owns_cursor and conn: conn.close() | |
| def save_wound_image(self, patient_id, image): | |
| """ | |
| Save wound image to filesystem and EXISTING `wound_images` table. | |
| Your `wound_images.patient_id` is VARCHAR, so we store patients.uuid there. | |
| Returns dict {id, filename, path} or None. | |
| """ | |
| try: | |
| # 1) Persist to disk | |
| image_uid = str(uuid.uuid4()) | |
| filename = f"wound_{image_uid}.jpg" | |
| os.makedirs("uploads", exist_ok=True) | |
| file_path = os.path.join("uploads", filename) | |
| if hasattr(image, 'save'): | |
| image.save(file_path, format='JPEG', quality=95) | |
| width, height = image.size | |
| file_size = os.path.getsize(file_path) | |
| original_filename = getattr(image, "filename", filename) | |
| elif isinstance(image, str) and os.path.exists(image): | |
| with Image.open(image) as pil: | |
| pil = pil.convert('RGB') | |
| pil.save(file_path, format='JPEG', quality=95) | |
| width, height = pil.size | |
| file_size = os.path.getsize(file_path) | |
| original_filename = os.path.basename(image) | |
| else: | |
| logging.error("Invalid image object/path") | |
| return None | |
| # 2) Resolve patients.uuid (VARCHAR target) | |
| conn = self.get_connection() | |
| if not conn: return None | |
| cur = conn.cursor() | |
| try: | |
| patient_uuid = self._get_patient_uuid(patient_id) | |
| if not patient_uuid: | |
| raise Exception("Patient UUID not found for given patient_id") | |
| # Insert into existing wound_images schema (uuid, patient_id (varchar), image, width, height, etc.) | |
| cur.execute(""" | |
| INSERT INTO wound_images ( | |
| uuid, patient_id, image, width, height, area, notes, created_at, updated_at | |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) | |
| """, ( | |
| str(uuid.uuid4()), | |
| patient_uuid, # VARCHAR column → store patient's UUID | |
| file_path, | |
| str(width), | |
| str(height), | |
| None, | |
| None | |
| )) | |
| conn.commit() | |
| image_db_id = cur.lastrowid | |
| logging.info(f"Image saved with ID: {image_db_id}") | |
| return {'id': image_db_id, 'filename': filename, 'path': file_path} | |
| finally: | |
| cur.close() | |
| conn.close() | |
| except Exception as e: | |
| logging.error(f"Image save error: {e}") | |
| return None | |
| def save_analysis(self, questionnaire_response_id, image_id, analysis_data): | |
| """ | |
| Save AI analysis results. | |
| Your live `ai_analyses` table expects `questionnaire_id` (the TEMPLATE), not the response id. | |
| So we first look up the template id from questionnaire_responses and store that. | |
| """ | |
| try: | |
| # Resolve template questionnaire_id from the response | |
| row = self.execute_query_one( | |
| "SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1", | |
| (questionnaire_response_id,) | |
| ) | |
| if not row: | |
| logging.error("No questionnaire_response found for analysis save") | |
| return None | |
| questionnaire_id = row["questionnaire_id"] | |
| query = """ | |
| INSERT INTO ai_analyses ( | |
| questionnaire_id, image_id, analysis_data, summary, | |
| recommendations, risk_score, risk_level, | |
| processing_time, model_version, created_at | |
| ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
| """ | |
| params = ( | |
| questionnaire_id, | |
| image_id, | |
| json.dumps(analysis_data) if analysis_data else None, | |
| (analysis_data or {}).get('summary', ''), | |
| (analysis_data or {}).get('recommendations', ''), | |
| (analysis_data or {}).get('risk_score', 0), | |
| (analysis_data or {}).get('risk_level', 'Unknown'), | |
| (analysis_data or {}).get('processing_time', 0.0), | |
| (analysis_data or {}).get('model_version', 'v1.0'), | |
| datetime.now() | |
| ) | |
| return self.execute_query(query, params, fetch=False) | |
| except Exception as e: | |
| logging.error(f"Analysis save error: {e}") | |
| return None | |
| def get_user_history(self, user_id): | |
| """ | |
| Latest 20 submissions for a practitioner, with optional analysis summary if present. | |
| Aligns to existing schema: joins questionnaire_responses -> questionnaires and LEFT joins ai_analyses (by template). | |
| """ | |
| try: | |
| query = """ | |
| SELECT | |
| r.id AS response_id, | |
| r.submitted_at, | |
| p.name AS patient_name, | |
| p.age AS patient_age, | |
| p.gender AS patient_gender, | |
| q.name AS questionnaire_name, | |
| a.risk_level, | |
| a.summary, | |
| a.recommendations | |
| FROM questionnaire_responses r | |
| JOIN patients p ON p.id = r.patient_id | |
| JOIN questionnaires q ON q.id = r.questionnaire_id | |
| LEFT JOIN ai_analyses a ON a.questionnaire_id = r.questionnaire_id | |
| WHERE r.practitioner_id = %s | |
| ORDER BY r.submitted_at DESC | |
| LIMIT 20 | |
| """ | |
| result = self.execute_query(query, (user_id,), fetch=True) | |
| return result or [] | |
| except Exception as e: | |
| logging.error(f"Error fetching user history: {e}") | |
| return [] | |
| # Convenience for existing callers (kept signature) | |
| def create_organization(self, org_data): | |
| try: | |
| query = """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at, updated_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())""" | |
| params = ( | |
| org_data.get('org_name', ''), | |
| org_data.get('email', ''), | |
| org_data.get('phone', ''), | |
| org_data.get('country_code', ''), | |
| org_data.get('department', ''), | |
| org_data.get('location', '') | |
| ) | |
| rc = self.execute_query(query, params) | |
| if rc: | |
| row = self.execute_query_one( | |
| "SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1", | |
| (org_data.get('org_name', ''),) | |
| ) | |
| return row['id'] if row else None | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error creating organization: {e}") | |
| return None | |
| def get_organizations(self): | |
| try: | |
| query = "SELECT id, name as org_name, location FROM organizations ORDER BY name" | |
| result = self.execute_query(query, fetch=True) | |
| return result or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] | |
| except Exception as e: | |
| logging.error(f"Error getting organizations: {e}") | |
| return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] | |
| # Back-compat helper kept (writes same as save_analysis, minimal data) | |
| def save_analysis_result(self, questionnaire_response_id, analysis_result): | |
| try: | |
| # store under template id like save_analysis() | |
| row = self.execute_query_one( | |
| "SELECT questionnaire_id FROM questionnaire_responses WHERE id = %s LIMIT 1", | |
| (questionnaire_response_id,) | |
| ) | |
| if not row: | |
| logging.error("No questionnaire_response found for analysis_result save") | |
| return None | |
| questionnaire_id = row["questionnaire_id"] | |
| query = """INSERT INTO ai_analyses (questionnaire_id, analysis_data, summary, created_at) | |
| VALUES (%s, %s, %s, %s)""" | |
| params = ( | |
| questionnaire_id, | |
| json.dumps(analysis_result) if analysis_result else None, | |
| (analysis_result or {}).get('summary', 'Analysis completed'), | |
| datetime.now() | |
| ) | |
| return self.execute_query(query, params) | |
| except Exception as e: | |
| logging.error(f"Error saving analysis result: {e}") | |
| return None | |