人机交互循环#

在前一节团队中,我们已经了解了如何创建、观察和控制一个智能体团队。 本节将重点介绍如何从您的应用程序与团队进行交互,并向团队提供人工反馈。

从应用程序与团队交互主要有两种方式:

  1. 在团队运行期间——执行run()run_stream()时,通过UserProxyAgent提供反馈。

  2. 运行结束后,通过输入到下一次调用run()run_stream()来提供反馈。

我们将在本节中介绍这两种方法。

要直接查看与Web和UI框架集成的代码示例,请参阅以下链接:

在运行期间提供反馈#

UserProxyAgent是一个特殊的内置智能体, 它作为用户的代理向团队提供反馈。

要使用UserProxyAgent,您可以创建其实例 并在运行团队之前将其包含在团队中。 团队会决定何时调用UserProxyAgent 来向用户请求反馈。

例如在RoundRobinGroupChat团队中, UserProxyAgent按照传递给团队的顺序被调用, 而在SelectorGroupChat团队中, 选择器提示或选择器函数决定何时调用UserProxyAgent

下图展示了如何在团队运行期间使用UserProxyAgent 从用户获取反馈:

human-in-the-loop-user-proxy

粗箭头表示团队运行期间的控制流: 当团队调用UserProxyAgent时, 它将控制权转移给应用程序/用户,并等待反馈; 一旦提供反馈,控制权就转移回团队,团队继续执行。

备注

当在运行期间调用UserProxyAgent时, 它会阻塞团队的执行,直到用户提供反馈或出错。 这会阻碍团队的进展,并使团队处于无法保存或恢复的不稳定状态。

由于这种方法的阻塞特性,建议仅将其用于需要用户立即反馈的简短交互, 例如通过点击按钮请求批准或拒绝,或者需要立即关注否则会导致任务失败的警报。

以下是在RoundRobinGroupChat中 使用UserProxyAgent进行诗歌生成任务的示例:

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建智能体
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)
user_proxy = UserProxyAgent("user_proxy", input_func=input)  # 使用input()从控制台获取用户输入

# 创建终止条件,当用户说"APPROVE"时结束对话
termination = TextMentionTermination("APPROVE")

# 创建团队。
team = RoundRobinGroupChat([assistant, user_proxy], termination_condition=termination)

# 运行对话并将结果流式输出到控制台。
stream = team.run_stream(task="Write a 4-line poem about the ocean.")
# 在脚本中运行时使用 asyncio.run(...)。
await Console(stream)
await model_client.close()
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
In endless blue where whispers play,  
The ocean's waves dance night and day.  
A world of depths, both calm and wild,  
Nature's heart, forever beguiled.  
TERMINATE
---------- user_proxy ----------
APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a 4-line poem about the ocean.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=43), metadata={}, content="In endless blue where whispers play,  \nThe ocean's waves dance night and day.  \nA world of depths, both calm and wild,  \nNature's heart, forever beguiled.  \nTERMINATE", type='TextMessage'), UserInputRequestedEvent(source='user_proxy', models_usage=None, metadata={}, request_id='2622a0aa-b776-4e54-9e8f-4ecbdf14b78d', content='', type='UserInputRequestedEvent'), TextMessage(source='user_proxy', models_usage=None, metadata={}, content='APPROVE', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")

从控制台输出可以看到,团队通过 user_proxy 向用户征求反馈 来批准生成的诗作。

您可以为 UserProxyAgent 提供自定义输入函数 来定制反馈流程。 例如,当团队作为网络服务运行时,可以使用自定义输入函数 等待来自 WebSocket 连接的消息。 以下代码片段展示了使用 FastAPI 框架时 自定义输入函数的示例:

@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
    await websocket.accept()

    async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
        data = await websocket.receive_json() # 从websocket等待用户消息
        message = TextMessage.model_validate(data) # 假设用户消息是TextMessage类型
        return message.content
    
    # 创建带有自定义输入函数的用户代理
    # 使用该用户代理运行团队
    # ...

完整示例请参考 AgentChat FastAPI 示例

关于 UserProxyAgentChainLit 的集成, 请参阅 AgentChat ChainLit 示例

为下次运行提供反馈#

通常,应用程序或用户会与代理团队进行交互式循环: 团队运行直至终止, 应用程序或用户提供反馈,然后团队带着反馈再次运行。

这种方法在持久会话中非常有用, 可以实现团队与应用程序/用户之间的异步通信: 当团队完成一次运行时,应用程序保存团队状态, 将其放入持久存储,并在收到反馈时恢复团队运行。

备注

关于如何保存和加载团队状态,请参考状态管理。 本节将重点介绍反馈机制。

下图展示了这种方法的控制流程:

human-in-the-loop-termination

有两种实现方式:

  • 设置最大轮次限制,使团队在指定轮次后总是停止

  • 使用终止条件如 TextMentionTerminationHandoffTermination,让团队根据内部状态决定何时停止并交回控制权

您可以同时使用这两种方法来实现所需的行为。

使用最大轮次#

此方法允许通过设置最大轮次来暂停团队以等待用户输入。例如,您可以通过将 max_turns 设为 1 来配置团队在第一个代理响应后停止。这在需要持续用户交互的场景中特别有用,比如聊天机器人。

要实现这一点,请在 RoundRobinGroupChat() 构造函数中设置 max_turns 参数。

team = RoundRobinGroupChat([...], max_turns=1)

当团队停止时,轮次计数将被重置。当您恢复团队时,它将从 0 重新开始。不过,团队的内部状态将被保留,例如 RoundRobinGroupChat 会从列表中的下一个代理继续,并保持相同的对话历史。

备注

max_turn 是团队类特有的功能,目前仅支持 RoundRobinGroupChatSelectorGroupChatSwarm。 当与终止条件一起使用时,团队会在任一条件满足时停止。

以下是一个在 RoundRobinGroupChat 中使用 max_turns 进行诗歌生成任务的示例,最大轮次设为 1:

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建代理
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)

# 创建团队并设置最大轮次为 1
team = RoundRobinGroupChat([assistant], max_turns=1)

task = "Write a 4-line poem about the ocean."
while True:
    # 运行对话并输出到控制台
    stream = team.run_stream(task=task)
    # 在脚本中使用 asyncio.run(...)
    await Console(stream)
    # 获取用户响应。
    task = input("Enter your feedback (type 'exit' to leave): ")
    if task.lower().strip() == "exit":
        break
await model_client.close()
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Endless waves in a dance with the shore,  
Whispers of secrets in tales from the roar,  
Beneath the vast sky, where horizons blend,  
The ocean’s embrace is a timeless friend.  
TERMINATE
[Prompt tokens: 46, Completion tokens: 48]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 46
Total completion tokens: 48
Duration: 1.63 seconds
---------- user ----------
Can you make it about a person and its relationship with the ocean
---------- assistant ----------
She walks along the tide, where dreams intertwine,  
With every crashing wave, her heart feels aligned,  
In the ocean's embrace, her worries dissolve,  
A symphony of solace, where her spirit evolves.  
TERMINATE
[Prompt tokens: 117, Completion tokens: 49]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 117
Total completion tokens: 49
Duration: 1.21 seconds

你可以看到团队在一个代理响应后立即停止了。

使用终止条件#

在前面的章节中我们已经看到了几个终止条件的例子。 本节我们将重点介绍 HandoffTermination, 当代理发送 HandoffMessage 消息时,该条件会停止团队运行。

让我们创建一个包含单个 AssistantAgent 代理的团队, 并设置交接功能,然后运行一个需要用户额外输入的任务, 因为该代理没有相关工具来继续处理任务。

备注

AssistantAgent 一起使用的模型必须支持工具调用 才能使用交接功能。

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建 OpenAI 模型客户端。
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="sk-...", # 如果设置了 OPENAI_API_KEY 环境变量则为可选
)

# 创建一个懒惰的助手代理,总是将任务转交给用户。
lazy_agent = AssistantAgent(
    "lazy_assistant",
    model_client=model_client,
    handoffs=[Handoff(target="user", message="Transfer to user.")],
    system_message="If you cannot complete the task, transfer to user. Otherwise, when finished, respond with 'TERMINATE'.",
)

# 定义一个终止条件,用于检查转交消息。
handoff_termination = HandoffTermination(target="user")
# 定义一个终止条件,用于检查特定文本提及。
text_termination = TextMentionTermination("TERMINATE")

# 创建一个包含懒惰助手和两个终止条件的单代理团队。
lazy_agent_team = RoundRobinGroupChat([lazy_agent], termination_condition=handoff_termination | text_termination)

# 运行团队并将结果流式输出到控制台。
task = "What is the weather in New York?"
await Console(lazy_agent_team.run_stream(task=task), output_stats=True)
---------- user ----------
What is the weather in New York?
---------- lazy_assistant ----------
[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')]
[Prompt tokens: 69, Completion tokens: 12]
---------- lazy_assistant ----------
[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')]
---------- lazy_assistant ----------
Transfer to user.
---------- Summary ----------
Number of messages: 4
Finish reason: Handoff to user from lazy_assistant detected.
Total prompt tokens: 69
Total completion tokens: 12
Duration: 0.69 seconds
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the weather in New York?', type='TextMessage'), ToolCallRequestEvent(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=69, completion_tokens=12), content=[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='lazy_assistant', models_usage=None, content=[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')], type='ToolCallExecutionEvent'), HandoffMessage(source='lazy_assistant', models_usage=None, target='user', content='Transfer to user.', context=[], type='HandoffMessage')], stop_reason='Handoff to user from lazy_assistant detected.')

你可以看到团队因检测到交接消息而停止运行。 让我们通过提供代理所需的信息来继续团队工作。

await Console(lazy_agent_team.run_stream(task="The weather in New York is sunny."))
---------- user ----------
The weather in New York is sunny.
---------- lazy_assistant ----------
Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?
---------- lazy_assistant ----------
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='The weather in New York is sunny.', type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=110, completion_tokens=21), content="Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?", type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=137, completion_tokens=5), content='TERMINATE', type='TextMessage')], stop_reason="Text 'TERMINATE' mentioned")

你可以看到在用户提供信息后团队继续运行了。

备注

如果你使用的是针对用户的Swarm团队 配合HandoffTermination条件, 要恢复团队运行,你需要将task设置为一个HandoffMessage 并将target指向你想要运行的下一个代理。 更多详情请参阅Swarm