Spaces:
Paused
Paused
File size: 6,263 Bytes
2f28b62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
from typing import Dict, List, Optional
from pydantic import Field, model_validator
from app.agent.browser import BrowserContextHelper
from app.agent.toolcall import ToolCallAgent
from app.config import config
from app.logger import logger
from app.prompt.manus import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.tool import Terminate, ToolCollection
from app.tool.ask_human import AskHuman
from app.tool.browser_use_tool import BrowserUseTool
from app.tool.mcp import MCPClients, MCPClientTool
from app.tool.python_execute import PythonExecute
from app.tool.str_replace_editor import StrReplaceEditor
class Manus(ToolCallAgent):
"""A versatile general-purpose agent with support for both local and MCP tools."""
name: str = "Manus"
description: str = "A versatile agent that can solve various tasks using multiple tools including MCP-based tools"
system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root)
next_step_prompt: str = NEXT_STEP_PROMPT
max_observe: int = 10000
max_steps: int = 20
# MCP clients for remote tool access
mcp_clients: MCPClients = Field(default_factory=MCPClients)
# Add general-purpose tools to the tool collection
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(),
BrowserUseTool(),
StrReplaceEditor(),
AskHuman(),
Terminate(),
)
)
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name])
browser_context_helper: Optional[BrowserContextHelper] = None
# Track connected MCP servers
connected_servers: Dict[str, str] = Field(
default_factory=dict
) # server_id -> url/command
_initialized: bool = False
@model_validator(mode="after")
def initialize_helper(self) -> "Manus":
"""Initialize basic components synchronously."""
self.browser_context_helper = BrowserContextHelper(self)
return self
@classmethod
async def create(cls, **kwargs) -> "Manus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
instance._initialized = True
return instance
async def initialize_mcp_servers(self) -> None:
"""Initialize connections to configured MCP servers."""
for server_id, server_config in config.mcp_config.servers.items():
try:
if server_config.type == "sse":
if server_config.url:
await self.connect_mcp_server(server_config.url, server_id)
logger.info(
f"Connected to MCP server {server_id} at {server_config.url}"
)
elif server_config.type == "stdio":
if server_config.command:
await self.connect_mcp_server(
server_config.command,
server_id,
use_stdio=True,
stdio_args=server_config.args,
)
logger.info(
f"Connected to MCP server {server_id} using command {server_config.command}"
)
except Exception as e:
logger.error(f"Failed to connect to MCP server {server_id}: {e}")
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url
# Update available tools with only the new tools from this server
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)
async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
await self.mcp_clients.disconnect(server_id)
if server_id:
self.connected_servers.pop(server_id, None)
else:
self.connected_servers.clear()
# Rebuild available tools without the disconnected server's tools
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
self.available_tools = ToolCollection(*base_tools)
self.available_tools.add_tools(*self.mcp_clients.tools)
async def cleanup(self):
"""Clean up Manus agent resources."""
if self.browser_context_helper:
await self.browser_context_helper.cleanup_browser()
# Disconnect from all MCP servers only if we were initialized
if self._initialized:
await self.disconnect_mcp_server()
self._initialized = False
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
if not self._initialized:
await self.initialize_mcp_servers()
self._initialized = True
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == BrowserUseTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return result
|