使用代理提取结果#
当运行多代理系统来解决某些任务时,您可能希望在系统达到终止状态后提取其结果。本指南展示了一种实现方法。由于代理实例无法直接从外部访问,我们将使用一个代理将最终结果发布到可访问的位置。
如果您将系统建模为发布某种FinalResult
类型,那么可以创建一个专门订阅该类型并将结果对外提供的代理。对于这类简单代理,ClosureAgent
是一个可以减少样板代码的选项。它允许您定义一个函数作为代理的消息处理器。在本例中,我们将使用代理与外部代码共享的队列来传递结果。
备注
在考虑如何从多代理系统中提取结果时,必须始终考虑代理的订阅主题及其发布主题。 这是因为代理只会接收其订阅主题的消息。
import asyncio
from dataclasses import dataclass
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultSubscription,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
)
为最终结果定义一个数据类。
@dataclass
class FinalResult:
value: str
创建一个队列用于将结果从代理传递到外部代码。
queue = asyncio.Queue[FinalResult]()
创建一个函数闭包用于将最终结果输出到队列。
该函数必须遵循签名
Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]]
其中T
是代理将接收的消息类型。
您可以使用联合类型来处理多种消息类型。
async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
await queue.put(message)
让我们创建一个运行时并注册一个ClosureAgent
,它将把最终结果发布到队列中。
runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')
我们可以通过直接将最终结果发布到运行时来模拟它们的收集过程。
runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()
我们可以查看队列来查看最终结果。
while not queue.empty():
print((result := await queue.get()).value)
Result 1
Result 2