{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 使用代理提取结果\n\n当运行多代理系统来解决某些任务时,您可能希望在系统达到终止状态后提取其结果。本指南展示了一种实现方法。由于代理实例无法直接从外部访问,我们将使用一个代理将最终结果发布到可访问的位置。\n\n如果您将系统建模为发布某种`FinalResult`类型,那么可以创建一个专门订阅该类型并将结果对外提供的代理。对于这类简单代理,{py:class}`~autogen_core.components.ClosureAgent`是一个可以减少样板代码的选项。它允许您定义一个函数作为代理的消息处理器。在本例中,我们将使用代理与外部代码共享的队列来传递结果。\n\n```{note}\n在考虑如何从多代理系统中提取结果时,必须始终考虑代理的订阅主题及其发布主题。\n这是因为代理只会接收其订阅主题的消息。\n```\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "from dataclasses import dataclass\n", "\n", "from autogen_core import (\n", " ClosureAgent,\n", " ClosureContext,\n", " DefaultSubscription,\n", " DefaultTopicId,\n", " MessageContext,\n", " SingleThreadedAgentRuntime,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "为最终结果定义一个数据类。\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class FinalResult:\n", " value: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "创建一个队列用于将结果从代理传递到外部代码。\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "queue = asyncio.Queue[FinalResult]()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "创建一个函数闭包用于将最终结果输出到队列。\n该函数必须遵循签名\n`Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]]`\n其中`T`是代理将接收的消息类型。\n您可以使用联合类型来处理多种消息类型。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:\n", " await queue.put(message)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "让我们创建一个运行时并注册一个{py:class}`~autogen_core.components.ClosureAgent`,它将把最终结果发布到队列中。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "AgentType(type='output_result')" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await ClosureAgent.register_closure(\n", " runtime, \"output_result\", output_result, subscriptions=lambda: [DefaultSubscription()]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以通过直接将最终结果发布到运行时来模拟它们的收集过程。\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "runtime.start()\n", "await runtime.publish_message(FinalResult(\"Result 1\"), DefaultTopicId())\n", "await runtime.publish_message(FinalResult(\"Result 2\"), DefaultTopicId())\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们可以查看队列来查看最终结果。\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Result 1\n", "Result 2\n" ] } ], "source": [ "while not queue.empty():\n", " print((result := await queue.get()).value)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }