Spaces:
Sleeping
Sleeping
Add configuration, graph, runner, and tools modules to enhance agent functionality. Introduce a Configuration class for managing parameters, implement an AgentRunner for executing the agent graph, and create tools for general search and mathematical calculations. Update test_agent.py to reflect new import paths and improve overall code organization.
13388e5
unverified
| import logging | |
| import os | |
| import re | |
| import uuid | |
| from langgraph.types import Command | |
| from graph import agent_graph | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) # Default to INFO level | |
| logger = logging.getLogger(__name__) | |
| # Enable LiteLLM debug logging only if environment variable is set | |
| import litellm | |
| if os.getenv("LITELLM_DEBUG", "false").lower() == "true": | |
| litellm.set_verbose = True | |
| logger.setLevel(logging.DEBUG) | |
| else: | |
| litellm.set_verbose = False | |
| logger.setLevel(logging.INFO) | |
| class AgentRunner: | |
| """Runner class for the code agent.""" | |
| def __init__(self): | |
| """Initialize the agent runner with graph and tools.""" | |
| logger.info("Initializing AgentRunner") | |
| self.graph = agent_graph | |
| self.last_state = None # Store the last state for testing/debugging | |
| self.thread_id = str( | |
| uuid.uuid4() | |
| ) # Generate a unique thread_id for this runner | |
| logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") | |
| def _extract_answer(self, state: dict) -> str: | |
| """Extract the answer from the state.""" | |
| if not state: | |
| return None | |
| # First try to get answer from direct answer field | |
| if "answer" in state and state["answer"]: | |
| logger.info(f"Found answer in direct field: {state['answer']}") | |
| return state["answer"] | |
| # Then try to get answer from messages | |
| if "messages" in state and state["messages"]: | |
| for msg in reversed(state["messages"]): | |
| if hasattr(msg, "content") and msg.content: | |
| # Look for code blocks that might contain the answer | |
| if "```" in msg.content: | |
| # Extract code between ```py and ``` or ```python and ``` | |
| code_match = re.search( | |
| r"```(?:py|python)?\s*\n(.*?)\n```", msg.content, re.DOTALL | |
| ) | |
| if code_match: | |
| code = code_match.group(1) | |
| # Look for final_answer call | |
| final_answer_match = re.search( | |
| r"final_answer\((.*?)\)", code | |
| ) | |
| if final_answer_match: | |
| answer = final_answer_match.group(1) | |
| logger.info( | |
| f"Found answer in final_answer call: {answer}" | |
| ) | |
| return answer | |
| # If no code block with final_answer, use the content | |
| logger.info(f"Found answer in message: {msg.content}") | |
| return msg.content | |
| return None | |
| def __call__(self, input_data) -> str: | |
| """Process a question through the agent graph and return the answer. | |
| Args: | |
| input_data: Either a question string or a Command object for resuming | |
| Returns: | |
| str: The agent's response | |
| """ | |
| try: | |
| config = {"configurable": {"thread_id": self.thread_id}} | |
| logger.info(f"Using config: {config}") | |
| if isinstance(input_data, str): | |
| # Initial question | |
| logger.info(f"Processing initial question: {input_data}") | |
| initial_state = { | |
| "question": input_data, | |
| "messages": [], | |
| "answer": None, | |
| "step_logs": [], | |
| "is_complete": False, | |
| "step_count": 0, | |
| # Initialize new memory fields | |
| "context": {}, | |
| "memory_buffer": [], | |
| "last_action": None, | |
| "action_history": [], | |
| "error_count": 0, | |
| "success_count": 0, | |
| } | |
| logger.info(f"Initial state: {initial_state}") | |
| # Use stream to get results | |
| logger.info("Starting graph stream for initial question") | |
| for chunk in self.graph.stream(initial_state, config): | |
| logger.debug(f"Received chunk: {chunk}") | |
| if isinstance(chunk, dict): | |
| if "__interrupt__" in chunk: | |
| logger.info("Detected interrupt in stream") | |
| logger.info(f"Interrupt details: {chunk['__interrupt__']}") | |
| # Let the graph handle the interrupt naturally | |
| continue | |
| answer = self._extract_answer(chunk) | |
| if answer: | |
| self.last_state = chunk | |
| # If the state is complete, return the answer | |
| if chunk.get("is_complete", False): | |
| return answer | |
| else: | |
| logger.debug(f"Skipping chunk without answer: {chunk}") | |
| else: | |
| # Resuming from interrupt | |
| logger.info(f"Resuming from interrupt with input: {input_data}") | |
| for result in self.graph.stream(input_data, config): | |
| logger.debug(f"Received resume result: {result}") | |
| if isinstance(result, dict): | |
| answer = self._extract_answer(result) | |
| if answer: | |
| self.last_state = result | |
| # If the state is complete, return the answer | |
| if result.get("is_complete", False): | |
| return answer | |
| else: | |
| logger.debug(f"Skipping result without answer: {result}") | |
| # If we get here, we didn't find an answer | |
| logger.warning("No answer generated from stream") | |
| return "No answer generated" | |
| except Exception as e: | |
| logger.error(f"Error processing input: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| import argparse | |
| from langgraph.types import Command | |
| # Set up argument parser | |
| parser = argparse.ArgumentParser(description="Run the agent with a question") | |
| parser.add_argument("question", type=str, help="The question to ask the agent") | |
| parser.add_argument( | |
| "--resume", | |
| type=str, | |
| help="Value to resume with after an interrupt", | |
| default=None, | |
| ) | |
| args = parser.parse_args() | |
| # Create agent runner | |
| runner = AgentRunner() | |
| if args.resume: | |
| # Resume from interrupt with provided value | |
| print(f"\nResuming with value: {args.resume}") | |
| response = runner(Command(resume=args.resume)) | |
| else: | |
| # Initial run with question | |
| print(f"\nAsking question: {args.question}") | |
| response = runner(args.question) | |
| print(f"\nFinal response: {response}") | |