Spaces:
Sleeping
Sleeping
| from langgraph.graph import StateGraph, END | |
| from langchain.schema import BaseMessage, HumanMessage, AIMessage | |
| from typing import TypedDict, List, Dict, Any | |
| import json | |
| from llm_client import LLMClient | |
| from soil_analyzer import SoilLayerAnalyzer | |
| class AgentState(TypedDict): | |
| messages: List[BaseMessage] | |
| soil_data: Dict[str, Any] | |
| analysis_results: Dict[str, Any] | |
| user_feedback: str | |
| current_task: str | |
| iteration_count: int | |
| text_content: str | |
| image_base64: str | |
| class SoilAnalysisAgent: | |
| def __init__(self): | |
| # Initialize with None client - will be set when needed | |
| self.llm_client = None | |
| self.soil_analyzer = SoilLayerAnalyzer() | |
| self.graph = self._create_graph() | |
| def _create_graph(self): | |
| """Create the LangGraph workflow""" | |
| workflow = StateGraph(AgentState) | |
| # Add nodes | |
| workflow.add_node("analyze_document", self._analyze_document) | |
| workflow.add_node("validate_layers", self._validate_layers) | |
| workflow.add_node("optimize_layers", self._optimize_layers) | |
| workflow.add_node("generate_insights", self._generate_insights) | |
| workflow.add_node("handle_feedback", self._handle_feedback) | |
| # Add edges | |
| workflow.add_edge("analyze_document", "validate_layers") | |
| workflow.add_edge("validate_layers", "optimize_layers") | |
| workflow.add_edge("optimize_layers", "generate_insights") | |
| workflow.add_conditional_edges( | |
| "generate_insights", | |
| self._should_handle_feedback, | |
| { | |
| "feedback": "handle_feedback", | |
| "end": END | |
| } | |
| ) | |
| workflow.add_edge("handle_feedback", "validate_layers") | |
| # Set entry point | |
| workflow.set_entry_point("analyze_document") | |
| return workflow.compile() | |
| def _analyze_document(self, state: AgentState) -> AgentState: | |
| """Analyze the soil boring log document""" | |
| # Extract document content from state | |
| document_content = state.get("text_content") | |
| image_content = state.get("image_base64") | |
| # Analyze using LLM | |
| soil_data = self.llm_client.analyze_soil_boring_log( | |
| text_content=document_content, | |
| image_base64=image_content | |
| ) | |
| state["soil_data"] = soil_data | |
| state["current_task"] = "document_analysis" | |
| state["messages"].append(AIMessage(content="Document analysis completed")) | |
| return state | |
| def _validate_layers(self, state: AgentState) -> AgentState: | |
| """Validate soil layer continuity and consistency""" | |
| soil_data = state["soil_data"] | |
| if "soil_layers" in soil_data: | |
| # Validate layer continuity | |
| validated_layers = self.soil_analyzer.validate_layer_continuity( | |
| soil_data["soil_layers"] | |
| ) | |
| soil_data["soil_layers"] = validated_layers | |
| # Calculate statistics | |
| stats = self.soil_analyzer.calculate_layer_statistics(validated_layers) | |
| state["analysis_results"] = {"validation_stats": stats} | |
| state["current_task"] = "layer_validation" | |
| state["messages"].append(AIMessage(content="Layer validation completed")) | |
| return state | |
| def _optimize_layers(self, state: AgentState) -> AgentState: | |
| """Optimize layer division by merging/splitting as needed""" | |
| soil_data = state["soil_data"] | |
| if "soil_layers" in soil_data: | |
| optimization_results = self.soil_analyzer.optimize_layer_division( | |
| soil_data["soil_layers"] | |
| ) | |
| state["analysis_results"]["optimization"] = optimization_results | |
| state["current_task"] = "layer_optimization" | |
| state["messages"].append(AIMessage(content="Layer optimization completed")) | |
| return state | |
| def _generate_insights(self, state: AgentState) -> AgentState: | |
| """Generate insights and recommendations""" | |
| soil_data = state["soil_data"] | |
| analysis_results = state["analysis_results"] | |
| # Generate insights using LLM | |
| insights_prompt = f""" | |
| Based on the soil boring log analysis, provide geotechnical insights and recommendations: | |
| Soil Data: {json.dumps(soil_data, indent=2)} | |
| Analysis Results: {json.dumps(analysis_results, indent=2)} | |
| Please provide: | |
| 1. Key geotechnical findings | |
| 2. Foundation recommendations | |
| 3. Construction considerations | |
| 4. Potential risks or concerns | |
| 5. Recommended additional testing | |
| """ | |
| try: | |
| response = self.llm_client.client.chat.completions.create( | |
| model=self.llm_client.model, | |
| messages=[{"role": "user", "content": insights_prompt}], | |
| max_tokens=1000, | |
| temperature=0.3 | |
| ) | |
| insights = response.choices[0].message.content | |
| state["analysis_results"]["insights"] = insights | |
| except Exception as e: | |
| state["analysis_results"]["insights"] = f"Error generating insights: {str(e)}" | |
| state["current_task"] = "insight_generation" | |
| state["messages"].append(AIMessage(content="Insights generation completed")) | |
| return state | |
| def _handle_feedback(self, state: AgentState) -> AgentState: | |
| """Handle user feedback and refine analysis""" | |
| user_feedback = state.get("user_feedback", "") | |
| soil_data = state["soil_data"] | |
| if user_feedback: | |
| # Refine soil layers based on feedback | |
| refined_data = self.llm_client.refine_soil_layers(soil_data, user_feedback) | |
| if "error" not in refined_data: | |
| state["soil_data"] = refined_data | |
| state["current_task"] = "feedback_handling" | |
| state["iteration_count"] = state.get("iteration_count", 0) + 1 | |
| state["messages"].append(AIMessage(content=f"Feedback processed (iteration {state['iteration_count']})")) | |
| return state | |
| def _should_handle_feedback(self, state: AgentState) -> str: | |
| """Determine if feedback should be handled""" | |
| if state.get("user_feedback") and state.get("iteration_count", 0) < 3: | |
| return "feedback" | |
| return "end" | |
| def run_analysis(self, text_content=None, image_base64=None, user_feedback=None): | |
| """Run the complete soil analysis workflow""" | |
| # Prepare initial state - store content in state instead of message | |
| initial_message = HumanMessage(content="Starting soil boring log analysis") | |
| initial_state = { | |
| "messages": [initial_message], | |
| "soil_data": {}, | |
| "analysis_results": {}, | |
| "user_feedback": user_feedback or "", | |
| "current_task": "initialization", | |
| "iteration_count": 0, | |
| "text_content": text_content, | |
| "image_base64": image_base64 | |
| } | |
| # Run the graph | |
| result = self.graph.invoke(initial_state) | |
| return { | |
| "soil_data": result["soil_data"], | |
| "analysis_results": result["analysis_results"], | |
| "messages": result["messages"], | |
| "current_task": result["current_task"], | |
| "iteration_count": result["iteration_count"] | |
| } | |
| def process_feedback(self, current_state, feedback): | |
| """Process user feedback and continue analysis""" | |
| current_state["user_feedback"] = feedback | |
| # Continue from feedback handling | |
| result = self.graph.invoke(current_state, {"recursion_limit": 10}) | |
| return { | |
| "soil_data": result["soil_data"], | |
| "analysis_results": result["analysis_results"], | |
| "messages": result["messages"], | |
| "current_task": result["current_task"], | |
| "iteration_count": result["iteration_count"] | |
| } |