使用代理提取结果#

当运行多代理系统来解决某些任务时,您可能希望在系统达到终止状态后提取其结果。本指南展示了一种实现方法。由于代理实例无法直接从外部访问,我们将使用一个代理将最终结果发布到可访问的位置。

如果您将系统建模为发布某种FinalResult类型,那么可以创建一个专门订阅该类型并将结果对外提供的代理。对于这类简单代理,ClosureAgent是一个可以减少样板代码的选项。它允许您定义一个函数作为代理的消息处理器。在本例中,我们将使用代理与外部代码共享的队列来传递结果。

备注

在考虑如何从多代理系统中提取结果时,必须始终考虑代理的订阅主题及其发布主题。 这是因为代理只会接收其订阅主题的消息。

import asyncio
from dataclasses import dataclass

from autogen_core import (
    ClosureAgent,
    ClosureContext,
    DefaultSubscription,
    DefaultTopicId,
    MessageContext,
    SingleThreadedAgentRuntime,
)

为最终结果定义一个数据类。

@dataclass
class FinalResult:
    value: str

创建一个队列用于将结果从代理传递到外部代码。

queue = asyncio.Queue[FinalResult]()

创建一个函数闭包用于将最终结果输出到队列。 该函数必须遵循签名 Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]] 其中T是代理将接收的消息类型。 您可以使用联合类型来处理多种消息类型。

async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
    await queue.put(message)

让我们创建一个运行时并注册一个ClosureAgent,它将把最终结果发布到队列中。

runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
    runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')

我们可以通过直接将最终结果发布到运行时来模拟它们的收集过程。

runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()

我们可以查看队列来查看最终结果。

while not queue.empty():
    print((result := await queue.get()).value)
Result 1
Result 2