|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import abc |
|
|
import threading |
|
|
from enum import Enum |
|
|
from functools import wraps |
|
|
from typing import Any, Callable, Generic, Optional, TypeVar |
|
|
|
|
|
from pydantic import BaseModel, ConfigDict |
|
|
|
|
|
from camel.agents import ChatAgent |
|
|
from camel.messages import BaseMessage |
|
|
|
|
|
T = TypeVar('T') |
|
|
|
|
|
|
|
|
class ProgrammableAgentRequirement(Enum): |
|
|
r"""Requirements for programmable agent state. |
|
|
|
|
|
Defines the possible requirements that can be used to repair the state |
|
|
of a programmable agent. |
|
|
|
|
|
Attributes: |
|
|
LAST_MESSAGE_NOT_USER (str): Requires that the last message in the |
|
|
conversation was not from the user. |
|
|
""" |
|
|
|
|
|
LAST_MESSAGE_NOT_USER = "LAST_MESSAGE_NOT_USER" |
|
|
|
|
|
|
|
|
class ProgrammedAgentInstructionResult(BaseModel, Generic[T]): |
|
|
r"""Result of a programmable agent instruction execution. |
|
|
|
|
|
Contains the messages exchanged during execution and the computed value. |
|
|
The value type is specified by the generic type parameter T. |
|
|
|
|
|
Attributes: |
|
|
user_message (BaseMessage): The message sent by the user. |
|
|
agent_message (BaseMessage): The message sent by the agent. |
|
|
value (T): The computed result value of type T. |
|
|
""" |
|
|
|
|
|
user_message: BaseMessage |
|
|
agent_message: BaseMessage |
|
|
value: T |
|
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
|
|
|
|
|
|
class AbstractProgrammableAgent(abc.ABC): |
|
|
r"""Abstract class for a programmable agent. |
|
|
|
|
|
A programmable agent is an agent that can be programmed to perform a |
|
|
specific function or task. This class defines the interface for a |
|
|
programmable agent. |
|
|
|
|
|
These methods should be implemented in order to ensure the agent supports |
|
|
the necessary guarantees to enable a programming interface while |
|
|
maintaining compatibility in a multi-agent system. |
|
|
|
|
|
A programmable agent is responsible for providing and maintaining a |
|
|
programming interface for its functionality. |
|
|
""" |
|
|
|
|
|
@abc.abstractmethod |
|
|
def run_atomic( |
|
|
self, callback: Callable[[], ProgrammedAgentInstructionResult[T]] |
|
|
) -> ProgrammedAgentInstructionResult[T]: |
|
|
r"""Run an atomic operation on the agent. |
|
|
|
|
|
An atomic operation is an operation that is guaranteed to |
|
|
be executed without interruption by any other operation. |
|
|
|
|
|
Args: |
|
|
callback (Callable[[], ProgrammedAgentInstructionResult[T]]): The |
|
|
operation to execute atomically. |
|
|
|
|
|
Returns: |
|
|
ProgrammedAgentInstructionResult[T]: The result of the operation. |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If an operation is already in progress. |
|
|
""" |
|
|
raise NotImplementedError |
|
|
|
|
|
@abc.abstractmethod |
|
|
def repair_state(self, requirement: ProgrammableAgentRequirement) -> None: |
|
|
r"""Repair the state of the agent. |
|
|
|
|
|
Agents may have other non-atomic interfaces, such as a user interface, |
|
|
or chat between other agents. This method should restore the agent to |
|
|
a state where it can perform operations according to the specified |
|
|
requirement. |
|
|
|
|
|
Args: |
|
|
requirement (ProgrammableAgentRequirement): The requirement to |
|
|
repair the state for. |
|
|
""" |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
def programmable_capability( |
|
|
func: Callable[..., ProgrammedAgentInstructionResult[T]], |
|
|
) -> Callable[..., ProgrammedAgentInstructionResult[T]]: |
|
|
r"""Decorator for programmable agent capabilities. |
|
|
|
|
|
This decorator ensures that the decorated method is executed atomically |
|
|
and maintains the agent's state guarantees. |
|
|
|
|
|
Args: |
|
|
func (Callable[..., ProgrammedAgentInstructionResult[T]]): The method |
|
|
to decorate. |
|
|
|
|
|
Returns: |
|
|
Callable[..., ProgrammedAgentInstructionResult[T]]: The decorated |
|
|
method that ensures atomic execution. |
|
|
""" |
|
|
|
|
|
@wraps(func) |
|
|
def wrapper( |
|
|
self, *args: Any, **kwargs: Any |
|
|
) -> ProgrammedAgentInstructionResult[T]: |
|
|
return self.run_atomic(lambda: func(self, *args, **kwargs)) |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
class ProgrammableChatAgent(ChatAgent, AbstractProgrammableAgent): |
|
|
r"""A chat agent that can be programmed to perform specific tasks. |
|
|
|
|
|
Provides a default implementation of atomic execution using threading locks |
|
|
and basic state tracking for message roles. Implementing classes need to |
|
|
provide specific repair logic for their use cases. |
|
|
|
|
|
Attributes: |
|
|
_operation_lock (threading.Lock): Lock for ensuring atomic operations. |
|
|
_last_message_role (Optional[str]): Role of the last message in the |
|
|
conversation. |
|
|
""" |
|
|
|
|
|
def __init__(self, **kwargs: Any) -> None: |
|
|
r"""Initialize the ProgrammableChatAgent. |
|
|
|
|
|
Args: |
|
|
**kwargs (Any): Additional keyword arguments to pass to parent |
|
|
class. |
|
|
""" |
|
|
super().__init__(**kwargs) |
|
|
self._operation_lock = threading.Lock() |
|
|
self._last_message_role: Optional[str] = None |
|
|
|
|
|
def run_atomic( |
|
|
self, callback: Callable[[], ProgrammedAgentInstructionResult[T]] |
|
|
) -> ProgrammedAgentInstructionResult[T]: |
|
|
r"""Run an atomic operation on the agent. |
|
|
|
|
|
Ensures thread-safe execution of the callback function by using a lock. |
|
|
|
|
|
Args: |
|
|
callback (Callable[[], ProgrammedAgentInstructionResult[T]]): The |
|
|
operation to execute atomically. |
|
|
|
|
|
Returns: |
|
|
ProgrammedAgentInstructionResult[T]: The result of the operation. |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If an operation is already in progress. |
|
|
""" |
|
|
if not self._operation_lock.acquire(blocking=False): |
|
|
raise RuntimeError("Operation already in progress") |
|
|
|
|
|
try: |
|
|
result = callback() |
|
|
self._last_message_role = result.agent_message.role_name |
|
|
return result |
|
|
finally: |
|
|
self._operation_lock.release() |
|
|
|
|
|
def repair_state(self, requirement: ProgrammableAgentRequirement) -> None: |
|
|
r"""Repair the state of the agent. |
|
|
|
|
|
Implements basic state repair for message role requirements. |
|
|
|
|
|
Args: |
|
|
requirement (ProgrammableAgentRequirement): The requirement to |
|
|
repair the state for. |
|
|
""" |
|
|
if requirement == ProgrammableAgentRequirement.LAST_MESSAGE_NOT_USER: |
|
|
if self._last_message_role == "user": |
|
|
raise NotImplementedError( |
|
|
"Must implement repair for LAST_MESSAGE_NOT_USER" |
|
|
) |
|
|
|