from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, List, Mapping, Sequence
from autogen_core import CancellationToken, ComponentBase
from pydantic import BaseModel
from ..base import ChatAgent, Response, TaskResult
from ..messages import (
BaseAgentEvent,
BaseChatMessage,
ModelClientStreamingChunkEvent,
TextMessage,
)
from ..state import BaseState
[文档]
class BaseChatAgent(ChatAgent, ABC, ComponentBase[BaseModel]):
"""聊天代理的基类。
这个抽象类为 :class:`ChatAgent` 提供了基础实现。
要创建新的聊天代理,请继承此类并实现
:meth:`on_messages`、:meth:`on_reset` 和 :attr:`produced_message_types`。
如果需要流式处理,还需实现 :meth:`on_messages_stream` 方法。
代理被视为有状态的,在调用 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法之间会保持其状态。
代理应将其状态存储在代理实例中。代理还应实现 :meth:`on_reset` 方法
以将代理重置为初始化状态。
.. note::
调用者在每次调用 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法时,
应仅向代理传递新消息。
不要在每次调用时将整个对话历史传递给代理。
创建新代理时必须遵循此设计原则。
"""
component_type = "agent"
def __init__(self, name: str, description: str) -> None:
self._name = name
if self._name.isidentifier() is False:
raise ValueError("The agent name must be a valid Python identifier.")
self._description = description
@property
def name(self) -> str:
"""代理的名称。团队使用此名称唯一标识
代理。在团队内应保持唯一。"""
return self._name
@property
def description(self) -> str:
"""代理的描述。团队使用此描述
来决定使用哪些代理。描述应
说明代理的能力以及如何与之交互。"""
return self._description
@property
@abstractmethod
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
"""代理在 :attr:`Response.chat_message` 字段中产生的消息类型。
这些类型必须是 :class:`BaseChatMessage` 类型。"""
...
[文档]
@abstractmethod
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
"""处理传入的消息并返回响应。
.. note::
代理是有状态的,传递给此方法的消息应该是自上次调用此方法以来的新消息。
代理应在两次调用之间保持其状态。例如,如果代理需要记住先前的消息以响应当前消息,
它应该将先前的消息存储在代理状态中。
"""
...
[文档]
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
"""处理传入的消息并返回消息流,最后一项是响应。
:class:`BaseChatAgent` 中的基础实现只是调用 :meth:`on_messages` 并生成响应中的消息。
.. note::
代理是有状态的,传递给此方法的消息应该是自上次调用此方法以来的新消息。
代理应在两次调用之间保持其状态。例如,如果代理需要记住先前的消息以响应当前消息,
它应该将先前的消息存储在代理状态中。
"""
response = await self.on_messages(messages, cancellation_token)
for inner_message in response.inner_messages or []:
yield inner_message
yield response
[文档]
async def run(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""使用给定任务运行代理并返回结果。"""
if cancellation_token is None:
cancellation_token = CancellationToken()
input_messages: List[BaseChatMessage] = []
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
if task is None:
pass
elif isinstance(task, str):
text_msg = TextMessage(content=task, source="user")
input_messages.append(text_msg)
output_messages.append(text_msg)
elif isinstance(task, BaseChatMessage):
input_messages.append(task)
output_messages.append(task)
else:
if not task:
raise ValueError("Task list cannot be empty.")
# Task is a sequence of messages.
for msg in task:
if isinstance(msg, BaseChatMessage):
input_messages.append(msg)
output_messages.append(msg)
else:
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
response = await self.on_messages(input_messages, cancellation_token)
if response.inner_messages is not None:
output_messages += response.inner_messages
output_messages.append(response.chat_message)
return TaskResult(messages=output_messages)
[文档]
async def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
"""使用给定任务运行代理并返回消息流
以及作为流中最后一项的最终任务结果。"""
if cancellation_token is None:
cancellation_token = CancellationToken()
input_messages: List[BaseChatMessage] = []
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
if task is None:
pass
elif isinstance(task, str):
text_msg = TextMessage(content=task, source="user")
input_messages.append(text_msg)
output_messages.append(text_msg)
yield text_msg
elif isinstance(task, BaseChatMessage):
input_messages.append(task)
output_messages.append(task)
yield task
else:
if not task:
raise ValueError("Task list cannot be empty.")
for msg in task:
if isinstance(msg, BaseChatMessage):
input_messages.append(msg)
output_messages.append(msg)
yield msg
else:
raise ValueError(f"Invalid message type in sequence: {type(msg)}")
async for message in self.on_messages_stream(input_messages, cancellation_token):
if isinstance(message, Response):
yield message.chat_message
output_messages.append(message.chat_message)
yield TaskResult(messages=output_messages)
else:
yield message
if isinstance(message, ModelClientStreamingChunkEvent):
# Skip the model client streaming chunk events.
continue
output_messages.append(message)
[文档]
@abstractmethod
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""将代理重置到其初始化状态。"""
...
[文档]
async def on_pause(self, cancellation_token: CancellationToken) -> None:
"""当代理在其 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法运行期间被暂停时调用。
在 :class:`BaseChatAgent` 类中默认是一个空操作。子类可以重写此方法以
实现自定义的暂停行为。"""
pass
[文档]
async def on_resume(self, cancellation_token: CancellationToken) -> None:
"""当代理在其 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法运行期间
从暂停状态恢复时调用。在 :class:`BaseChatAgent` 类中默认是一个空操作。
子类可以重写此方法以实现自定义的恢复行为。"""
pass
[文档]
async def save_state(self) -> Mapping[str, Any]:
"""导出状态。无状态代理的默认实现。"""
return BaseState().model_dump()
[文档]
async def load_state(self, state: Mapping[str, Any]) -> None:
"""从保存的状态恢复代理。无状态代理的默认实现。"""
BaseState.model_validate(state)
[文档]
async def close(self) -> None:
"""释放代理持有的所有资源。在 :class:`BaseChatAgent` 类中默认不执行任何操作。
子类可以重写此方法来实现自定义的关闭行为。"""
pass