SmartHeal-Agentic-AI / src /database_manager.py
SmartHeal's picture
Upload 33 files
185c377 verified
raw
history blame
7.31 kB
import mysql.connector
from mysql.connector import Error
import json
from datetime import datetime
import logging
from src.config import Config
class DatabaseManager:
def __init__(self, config_dict=None):
self.config = config_dict or Config().get_mysql_config()
def get_connection(self):
try:
connection = mysql.connector.connect(**self.config)
return connection
except Error as e:
logging.error(f"Error connecting to MySQL: {e}")
return None
def execute_query(self, query, params=None, fetch=False):
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
if fetch:
return cursor.fetchall()
else:
connection.commit()
return cursor.rowcount
except Error as e:
logging.error(f"Error executing query: {e}")
connection.rollback()
return None
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
def execute_query_one(self, query, params=None):
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
return cursor.fetchone()
except Error as e:
logging.error(f"Error executing query: {e}")
return None
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
def save_questionnaire(self, user_id, questionnaire_data):
"""Save questionnaire data to DB and return the inserted ID"""
try:
connection = self.get_connection()
if not connection:
return None
cursor = connection.cursor()
query = """
INSERT INTO questionnaires (
user_id, patient_name, patient_age, patient_gender,
wound_location, wound_duration, pain_level,
previous_treatment, medical_history, medications,
allergies, additional_notes, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
user_id,
questionnaire_data.get("patient_name"),
questionnaire_data.get("patient_age"),
questionnaire_data.get("patient_gender"),
questionnaire_data.get("wound_location"),
questionnaire_data.get("wound_duration"),
questionnaire_data.get("pain_level"),
questionnaire_data.get("previous_treatment"),
questionnaire_data.get("medical_history"),
questionnaire_data.get("medications"),
questionnaire_data.get("allergies"),
questionnaire_data.get("additional_notes"),
datetime.now()
)
cursor.execute(query, params)
connection.commit()
return cursor.lastrowid
except Error as e:
logging.error(f"Error saving questionnaire: {e}")
connection.rollback()
return None
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
def save_analysis_session(self, user_id, questionnaire_data, image_url, ai_analysis):
try:
connection = self.get_connection()
if not connection:
return None
cursor = connection.cursor()
# Save questionnaire
questionnaire_id = self.save_questionnaire(user_id, questionnaire_data)
# Save image
image_id = None
if image_url:
image_query = """
INSERT INTO wound_images (questionnaire_id, image_url, created_at)
VALUES (%s, %s, %s)
"""
cursor.execute(image_query, (questionnaire_id, image_url, datetime.now()))
image_id = cursor.lastrowid
# Save AI analysis
analysis_query = """
INSERT INTO ai_analyses (questionnaire_id, image_id, analysis_data,
summary, recommendations, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(analysis_query, (
questionnaire_id,
image_id,
json.dumps(ai_analysis),
ai_analysis.get("summary", ""),
ai_analysis.get("recommendations", ""),
datetime.now()
))
analysis_id = cursor.lastrowid
# Save session
session_query = """
INSERT INTO analysis_sessions (user_id, questionnaire_id, image_id,
analysis_id, created_at)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(session_query, (
user_id, questionnaire_id, image_id, analysis_id, datetime.now()
))
connection.commit()
return cursor.lastrowid
except Error as e:
logging.error(f"Error saving analysis session: {e}")
connection.rollback()
return None
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
def get_user_analysis_history(self, user_id, limit=10):
query = """
SELECT
s.id as session_id,
s.created_at,
q.patient_name,
q.wound_location,
a.summary as ai_summary,
a.recommendations,
wi.image_url
FROM analysis_sessions s
JOIN questionnaires q ON s.questionnaire_id = q.id
LEFT JOIN ai_analyses a ON s.analysis_id = a.id
LEFT JOIN wound_images wi ON s.image_id = wi.id
WHERE s.user_id = %s
ORDER BY s.created_at DESC
LIMIT %s
"""
return self.execute_query(query, (user_id, limit), fetch=True)
def get_organization_stats(self, organization_id):
query = """
SELECT
COUNT(DISTINCT u.id) as total_practitioners,
COUNT(DISTINCT s.id) as total_analyses,
COUNT(DISTINCT q.patient_name) as unique_patients
FROM users u
LEFT JOIN analysis_sessions s ON u.id = s.user_id
LEFT JOIN questionnaires q ON s.questionnaire_id = q.id
WHERE u.org = %s AND u.role = 'practitioner'
"""
return self.execute_query_one(query, (organization_id,))