Spaces:
Sleeping
Sleeping
| # app/services/data_loader.py | |
| import logging | |
| from typing import Dict, Optional, Tuple, Set | |
| # Import settings | |
| from app.core.config import settings | |
| # Import Neo4j utils | |
| from app.utils import neo4j_utils | |
| # Import Neo4jError for specific exception handling | |
| from neo4j.exceptions import Neo4jError | |
| logger = logging.getLogger(__name__) | |
| async def load_chunk_content_map( | |
| required_chunk_ids: Optional[Set[str]] = None | |
| ) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, Dict]]]: | |
| """ | |
| Fetches Chunk content/metadata from Neo4j for all relevant chunks, | |
| using the Chunk's elementId as the key. | |
| If required_chunk_ids is provided, it efficiently filters for only those IDs. | |
| """ | |
| log_filter_msg = "" | |
| if required_chunk_ids: | |
| log_filter_msg = f" for {len(required_chunk_ids)} required IDs" | |
| logger.info(f"Attempting to fetch chunk content/metadata from Neo4j{log_filter_msg}...") | |
| content_map: Dict[str, str] = {} | |
| metadata_map: Dict[str, Dict] = {} | |
| driver = await neo4j_utils.get_driver() | |
| if not driver: | |
| logger.error("Cannot fetch chunk content: Neo4j driver not available.") | |
| return None, None | |
| try: | |
| # Optimized query: Use a parameter for the list of required IDs | |
| query = """ | |
| MATCH (p:Page)-[:HAS_CHUNK]->(c:Chunk) | |
| WHERE c.id IN $required_ids AND c.content IS NOT NULL | |
| RETURN c.id AS chunk_key, | |
| c.content AS content, | |
| elementId(p) AS page_element_id, | |
| p.original_file AS original_file, | |
| p.page_type as page_type, | |
| p.module_cluster as module_cluster, | |
| p.module_name as module_name | |
| """ | |
| id_list = [str(id_val) for id_val in required_chunk_ids] if required_chunk_ids else [] | |
| params = {"required_ids": id_list} | |
| if not params["required_ids"]: | |
| logger.warning("No required chunk IDs provided to load_chunk_content_map. No content will be loaded.") | |
| return {}, {} | |
| records, _, _ = await driver.execute_query(query, params, database_=settings.NEO4J_DATABASE or "neo4j") | |
| for record in records: | |
| data = record.data() | |
| chunk_key = data.get("chunk_key") | |
| if chunk_key: | |
| content_map[chunk_key] = data.get("content", "") | |
| metadata_map[chunk_key] = { | |
| 'page_element_id': data.get("page_element_id"), | |
| 'original_file': data.get("original_file"), | |
| 'page_type': data.get("page_type"), | |
| 'module_cluster': data.get("module_cluster"), | |
| 'module_name': data.get("module_name") | |
| } | |
| except Neo4jError as ne: | |
| logger.exception(f"Neo4j query failed: {ne.message} (code: {ne.code})") | |
| return None, None | |
| except Exception as e: | |
| logger.exception(f"An unexpected error occurred fetching chunk data from Neo4j: {e}") | |
| return None, None | |
| logger.info(f"Successfully prepared content map with {len(content_map)} entries.") | |
| return content_map, metadata_map | |