backend_chatbot / app /services /data_loader.py
helal94hb1's picture
feat: Initial commit for Hugging Face deployment
58de15f
# app/services/data_loader.py
import logging
from typing import Dict, Optional, Tuple, Set
# Import settings
from app.core.config import settings
# Import Neo4j utils
from app.utils import neo4j_utils
# Import Neo4jError for specific exception handling
from neo4j.exceptions import Neo4jError
logger = logging.getLogger(__name__)
async def load_chunk_content_map(
required_chunk_ids: Optional[Set[str]] = None
) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, Dict]]]:
"""
Fetches Chunk content/metadata from Neo4j for all relevant chunks,
using the Chunk's elementId as the key.
If required_chunk_ids is provided, it efficiently filters for only those IDs.
"""
log_filter_msg = ""
if required_chunk_ids:
log_filter_msg = f" for {len(required_chunk_ids)} required IDs"
logger.info(f"Attempting to fetch chunk content/metadata from Neo4j{log_filter_msg}...")
content_map: Dict[str, str] = {}
metadata_map: Dict[str, Dict] = {}
driver = await neo4j_utils.get_driver()
if not driver:
logger.error("Cannot fetch chunk content: Neo4j driver not available.")
return None, None
try:
# Optimized query: Use a parameter for the list of required IDs
query = """
MATCH (p:Page)-[:HAS_CHUNK]->(c:Chunk)
WHERE c.id IN $required_ids AND c.content IS NOT NULL
RETURN c.id AS chunk_key,
c.content AS content,
elementId(p) AS page_element_id,
p.original_file AS original_file,
p.page_type as page_type,
p.module_cluster as module_cluster,
p.module_name as module_name
"""
id_list = [str(id_val) for id_val in required_chunk_ids] if required_chunk_ids else []
params = {"required_ids": id_list}
if not params["required_ids"]:
logger.warning("No required chunk IDs provided to load_chunk_content_map. No content will be loaded.")
return {}, {}
records, _, _ = await driver.execute_query(query, params, database_=settings.NEO4J_DATABASE or "neo4j")
for record in records:
data = record.data()
chunk_key = data.get("chunk_key")
if chunk_key:
content_map[chunk_key] = data.get("content", "")
metadata_map[chunk_key] = {
'page_element_id': data.get("page_element_id"),
'original_file': data.get("original_file"),
'page_type': data.get("page_type"),
'module_cluster': data.get("module_cluster"),
'module_name': data.get("module_name")
}
except Neo4jError as ne:
logger.exception(f"Neo4j query failed: {ne.message} (code: {ne.code})")
return None, None
except Exception as e:
logger.exception(f"An unexpected error occurred fetching chunk data from Neo4j: {e}")
return None, None
logger.info(f"Successfully prepared content map with {len(content_map)} entries.")
return content_map, metadata_map