Spaces:
Sleeping
Sleeping
| from neo4j import GraphDatabase, Driver, exceptions | |
| from .config import settings | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| logger = logging.getLogger(__name__) | |
| class Neo4jClient: | |
| _driver: Optional[Driver] = None | |
| def _get_driver(self) -> Driver: | |
| """Initializes and returns the Neo4j driver instance.""" | |
| if self._driver is None or self._driver.close(): | |
| logger.info(f"Initializing Neo4j Driver for URI: {settings.neo4j_uri}") | |
| try: | |
| self._driver = GraphDatabase.driver( | |
| settings.neo4j_uri, | |
| auth=(settings.neo4j_username, settings.neo4j_password.get_secret_value()) | |
| ) | |
| # Verify connectivity during initialization | |
| self._driver.verify_connectivity() | |
| logger.info("Neo4j Driver initialized and connection verified.") | |
| except exceptions.AuthError as e: | |
| logger.error(f"Neo4j Authentication Error: {e}", exc_info=True) | |
| raise ConnectionError("Neo4j Authentication Failed. Check credentials.") from e | |
| except exceptions.ServiceUnavailable as e: | |
| logger.error(f"Neo4j Service Unavailable: {e}", exc_info=True) | |
| raise ConnectionError(f"Could not connect to Neo4j at {settings.neo4j_uri}. Ensure DB is running and reachable.") from e | |
| except Exception as e: | |
| logger.error(f"Unexpected error initializing Neo4j Driver: {e}", exc_info=True) | |
| raise ConnectionError("An unexpected error occurred connecting to Neo4j.") from e | |
| return self._driver | |
| def close(self): | |
| """Closes the Neo4j driver connection.""" | |
| if self._driver and not self._driver.close(): | |
| logger.info("Closing Neo4j Driver.") | |
| self._driver.close() | |
| self._driver = None | |
| def query(self, cypher_query: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: | |
| """Executes a Cypher query and returns the results.""" | |
| driver = self._get_driver() | |
| logger.debug(f"Executing Cypher: {cypher_query} with params: {params}") | |
| try: | |
| # Use session/transaction for robust execution | |
| with driver.session() as session: | |
| result = session.run(cypher_query, params or {}) | |
| # Convert Neo4j Records to dictionaries | |
| data = [record.data() for record in result] | |
| logger.debug(f"Query returned {len(data)} records.") | |
| return data | |
| except (exceptions.ServiceUnavailable, exceptions.SessionExpired) as e: | |
| logger.error(f"Neo4j connection error during query: {e}", exc_info=True) | |
| # Attempt to close the potentially broken driver so it reconnects next time | |
| self.close() | |
| raise ConnectionError("Neo4j connection error during query execution.") from e | |
| except exceptions.CypherSyntaxError as e: | |
| logger.error(f"Neo4j Cypher Syntax Error: {e}\nQuery: {cypher_query}", exc_info=True) | |
| raise ValueError("Invalid Cypher query syntax.") from e | |
| except Exception as e: | |
| logger.error(f"Unexpected error during Neo4j query: {e}", exc_info=True) | |
| raise RuntimeError("An unexpected error occurred during the Neo4j query.") from e | |
| def get_schema(self, force_refresh: bool = False) -> Dict[str, Any]: | |
| """ Fetches the graph schema. Placeholder - Langchain community graph has better schema fetching.""" | |
| # For simplicity, returning empty. Implement actual schema fetching if needed. | |
| # Consider using langchain_community.graphs.Neo4jGraph for schema handling if complex interactions are needed. | |
| logger.warning("Neo4jClient.get_schema() is a placeholder. Implement if schema needed.") | |
| return {} # Placeholder | |
| def get_concepts(self) -> List[str]: | |
| """Fetches all Concept names from the graph.""" | |
| cypher = "MATCH (c:Concept) RETURN c.name AS name ORDER BY name" | |
| results = self.query(cypher) | |
| return [record['name'] for record in results if 'name' in record] | |
| def get_concept_description(self, concept_name: str) -> Optional[str]: | |
| """Fetches the description for a specific concept.""" | |
| cypher = "MATCH (c:Concept {name: $name}) RETURN c.description AS description LIMIT 1" | |
| params = {"name": concept_name} | |
| results = self.query(cypher, params) | |
| return results[0]['description'] if results and 'description' in results[0] else None | |
| # Create a single instance for the application to use | |
| neo4j_client = Neo4jClient() | |
| # Ensure the client is closed gracefully when the application exits | |
| import atexit | |
| atexit.register(neo4j_client.close) |