Spaces:
Sleeping
Sleeping
| """ | |
| Open-Source Multi-LLM Agent System | |
| Uses only free and open-source models - no paid APIs required | |
| """ | |
| import os | |
| import time | |
| import random | |
| import operator | |
| from typing import List, Dict, Any, TypedDict, Annotated, Optional | |
| from dotenv import load_dotenv | |
| # Core LangChain imports | |
| from langchain_core.tools import tool | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| # Open-source model integrations | |
| from langchain_groq import ChatGroq # Free tier available | |
| from langchain_community.llms import Ollama | |
| from langchain_community.chat_models import ChatOllama | |
| # Hugging Face integration for open-source models | |
| try: | |
| from langchain_huggingface import HuggingFacePipeline | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| HF_AVAILABLE = True | |
| except ImportError: | |
| HF_AVAILABLE = False | |
| # Vector database imports | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import json | |
| load_dotenv() | |
| # Enhanced system prompt | |
| ENHANCED_SYSTEM_PROMPT = ( | |
| "You are a helpful assistant tasked with answering questions using available tools. " | |
| "You must provide accurate, comprehensive answers based on available information. " | |
| "When answering questions, follow these guidelines:\n" | |
| "1. Use available tools to gather information when needed\n" | |
| "2. Provide precise, factual answers\n" | |
| "3. For numbers: don't use commas or units unless specified\n" | |
| "4. For strings: don't use articles or abbreviations, write digits in plain text\n" | |
| "5. For lists: apply above rules based on element type\n" | |
| "6. Always end with 'FINAL ANSWER: [YOUR ANSWER]'\n" | |
| "7. Be concise but thorough in your reasoning\n" | |
| "8. If you cannot find the answer, state that clearly" | |
| ) | |
| # ---- Tool Definitions ---- | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two integers and return the product.""" | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two integers and return the sum.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract the second integer from the first and return the difference.""" | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| """Divide the first integer by the second and return the quotient.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Return the remainder when dividing the first integer by the second.""" | |
| return a % b | |
| def optimized_web_search(query: str) -> str: | |
| """Perform web search using free DuckDuckGo (fallback if Tavily not available).""" | |
| try: | |
| # Try Tavily first (free tier) | |
| if os.getenv("TAVILY_API_KEY"): | |
| time.sleep(random.uniform(0.7, 1.5)) | |
| search_tool = TavilySearchResults(max_results=3) | |
| docs = search_tool.invoke({"query": query}) | |
| return "\n\n---\n\n".join( | |
| f"<Doc url='{d.get('url','')}'>{d.get('content','')[:800]}</Doc>" | |
| for d in docs | |
| ) | |
| else: | |
| # Fallback to DuckDuckGo (completely free) | |
| try: | |
| from duckduckgo_search import DDGS | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=3)) | |
| return "\n\n---\n\n".join( | |
| f"<Doc url='{r.get('href','')}'>{r.get('body','')[:800]}</Doc>" | |
| for r in results | |
| ) | |
| except ImportError: | |
| return "Web search not available - install duckduckgo-search for free web search" | |
| except Exception as e: | |
| return f"Web search failed: {e}" | |
| def optimized_wiki_search(query: str) -> str: | |
| """Perform Wikipedia search - completely free.""" | |
| try: | |
| time.sleep(random.uniform(0.3, 1)) | |
| docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| return "\n\n---\n\n".join( | |
| f"<Doc src='{d.metadata.get('source','Wikipedia')}'>{d.page_content[:1000]}</Doc>" | |
| for d in docs | |
| ) | |
| except Exception as e: | |
| return f"Wikipedia search failed: {e}" | |
| # ---- Open-Source Model Manager ---- | |
| class OpenSourceModelManager: | |
| """Manages only open-source and free models""" | |
| def __init__(self): | |
| self.available_models = {} | |
| self._initialize_models() | |
| def _initialize_models(self): | |
| """Initialize only open-source models""" | |
| # 1. Groq (Free tier with open-source models) | |
| if os.getenv("GROQ_API_KEY"): | |
| try: | |
| self.available_models['groq_llama3_70b'] = ChatGroq( | |
| model="llama3-70b-8192", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| self.available_models['groq_llama3_8b'] = ChatGroq( | |
| model="llama3-8b-8192", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| self.available_models['groq_mixtral'] = ChatGroq( | |
| model="mixtral-8x7b-32768", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| self.available_models['groq_gemma'] = ChatGroq( | |
| model="gemma-7b-it", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| print("Groq models initialized (free tier)") | |
| except Exception as e: | |
| print(f"Groq models not available: {e}") | |
| # 2. Ollama (Completely free local models) | |
| try: | |
| # Test if Ollama is running | |
| test_model = ChatOllama(model="llama3", base_url="http://localhost:11434") | |
| # If no error, add Ollama models | |
| self.available_models['ollama_llama3'] = ChatOllama(model="llama3") | |
| self.available_models['ollama_llama3_70b'] = ChatOllama(model="llama3:70b") | |
| self.available_models['ollama_mistral'] = ChatOllama(model="mistral") | |
| self.available_models['ollama_phi3'] = ChatOllama(model="phi3") | |
| self.available_models['ollama_codellama'] = ChatOllama(model="codellama") | |
| self.available_models['ollama_gemma'] = ChatOllama(model="gemma") | |
| self.available_models['ollama_qwen'] = ChatOllama(model="qwen") | |
| print("Ollama models initialized (local)") | |
| except Exception as e: | |
| print(f"Ollama not available: {e}") | |
| # 3. Hugging Face Transformers (Completely free) | |
| if HF_AVAILABLE: | |
| try: | |
| # Small models that can run on CPU | |
| self.available_models['hf_gpt2'] = self._create_hf_model("gpt2") | |
| self.available_models['hf_distilgpt2'] = self._create_hf_model("distilgpt2") | |
| print("Hugging Face models initialized (local)") | |
| except Exception as e: | |
| print(f"Hugging Face models not available: {e}") | |
| print(f"Total available open-source models: {len(self.available_models)}") | |
| def _create_hf_model(self, model_name: str): | |
| """Create Hugging Face pipeline model""" | |
| try: | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_name, | |
| max_length=512, | |
| do_sample=True, | |
| temperature=0.7, | |
| pad_token_id=50256 | |
| ) | |
| return HuggingFacePipeline(pipeline=pipe) | |
| except Exception as e: | |
| print(f"Failed to create HF model {model_name}: {e}") | |
| return None | |
| def get_model(self, model_name: str): | |
| """Get a specific model by name""" | |
| return self.available_models.get(model_name) | |
| def list_available_models(self) -> List[str]: | |
| """List all available model names""" | |
| return list(self.available_models.keys()) | |
| def get_best_model_for_task(self, task_type: str): | |
| """Get the best available model for a specific task type""" | |
| if task_type == "reasoning": | |
| # Prefer larger models for reasoning | |
| for model_name in ['groq_llama3_70b', 'ollama_llama3_70b', 'groq_mixtral', 'ollama_llama3']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| elif task_type == "coding": | |
| # Prefer code-specialized models | |
| for model_name in ['ollama_codellama', 'groq_llama3_70b', 'ollama_llama3']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| elif task_type == "fast": | |
| # Prefer fast, smaller models | |
| for model_name in ['groq_llama3_8b', 'groq_gemma', 'ollama_phi3', 'hf_distilgpt2']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| # Default fallback to first available | |
| if self.available_models: | |
| return list(self.available_models.values())[0] | |
| return None | |
| # ---- Enhanced Agent State ---- | |
| class EnhancedAgentState(TypedDict): | |
| """State structure for the enhanced multi-LLM agent system.""" | |
| messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
| query: str | |
| agent_type: str | |
| final_answer: str | |
| perf: Dict[str, Any] | |
| tools_used: List[str] | |
| reasoning: str | |
| model_used: str | |
| # ---- Open-Source Multi-LLM System ---- | |
| class OpenSourceMultiLLMSystem: | |
| """ | |
| Multi-LLM system using only open-source and free models | |
| """ | |
| def __init__(self): | |
| self.model_manager = OpenSourceModelManager() | |
| self.tools = [ | |
| multiply, add, subtract, divide, modulus, | |
| optimized_web_search, optimized_wiki_search | |
| ] | |
| self.graph = self._build_graph() | |
| def _build_graph(self) -> StateGraph: | |
| """Build the LangGraph state machine with open-source models.""" | |
| def router(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Route queries to appropriate model based on complexity and content analysis.""" | |
| q = st["query"].lower() | |
| # Enhanced routing logic | |
| if any(keyword in q for keyword in ["calculate", "compute", "math", "multiply", "add", "subtract", "divide"]): | |
| model_type = "reasoning" | |
| agent_type = "math" | |
| elif any(keyword in q for keyword in ["search", "find", "lookup", "wikipedia", "information about"]): | |
| model_type = "fast" | |
| agent_type = "search_enhanced" | |
| elif any(keyword in q for keyword in ["code", "programming", "function", "algorithm"]): | |
| model_type = "coding" | |
| agent_type = "coding" | |
| elif len(q.split()) > 20: # Complex queries | |
| model_type = "reasoning" | |
| agent_type = "complex" | |
| else: | |
| model_type = "fast" | |
| agent_type = "simple" | |
| # Get the best model for this task | |
| selected_model = self.model_manager.get_best_model_for_task(model_type) | |
| model_name = "unknown" | |
| for name, model in self.model_manager.available_models.items(): | |
| if model == selected_model: | |
| model_name = name | |
| break | |
| return {**st, "agent_type": agent_type, "tools_used": [], "reasoning": "", "model_used": model_name} | |
| def math_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process mathematical queries.""" | |
| return self._process_with_model(st, "reasoning", "Mathematical calculation using open-source model") | |
| def search_enhanced_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process query with search enhancement.""" | |
| t0 = time.time() | |
| tools_used = [] | |
| try: | |
| # Determine search strategy | |
| query = st["query"] | |
| search_results = "" | |
| if any(keyword in query.lower() for keyword in ["wikipedia", "wiki"]): | |
| search_results = optimized_wiki_search.invoke({"query": query}) | |
| tools_used.append("wikipedia_search") | |
| else: | |
| search_results = optimized_web_search.invoke({"query": query}) | |
| tools_used.append("web_search") | |
| enhanced_query = f""" | |
| Original Question: {query} | |
| Search Results: | |
| {search_results} | |
| Based on the search results above, provide a direct answer to the original question. | |
| """ | |
| # Use fast model for search-enhanced queries | |
| model = self.model_manager.get_best_model_for_task("fast") | |
| if model: | |
| sys = SystemMessage(content=ENHANCED_SYSTEM_PROMPT) | |
| res = model.invoke([sys, HumanMessage(content=enhanced_query)]) | |
| answer = res.content.strip() if hasattr(res, 'content') else str(res).strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "tools_used": tools_used, | |
| "reasoning": "Used search enhancement with open-source model", | |
| "perf": {"time": time.time() - t0, "prov": "Search-Enhanced"}} | |
| else: | |
| return {**st, "final_answer": "No models available", "perf": {"error": "No models"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| def coding_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process coding-related queries.""" | |
| return self._process_with_model(st, "coding", "Code generation using open-source model") | |
| def complex_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process complex queries.""" | |
| return self._process_with_model(st, "reasoning", "Complex reasoning using open-source model") | |
| def simple_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process simple queries.""" | |
| return self._process_with_model(st, "fast", "Simple query using fast open-source model") | |
| # Build graph | |
| g = StateGraph(EnhancedAgentState) | |
| g.add_node("router", router) | |
| g.add_node("math", math_node) | |
| g.add_node("search_enhanced", search_enhanced_node) | |
| g.add_node("coding", coding_node) | |
| g.add_node("complex", complex_node) | |
| g.add_node("simple", simple_node) | |
| g.set_entry_point("router") | |
| g.add_conditional_edges("router", lambda s: s["agent_type"], { | |
| "math": "math", | |
| "search_enhanced": "search_enhanced", | |
| "coding": "coding", | |
| "complex": "complex", | |
| "simple": "simple" | |
| }) | |
| for node in ["math", "search_enhanced", "coding", "complex", "simple"]: | |
| g.add_edge(node, END) | |
| return g.compile(checkpointer=MemorySaver()) | |
| def _process_with_model(self, st: EnhancedAgentState, model_type: str, reasoning: str) -> EnhancedAgentState: | |
| """Process query with specified model type""" | |
| t0 = time.time() | |
| try: | |
| model = self.model_manager.get_best_model_for_task(model_type) | |
| if not model: | |
| return {**st, "final_answer": "No suitable model available", "perf": {"error": "No model"}} | |
| enhanced_query = f""" | |
| Question: {st["query"]} | |
| Please provide a direct, accurate answer to this question. | |
| """ | |
| sys = SystemMessage(content=ENHANCED_SYSTEM_PROMPT) | |
| res = model.invoke([sys, HumanMessage(content=enhanced_query)]) | |
| answer = res.content.strip() if hasattr(res, 'content') else str(res).strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "reasoning": reasoning, | |
| "perf": {"time": time.time() - t0, "prov": f"OpenSource-{model_type}"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| def process_query(self, q: str) -> str: | |
| """Process a query through the open-source multi-LLM system.""" | |
| state = { | |
| "messages": [HumanMessage(content=q)], | |
| "query": q, | |
| "agent_type": "", | |
| "final_answer": "", | |
| "perf": {}, | |
| "tools_used": [], | |
| "reasoning": "", | |
| "model_used": "" | |
| } | |
| cfg = {"configurable": {"thread_id": f"opensource_qa_{hash(q)}"}} | |
| try: | |
| out = self.graph.invoke(state, cfg) | |
| answer = out.get("final_answer", "").strip() | |
| # Ensure we don't return the question as the answer | |
| if answer == q or answer.startswith(q): | |
| return "Information not available" | |
| return answer if answer else "No answer generated" | |
| except Exception as e: | |
| return f"Error processing query: {e}" | |
| def get_system_info(self) -> Dict[str, Any]: | |
| """Get information about available open-source models""" | |
| return { | |
| "available_models": self.model_manager.list_available_models(), | |
| "total_models": len(self.model_manager.available_models), | |
| "model_types": { | |
| "groq_free_tier": [m for m in self.model_manager.list_available_models() if m.startswith("groq_")], | |
| "ollama_local": [m for m in self.model_manager.list_available_models() if m.startswith("ollama_")], | |
| "huggingface_local": [m for m in self.model_manager.list_available_models() if m.startswith("hf_")] | |
| } | |
| } | |
| # ---- Build Graph Function (for compatibility) ---- | |
| def build_graph(provider: str = "opensource"): | |
| """Build graph using only open-source models""" | |
| return OpenSourceMultiLLMSystem().graph | |
| # ---- Main execution ---- | |
| if __name__ == "__main__": | |
| # Initialize the open-source system | |
| system = OpenSourceMultiLLMSystem() | |
| # Print system information | |
| info = system.get_system_info() | |
| print("Open-Source System Information:") | |
| print(f"Total Models Available: {info['total_models']}") | |
| for category, models in info['model_types'].items(): | |
| if models: | |
| print(f" {category}: {models}") | |
| # Test queries | |
| test_questions = [ | |
| "What is 25 multiplied by 17?", | |
| "Find information about Mercedes Sosa albums between 2000-2009", | |
| "Write a simple Python function to calculate factorial", | |
| "Explain quantum computing in simple terms", | |
| "What is the capital of France?" | |
| ] | |
| print("\n" + "="*60) | |
| print("Testing Open-Source Multi-LLM System") | |
| print("="*60) | |
| for i, question in enumerate(test_questions, 1): | |
| print(f"\nQuestion {i}: {question}") | |
| print("-" * 50) | |
| answer = system.process_query(question) | |
| print(f"Answer: {answer}") | |