Spaces:
Sleeping
Sleeping
File size: 10,600 Bytes
2832da8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
"""
Database Initialization Module
Handles database creation, schema setup, and health checks
"""
import os
import aiosqlite
import logging
from pathlib import Path
from typing import Dict, Any, List
logger = logging.getLogger(__name__)
class DatabaseInitializer:
"""Handles database initialization and health checks"""
def __init__(self, db_path: str = None):
self.db_path = db_path or os.getenv("DATABASE_PATH", "./ai_tutor.db")
self.schema_path = self._find_schema_file()
def _find_schema_file(self) -> str:
"""Return the path to the schema.sql file.
The schema.sql file is expected to be in the same directory as this script.
"""
schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql')
if not os.path.exists(schema_path):
raise FileNotFoundError(f"schema.sql not found at {schema_path}")
return schema_path
async def check_database_exists(self) -> bool:
"""Check if database file exists"""
return os.path.exists(self.db_path)
async def check_database_health(self) -> Dict[str, Any]:
"""Comprehensive database health check"""
health_status = {
"database_exists": False,
"database_accessible": False,
"schema_loaded": False,
"tables_exist": False,
"views_exist": False,
"can_write": False,
"record_count": {},
"errors": []
}
try:
# Check if database file exists
health_status["database_exists"] = await self.check_database_exists()
if not health_status["database_exists"]:
health_status["errors"].append("Database file does not exist")
return health_status
# Try to connect to database
async with aiosqlite.connect(self.db_path) as db:
health_status["database_accessible"] = True
# Check if required tables exist
required_tables = ['metadata_extractions', 'curricula', 'learning_content', 'api_cache']
existing_tables = await self._get_existing_tables(db)
missing_tables = [table for table in required_tables if table not in existing_tables]
if missing_tables:
health_status["errors"].append(f"Missing tables: {missing_tables}")
else:
health_status["tables_exist"] = True
# Check if views exist
required_views = ['user_learning_journeys', 'curriculum_content_status']
existing_views = await self._get_existing_views(db)
missing_views = [view for view in required_views if view not in existing_views]
if missing_views:
health_status["errors"].append(f"Missing views: {missing_views}")
else:
health_status["views_exist"] = True
# Test write capability
try:
await db.execute("CREATE TEMPORARY TABLE test_write (id INTEGER)")
await db.execute("DROP TABLE test_write")
health_status["can_write"] = True
except Exception as e:
health_status["errors"].append(f"Cannot write to database: {str(e)}")
# Get record counts
if health_status["tables_exist"]:
for table in required_tables:
try:
async with db.execute(f"SELECT COUNT(*) FROM {table}") as cursor:
count = await cursor.fetchone()
health_status["record_count"][table] = count[0] if count else 0
except Exception as e:
health_status["record_count"][table] = f"Error: {str(e)}"
health_status["schema_loaded"] = (
health_status["tables_exist"] and
health_status["views_exist"]
)
except Exception as e:
health_status["errors"].append(f"Database connection error: {str(e)}")
return health_status
async def _get_existing_tables(self, db: aiosqlite.Connection) -> List[str]:
"""Get list of existing tables"""
async with db.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def _get_existing_views(self, db: aiosqlite.Connection) -> List[str]:
"""Get list of existing views"""
async with db.execute("""
SELECT name FROM sqlite_master
WHERE type='view'
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def create_database(self) -> bool:
"""Create database file and initialize with schema"""
try:
logger.info(f"Creating database at: {self.db_path}")
# Ensure directory exists
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
logger.info(f"Created directory: {db_dir}")
# Create database and load schema
async with aiosqlite.connect(self.db_path) as db:
# Read schema file
with open(self.schema_path, 'r') as f:
schema = f.read()
# Execute schema
await db.executescript(schema)
await db.commit()
logger.info("Database created and schema loaded successfully")
return True
except Exception as e:
logger.error(f"Error creating database: {str(e)}")
return False
async def initialize_database(self, force_recreate: bool = False) -> Dict[str, Any]:
"""Initialize database with comprehensive checks and creation"""
result = {
"success": False,
"action_taken": "none",
"health_check": {},
"errors": []
}
try:
# Check current database health
health_check = await self.check_database_health()
result["health_check"] = health_check
# Determine if we need to create/recreate database
needs_creation = (
not health_check["database_exists"] or
not health_check["schema_loaded"] or
force_recreate
)
if needs_creation:
if health_check["database_exists"] and force_recreate:
# Backup existing database
backup_path = f"{self.db_path}.backup"
if os.path.exists(self.db_path):
os.rename(self.db_path, backup_path)
logger.info(f"Backed up existing database to: {backup_path}")
result["action_taken"] = "recreated_with_backup"
else:
result["action_taken"] = "force_recreated"
else:
result["action_taken"] = "created"
# Create database
creation_success = await self.create_database()
if not creation_success:
result["errors"].append("Failed to create database")
return result
# Verify creation
final_health = await self.check_database_health()
result["health_check"] = final_health
if final_health["schema_loaded"] and final_health["can_write"]:
result["success"] = True
logger.info("Database initialization completed successfully")
else:
result["errors"].append("Database created but health check failed")
else:
# Database exists and is healthy
result["success"] = True
result["action_taken"] = "already_exists"
logger.info("Database already exists and is healthy")
except Exception as e:
error_msg = f"Database initialization error: {str(e)}"
logger.error(error_msg)
result["errors"].append(error_msg)
return result
async def repair_database(self) -> Dict[str, Any]:
"""Attempt to repair database issues"""
result = {
"success": False,
"repairs_attempted": [],
"errors": []
}
try:
health_check = await self.check_database_health()
if not health_check["database_exists"]:
# Database doesn't exist - create it
creation_result = await self.initialize_database()
result["repairs_attempted"].append("created_missing_database")
result["success"] = creation_result["success"]
result["errors"].extend(creation_result.get("errors", []))
return result
# Database exists but has issues
async with aiosqlite.connect(self.db_path) as db:
# Check and repair missing tables
if not health_check["tables_exist"]:
with open(self.schema_path, 'r') as f:
schema = f.read()
await db.executescript(schema)
await db.commit()
result["repairs_attempted"].append("recreated_schema")
# Verify repair
final_health = await self.check_database_health()
result["success"] = final_health["schema_loaded"]
except Exception as e:
error_msg = f"Database repair error: {str(e)}"
logger.error(error_msg)
result["errors"].append(error_msg)
return result
# Global instance
db_initializer = DatabaseInitializer() |