autogen_core._agent_runtime 源代码

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Awaitable, Callable, Mapping, Protocol, Type, TypeVar, overload, runtime_checkable

from ._agent import Agent
from ._agent_id import AgentId
from ._agent_metadata import AgentMetadata
from ._agent_type import AgentType
from ._cancellation_token import CancellationToken
from ._serialization import MessageSerializer
from ._subscription import Subscription
from ._topic import TopicId

# Undeliverable - error

T = TypeVar("T", bound=Agent)


[文档] @runtime_checkable class AgentRuntime(Protocol):
[文档] async def send_message( self, message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None, ) -> Any: """向代理发送消息并获取响应。 Args: message (Any): 要发送的消息。 recipient (AgentId): 接收消息的代理。 sender (AgentId | None, optional): 发送消息的代理。**仅**当消息不是由任何代理发送时(例如直接从外部发送到运行时)才应为None。默认为None。 cancellation_token (CancellationToken | None, optional): 用于取消进行中的操作的令牌。默认为None。 Raises: CantHandleException: 如果接收方无法处理该消息。 UndeliverableException: 如果消息无法送达。 Other: 接收方引发的任何其他异常。 Returns: Any: 来自代理的响应。 """ ...
[文档] async def publish_message( self, message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None, ) -> None: """向给定命名空间中的所有代理发布消息,如果未提供命名空间,则使用发送者的命名空间。 发布消息不期望获得响应。 Args: message (Any): 要发布的消息。 topic_id (TopicId): 发布消息的主题。 sender (AgentId | None, optional): 发送消息的代理。默认为None。 cancellation_token (CancellationToken | None, optional): 用于取消进行中的操作的令牌。默认为None。 message_id (str | None, optional): 消息ID。如果为None,将生成新的消息ID。默认为None。此消息ID必须唯一,建议使用UUID。 Raises: UndeliverableException: 如果消息无法送达。 """ ...
[文档] async def register_factory( self, type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None, ) -> AgentType: """向运行时注册与特定类型关联的代理工厂。该类型必须唯一。此API不添加任何订阅。 .. note:: 这是一个底层API,通常应使用代理类的`register`方法代替,因为该方法还会自动处理订阅。 示例: .. code-block:: python from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main()) Args: type (str): 此工厂创建的代理类型。它与代理类名不同。`type`参数用于区分不同的工厂函数而非代理类。 agent_factory (Callable[[], T]): 创建代理的工厂,其中T是具体的代理类型。在工厂内部,使用`autogen_core.AgentInstantiationContext`访问当前运行时和代理ID等变量。 expected_class (type[T] | None, optional): 代理的预期类,用于运行时验证工厂。默认为None。如果为None,则不执行验证。 """ ...
[文档] async def register_agent_instance( self, agent_instance: Agent, agent_id: AgentId, ) -> AgentId: """向运行时注册一个代理实例。类型可以重复使用,但每个agent_id必须唯一。同一类型下的所有代理实例必须是相同的对象类型。此API不会添加任何订阅。 .. note:: 这是一个底层API,通常应该使用代理类的`register_instance`方法,因为该方法还会自动处理订阅。 示例: .. code-block:: python from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main()) Args: agent_instance (Agent): 代理的具体实例。 agent_id (AgentId): 代理的标识符。代理的类型是`agent_id.type`。 """ ...
# TODO: uncomment out the following type ignore when this is fixed in mypy: https://github.com/python/mypy/issues/3737
[文档] async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T: # type: ignore[assignment] """尝试通过名称和命名空间获取底层代理实例。通常不推荐这样做(因此名称较长),但在某些情况下可能有用。 如果底层代理不可访问,将引发异常。 Args: id (AgentId): 代理ID。 type (Type[T], optional): 代理的预期类型。默认为Agent。 Returns: T: 具体的代理实例。 Raises: LookupError: 如果找不到代理。 NotAccessibleError: 如果代理不可访问,例如位于远程。 TypeError: 如果代理不是预期类型。 """ ...
@overload async def get(self, id: AgentId, /, *, lazy: bool = ...) -> AgentId: ... @overload async def get(self, type: AgentType | str, /, key: str = ..., *, lazy: bool = ...) -> AgentId: ...
[文档] async def get(
self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True ) -> AgentId: ...
[文档] async def save_state(self) -> Mapping[str, Any]: """保存整个运行时的状态,包括所有托管代理。恢复状态的唯一方法是将其传递给:meth:`load_state`。 状态的结构由实现定义,可以是任何JSON可序列化对象。 Returns: Mapping[str, Any]: 保存的状态。 """ ...
[文档] async def load_state(self, state: Mapping[str, Any]) -> None: """加载整个运行时的状态,包括所有托管代理。该状态应与 :meth:`save_state` 方法返回的状态一致。 Args: state (Mapping[str, Any]): 已保存的状态。 """ ...
[文档] async def agent_metadata(self, agent: AgentId) -> AgentMetadata: """获取代理的元数据。 Args: agent (AgentId): 代理ID。 Returns: AgentMetadata: 代理元数据。 """ ...
[文档] async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]: """保存单个代理的状态。 状态的结构由实现定义,可以是任何可JSON序列化的对象。 Args: agent (AgentId): 代理ID。 Returns: Mapping[str, Any]: 已保存的状态。 """ ...
[文档] async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None: """加载单个代理的状态。 Args: agent (AgentId): 代理ID。 state (Mapping[str, Any]): 保存的状态。 """ ...
[文档] async def add_subscription(self, subscription: Subscription) -> None: """添加运行时在处理发布消息时应满足的新订阅 Args: subscription (Subscription): 要添加的订阅 """ ...
[文档] async def remove_subscription(self, id: str) -> None: """从运行时移除订阅 Args: id (str): 要移除的订阅ID Raises: LookupError: 如果订阅不存在 """ ...
[文档] def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None: """向运行时添加新的消息序列化序列化器 注意:这将根据 type_name 和 data_content_type 属性对序列化器进行去重 Args: serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]): 要添加的序列化器/序列化器列表 """ ...