File size: 7,124 Bytes
fcaa164
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import abc
import threading
from enum import Enum
from functools import wraps
from typing import Any, Callable, Generic, Optional, TypeVar

from pydantic import BaseModel, ConfigDict

from camel.agents import ChatAgent
from camel.messages import BaseMessage

T = TypeVar('T')


class ProgrammableAgentRequirement(Enum):
    r"""Requirements for programmable agent state.

    Defines the possible requirements that can be used to repair the state
    of a programmable agent.

    Attributes:
        LAST_MESSAGE_NOT_USER (str): Requires that the last message in the
            conversation was not from the user.
    """

    LAST_MESSAGE_NOT_USER = "LAST_MESSAGE_NOT_USER"


class ProgrammedAgentInstructionResult(BaseModel, Generic[T]):
    r"""Result of a programmable agent instruction execution.

    Contains the messages exchanged during execution and the computed value.
    The value type is specified by the generic type parameter T.

    Attributes:
        user_message (BaseMessage): The message sent by the user.
        agent_message (BaseMessage): The message sent by the agent.
        value (T): The computed result value of type T.
    """

    user_message: BaseMessage
    agent_message: BaseMessage
    value: T

    model_config = ConfigDict(arbitrary_types_allowed=True)


class AbstractProgrammableAgent(abc.ABC):
    r"""Abstract class for a programmable agent.

    A programmable agent is an agent that can be programmed to perform a
    specific function or task. This class defines the interface for a
    programmable agent.

    These methods should be implemented in order to ensure the agent supports
    the necessary guarantees to enable a programming interface while
    maintaining compatibility in a multi-agent system.

    A programmable agent is responsible for providing and maintaining a
    programming interface for its functionality.
    """

    @abc.abstractmethod
    def run_atomic(
        self, callback: Callable[[], ProgrammedAgentInstructionResult[T]]
    ) -> ProgrammedAgentInstructionResult[T]:
        r"""Run an atomic operation on the agent.

        An atomic operation is an operation that is guaranteed to
        be executed without interruption by any other operation.

        Args:
            callback (Callable[[], ProgrammedAgentInstructionResult[T]]): The
                operation to execute atomically.

        Returns:
            ProgrammedAgentInstructionResult[T]: The result of the operation.

        Raises:
            RuntimeError: If an operation is already in progress.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def repair_state(self, requirement: ProgrammableAgentRequirement) -> None:
        r"""Repair the state of the agent.

        Agents may have other non-atomic interfaces, such as a user interface,
        or chat between other agents. This method should restore the agent to
        a state where it can perform operations according to the specified
        requirement.

        Args:
            requirement (ProgrammableAgentRequirement): The requirement to
                repair the state for.
        """
        raise NotImplementedError


def programmable_capability(
    func: Callable[..., ProgrammedAgentInstructionResult[T]],
) -> Callable[..., ProgrammedAgentInstructionResult[T]]:
    r"""Decorator for programmable agent capabilities.

    This decorator ensures that the decorated method is executed atomically
    and maintains the agent's state guarantees.

    Args:
        func (Callable[..., ProgrammedAgentInstructionResult[T]]): The method
            to decorate.

    Returns:
        Callable[..., ProgrammedAgentInstructionResult[T]]: The decorated
            method that ensures atomic execution.
    """

    @wraps(func)
    def wrapper(
        self, *args: Any, **kwargs: Any
    ) -> ProgrammedAgentInstructionResult[T]:
        return self.run_atomic(lambda: func(self, *args, **kwargs))

    return wrapper


class ProgrammableChatAgent(ChatAgent, AbstractProgrammableAgent):
    r"""A chat agent that can be programmed to perform specific tasks.

    Provides a default implementation of atomic execution using threading locks
    and basic state tracking for message roles. Implementing classes need to
    provide specific repair logic for their use cases.

    Attributes:
        _operation_lock (threading.Lock): Lock for ensuring atomic operations.
        _last_message_role (Optional[str]): Role of the last message in the
            conversation.
    """

    def __init__(self, **kwargs: Any) -> None:
        r"""Initialize the ProgrammableChatAgent.

        Args:
            **kwargs (Any): Additional keyword arguments to pass to parent
                class.
        """
        super().__init__(**kwargs)
        self._operation_lock = threading.Lock()
        self._last_message_role: Optional[str] = None

    def run_atomic(
        self, callback: Callable[[], ProgrammedAgentInstructionResult[T]]
    ) -> ProgrammedAgentInstructionResult[T]:
        r"""Run an atomic operation on the agent.

        Ensures thread-safe execution of the callback function by using a lock.

        Args:
            callback (Callable[[], ProgrammedAgentInstructionResult[T]]): The
                operation to execute atomically.

        Returns:
            ProgrammedAgentInstructionResult[T]: The result of the operation.

        Raises:
            RuntimeError: If an operation is already in progress.
        """
        if not self._operation_lock.acquire(blocking=False):
            raise RuntimeError("Operation already in progress")

        try:
            result = callback()
            self._last_message_role = result.agent_message.role_name
            return result
        finally:
            self._operation_lock.release()

    def repair_state(self, requirement: ProgrammableAgentRequirement) -> None:
        r"""Repair the state of the agent.

        Implements basic state repair for message role requirements.

        Args:
            requirement (ProgrammableAgentRequirement): The requirement to
                repair the state for.
        """
        if requirement == ProgrammableAgentRequirement.LAST_MESSAGE_NOT_USER:
            if self._last_message_role == "user":
                raise NotImplementedError(
                    "Must implement repair for LAST_MESSAGE_NOT_USER"
                )