分布式代理运行时#
注意
分布式代理运行时是一项实验性功能。API可能会发生破坏性变更。
分布式代理运行时支持跨进程边界的通信和代理生命周期管理。 它由一个主机服务和至少一个工作运行时组成。
主机服务维护与所有活动工作运行时的连接, 促进消息传递,并为所有直接消息(即RPC)保持会话。 工作运行时处理应用程序代码(代理)并连接到主机服务。 它还向主机服务通告其支持的代理, 以便主机服务能将消息传递到正确的工作运行时。
备注
分布式代理运行时需要额外依赖项,请使用以下命令安装:
pip install "autogen-ext[grpc]"
我们可以使用GrpcWorkerAgentRuntimeHost
启动主机服务。
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost
host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start() # 在后台启动主机服务。
上述代码在后台启动主机服务,并在端口50051上接受工作运行时连接。
在运行工作运行时之前,让我们先定义我们的代理。 该代理将在收到每条消息时发布一条新消息。 它还会跟踪已发布的消息数量, 一旦发布5条消息后就会停止发布新消息。
from dataclasses import dataclass
from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler
@dataclass
class MyMessage:
content: str
@default_subscription
class MyAgent(RoutedAgent):
def __init__(self, name: str) -> None:
super().__init__("My agent")
self._name = name
self._counter = 0
@message_handler
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:
self._counter += 1
if self._counter > 5:
return
content = f"{self._name}: Hello x {self._counter}"
print(content)
await self.publish_message(MyMessage(content=content), DefaultTopicId())
现在我们可以设置工作代理运行时。
我们使用GrpcWorkerAgentRuntime
。
我们设置两个工作运行时。每个运行时托管一个代理。
所有代理都发布并订阅默认主题,因此它们可以看到所有
正在发布的消息。
为了运行代理,我们从工作运行时发布一条消息。
import asyncio
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
worker1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker1.start()
await MyAgent.register(worker1, "worker1", lambda: MyAgent("worker1"))
worker2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker2.start()
await MyAgent.register(worker2, "worker2", lambda: MyAgent("worker2"))
await worker2.publish_message(MyMessage(content="Hello!"), DefaultTopicId())
# 让代理运行一段时间。
await asyncio.sleep(5)
worker1: Hello x 1
worker2: Hello x 1
worker2: Hello x 2
worker1: Hello x 2
worker1: Hello x 3
worker2: Hello x 3
worker2: Hello x 4
worker1: Hello x 4
worker1: Hello x 5
worker2: Hello x 5
我们可以看到每个代理都恰好发布了5条消息。
要停止工作器运行时,可以调用 stop()
。
await worker1.stop()
await worker2.stop()
# 要让工作器持续运行直到接收到终止信号(例如SIGTERM),
# 可以执行 await worker1.stop_when_signal()
我们可以调用 stop()
来停止主机服务。
await host.stop()
# 要让主机服务持续运行直到接收到终止信号(例如SIGTERM),
# 可以执行 await host.stop_when_signal()
跨语言运行时#
上述流程大体相同,但所有跨代理消息类型必须使用共享的protobuf模式。
后续步骤#
要查看使用分布式运行时的完整示例,请参考以下示例: