from typing import Dict, List, Optional from pydantic import Field, model_validator from app.agent.browser import BrowserContextHelper from app.agent.toolcall import ToolCallAgent from app.config import config from app.logger import logger from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT from app.tool import Terminate, ToolCollection from app.tool.ask_human import AskHuman from app.tool.browser_use_tool import BrowserUseTool from app.tool.mcp import MCPClients, MCPClientTool from app.tool.python_execute import PythonExecute from app.tool.str_replace_editor import StrReplaceEditor class Manus(ToolCallAgent): """A versatile general-purpose agent with support for both local and MCP tools.""" name: str = "Manus" description: str = "A versatile agent that can solve various tasks using multiple tools including MCP-based tools" system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root) next_step_prompt: str = NEXT_STEP_PROMPT max_observe: int = 10000 max_steps: int = 20 # MCP clients for remote tool access mcp_clients: MCPClients = Field(default_factory=MCPClients) # Add general-purpose tools to the tool collection available_tools: ToolCollection = Field( default_factory=lambda: ToolCollection( PythonExecute(), BrowserUseTool(), StrReplaceEditor(), AskHuman(), Terminate(), ) ) special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name]) browser_context_helper: Optional[BrowserContextHelper] = None # Track connected MCP servers connected_servers: Dict[str, str] = Field( default_factory=dict ) # server_id -> url/command _initialized: bool = False @model_validator(mode="after") def initialize_helper(self) -> "Manus": """Initialize basic components synchronously.""" self.browser_context_helper = BrowserContextHelper(self) return self @classmethod async def create(cls, **kwargs) -> "Manus": """Factory method to create and properly initialize a Manus instance.""" instance = cls(**kwargs) await instance.initialize_mcp_servers() instance._initialized = True return instance async def initialize_mcp_servers(self) -> None: """Initialize connections to configured MCP servers.""" for server_id, server_config in config.mcp_config.servers.items(): try: if server_config.type == "sse": if server_config.url: await self.connect_mcp_server(server_config.url, server_id) logger.info( f"Connected to MCP server {server_id} at {server_config.url}" ) elif server_config.type == "stdio": if server_config.command: await self.connect_mcp_server( server_config.command, server_id, use_stdio=True, stdio_args=server_config.args, ) logger.info( f"Connected to MCP server {server_id} using command {server_config.command}" ) except Exception as e: logger.error(f"Failed to connect to MCP server {server_id}: {e}") async def connect_mcp_server( self, server_url: str, server_id: str = "", use_stdio: bool = False, stdio_args: List[str] = None, ) -> None: """Connect to an MCP server and add its tools.""" if use_stdio: await self.mcp_clients.connect_stdio( server_url, stdio_args or [], server_id ) self.connected_servers[server_id or server_url] = server_url else: await self.mcp_clients.connect_sse(server_url, server_id) self.connected_servers[server_id or server_url] = server_url # Update available tools with only the new tools from this server new_tools = [ tool for tool in self.mcp_clients.tools if tool.server_id == server_id ] self.available_tools.add_tools(*new_tools) async def disconnect_mcp_server(self, server_id: str = "") -> None: """Disconnect from an MCP server and remove its tools.""" await self.mcp_clients.disconnect(server_id) if server_id: self.connected_servers.pop(server_id, None) else: self.connected_servers.clear() # Rebuild available tools without the disconnected server's tools base_tools = [ tool for tool in self.available_tools.tools if not isinstance(tool, MCPClientTool) ] self.available_tools = ToolCollection(*base_tools) self.available_tools.add_tools(*self.mcp_clients.tools) async def cleanup(self): """Clean up Manus agent resources.""" if self.browser_context_helper: await self.browser_context_helper.cleanup_browser() # Disconnect from all MCP servers only if we were initialized if self._initialized: await self.disconnect_mcp_server() self._initialized = False async def think(self) -> bool: """Process current state and decide next actions with appropriate context.""" if not self._initialized: await self.initialize_mcp_servers() self._initialized = True original_prompt = self.next_step_prompt recent_messages = self.memory.messages[-3:] if self.memory.messages else [] browser_in_use = any( tc.function.name == BrowserUseTool().name for msg in recent_messages if msg.tool_calls for tc in msg.tool_calls ) if browser_in_use: self.next_step_prompt = ( await self.browser_context_helper.format_next_step_prompt() ) result = await super().think() # Restore original prompt self.next_step_prompt = original_prompt return result