|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
from typing import Any, List |
|
|
|
|
|
from colorama import Fore |
|
|
|
|
|
from camel.agents import ChatAgent |
|
|
from camel.messages.base import BaseMessage |
|
|
from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT |
|
|
from camel.societies.workforce.utils import TaskResult |
|
|
from camel.societies.workforce.worker import Worker |
|
|
from camel.tasks.task import Task, TaskState |
|
|
from camel.utils import print_text_animated |
|
|
|
|
|
|
|
|
class SingleAgentWorker(Worker): |
|
|
r"""A worker node that consists of a single agent. |
|
|
|
|
|
Args: |
|
|
description (str): Description of the node. |
|
|
worker (ChatAgent): Worker of the node. A single agent. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
description: str, |
|
|
worker: ChatAgent, |
|
|
) -> None: |
|
|
super().__init__(description) |
|
|
self.worker = worker |
|
|
|
|
|
def reset(self) -> Any: |
|
|
r"""Resets the worker to its initial state.""" |
|
|
super().reset() |
|
|
self.worker.reset() |
|
|
|
|
|
async def _process_task( |
|
|
self, task: Task, dependencies: List[Task] |
|
|
) -> TaskState: |
|
|
r"""Processes a task with its dependencies. |
|
|
|
|
|
This method asynchronously processes a given task, considering its |
|
|
dependencies, by sending a generated prompt to a worker. It updates |
|
|
the task's result based on the agent's response. |
|
|
|
|
|
Args: |
|
|
task (Task): The task to process, which includes necessary details |
|
|
like content and type. |
|
|
dependencies (List[Task]): Tasks that the given task depends on. |
|
|
|
|
|
Returns: |
|
|
TaskState: `TaskState.DONE` if processed successfully, otherwise |
|
|
`TaskState.FAILED`. |
|
|
""" |
|
|
dependency_tasks_info = self._get_dep_tasks_info(dependencies) |
|
|
prompt = PROCESS_TASK_PROMPT.format( |
|
|
content=task.content, |
|
|
dependency_tasks_info=dependency_tasks_info, |
|
|
additional_info=task.additional_info, |
|
|
) |
|
|
req = BaseMessage.make_user_message( |
|
|
role_name="User", |
|
|
content=prompt, |
|
|
) |
|
|
try: |
|
|
response = self.worker.step(req, response_format=TaskResult) |
|
|
except Exception as e: |
|
|
print( |
|
|
f"{Fore.RED}Error occurred while processing task {task.id}:" |
|
|
f"\n{e}{Fore.RESET}" |
|
|
) |
|
|
return TaskState.FAILED |
|
|
|
|
|
print(f"======\n{Fore.GREEN}Reply from {self}:{Fore.RESET}") |
|
|
|
|
|
result_dict = json.loads(response.msg.content) |
|
|
task_result = TaskResult(**result_dict) |
|
|
|
|
|
color = Fore.RED if task_result.failed else Fore.GREEN |
|
|
print_text_animated( |
|
|
f"\n{color}{task_result.content}{Fore.RESET}\n======", |
|
|
delay=0.005, |
|
|
) |
|
|
|
|
|
if task_result.failed: |
|
|
return TaskState.FAILED |
|
|
|
|
|
task.result = task_result.content |
|
|
return TaskState.DONE |
|
|
|