Spaces:
Runtime error
Runtime error
| """ | |
| Advanced Agentic System Interface | |
| ------------------------------- | |
| Provides a chat interface to interact with the autonomous agent teams: | |
| - Team A: Coders (App/Software Developers) | |
| - Team B: Business (Entrepreneurs) | |
| - Team C: Research (Deep Online Research) | |
| - Team D: Crypto & Sports Trading | |
| """ | |
| import os | |
| import socket | |
| import gradio as gr | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uvicorn | |
| from typing import Dict, Any, List, Tuple, Optional | |
| import logging | |
| from pathlib import Path | |
| import asyncio | |
| from datetime import datetime | |
| import json | |
| import requests | |
| from requests.adapters import HTTPAdapter, Retry | |
| from dataclasses import dataclass | |
| from agentic_system import AgenticSystem | |
| from orchestrator import AgentOrchestrator | |
| from team_management import TeamManager, TeamType | |
| from reasoning import ( | |
| UnifiedReasoningEngine, | |
| StrategyType, | |
| UnifiedResult | |
| ) | |
| from api.openai_compatible import OpenAICompatibleAPI | |
| from api.venture_api import VentureAPI | |
| from api.groq_api import GroqAPI | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def setup_requests_session(): | |
| """Set up requests session with retries.""" | |
| session = requests.Session() | |
| retries = Retry( | |
| total=5, | |
| backoff_factor=0.1, | |
| status_forcelist=[500, 502, 503, 504] | |
| ) | |
| session.mount('http://', HTTPAdapter(max_retries=retries)) | |
| session.mount('https://', HTTPAdapter(max_retries=retries)) | |
| return session | |
| def check_network(): | |
| """Check network connectivity.""" | |
| try: | |
| # Try DNS resolution first | |
| socket.gethostbyname('huggingface.co') | |
| return True | |
| except socket.gaierror: | |
| logger.warning("DNS resolution failed") | |
| try: | |
| # Try HTTP request as backup | |
| session = setup_requests_session() | |
| response = session.get('https://huggingface.co', timeout=5) | |
| return response.status_code == 200 | |
| except (requests.RequestException, socket.gaierror) as e: | |
| logger.warning(f"Network connectivity check failed: {e}") | |
| return False | |
| class ChatInterface: | |
| """Chat interface for interacting with the agentic system.""" | |
| def __init__(self): | |
| """Initialize the chat interface.""" | |
| # Check network connectivity | |
| if not check_network(): | |
| raise ConnectionError("No network connectivity. Please check your connection.") | |
| # Initialize core components with consistent configuration | |
| config = { | |
| "min_confidence": 0.7, | |
| "parallel_threshold": 3, | |
| "learning_rate": 0.1, | |
| "strategy_weights": { | |
| "LOCAL_LLM": 0.8, | |
| "CHAIN_OF_THOUGHT": 0.6, | |
| "TREE_OF_THOUGHTS": 0.5, | |
| "META_LEARNING": 0.4 | |
| } | |
| } | |
| # Initialize chat state | |
| self.chat_history = [] | |
| self.active_objectives = {} | |
| # Initialize components | |
| self.orchestrator = AgentOrchestrator(config) | |
| self.team_manager = TeamManager(self.orchestrator) | |
| self.reasoning_engine = UnifiedReasoningEngine() | |
| self.groq_api = GroqAPI() | |
| # Set up the agentic system | |
| self.agentic_system = AgenticSystem(config) | |
| # Initialize FastAPI app | |
| self.app = FastAPI() | |
| self.setup_cors() | |
| self.setup_routes() | |
| # Create Gradio interface | |
| self.interface = self.create_interface() | |
| # Launch background tasks | |
| self.background_tasks = [] | |
| self.launch_background_tasks() | |
| async def initialize(self): | |
| """Initialize async components.""" | |
| await self.team_manager.initialize_team_agents() | |
| def launch_background_tasks(self): | |
| """Launch background tasks.""" | |
| loop = asyncio.get_event_loop() | |
| self.background_tasks.append( | |
| loop.create_task(self.initialize()) | |
| ) | |
| def setup_cors(self): | |
| """Set up CORS middleware.""" | |
| self.app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def setup_routes(self): | |
| """Set up API routes.""" | |
| # Include OpenAI-compatible routes | |
| openai_api = OpenAICompatibleAPI(self.reasoning_engine) | |
| self.app.include_router(openai_api.router, tags=["OpenAI Compatible"]) | |
| # Original API routes | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return { | |
| "status": "healthy", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "openai_compatible": "/v1/chat/completions", | |
| "venture": "/api/venture", | |
| "ui": "/" | |
| } | |
| } | |
| async def reason(query: str, context: Optional[Dict[str, Any]] = None): | |
| """Reasoning endpoint.""" | |
| try: | |
| result = await self.reasoning_engine.reason(query, context or {}) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Reasoning error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def analyze_venture( | |
| venture_type: str, | |
| description: str, | |
| metrics: Optional[Dict[str, Any]] = None | |
| ): | |
| """Venture analysis endpoint.""" | |
| try: | |
| result = await VentureAPI(self.reasoning_engine).analyze_venture( | |
| venture_type=venture_type, | |
| description=description, | |
| metrics=metrics or {} | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_venture_types(): | |
| """Get available venture types.""" | |
| return VentureAPI(self.reasoning_engine).get_venture_types() | |
| def create_interface(self) -> gr.Blocks: | |
| """Create the Gradio interface.""" | |
| with gr.Blocks( | |
| title="Advanced Agentic System", | |
| css=None, # Disable custom CSS | |
| theme=gr.themes.Soft(), # Use built-in theme | |
| ) as interface: | |
| gr.Markdown(""" | |
| # 🤖 Advanced Agentic System Chat Interface | |
| Welcome to our AI-powered autonomous agent teams! Each team specializes in different domains: | |
| - 💻 **Team A: Coders** - Expert software developers and architects | |
| - 💼 **Team B: Business** - Strategic entrepreneurs and analysts | |
| - 🔍 **Team C: Research** - Deep online research specialists | |
| - 📈 **Team D: Trading** - Crypto & sports trading experts | |
| You can: | |
| 1. Ask questions about any domain | |
| 2. Create new objectives for teams | |
| 3. Check status of ongoing work | |
| 4. Get insights and recommendations | |
| --- | |
| """) | |
| chatbot = gr.Chatbot( | |
| value=[], # Initialize with empty list | |
| type="messages", # Use OpenAI-style message format | |
| height=500, | |
| show_label=False, | |
| render_markdown=True, | |
| avatar_images=None, # Disable avatars to prevent asset loading issues | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| show_label=False, | |
| placeholder="Chat with the Agentic System...", | |
| container=False, | |
| autofocus=True, | |
| ) | |
| submit = gr.Button("Send 🚀") | |
| with gr.Row(): | |
| clear = gr.ClearButton([msg, chatbot], value="Clear") | |
| retry = gr.Button("Retry") | |
| async def respond(message, history): | |
| """Handle chat responses with proper formatting.""" | |
| try: | |
| # Convert history to the format expected by process_message | |
| history_list = [[msg["content"] for msg in exchange] for exchange in history] if history else [] | |
| response = await self.process_message(message, history_list) | |
| # Format response for markdown rendering | |
| formatted_response = response.replace('```', '\n```\n') | |
| # Update history with the new message format | |
| return "", history + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": formatted_response} | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error in chat response: {str(e)}") | |
| error_msg = "I apologize, but I encountered an error. Please try again." | |
| return "", history + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": error_msg} | |
| ] | |
| async def retry_last(history): | |
| """Retry the last message with proper formatting.""" | |
| if not history: | |
| return history | |
| last_user_msg = history[-2]["content"] # Get the last user message | |
| history = history[:-2] # Remove last exchange | |
| return await respond(last_user_msg, history) | |
| # Submit handlers with loading states | |
| submit_event = msg.submit( | |
| fn=respond, | |
| inputs=[msg, chatbot], | |
| outputs=[msg, chatbot], | |
| api_name=False | |
| ).then( | |
| lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), | |
| None, | |
| [msg, submit] | |
| ) | |
| # Click handlers with loading states | |
| click_event = submit.click( | |
| fn=respond, | |
| inputs=[msg, chatbot], | |
| outputs=[msg, chatbot], | |
| api_name=False | |
| ).then( | |
| lambda: (gr.update(value="", interactive=True), gr.update(interactive=True)), | |
| None, | |
| [msg, submit] | |
| ) | |
| # Retry handler | |
| retry.click( | |
| fn=retry_last, | |
| inputs=[chatbot], | |
| outputs=[chatbot], | |
| api_name=False | |
| ) | |
| # Auto-focus and dynamic submit button state | |
| msg.change( | |
| lambda x: ( | |
| gr.update(interactive=bool(x.strip())), | |
| gr.update(interactive=bool(x.strip()), variant="primary" if x.strip() else "secondary") | |
| ), | |
| [msg], | |
| [msg, submit] | |
| ) | |
| # Example queries with emojis | |
| gr.Examples( | |
| examples=[ | |
| "💻 Can Team A help me build a web application?", | |
| "💼 Create a new objective: Analyze market trends for AI startups", | |
| "🔍 Research the latest developments in quantum computing", | |
| "📈 What's the current status of all teams?" | |
| ], | |
| inputs=msg, | |
| label="Example Queries", | |
| examples_per_page=4 | |
| ) | |
| return interface | |
| async def process_message( | |
| self, | |
| message: str, | |
| history: List[List[str]] = None | |
| ) -> str: | |
| """Process a user message.""" | |
| try: | |
| # Initialize history if None | |
| if history is None: | |
| history = [] | |
| # Update chat history | |
| self.chat_history = history | |
| # Analyze message intent | |
| intent = await self._analyze_intent(message) | |
| # Process based on intent | |
| if intent.get('type') == 'objective': | |
| response = await self._handle_objective(message, intent) | |
| elif intent.get('type') == 'status': | |
| response = await self._get_status() | |
| elif intent.get('type') == 'chat': | |
| response = await self._handle_chat(message) | |
| else: | |
| response = await self._handle_chat(message) # Default to chat handler | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error processing message: {str(e)}") | |
| return "I encountered an error processing your message. Please try again." | |
| async def _analyze_intent(self, message: str) -> Dict[str, Any]: | |
| """Analyze user message intent with error handling.""" | |
| try: | |
| # Use reasoning engine to analyze intent | |
| result = await self.reasoning_engine.reason( | |
| query=message, | |
| context={ | |
| "chat_history": self.chat_history, | |
| "active_objectives": self.active_objectives | |
| } | |
| ) | |
| # Handle UnifiedResult object | |
| if isinstance(result, UnifiedResult): | |
| return { | |
| "type": "chat", | |
| "confidence": getattr(result, 'confidence', 0.5), | |
| "metadata": getattr(result, 'metadata', {}) | |
| } | |
| elif isinstance(result, dict): | |
| return result | |
| else: | |
| return {"type": "chat", "confidence": 0.5} | |
| except Exception as e: | |
| logger.error(f"Error analyzing intent: {str(e)}") | |
| return {"type": "chat", "error": str(e)} | |
| async def _handle_objective(self, message: str, intent: Dict[str, Any]) -> str: | |
| """Handle objective creation and management.""" | |
| try: | |
| # Extract objective details | |
| objective = intent.get('objective', {}) | |
| # Create objective | |
| objective_id = await self.team_manager.create_cross_team_objective( | |
| objective=objective.get('description', message), | |
| required_teams=objective.get('teams', []), | |
| priority=objective.get('priority', 'MEDIUM') | |
| ) | |
| # Monitor progress | |
| status = await self.team_manager.monitor_objective_progress(objective_id) | |
| return f"Created objective {objective_id}. Current status: {status}" | |
| except Exception as e: | |
| logger.error(f"Error handling objective: {str(e)}") | |
| return "Failed to create objective. Please try again." | |
| async def _handle_chat(self, message: str) -> str: | |
| """Handle general chat interactions with error recovery.""" | |
| try: | |
| # First try using the reasoning engine | |
| try: | |
| result = await self.reasoning_engine.reason( | |
| query=message, | |
| context={ | |
| "chat_history": self.chat_history, | |
| "active_objectives": self.active_objectives, | |
| "groq_api": self.groq_api | |
| } | |
| ) | |
| # Handle UnifiedResult object | |
| if isinstance(result, UnifiedResult): | |
| if not result.success: | |
| # If reasoning engine fails, fallback to Groq API | |
| groq_result = await self.groq_api.predict(message) | |
| if groq_result["success"]: | |
| return groq_result["answer"] | |
| else: | |
| return "I encountered an error. Please try rephrasing your question." | |
| return result.answer if hasattr(result, 'answer') else str(result) | |
| elif isinstance(result, dict): | |
| return result.get('response', str(result)) | |
| else: | |
| return str(result) | |
| except Exception as reasoning_error: | |
| logger.error(f"Reasoning engine error: {str(reasoning_error)}") | |
| # Fallback to Groq API | |
| groq_result = await self.groq_api.predict(message) | |
| if groq_result["success"]: | |
| return groq_result["answer"] | |
| else: | |
| raise Exception(f"Both reasoning engine and Groq API failed: {groq_result.get('error')}") | |
| except Exception as e: | |
| logger.error(f"Error in chat response: {str(e)}") | |
| return "I encountered an error generating a response. Please try again." | |
| async def _get_status(self) -> str: | |
| """Get system status information.""" | |
| try: | |
| # Get team status | |
| team_status = await self.team_manager.get_team_status() | |
| # Get objective status | |
| objective_status = await self.team_manager.get_objective_status() | |
| # Format status information | |
| status = "Current System Status:\n\n" | |
| # Add team information | |
| status += "Teams:\n" | |
| for team, info in team_status.items(): | |
| status += f"- {team}: {info['status']}\n" | |
| status += f" Active Projects: {info['active_projects']}\n" | |
| status += f" Success Rate: {info['success_rate']}%\n\n" | |
| # Add objective information | |
| status += "\nActive Objectives:\n" | |
| for obj, info in objective_status.items(): | |
| status += f"- {obj}: {info['status']}\n" | |
| status += f" Progress: {info['progress']}%\n" | |
| status += f" Teams: {', '.join(info['teams'])}\n\n" | |
| return status | |
| except Exception as e: | |
| logger.error(f"Error formatting status: {str(e)}") | |
| return "Error formatting status information." | |
| def create_chat_interface() -> gr.Blocks: | |
| """Create Gradio chat interface.""" | |
| chat = ChatInterface() | |
| return chat.interface | |
| # Initialize FastAPI | |
| app = FastAPI( | |
| title="Advanced Agentic System API", | |
| description="API for interacting with the autonomous agent teams", | |
| version="1.0.0" | |
| ) | |
| # Create Gradio interface | |
| interface = create_chat_interface() | |
| # Mount Gradio app to FastAPI | |
| app = gr.mount_gradio_app(app, interface, path="/") | |
| if __name__ == "__main__": | |
| chat_interface = ChatInterface() | |
| interface = chat_interface.create_interface() | |
| interface.queue() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| debug=True, | |
| enable_queue=True, | |
| show_error=True, | |
| favicon_path=None, # Disable favicon to prevent 404 | |
| show_api=False, # Disable API docs to reduce asset loading | |
| reload=False, # Disable auto-reload to use workers properly | |
| max_threads=40, | |
| ) | |