autogen_agentchat.agents._base_chat_agent 源代码

from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, List, Mapping, Sequence

from autogen_core import CancellationToken, ComponentBase
from pydantic import BaseModel

from ..base import ChatAgent, Response, TaskResult
from ..messages import (
    BaseAgentEvent,
    BaseChatMessage,
    ModelClientStreamingChunkEvent,
    TextMessage,
)
from ..state import BaseState


[文档] class BaseChatAgent(ChatAgent, ABC, ComponentBase[BaseModel]): """聊天代理的基类。 这个抽象类为 :class:`ChatAgent` 提供了基础实现。 要创建新的聊天代理,请继承此类并实现 :meth:`on_messages`、:meth:`on_reset` 和 :attr:`produced_message_types`。 如果需要流式处理,还需实现 :meth:`on_messages_stream` 方法。 代理被视为有状态的,在调用 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法之间会保持其状态。 代理应将其状态存储在代理实例中。代理还应实现 :meth:`on_reset` 方法 以将代理重置为初始化状态。 .. note:: 调用者在每次调用 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法时, 应仅向代理传递新消息。 不要在每次调用时将整个对话历史传递给代理。 创建新代理时必须遵循此设计原则。 """ component_type = "agent" def __init__(self, name: str, description: str) -> None: self._name = name if self._name.isidentifier() is False: raise ValueError("The agent name must be a valid Python identifier.") self._description = description @property def name(self) -> str: """代理的名称。团队使用此名称唯一标识 代理。在团队内应保持唯一。""" return self._name @property def description(self) -> str: """代理的描述。团队使用此描述 来决定使用哪些代理。描述应 说明代理的能力以及如何与之交互。""" return self._description @property @abstractmethod def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: """代理在 :attr:`Response.chat_message` 字段中产生的消息类型。 这些类型必须是 :class:`BaseChatMessage` 类型。""" ...
[文档] @abstractmethod async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response: """处理传入的消息并返回响应。 .. note:: 代理是有状态的,传递给此方法的消息应该是自上次调用此方法以来的新消息。 代理应在两次调用之间保持其状态。例如,如果代理需要记住先前的消息以响应当前消息, 它应该将先前的消息存储在代理状态中。 """ ...
[文档] async def on_messages_stream( self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: """处理传入的消息并返回消息流,最后一项是响应。 :class:`BaseChatAgent` 中的基础实现只是调用 :meth:`on_messages` 并生成响应中的消息。 .. note:: 代理是有状态的,传递给此方法的消息应该是自上次调用此方法以来的新消息。 代理应在两次调用之间保持其状态。例如,如果代理需要记住先前的消息以响应当前消息, 它应该将先前的消息存储在代理状态中。 """ response = await self.on_messages(messages, cancellation_token) for inner_message in response.inner_messages or []: yield inner_message yield response
[文档] async def run( self, *, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, ) -> TaskResult: """使用给定任务运行代理并返回结果。""" if cancellation_token is None: cancellation_token = CancellationToken() input_messages: List[BaseChatMessage] = [] output_messages: List[BaseAgentEvent | BaseChatMessage] = [] if task is None: pass elif isinstance(task, str): text_msg = TextMessage(content=task, source="user") input_messages.append(text_msg) output_messages.append(text_msg) elif isinstance(task, BaseChatMessage): input_messages.append(task) output_messages.append(task) else: if not task: raise ValueError("Task list cannot be empty.") # Task is a sequence of messages. for msg in task: if isinstance(msg, BaseChatMessage): input_messages.append(msg) output_messages.append(msg) else: raise ValueError(f"Invalid message type in sequence: {type(msg)}") response = await self.on_messages(input_messages, cancellation_token) if response.inner_messages is not None: output_messages += response.inner_messages output_messages.append(response.chat_message) return TaskResult(messages=output_messages)
[文档] async def run_stream( self, *, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]: """使用给定任务运行代理并返回消息流 以及作为流中最后一项的最终任务结果。""" if cancellation_token is None: cancellation_token = CancellationToken() input_messages: List[BaseChatMessage] = [] output_messages: List[BaseAgentEvent | BaseChatMessage] = [] if task is None: pass elif isinstance(task, str): text_msg = TextMessage(content=task, source="user") input_messages.append(text_msg) output_messages.append(text_msg) yield text_msg elif isinstance(task, BaseChatMessage): input_messages.append(task) output_messages.append(task) yield task else: if not task: raise ValueError("Task list cannot be empty.") for msg in task: if isinstance(msg, BaseChatMessage): input_messages.append(msg) output_messages.append(msg) yield msg else: raise ValueError(f"Invalid message type in sequence: {type(msg)}") async for message in self.on_messages_stream(input_messages, cancellation_token): if isinstance(message, Response): yield message.chat_message output_messages.append(message.chat_message) yield TaskResult(messages=output_messages) else: yield message if isinstance(message, ModelClientStreamingChunkEvent): # Skip the model client streaming chunk events. continue output_messages.append(message)
[文档] @abstractmethod async def on_reset(self, cancellation_token: CancellationToken) -> None: """将代理重置到其初始化状态。""" ...
[文档] async def on_pause(self, cancellation_token: CancellationToken) -> None: """当代理在其 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法运行期间被暂停时调用。 在 :class:`BaseChatAgent` 类中默认是一个空操作。子类可以重写此方法以 实现自定义的暂停行为。""" pass
[文档] async def on_resume(self, cancellation_token: CancellationToken) -> None: """当代理在其 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法运行期间 从暂停状态恢复时调用。在 :class:`BaseChatAgent` 类中默认是一个空操作。 子类可以重写此方法以实现自定义的恢复行为。""" pass
[文档] async def save_state(self) -> Mapping[str, Any]: """导出状态。无状态代理的默认实现。""" return BaseState().model_dump()
[文档] async def load_state(self, state: Mapping[str, Any]) -> None: """从保存的状态恢复代理。无状态代理的默认实现。""" BaseState.model_validate(state)
[文档] async def close(self) -> None: """释放代理持有的所有资源。在 :class:`BaseChatAgent` 类中默认不执行任何操作。 子类可以重写此方法来实现自定义的关闭行为。""" pass