samu's picture
temporary database
84efb1f
import aiosqlite
import json
import os
from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid
import logging
logger = logging.getLogger(__name__)
class Database:
"""Pure SQLite database handler for AI Language Tutor"""
def __init__(self, db_path: str = "/tmp/ai_tutor.db"):
self.db_path = db_path
async def initialize(self):
"""Initialize database with schema"""
async with aiosqlite.connect(self.db_path) as db:
# Read and execute schema - look for it in parent directory
schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql')
with open(schema_path, 'r') as f:
schema = f.read()
await db.executescript(schema)
await db.commit()
logger.info("Database initialized successfully")
async def find_existing_curriculum(
self,
query: str,
native_language: str,
target_language: str,
proficiency: str,
user_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""Find existing curriculum for exact query and metadata match"""
logger.info(f"Looking for curriculum: query='{query[:50]}...', native={native_language}, target={target_language}, proficiency={proficiency}, user_id={user_id}")
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Always look for exact query matches first, prioritizing user-specific matches
if user_id is not None:
# User-specific search: Find exact query match for the user
logger.info(f"Searching for exact match for user {user_id}")
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title, m.query
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.user_id = ? AND m.query = ? AND m.native_language = ?
AND m.target_language = ? AND m.proficiency = ?
ORDER BY c.created_at DESC
LIMIT 1
""", (user_id, query, native_language, target_language, proficiency)) as cursor:
row = await cursor.fetchone()
if row:
logger.info(f"Found exact user match: {dict(row)['id']}")
return dict(row)
# Look for exact query match from any user (only if the query is exactly the same)
logger.info("Searching for exact query match (any user)")
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title, m.query
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.query = ? AND m.native_language = ? AND m.target_language = ? AND m.proficiency = ?
ORDER BY c.created_at DESC
LIMIT 1
""", (query, native_language, target_language, proficiency)) as cursor:
row = await cursor.fetchone()
if row:
logger.info(f"Found exact query match: {dict(row)['id']}")
return dict(row)
else:
logger.info("No exact query match found")
logger.info("No existing curriculum found")
return None
async def save_metadata_extraction(
self,
query: str,
metadata: Dict[str, Any],
user_id: Optional[int] = None
) -> str:
"""Save extracted metadata and return extraction ID"""
extraction_id = str(uuid.uuid4())
# Validate proficiency before inserting into the database
allowed_proficiencies = {"beginner", "intermediate", "advanced"}
proficiency = metadata.get('proficiency')
if proficiency not in allowed_proficiencies:
logger.warning(
f"Unknown proficiency '{proficiency}' received; defaulting to 'beginner'."
)
proficiency = "beginner"
metadata["proficiency"] = "beginner"
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO metadata_extractions
(id, user_id, query, native_language, target_language, proficiency, title, description, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
extraction_id,
user_id,
query,
metadata.get('native_language'),
metadata.get('target_language'),
metadata.get('proficiency'),
metadata.get('title'),
metadata.get('description'),
json.dumps(metadata)
))
await db.commit()
logger.info(f"Saved metadata extraction: {extraction_id}")
return extraction_id
async def save_curriculum(
self,
metadata_extraction_id: str,
curriculum: Dict[str, Any],
user_id: Optional[int] = None
) -> str:
"""Save generated curriculum and return curriculum ID"""
curriculum_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO curricula
(id, metadata_extraction_id, user_id, lesson_topic, curriculum_json, content_generation_status)
VALUES (?, ?, ?, ?, ?, 'pending')
""", (
curriculum_id,
metadata_extraction_id,
user_id,
curriculum.get('lesson_topic', ''),
json.dumps(curriculum)
))
await db.commit()
logger.info(f"Saved curriculum: {curriculum_id}")
return curriculum_id
async def copy_curriculum_for_user(
self,
source_curriculum_id: str,
metadata_extraction_id: str,
user_id: Optional[int] = None
) -> str:
"""Copy an existing curriculum for a new user"""
new_curriculum_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
# Get source curriculum
async with db.execute("""
SELECT lesson_topic, curriculum_json FROM curricula WHERE id = ?
""", (source_curriculum_id,)) as cursor:
row = await cursor.fetchone()
if not row:
raise ValueError(f"Source curriculum {source_curriculum_id} not found")
lesson_topic, curriculum_json = row
# Create new curriculum
await db.execute("""
INSERT INTO curricula
(id, metadata_extraction_id, user_id, lesson_topic, curriculum_json, is_content_generated, content_generation_status)
VALUES (?, ?, ?, ?, ?, 0, 'pending')
""", (
new_curriculum_id,
metadata_extraction_id,
user_id,
lesson_topic,
curriculum_json
))
# Copy all learning content
await db.execute("""
INSERT INTO learning_content
(id, curriculum_id, content_type, lesson_index, lesson_topic, content_json)
SELECT
lower(hex(randomblob(16))),
?,
content_type,
lesson_index,
lesson_topic,
content_json
FROM learning_content
WHERE curriculum_id = ?
""", (new_curriculum_id, source_curriculum_id))
# Mark as content generated
await db.execute("""
UPDATE curricula
SET is_content_generated = 1,
content_generation_status = 'completed',
content_generation_completed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_curriculum_id,))
await db.commit()
logger.info(f"Copied curriculum {source_curriculum_id} to {new_curriculum_id} for user {user_id}")
return new_curriculum_id
async def save_learning_content(
self,
curriculum_id: str,
content_type: str,
lesson_index: int,
lesson_topic: str,
content: Any
) -> str:
"""Save learning content (flashcards, exercises, or simulation)"""
content_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO learning_content
(id, curriculum_id, content_type, lesson_index, lesson_topic, content_json)
VALUES (?, ?, ?, ?, ?, ?)
""", (
content_id,
curriculum_id,
content_type,
lesson_index,
lesson_topic,
json.dumps(content) if isinstance(content, (dict, list)) else content
))
await db.commit()
logger.info(f"Saved {content_type} for lesson {lesson_index}")
return content_id
async def mark_curriculum_content_generated(self, curriculum_id: str):
"""Mark curriculum as having all content generated"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE curricula
SET is_content_generated = 1,
content_generation_status = 'completed',
content_generation_completed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (curriculum_id,))
await db.commit()
async def update_content_generation_status(
self,
curriculum_id: str,
status: str,
error_message: Optional[str] = None
):
"""Update content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
if status == 'generating':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_started_at = CURRENT_TIMESTAMP,
content_generation_error = NULL
WHERE id = ?
""", (status, curriculum_id))
elif status == 'completed':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_completed_at = CURRENT_TIMESTAMP,
content_generation_error = NULL,
is_content_generated = 1
WHERE id = ?
""", (status, curriculum_id))
elif status == 'failed':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_error = ?
WHERE id = ?
""", (status, error_message, curriculum_id))
else:
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_error = ?
WHERE id = ?
""", (status, error_message, curriculum_id))
await db.commit()
async def get_content_generation_status(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT
id,
content_generation_status,
content_generation_error,
content_generation_started_at,
content_generation_completed_at,
is_content_generated
FROM curricula
WHERE id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_metadata_extraction(self, extraction_id: str) -> Optional[Dict[str, Any]]:
"""Get metadata extraction by ID"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM metadata_extractions WHERE id = ?
""", (extraction_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_curriculum(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get curriculum by ID"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE c.id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_learning_content(
self,
curriculum_id: str,
content_type: Optional[str] = None,
lesson_index: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get learning content for a curriculum"""
query = "SELECT * FROM learning_content WHERE curriculum_id = ?"
params = [curriculum_id]
if content_type:
query += " AND content_type = ?"
params.append(content_type)
if lesson_index is not None:
query += " AND lesson_index = ?"
params.append(lesson_index)
query += " ORDER BY lesson_index"
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_metadata_extractions(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's metadata extraction history"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM metadata_extractions
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_curricula(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's curricula"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE c.user_id = ?
ORDER BY c.created_at DESC
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_learning_journeys(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's complete learning journeys"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM user_learning_journeys
WHERE user_id = ?
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_curriculum_content_status(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM curriculum_content_status WHERE curriculum_id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_full_curriculum_details(self, curriculum_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
"""Get full curriculum details, optionally including all content."""
curriculum = await self.get_curriculum(curriculum_id)
if not curriculum:
return None
try:
curriculum_data = json.loads(curriculum['curriculum_json'])
lessons = curriculum_data.get('sub_topics', [])
except json.JSONDecodeError:
curriculum_data = {}
lessons = []
if include_content:
content_list = await self.get_learning_content(curriculum_id)
content_map = {}
for content in content_list:
lesson_index = content['lesson_index']
content_type = content['content_type']
if lesson_index not in content_map:
content_map[lesson_index] = {}
try:
parsed_content = json.loads(content['content_json'])
except json.JSONDecodeError:
parsed_content = content['content_json']
content_map[lesson_index][content_type] = {
"id": content['id'],
"lesson_topic": content['lesson_topic'],
"content": parsed_content,
"created_at": content['created_at']
}
# Embed content into lessons
for i, lesson in enumerate(lessons):
lesson['content'] = content_map.get(i, {})
curriculum['curriculum'] = curriculum_data
del curriculum['curriculum_json']
return curriculum
async def search_curricula_by_languages(
self,
native_language: str,
target_language: str,
proficiency: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Search for existing curricula by language combination"""
query = """
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.native_language = ? AND m.target_language = ?
"""
params = [native_language, target_language]
if proficiency:
query += " AND m.proficiency = ?"
params.append(proficiency)
query += " ORDER BY c.created_at DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
# Global database instance
db = Database()