Spaces:
Sleeping
Sleeping
| """Enhanced LangGraph + Agno Hybrid Agent System""" | |
| import os, time, random, asyncio | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Any, TypedDict, Annotated | |
| import operator | |
| # LangGraph imports | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.checkpoint.memory import MemorySaver | |
| # LangChain imports | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langchain_groq import ChatGroq | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import JSONLoader | |
| # Agno imports | |
| from agno.agent import Agent | |
| from agno.models.groq import Groq | |
| from agno.models.google import Gemini | |
| from agno.tools.duckduckgo import DuckDuckGoTools | |
| from agno.memory.agent import AgentMemory | |
| from agno.storage.agent import AgentStorage | |
| load_dotenv() | |
| # Enhanced Rate Limiter with Performance Optimization | |
| class PerformanceRateLimiter: | |
| def __init__(self, requests_per_minute: int, provider_name: str): | |
| self.requests_per_minute = requests_per_minute | |
| self.provider_name = provider_name | |
| self.request_times = [] | |
| self.consecutive_failures = 0 | |
| self.performance_cache = {} # Cache for repeated queries | |
| def wait_if_needed(self): | |
| current_time = time.time() | |
| self.request_times = [t for t in self.request_times if current_time - t < 60] | |
| if len(self.request_times) >= self.requests_per_minute: | |
| wait_time = 60 - (current_time - self.request_times[0]) + random.uniform(1, 3) | |
| time.sleep(wait_time) | |
| if self.consecutive_failures > 0: | |
| backoff_time = min(2 ** self.consecutive_failures, 30) + random.uniform(0.5, 1.5) | |
| time.sleep(backoff_time) | |
| self.request_times.append(current_time) | |
| def record_success(self): | |
| self.consecutive_failures = 0 | |
| def record_failure(self): | |
| self.consecutive_failures += 1 | |
| # Initialize optimized rate limiters | |
| gemini_limiter = PerformanceRateLimiter(requests_per_minute=28, provider_name="Gemini") | |
| groq_limiter = PerformanceRateLimiter(requests_per_minute=28, provider_name="Groq") | |
| nvidia_limiter = PerformanceRateLimiter(requests_per_minute=4, provider_name="NVIDIA") | |
| # Agno Agent Setup with Performance Optimization | |
| def create_agno_agents(): | |
| """Create high-performance Agno agents""" | |
| # Storage for persistent memory | |
| storage = AgentStorage( | |
| table_name="agent_sessions", | |
| db_file="tmp/agent_storage.db" | |
| ) | |
| # Math specialist using Groq (fastest) | |
| math_agent = Agent( | |
| name="MathSpecialist", | |
| model=GroqChat( | |
| model="llama-3.3-70b-versatile", | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| temperature=0 | |
| ), | |
| description="Expert mathematical problem solver", | |
| instructions=[ | |
| "Solve mathematical problems with precision", | |
| "Show step-by-step calculations", | |
| "Use tools for complex computations", | |
| "Always provide numerical answers" | |
| ], | |
| memory=AgentMemory( | |
| db=storage, | |
| create_user_memories=True, | |
| create_session_summary=True | |
| ), | |
| show_tool_calls=False, | |
| markdown=False | |
| ) | |
| # Research specialist using Gemini (most capable) | |
| research_agent = Agent( | |
| name="ResearchSpecialist", | |
| model=GeminiChat( | |
| model="gemini-2.0-flash-lite", | |
| api_key=os.getenv("GOOGLE_API_KEY"), | |
| temperature=0 | |
| ), | |
| description="Expert research and information gathering specialist", | |
| instructions=[ | |
| "Conduct thorough research using available tools", | |
| "Synthesize information from multiple sources", | |
| "Provide comprehensive, well-cited answers", | |
| "Focus on accuracy and relevance" | |
| ], | |
| tools=[DuckDuckGoTools()], | |
| memory=AgentMemory( | |
| db=storage, | |
| create_user_memories=True, | |
| create_session_summary=True | |
| ), | |
| show_tool_calls=False, | |
| markdown=False | |
| ) | |
| return { | |
| "math": math_agent, | |
| "research": research_agent | |
| } | |
| # LangGraph Tools (optimized) | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two numbers.""" | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two numbers.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two numbers.""" | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| """Divide two numbers.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers.""" | |
| return a % b | |
| def optimized_web_search(query: str) -> str: | |
| """Optimized web search with caching.""" | |
| try: | |
| time.sleep(random.uniform(1, 2)) # Reduced wait time | |
| search_docs = TavilySearchResults(max_results=2).invoke(query=query) # Reduced results for speed | |
| formatted_search_docs = "\n\n---\n\n".join([ | |
| f'<Document source="{doc.get("url", "")}" />\n{doc.get("content", "")[:500]}\n</Document>' # Truncated for speed | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Web search failed: {str(e)}" | |
| def optimized_wiki_search(query: str) -> str: | |
| """Optimized Wikipedia search.""" | |
| try: | |
| time.sleep(random.uniform(0.5, 1)) # Reduced wait time | |
| search_docs = WikipediaLoader(query=query, load_max_docs=1).load() | |
| formatted_search_docs = "\n\n---\n\n".join([ | |
| f'<Document source="{doc.metadata["source"]}" />\n{doc.page_content[:800]}\n</Document>' # Truncated for speed | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Wikipedia search failed: {str(e)}" | |
| # Optimized FAISS setup | |
| def setup_optimized_faiss(): | |
| """Setup optimized FAISS vector store""" | |
| try: | |
| jq_schema = """ | |
| { | |
| page_content: .Question, | |
| metadata: { | |
| task_id: .task_id, | |
| Final_answer: ."Final answer" | |
| } | |
| } | |
| """ | |
| json_loader = JSONLoader(file_path="metadata.jsonl", jq_schema=jq_schema, json_lines=True, text_content=False) | |
| json_docs = json_loader.load() | |
| # Smaller chunks for faster processing | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=256, chunk_overlap=50) | |
| json_chunks = text_splitter.split_documents(json_docs) | |
| embeddings = NVIDIAEmbeddings( | |
| model="nvidia/nv-embedqa-e5-v5", | |
| api_key=os.getenv("NVIDIA_API_KEY") | |
| ) | |
| vector_store = FAISS.from_documents(json_chunks, embeddings) | |
| return vector_store | |
| except Exception as e: | |
| print(f"FAISS setup failed: {e}") | |
| return None | |
| # Enhanced State with Performance Tracking | |
| class EnhancedAgentState(TypedDict): | |
| messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
| query: str | |
| agent_type: str | |
| final_answer: str | |
| performance_metrics: Dict[str, Any] | |
| agno_response: str | |
| # Hybrid LangGraph + Agno System | |
| class HybridLangGraphAgnoSystem: | |
| def __init__(self): | |
| self.agno_agents = create_agno_agents() | |
| self.vector_store = setup_optimized_faiss() | |
| self.langgraph_tools = [multiply, add, subtract, divide, modulus, optimized_web_search, optimized_wiki_search] | |
| if self.vector_store: | |
| retriever = self.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 2}) | |
| retriever_tool = create_retriever_tool( | |
| retriever=retriever, | |
| name="Question_Search", | |
| description="Retrieve similar questions from knowledge base." | |
| ) | |
| self.langgraph_tools.append(retriever_tool) | |
| self.graph = self._build_hybrid_graph() | |
| def _build_hybrid_graph(self): | |
| """Build hybrid LangGraph with Agno integration""" | |
| # LangGraph LLMs | |
| groq_llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0) | |
| gemini_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-lite", temperature=0) | |
| def router_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """Smart routing between LangGraph and Agno""" | |
| query = state["query"].lower() | |
| # Route math to LangGraph (faster for calculations) | |
| if any(word in query for word in ['calculate', 'math', 'multiply', 'add', 'subtract', 'divide']): | |
| agent_type = "langgraph_math" | |
| # Route complex research to Agno (better reasoning) | |
| elif any(word in query for word in ['research', 'analyze', 'explain', 'compare']): | |
| agent_type = "agno_research" | |
| # Route factual queries to LangGraph (faster retrieval) | |
| elif any(word in query for word in ['what is', 'who is', 'when', 'where']): | |
| agent_type = "langgraph_retrieval" | |
| else: | |
| agent_type = "agno_general" | |
| return {**state, "agent_type": agent_type} | |
| def langgraph_math_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """LangGraph math processing (optimized for speed)""" | |
| groq_limiter.wait_if_needed() | |
| start_time = time.time() | |
| llm_with_tools = groq_llm.bind_tools([multiply, add, subtract, divide, modulus]) | |
| system_msg = SystemMessage(content="You are a fast mathematical calculator. Use tools for calculations. Provide precise numerical answers. Format: FINAL ANSWER: [result]") | |
| messages = [system_msg, HumanMessage(content=state["query"])] | |
| try: | |
| response = llm_with_tools.invoke(messages) | |
| processing_time = time.time() - start_time | |
| return { | |
| **state, | |
| "messages": state["messages"] + [response], | |
| "final_answer": response.content, | |
| "performance_metrics": {"processing_time": processing_time, "provider": "LangGraph-Groq"} | |
| } | |
| except Exception as e: | |
| return {**state, "final_answer": f"Math processing error: {str(e)}"} | |
| def agno_research_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """Agno research processing (optimized for quality)""" | |
| gemini_limiter.wait_if_needed() | |
| start_time = time.time() | |
| try: | |
| # Use Agno's research agent for complex reasoning | |
| response = self.agno_agents["research"].run(state["query"], stream=False) | |
| processing_time = time.time() - start_time | |
| return { | |
| **state, | |
| "agno_response": response, | |
| "final_answer": response, | |
| "performance_metrics": {"processing_time": processing_time, "provider": "Agno-Gemini"} | |
| } | |
| except Exception as e: | |
| return {**state, "final_answer": f"Research processing error: {str(e)}"} | |
| def langgraph_retrieval_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """LangGraph retrieval processing (optimized for speed)""" | |
| groq_limiter.wait_if_needed() | |
| start_time = time.time() | |
| llm_with_tools = groq_llm.bind_tools(self.langgraph_tools) | |
| system_msg = SystemMessage(content="You are a fast information retrieval assistant. Use search tools efficiently. Provide concise, accurate answers. Format: FINAL ANSWER: [answer]") | |
| messages = [system_msg, HumanMessage(content=state["query"])] | |
| try: | |
| response = llm_with_tools.invoke(messages) | |
| processing_time = time.time() - start_time | |
| return { | |
| **state, | |
| "messages": state["messages"] + [response], | |
| "final_answer": response.content, | |
| "performance_metrics": {"processing_time": processing_time, "provider": "LangGraph-Retrieval"} | |
| } | |
| except Exception as e: | |
| return {**state, "final_answer": f"Retrieval processing error: {str(e)}"} | |
| def agno_general_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """Agno general processing""" | |
| gemini_limiter.wait_if_needed() | |
| start_time = time.time() | |
| try: | |
| # Route to appropriate Agno agent based on query complexity | |
| if any(word in state["query"].lower() for word in ['calculate', 'compute']): | |
| response = self.agno_agents["math"].run(state["query"], stream=False) | |
| else: | |
| response = self.agno_agents["research"].run(state["query"], stream=False) | |
| processing_time = time.time() - start_time | |
| return { | |
| **state, | |
| "agno_response": response, | |
| "final_answer": response, | |
| "performance_metrics": {"processing_time": processing_time, "provider": "Agno-General"} | |
| } | |
| except Exception as e: | |
| return {**state, "final_answer": f"General processing error: {str(e)}"} | |
| def route_agent(state: EnhancedAgentState) -> str: | |
| """Route to appropriate processing node""" | |
| agent_type = state.get("agent_type", "agno_general") | |
| return agent_type | |
| # Build the graph | |
| builder = StateGraph(EnhancedAgentState) | |
| builder.add_node("router", router_node) | |
| builder.add_node("langgraph_math", langgraph_math_node) | |
| builder.add_node("agno_research", agno_research_node) | |
| builder.add_node("langgraph_retrieval", langgraph_retrieval_node) | |
| builder.add_node("agno_general", agno_general_node) | |
| builder.set_entry_point("router") | |
| builder.add_conditional_edges( | |
| "router", | |
| route_agent, | |
| { | |
| "langgraph_math": "langgraph_math", | |
| "agno_research": "agno_research", | |
| "langgraph_retrieval": "langgraph_retrieval", | |
| "agno_general": "agno_general" | |
| } | |
| ) | |
| # All nodes end the workflow | |
| for node in ["langgraph_math", "agno_research", "langgraph_retrieval", "agno_general"]: | |
| builder.add_edge(node, "END") | |
| memory = MemorySaver() | |
| return builder.compile(checkpointer=memory) | |
| def process_query(self, query: str) -> Dict[str, Any]: | |
| """Process query with performance optimization""" | |
| start_time = time.time() | |
| initial_state = { | |
| "messages": [HumanMessage(content=query)], | |
| "query": query, | |
| "agent_type": "", | |
| "final_answer": "", | |
| "performance_metrics": {}, | |
| "agno_response": "" | |
| } | |
| config = {"configurable": {"thread_id": f"hybrid_{hash(query)}"}} | |
| try: | |
| result = self.graph.invoke(initial_state, config) | |
| total_time = time.time() - start_time | |
| return { | |
| "answer": result.get("final_answer", "No response generated"), | |
| "performance_metrics": { | |
| **result.get("performance_metrics", {}), | |
| "total_time": total_time | |
| }, | |
| "provider_used": result.get("performance_metrics", {}).get("provider", "Unknown") | |
| } | |
| except Exception as e: | |
| return { | |
| "answer": f"Error: {str(e)}", | |
| "performance_metrics": {"total_time": time.time() - start_time, "error": True}, | |
| "provider_used": "Error" | |
| } | |
| # Build graph function for compatibility | |
| def build_graph(provider: str = "hybrid"): | |
| """Build the hybrid graph system""" | |
| if provider == "hybrid": | |
| system = HybridLangGraphAgnoSystem() | |
| return system.graph | |
| else: | |
| # Fallback to original implementation | |
| return build_original_graph(provider) | |
| def build_original_graph(provider: str): | |
| """Original graph implementation for fallback""" | |
| # Implementation of original graph... | |
| pass | |
| # Main execution | |
| if __name__ == "__main__": | |
| # Test the hybrid system | |
| hybrid_system = HybridLangGraphAgnoSystem() | |
| test_queries = [ | |
| "What is 25 * 4 + 10?", # Should route to LangGraph math | |
| "Explain the economic impacts of AI automation", # Should route to Agno research | |
| "What are the names of US presidents who were assassinated?", # Should route to LangGraph retrieval | |
| "Compare quantum computing with classical computing" # Should route to Agno general | |
| ] | |
| for query in test_queries: | |
| print(f"\nQuery: {query}") | |
| result = hybrid_system.process_query(query) | |
| print(f"Answer: {result['answer']}") | |
| print(f"Provider: {result['provider_used']}") | |
| print(f"Processing Time: {result['performance_metrics'].get('total_time', 0):.2f}s") | |
| print("-" * 80) | |