autogen_ext.runtimes.grpc#
- class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | None = None, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[源代码]#
基类:
AgentRuntime
用于运行远程或跨语言代理的代理运行时。
代理消息传递使用来自`agent_worker.proto`_的protobuf协议以及来自`cloudevent.proto`_的``CloudEvent``。
跨语言代理还需要所有代理对任何在代理之间传递的消息类型使用共享的protobuf模式。
- add_message_serializer(serializer: MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) None [源代码]#
向运行时添加新的消息序列化序列化器
注意:这将根据 type_name 和 data_content_type 属性对序列化器进行去重
- 参数:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) -- 要添加的序列化器/序列化器列表
- async add_subscription(subscription: Subscription) None [源代码]#
添加运行时在处理发布消息时应满足的新订阅
- 参数:
subscription (Subscription) -- 要添加的订阅
- async agent_metadata(agent: AgentId) AgentMetadata [源代码]#
获取代理的元数据。
- 参数:
agent (AgentId) -- 代理ID。
- Returns:
AgentMetadata -- 代理元数据。
- async agent_save_state(agent: AgentId) Mapping[str, Any] [源代码]#
保存单个代理的状态。
状态的结构由实现定义,可以是任何可JSON序列化的对象。
- 参数:
agent (AgentId) -- 代理ID。
- Returns:
Mapping[str, Any] -- 已保存的状态。
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [源代码]#
- async load_state(state: Mapping[str, Any]) None [源代码]#
加载整个运行时的状态,包括所有托管代理。该状态应与
save_state()
方法返回的状态一致。- 参数:
state (Mapping[str, Any]) -- 已保存的状态。
- async publish_message(message: Any, topic_id: TopicId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) None [源代码]#
向给定命名空间中的所有代理发布消息,如果未提供命名空间,则使用发送者的命名空间。
发布消息不期望获得响应。
- 参数:
message (Any) -- 要发布的消息。
topic_id (TopicId) -- 发布消息的主题。
sender (AgentId | None, optional) -- 发送消息的代理。默认为None。
cancellation_token (CancellationToken | None, optional) -- 用于取消进行中的操作的令牌。默认为None。
message_id (str | None, optional) -- 消息ID。如果为None,将生成新的消息ID。默认为None。此消息ID必须唯一,建议使用UUID。
- 抛出:
UndeliverableException -- 如果消息无法送达。
- async register_agent_instance(agent_instance: Agent, agent_id: AgentId) AgentId [源代码]#
向运行时注册一个代理实例。类型可以重复使用,但每个agent_id必须唯一。同一类型下的所有代理实例必须是相同的对象类型。此API不会添加任何订阅。
备注
这是一个底层API,通常应该使用代理类的`register_instance`方法,因为该方法还会自动处理订阅。
示例:
from dataclasses import dataclass from autogen_core import AgentId, AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def main() -> None: runtime: AgentRuntime = ... # type: ignore agent = MyAgent() await runtime.register_agent_instance( agent_instance=agent, agent_id=AgentId(type="my_agent", key="default") ) import asyncio asyncio.run(main())
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: type[T] | None = None) AgentType [源代码]#
向运行时注册与特定类型关联的代理工厂。该类型必须唯一。此API不添加任何订阅。
备注
这是一个底层API,通常应使用代理类的`register`方法代替,因为该方法还会自动处理订阅。
示例:
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- async remove_subscription(id: str) None [源代码]#
从运行时移除订阅
- 参数:
id (str) -- 要移除的订阅ID
- 抛出:
LookupError -- 如果订阅不存在
- async save_state() Mapping[str, Any] [源代码]#
保存整个运行时的状态,包括所有托管代理。恢复状态的唯一方法是将其传递给:meth:load_state。
状态的结构由实现定义,可以是任何JSON可序列化对象。
- Returns:
Mapping[str, Any] -- 保存的状态。
- async send_message(message: Any, recipient: AgentId, *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, message_id: str | None = None) Any [源代码]#
向代理发送消息并获取响应。
- 参数:
message (Any) -- 要发送的消息。
recipient (AgentId) -- 接收消息的代理。
sender (AgentId | None, optional) -- 发送消息的代理。**仅**当消息不是由任何代理发送时(例如直接从外部发送到运行时)才应为None。默认为None。
cancellation_token (CancellationToken | None, optional) -- 用于取消进行中的操作的令牌。默认为None。
- 抛出:
CantHandleException -- 如果接收方无法处理该消息。
UndeliverableException -- 如果消息无法送达。
Other -- 接收方引发的任何其他异常。
- Returns:
Any -- 来自代理的响应。
- async stop_when_signal(signals: Sequence[Signals] = (signal.SIGTERM, signal.SIGINT)) None [源代码]#
在接收到信号时停止运行时。
- async try_get_underlying_agent_instance(id: AgentId, type: Type[T] = Agent) T [源代码]#
尝试通过名称和命名空间获取底层代理实例。通常不推荐这样做(因此名称较长),但在某些情况下可能有用。
如果底层代理不可访问,将引发异常。
- 参数:
id (AgentId) -- 代理ID。
type (Type[T], optional) -- 代理的预期类型。默认为Agent。
- Returns:
T -- 具体的代理实例。
- 抛出:
LookupError -- 如果找不到代理。
NotAccessibleError -- 如果代理不可访问,例如位于远程。
TypeError -- 如果代理不是预期类型。
- class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[Tuple[str, Any]] | None = None)[源代码]#
基类:
object
- class GrpcWorkerAgentRuntimeHostServicer[源代码]#
基类:
AgentRpcServicer
一个托管代理消息传递服务的gRPC服务端。
- async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) AddSubscriptionResponse [源代码]#
在 .proto 文件中缺少关联的文档注释。
- async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) GetSubscriptionsResponse [源代码]#
在 .proto 文件中缺少关联的文档注释。
- async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) AsyncIterator[Message] [源代码]#
在.proto文件中缺少关联的文档注释。
- async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) AsyncIterator[ControlMessage] [源代码]#
在.proto文件中缺少关联的文档注释。