Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Any, Optional, Union, Annotated | |
| from typing_extensions import TypedDict | |
| from langchain_core.messages import BaseMessage | |
| from pydantic import BaseModel, Field | |
| from langgraph.graph.message import add_messages | |
| # --- Pydantic Models for Structured Output --- | |
| class KeyIssue(BaseModel): | |
| """Represents a single generated Key Issue.""" | |
| id: int = Field(..., description="Sequential ID for the key issue") | |
| title: str = Field(..., description="A concise title for the key issue") | |
| description: str = Field(..., description="Detailed description of the key issue") | |
| challenges: List[str] = Field(default_factory=list, description="Specific challenges associated with this issue") | |
| potential_impact: Optional[str] = Field(None, description="Potential impact if the issue is not addressed") | |
| # Add source tracking if possible/needed from the processed docs | |
| # sources: List[str] = Field(default_factory=list, description="Source documents relevant to this issue") | |
| # --- TypedDicts for LangGraph State --- | |
| class GraphConfig(TypedDict): | |
| """Configuration passed to the graph execution.""" | |
| thread_id: str | |
| # Add other config items needed at runtime if not globally available via settings | |
| class BaseState(TypedDict): | |
| """Base state common across potentially multiple graphs.""" | |
| messages: Annotated[List[BaseMessage], add_messages] | |
| error: Optional[str] # To store potential errors during execution | |
| class PlannerState(BaseState): | |
| """State specific to the main planner graph.""" | |
| user_query: str | |
| plan: List[str] # The high-level plan steps | |
| current_plan_step_index: int # Index of the current step being executed | |
| # Stored data from previous steps (e.g., summaries) | |
| # Use a dictionary to store context relevant to each plan step | |
| step_outputs: Dict[int, Any] # Stores output (e.g., processed docs) from each step | |
| # Final structured output | |
| key_issues: List[KeyIssue] | |
| class DataRetrievalState(TypedDict): | |
| """State for a potential data retrieval sub-graph.""" | |
| query_for_retrieval: str # The specific query for this retrieval step | |
| retrieved_docs: List[Dict[str, Any]] # Raw docs from Neo4j | |
| evaluated_docs: List[Dict[str, Any]] # Docs after relevance grading | |
| cypher_queries: List[str] # Generated Cypher queries | |
| class ProcessingState(TypedDict): | |
| """State for a potential document processing sub-graph.""" | |
| docs_to_process: List[Dict[str, Any]] # Documents passed for processing | |
| processed_docs: List[Union[str, Dict[str, Any]]] # Processed/summarized docs | |
| processing_steps_config: List[Union[str, dict]] # Configuration for processing |