{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 分布式代理运行时\n\n```{attention}\n分布式代理运行时是一项实验性功能。API可能会发生破坏性变更。\n```\n\n分布式代理运行时支持跨进程边界的通信和代理生命周期管理。\n它由一个主机服务和至少一个工作运行时组成。\n\n主机服务维护与所有活动工作运行时的连接,\n促进消息传递,并为所有直接消息(即RPC)保持会话。\n工作运行时处理应用程序代码(代理)并连接到主机服务。\n它还向主机服务通告其支持的代理,\n以便主机服务能将消息传递到正确的工作运行时。\n\n````{note}\n分布式代理运行时需要额外依赖项,请使用以下命令安装:\n```bash\npip install \"autogen-ext[grpc]\"\n```\n````\n\n我们可以使用{py:class}`~autogen_ext.runtimes.grpc.GrpcWorkerAgentRuntimeHost`启动主机服务。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost\n", "\n", "host = GrpcWorkerAgentRuntimeHost(address=\"localhost:50051\")\n", "host.start() # 在后台启动主机服务。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "上述代码在后台启动主机服务,并在端口50051上接受工作运行时连接。\n\n在运行工作运行时之前,让我们先定义我们的代理。\n该代理将在收到每条消息时发布一条新消息。\n它还会跟踪已发布的消息数量,\n一旦发布5条消息后就会停止发布新消息。\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler\n", "\n", "\n", "@dataclass\n", "class MyMessage:\n", " content: str\n", "\n", "\n", "@default_subscription\n", "class MyAgent(RoutedAgent):\n", " def __init__(self, name: str) -> None:\n", " super().__init__(\"My agent\")\n", " self._name = name\n", " self._counter = 0\n", "\n", " @message_handler\n", " async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:\n", " self._counter += 1\n", " if self._counter > 5:\n", " return\n", " content = f\"{self._name}: Hello x {self._counter}\"\n", " print(content)\n", " await self.publish_message(MyMessage(content=content), DefaultTopicId())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "现在我们可以设置工作代理运行时。\n我们使用{py:class}`~autogen_ext.runtimes.grpc.GrpcWorkerAgentRuntime`。\n我们设置两个工作运行时。每个运行时托管一个代理。\n所有代理都发布并订阅默认主题,因此它们可以看到所有\n正在发布的消息。\n\n为了运行代理,我们从工作运行时发布一条消息。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker1: Hello x 1\n", "worker2: Hello x 1\n", "worker2: Hello x 2\n", "worker1: Hello x 2\n", "worker1: Hello x 3\n", "worker2: Hello x 3\n", "worker2: Hello x 4\n", "worker1: Hello x 4\n", "worker1: Hello x 5\n", "worker2: Hello x 5\n" ] } ], "source": [ "import asyncio\n", "\n", "from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime\n", "\n", "worker1 = GrpcWorkerAgentRuntime(host_address=\"localhost:50051\")\n", "await worker1.start()\n", "await MyAgent.register(worker1, \"worker1\", lambda: MyAgent(\"worker1\"))\n", "\n", "worker2 = GrpcWorkerAgentRuntime(host_address=\"localhost:50051\")\n", "await worker2.start()\n", "await MyAgent.register(worker2, \"worker2\", lambda: MyAgent(\"worker2\"))\n", "\n", "await worker2.publish_message(MyMessage(content=\"Hello!\"), DefaultTopicId())\n", "\n", "# 让代理运行一段时间。\n", "await asyncio.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以看到每个代理都恰好发布了5条消息。\n\n要停止工作器运行时,可以调用 {py:meth}`~autogen_ext.runtimes.grpc.GrpcWorkerAgentRuntime.stop`。\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "await worker1.stop()\n", "await worker2.stop()\n", "\n", "# 要让工作器持续运行直到接收到终止信号(例如SIGTERM),\n", "# 可以执行 await worker1.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以调用 {py:meth}`~autogen_ext.runtimes.grpc.GrpcWorkerAgentRuntimeHost.stop`\n来停止主机服务。\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "await host.stop()\n", "\n", "# 要让主机服务持续运行直到接收到终止信号(例如SIGTERM),\n", "# 可以执行 await host.stop_when_signal()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 跨语言运行时\n上述流程大体相同,但所有跨代理消息类型必须使用共享的protobuf模式。\n\n## 后续步骤\n要查看使用分布式运行时的完整示例,请参考以下示例:\n\n- [分布式工作器](https://github.com/microsoft/autogen/tree/main/python/samples/core_grpc_worker_runtime) \n- [分布式语义路由器](https://github.com/microsoft/autogen/tree/main/python/samples/core_semantic_router) \n- [分布式群聊](https://github.com/microsoft/autogen/tree/main/python/samples/core_distributed-group-chat)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 2 }