samu's picture
improved backend
2832da8
raw
history blame
10.6 kB
"""
Database Initialization Module
Handles database creation, schema setup, and health checks
"""
import os
import aiosqlite
import logging
from pathlib import Path
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class DatabaseInitializer:
"""Handles database initialization and health checks"""
def __init__(self, db_path: str = None):
self.db_path = db_path or os.getenv("DATABASE_PATH", "./ai_tutor.db")
self.schema_path = self._find_schema_file()
def _find_schema_file(self) -> str:
"""Return the path to the schema.sql file.
The schema.sql file is expected to be in the same directory as this script.
"""
schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql')
if not os.path.exists(schema_path):
raise FileNotFoundError(f"schema.sql not found at {schema_path}")
return schema_path
async def check_database_exists(self) -> bool:
"""Check if database file exists"""
return os.path.exists(self.db_path)
async def check_database_health(self) -> Dict[str, Any]:
"""Comprehensive database health check"""
health_status = {
"database_exists": False,
"database_accessible": False,
"schema_loaded": False,
"tables_exist": False,
"views_exist": False,
"can_write": False,
"record_count": {},
"errors": []
}
try:
# Check if database file exists
health_status["database_exists"] = await self.check_database_exists()
if not health_status["database_exists"]:
health_status["errors"].append("Database file does not exist")
return health_status
# Try to connect to database
async with aiosqlite.connect(self.db_path) as db:
health_status["database_accessible"] = True
# Check if required tables exist
required_tables = ['metadata_extractions', 'curricula', 'learning_content', 'api_cache']
existing_tables = await self._get_existing_tables(db)
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
health_status["errors"].append(f"Missing tables: {missing_tables}")
else:
health_status["tables_exist"] = True
# Check if views exist
required_views = ['user_learning_journeys', 'curriculum_content_status']
existing_views = await self._get_existing_views(db)
missing_views = [view for view in required_views if view not in existing_views]
if missing_views:
health_status["errors"].append(f"Missing views: {missing_views}")
else:
health_status["views_exist"] = True
# Test write capability
try:
await db.execute("CREATE TEMPORARY TABLE test_write (id INTEGER)")
await db.execute("DROP TABLE test_write")
health_status["can_write"] = True
except Exception as e:
health_status["errors"].append(f"Cannot write to database: {str(e)}")
# Get record counts
if health_status["tables_exist"]:
for table in required_tables:
try:
async with db.execute(f"SELECT COUNT(*) FROM {table}") as cursor:
count = await cursor.fetchone()
health_status["record_count"][table] = count[0] if count else 0
except Exception as e:
health_status["record_count"][table] = f"Error: {str(e)}"
health_status["schema_loaded"] = (
health_status["tables_exist"] and
health_status["views_exist"]
)
except Exception as e:
health_status["errors"].append(f"Database connection error: {str(e)}")
return health_status
async def _get_existing_tables(self, db: aiosqlite.Connection) -> List[str]:
"""Get list of existing tables"""
async with db.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def _get_existing_views(self, db: aiosqlite.Connection) -> List[str]:
"""Get list of existing views"""
async with db.execute("""
SELECT name FROM sqlite_master
WHERE type='view'
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def create_database(self) -> bool:
"""Create database file and initialize with schema"""
try:
logger.info(f"Creating database at: {self.db_path}")
# Ensure directory exists
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
logger.info(f"Created directory: {db_dir}")
# Create database and load schema
async with aiosqlite.connect(self.db_path) as db:
# Read schema file
with open(self.schema_path, 'r') as f:
schema = f.read()
# Execute schema
await db.executescript(schema)
await db.commit()
logger.info("Database created and schema loaded successfully")
return True
except Exception as e:
logger.error(f"Error creating database: {str(e)}")
return False
async def initialize_database(self, force_recreate: bool = False) -> Dict[str, Any]:
"""Initialize database with comprehensive checks and creation"""
result = {
"success": False,
"action_taken": "none",
"health_check": {},
"errors": []
}
try:
# Check current database health
health_check = await self.check_database_health()
result["health_check"] = health_check
# Determine if we need to create/recreate database
needs_creation = (
not health_check["database_exists"] or
not health_check["schema_loaded"] or
force_recreate
)
if needs_creation:
if health_check["database_exists"] and force_recreate:
# Backup existing database
backup_path = f"{self.db_path}.backup"
if os.path.exists(self.db_path):
os.rename(self.db_path, backup_path)
logger.info(f"Backed up existing database to: {backup_path}")
result["action_taken"] = "recreated_with_backup"
else:
result["action_taken"] = "force_recreated"
else:
result["action_taken"] = "created"
# Create database
creation_success = await self.create_database()
if not creation_success:
result["errors"].append("Failed to create database")
return result
# Verify creation
final_health = await self.check_database_health()
result["health_check"] = final_health
if final_health["schema_loaded"] and final_health["can_write"]:
result["success"] = True
logger.info("Database initialization completed successfully")
else:
result["errors"].append("Database created but health check failed")
else:
# Database exists and is healthy
result["success"] = True
result["action_taken"] = "already_exists"
logger.info("Database already exists and is healthy")
except Exception as e:
error_msg = f"Database initialization error: {str(e)}"
logger.error(error_msg)
result["errors"].append(error_msg)
return result
async def repair_database(self) -> Dict[str, Any]:
"""Attempt to repair database issues"""
result = {
"success": False,
"repairs_attempted": [],
"errors": []
}
try:
health_check = await self.check_database_health()
if not health_check["database_exists"]:
# Database doesn't exist - create it
creation_result = await self.initialize_database()
result["repairs_attempted"].append("created_missing_database")
result["success"] = creation_result["success"]
result["errors"].extend(creation_result.get("errors", []))
return result
# Database exists but has issues
async with aiosqlite.connect(self.db_path) as db:
# Check and repair missing tables
if not health_check["tables_exist"]:
with open(self.schema_path, 'r') as f:
schema = f.read()
await db.executescript(schema)
await db.commit()
result["repairs_attempted"].append("recreated_schema")
# Verify repair
final_health = await self.check_database_health()
result["success"] = final_health["schema_loaded"]
except Exception as e:
error_msg = f"Database repair error: {str(e)}"
logger.error(error_msg)
result["errors"].append(error_msg)
return result
# Global instance
db_initializer = DatabaseInitializer()