from __future__ import annotations
from collections.abc import Sequence
from typing import Any, Awaitable, Callable, Mapping, Protocol, Type, TypeVar, overload, runtime_checkable
from ._agent import Agent
from ._agent_id import AgentId
from ._agent_metadata import AgentMetadata
from ._agent_type import AgentType
from ._cancellation_token import CancellationToken
from ._serialization import MessageSerializer
from ._subscription import Subscription
from ._topic import TopicId
# Undeliverable - error
T = TypeVar("T", bound=Agent)
[文档]
@runtime_checkable
class AgentRuntime(Protocol):
[文档]
async def send_message(
self,
message: Any,
recipient: AgentId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> Any:
"""向代理发送消息并获取响应。
Args:
message (Any): 要发送的消息。
recipient (AgentId): 接收消息的代理。
sender (AgentId | None, optional): 发送消息的代理。**仅**当消息不是由任何代理发送时(例如直接从外部发送到运行时)才应为None。默认为None。
cancellation_token (CancellationToken | None, optional): 用于取消进行中的操作的令牌。默认为None。
Raises:
CantHandleException: 如果接收方无法处理该消息。
UndeliverableException: 如果消息无法送达。
Other: 接收方引发的任何其他异常。
Returns:
Any: 来自代理的响应。
"""
...
[文档]
async def publish_message(
self,
message: Any,
topic_id: TopicId,
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
message_id: str | None = None,
) -> None:
"""向给定命名空间中的所有代理发布消息,如果未提供命名空间,则使用发送者的命名空间。
发布消息不期望获得响应。
Args:
message (Any): 要发布的消息。
topic_id (TopicId): 发布消息的主题。
sender (AgentId | None, optional): 发送消息的代理。默认为None。
cancellation_token (CancellationToken | None, optional): 用于取消进行中的操作的令牌。默认为None。
message_id (str | None, optional): 消息ID。如果为None,将生成新的消息ID。默认为None。此消息ID必须唯一,建议使用UUID。
Raises:
UndeliverableException: 如果消息无法送达。
"""
...
[文档]
async def register_factory(
self,
type: str | AgentType,
agent_factory: Callable[[], T | Awaitable[T]],
*,
expected_class: type[T] | None = None,
) -> AgentType:
"""向运行时注册与特定类型关联的代理工厂。该类型必须唯一。此API不添加任何订阅。
.. note::
这是一个底层API,通常应使用代理类的`register`方法代替,因为该方法还会自动处理订阅。
示例:
.. code-block:: python
from dataclasses import dataclass
from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("My core agent")
@event
async def handler(self, message: UserMessage, context: MessageContext) -> None:
print("Event received: ", message.content)
async def my_agent_factory():
return MyAgent()
async def main() -> None:
runtime: AgentRuntime = ... # type: ignore
await runtime.register_factory("my_agent", lambda: MyAgent())
import asyncio
asyncio.run(main())
Args:
type (str): 此工厂创建的代理类型。它与代理类名不同。`type`参数用于区分不同的工厂函数而非代理类。
agent_factory (Callable[[], T]): 创建代理的工厂,其中T是具体的代理类型。在工厂内部,使用`autogen_core.AgentInstantiationContext`访问当前运行时和代理ID等变量。
expected_class (type[T] | None, optional): 代理的预期类,用于运行时验证工厂。默认为None。如果为None,则不执行验证。
"""
...
[文档]
async def register_agent_instance(
self,
agent_instance: Agent,
agent_id: AgentId,
) -> AgentId:
"""向运行时注册一个代理实例。类型可以重复使用,但每个agent_id必须唯一。同一类型下的所有代理实例必须是相同的对象类型。此API不会添加任何订阅。
.. note::
这是一个底层API,通常应该使用代理类的`register_instance`方法,因为该方法还会自动处理订阅。
示例:
.. code-block:: python
from dataclasses import dataclass
from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("My core agent")
@event
async def handler(self, message: UserMessage, context: MessageContext) -> None:
print("Event received: ", message.content)
async def main() -> None:
runtime: AgentRuntime = ... # type: ignore
agent = MyAgent()
await runtime.register_agent_instance(
agent_instance=agent, agent_id=AgentId(type="my_agent", key="default")
)
import asyncio
asyncio.run(main())
Args:
agent_instance (Agent): 代理的具体实例。
agent_id (AgentId): 代理的标识符。代理的类型是`agent_id.type`。
"""
...
# TODO: uncomment out the following type ignore when this is fixed in mypy: https://github.com/python/mypy/issues/3737
[文档]
async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = Agent) -> T: # type: ignore[assignment]
"""尝试通过名称和命名空间获取底层代理实例。通常不推荐这样做(因此名称较长),但在某些情况下可能有用。
如果底层代理不可访问,将引发异常。
Args:
id (AgentId): 代理ID。
type (Type[T], optional): 代理的预期类型。默认为Agent。
Returns:
T: 具体的代理实例。
Raises:
LookupError: 如果找不到代理。
NotAccessibleError: 如果代理不可访问,例如位于远程。
TypeError: 如果代理不是预期类型。
"""
...
@overload
async def get(self, id: AgentId, /, *, lazy: bool = ...) -> AgentId: ...
@overload
async def get(self, type: AgentType | str, /, key: str = ..., *, lazy: bool = ...) -> AgentId: ...
self, id_or_type: AgentId | AgentType | str, /, key: str = "default", *, lazy: bool = True
) -> AgentId: ...
[文档]
async def save_state(self) -> Mapping[str, Any]:
"""保存整个运行时的状态,包括所有托管代理。恢复状态的唯一方法是将其传递给:meth:`load_state`。
状态的结构由实现定义,可以是任何JSON可序列化对象。
Returns:
Mapping[str, Any]: 保存的状态。
"""
...
[文档]
async def load_state(self, state: Mapping[str, Any]) -> None:
"""加载整个运行时的状态,包括所有托管代理。该状态应与 :meth:`save_state` 方法返回的状态一致。
Args:
state (Mapping[str, Any]): 已保存的状态。
"""
...
[文档]
async def agent_save_state(self, agent: AgentId) -> Mapping[str, Any]:
"""保存单个代理的状态。
状态的结构由实现定义,可以是任何可JSON序列化的对象。
Args:
agent (AgentId): 代理ID。
Returns:
Mapping[str, Any]: 已保存的状态。
"""
...
[文档]
async def agent_load_state(self, agent: AgentId, state: Mapping[str, Any]) -> None:
"""加载单个代理的状态。
Args:
agent (AgentId): 代理ID。
state (Mapping[str, Any]): 保存的状态。
"""
...
[文档]
async def add_subscription(self, subscription: Subscription) -> None:
"""添加运行时在处理发布消息时应满足的新订阅
Args:
subscription (Subscription): 要添加的订阅
"""
...
[文档]
async def remove_subscription(self, id: str) -> None:
"""从运行时移除订阅
Args:
id (str): 要移除的订阅ID
Raises:
LookupError: 如果订阅不存在
"""
...
[文档]
def add_message_serializer(self, serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -> None:
"""向运行时添加新的消息序列化序列化器
注意:这将根据 type_name 和 data_content_type 属性对序列化器进行去重
Args:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]): 要添加的序列化器/序列化器列表
"""
...