任务交接#

任务交接(Handoff)是OpenAI在名为Swarm的实验性项目中引入的多智能体设计模式。 核心思想是让智能体通过特殊工具调用来将任务委托给其他智能体。

我们可以使用AutoGen核心API通过事件驱动型智能体来实现任务交接模式。 相比OpenAI的实现和早期版本(v0.2),使用AutoGen(v0.4+)具有以下优势:

  1. 通过分布式智能体运行时可以扩展到分布式环境

  2. 提供灵活的自定义智能体实现能力

  3. 原生异步API便于与UI和其他系统集成

本笔记本演示了任务交接模式的简单实现。 建议先阅读主题与订阅 来理解发布-订阅和事件驱动型智能体的基本概念。

备注

我们正在AgentChat中为任务交接模式开发高级API, 以便您能更快速地开始使用。

场景说明#

本场景基于OpenAI示例修改。

考虑一个客户服务场景,客户试图通过聊天机器人获得产品退款或购买新产品。 该聊天机器人是由三个AI智能体和一个人类智能体组成的多智能体团队:

  • 分诊智能体:负责理解客户请求并决定将任务交接给哪个其他智能体

  • 退款智能体:负责处理退款请求

  • 销售智能体:负责处理销售请求

  • 人类智能体:负责处理AI智能体无法处理的复杂请求

在此场景中,客户通过用户代理与聊天机器人交互。

下图展示了该场景中智能体的交互拓扑结构。

任务交接

让我们使用AutoGen核心来实现这个场景。首先需要导入必要的模块。

import json
import uuid
from typing import List, Tuple

from autogen_core import (
    FunctionCall,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    FunctionExecutionResult,
    FunctionExecutionResultMessage,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from autogen_core.tools import FunctionTool, Tool
from autogen_ext.models.openai import OpenAIChatCompletionClient
from pydantic import BaseModel

消息协议#

首先需要定义智能体间通信的消息协议。 我们使用事件驱动的发布-订阅通信,因此这些消息类型将作为事件使用。

  • UserLogin:当用户登录并开始新会话时由运行时发布的消息

  • UserTask:包含用户会话聊天历史的消息。当AI智能体将任务交接给其他智能体时,也会发布UserTask消息

  • AgentResponse:由AI智能体和人类智能体发布的消息,包含聊天历史以及供客户回复的主题类型

class UserLogin(BaseModel):
    pass


class UserTask(BaseModel):
    context: List[LLMMessage]


class AgentResponse(BaseModel):
    reply_to_topic_type: str
    context: List[LLMMessage]

AI智能体#

我们从AIAgent类开始,这是多智能体聊天机器人中所有AI智能体 (即分诊、销售和问题处理智能体)的基类。 AIAgent使用ChatCompletionClient 来生成响应。 它可以直接使用常规工具,或通过delegate_tools将任务委托给其他智能体。 它订阅agent_topic_type主题类型来接收来自客户的消息, 并通过发布到user_topic_type主题类型向客户发送消息。

handle_task方法中,智能体首先使用模型生成响应。 如果响应包含交接工具调用,智能体会通过向工具调用结果中指定的主题发布UserTask消息, 将任务委托给另一个智能体。 如果响应是常规工具调用,智能体会执行该工具并再次调用模型生成下一个响应, 直到响应不再是工具调用。

当模型响应不是工具调用时,智能体通过向user_topic_type发布AgentResponse消息 来向客户发送响应。

class AIAgent(RoutedAgent):
    def __init__(
        self,
        description: str,
        system_message: SystemMessage,
        model_client: ChatCompletionClient,
        tools: List[Tool],
        delegate_tools: List[Tool],
        agent_topic_type: str,
        user_topic_type: str,
    ) -> None:
        super().__init__(description)
        self._system_message = system_message
        self._model_client = model_client
        self._tools = dict([(tool.name, tool) for tool in tools])
        self._tool_schema = [tool.schema for tool in tools]
        self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
        self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
        self._agent_topic_type = agent_topic_type
        self._user_topic_type = user_topic_type

    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
        # 将任务发送给大语言模型
        llm_result = await self._model_client.create(
            messages=[self._system_message] + message.context,
            tools=self._tool_schema + self._delegate_tool_schema,
            cancellation_token=ctx.cancellation_token,
        )
        print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
        # 处理大语言模型的结果
        while isinstance(llm_result.content, list) and all(isinstance(m, FunctionCall) for m in llm_result.content):
            tool_call_results: List[FunctionExecutionResult] = []
            delegate_targets: List[Tuple[str, UserTask]] = []
            # 处理每个函数调用。
            for call in llm_result.content:
                arguments = json.loads(call.arguments)
                if call.name in self._tools:
                    # 直接执行工具。
                    result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
                    result_as_str = self._tools[call.name].return_value_as_string(result)
                    tool_call_results.append(
                        FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
                    )
                elif call.name in self._delegate_tools:
                    # 执行工具以获取委托代理的主题类型。
                    result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
                    topic_type = self._delegate_tools[call.name].return_value_as_string(result)
                    # 为委托代理创建上下文,包括函数调用和结果。
                    delegate_messages = list(message.context) + [
                        AssistantMessage(content=[call], source=self.id.type),
                        FunctionExecutionResultMessage(
                            content=[
                                FunctionExecutionResult(
                                    call_id=call.id,
                                    content=f"Transferred to {topic_type}. Adopt persona immediately.",
                                    is_error=False,
                                    name=call.name,
                                )
                            ]
                        ),
                    ]
                    delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
                else:
                    raise ValueError(f"Unknown tool: {call.name}")
            if len(delegate_targets) > 0:
                # 通过向相应主题发布消息,将任务委托给其他代理。
                for topic_type, task in delegate_targets:
                    print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
                    await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
            if len(tool_call_results) > 0:
                print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
                # 用结果再次调用LLM。
                message.context.extend(
                    [
                        AssistantMessage(content=llm_result.content, source=self.id.type),
                        FunctionExecutionResultMessage(content=tool_call_results),
                    ]
                )
                llm_result = await self._model_client.create(
                    messages=[self._system_message] + message.context,
                    tools=self._tool_schema + self._delegate_tool_schema,
                    cancellation_token=ctx.cancellation_token,
                )
                print(f"{'-'*80}\n{self.id.type}:\n{llm_result.content}", flush=True)
            else:
                # 任务已委派,我们已完成。
                return
        # 任务已完成,发布最终结果。
        assert isinstance(llm_result.content, str)
        message.context.append(AssistantMessage(content=llm_result.content, source=self.id.type))
        await self.publish_message(
            AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )

人工代理#

HumanAgent 类是聊天机器人中代表人类的代理。它用于处理AI代理无法处理的请求。HumanAgent 订阅主题类型 agent_topic_type 来接收消息,并发布到主题类型 user_topic_type 以向客户发送消息。

在本实现中,HumanAgent 仅使用控制台获取您的输入。在实际应用中,您可以按以下方式改进此设计:

  • handle_user_task 方法中,通过Teams或Slack等聊天应用发送通知

  • 聊天应用通过运行时将人工响应发布到 agent_topic_type 指定的主题

  • 创建另一个消息处理器来处理人工响应并将其发送回客户

class HumanAgent(RoutedAgent):
    def __init__(self, description: str, agent_topic_type: str, user_topic_type: str) -> None:
        super().__init__(description)
        self._agent_topic_type = agent_topic_type
        self._user_topic_type = user_topic_type

    @message_handler
    async def handle_user_task(self, message: UserTask, ctx: MessageContext) -> None:
        human_input = input("Human agent input: ")
        print(f"{'-'*80}\n{self.id.type}:\n{human_input}", flush=True)
        message.context.append(AssistantMessage(content=human_input, source=self.id.type))
        await self.publish_message(
            AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
            topic_id=TopicId(self._user_topic_type, source=self.id.key),
        )

用户代理#

UserAgent 类是与聊天机器人对话的客户代理。它处理两种消息类型:UserLoginAgentResponse。 当 UserAgent 收到 UserLogin 消息时,它会与聊天机器人开始新会话,并向订阅 agent_topic_type 主题类型的AI代理发布 UserTask 消息。 当 UserAgent 收到 AgentResponse 消息时,它会向用户展示聊天机器人的响应。

在本实现中,UserAgent 使用控制台获取您的输入。在实际应用中,您可以使用上述 HumanAgent 部分描述的相同思路来改进人机交互。

class UserAgent(RoutedAgent):
    def __init__(self, description: str, user_topic_type: str, agent_topic_type: str) -> None:
        super().__init__(description)
        self._user_topic_type = user_topic_type
        self._agent_topic_type = agent_topic_type

    @message_handler
    async def handle_user_login(self, message: UserLogin, ctx: MessageContext) -> None:
        print(f"{'-'*80}\nUser login, session ID: {self.id.key}.", flush=True)
        # 获取用户登录后的初始输入。
        user_input = input("User: ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}")
        await self.publish_message(
            UserTask(context=[UserMessage(content=user_input, source="User")]),
            topic_id=TopicId(self._agent_topic_type, source=self.id.key),
        )

    @message_handler
    async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
        # 获取用户在收到代理响应后的输入。
        user_input = input("User (type 'exit' to close the session): ")
        print(f"{'-'*80}\n{self.id.type}:\n{user_input}", flush=True)
        if user_input.strip().lower() == "exit":
            print(f"{'-'*80}\nUser session ended, session ID: {self.id.key}.")
            return
        message.context.append(UserMessage(content=user_input, source="User"))
        await self.publish_message(
            UserTask(context=message.context), topic_id=TopicId(message.reply_to_topic_type, source=self.id.key)
        )

AI代理的工具#

如果AI代理不需要将任务转交给其他代理,它们可以使用常规工具来完成任务。 我们通过简单函数定义工具,并使用 FunctionTool包装器创建这些工具。

def execute_order(product: str, price: int) -> str:
    print("\n\n=== Order Summary ===")
    print(f"Product: {product}")
    print(f"Price: ${price}")
    print("=================\n")
    confirm = input("Confirm order? y/n: ").strip().lower()
    if confirm == "y":
        print("Order execution successful!")
        return "Success"
    else:
        print("Order cancelled!")
        return "User cancelled order."


def look_up_item(search_query: str) -> str:
    item_id = "item_132612938"
    print("Found item:", item_id)
    return item_id


def execute_refund(item_id: str, reason: str = "not provided") -> str:
    print("\n\n=== Refund Summary ===")
    print(f"Item ID: {item_id}")
    print(f"Reason: {reason}")
    print("=================\n")
    print("Refund execution successful!")
    return "success"


execute_order_tool = FunctionTool(execute_order, description="Price should be in USD.")
look_up_item_tool = FunctionTool(
    look_up_item, description="Use to find item ID.\nSearch query can be a description or keywords."
)
execute_refund_tool = FunctionTool(execute_refund, description="")

代理的主题类型#

我们定义了每个代理将订阅的主题类型。 更多关于主题类型的信息,请参阅主题与订阅

sales_agent_topic_type = "SalesAgent"
issues_and_repairs_agent_topic_type = "IssuesAndRepairsAgent"
triage_agent_topic_type = "TriageAgent"
human_agent_topic_type = "HumanAgent"
user_topic_type = "User"

AI代理的委托工具#

除了常规工具外,AI代理还可以使用称为委托工具的特殊工具将任务委托给其他代理。 委托工具的概念仅在此设计模式中使用,这些委托工具同样被定义为简单函数。 在此设计模式中,我们将委托工具与常规工具区分开来, 因为当AI代理调用委托工具时,我们会将任务转移给另一个代理, 而不是继续使用同一代理中的模型生成响应。

def transfer_to_sales_agent() -> str:
    return sales_agent_topic_type


def transfer_to_issues_and_repairs() -> str:
    return issues_and_repairs_agent_topic_type


def transfer_back_to_triage() -> str:
    return triage_agent_topic_type


def escalate_to_human() -> str:
    return human_agent_topic_type


transfer_to_sales_agent_tool = FunctionTool(
    transfer_to_sales_agent, description="Use for anything sales or buying related."
)
transfer_to_issues_and_repairs_tool = FunctionTool(
    transfer_to_issues_and_repairs, description="Use for issues, repairs, or refunds."
)
transfer_back_to_triage_tool = FunctionTool(
    transfer_back_to_triage,
    description="Call this if the user brings up a topic outside of your purview,\nincluding escalating to human.",
)
escalate_to_human_tool = FunctionTool(escalate_to_human, description="Only call this if explicitly asked to.")

创建团队#

我们已经定义了AI智能体、人类代理、用户代理、工具和主题类型。 现在我们可以创建智能体团队了。

对于AI智能体,我们使用OpenAIChatCompletionClientgpt-4o-mini模型。

创建完智能体运行时后,我们通过提供 一个智能体类型和一个创建智能体实例的工厂方法来注册每个智能体。 运行时负责管理智能体生命周期,因此我们不需要 自己实例化智能体。 更多关于智能体运行时的内容请阅读智能体运行时环境 以及关于智能体生命周期的内容请阅读智能体身份与生命周期

在下面的代码中,你可以看到我们使用AIAgent类来定义分诊、销售、问题和修复智能体。 我们为每个智能体添加了常规工具和委托工具。 我们还为每个智能体添加了对主题类型的订阅。

runtime = SingleThreadedAgentRuntime()

model_client = OpenAIChatCompletionClient(
    model="gpt-4o-mini",
    # api_key="YOUR_API_KEY",
)

# 注册分诊智能体
triage_agent_type = await AIAgent.register(
    runtime,
    type=triage_agent_topic_type,  # 使用主题类型作为智能体类型
    factory=lambda: AIAgent(
        description="A triage agent.",
        system_message=SystemMessage(
            content="You are a customer service bot for ACME Inc. "
            "Introduce yourself. Always be very brief. "
            "Gather information to direct the customer to the right department. "
            "But make your questions subtle and natural."
        ),
        model_client=model_client,
        tools=[],
        delegate_tools=[
            transfer_to_issues_and_repairs_tool,
            transfer_to_sales_agent_tool,
            escalate_to_human_tool,
        ],
        agent_topic_type=triage_agent_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# 为分诊智能体添加订阅:它只会接收发布到自身主题的消息
await runtime.add_subscription(TypeSubscription(topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type))

# 注册销售代理。
sales_agent_type = await AIAgent.register(
    runtime,
    type=sales_agent_topic_type,  # 使用主题类型作为代理类型。
    factory=lambda: AIAgent(
        description="A sales agent.",
        system_message=SystemMessage(
            content="You are a sales agent for ACME Inc."
            "Always answer in a sentence or less."
            "Follow the following routine with the user:"
            "1. Ask them about any problems in their life related to catching roadrunners.\n"
            "2. Casually mention one of ACME's crazy made-up products can help.\n"
            " - Don't mention price.\n"
            "3. Once the user is bought in, drop a ridiculous price.\n"
            "4. Only after everything, and if the user says yes, "
            "tell them a crazy caveat and execute their order.\n"
            ""
        ),
        model_client=model_client,
        tools=[execute_order_tool],
        delegate_tools=[transfer_back_to_triage_tool],
        agent_topic_type=sales_agent_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# 为销售代理添加订阅:它只会接收发布到自身主题的消息。
await runtime.add_subscription(TypeSubscription(topic_type=sales_agent_topic_type, agent_type=sales_agent_type.type))

# 注册问题和维修代理。
issues_and_repairs_agent_type = await AIAgent.register(
    runtime,
    type=issues_and_repairs_agent_topic_type,  # 使用主题类型作为代理类型。
    factory=lambda: AIAgent(
        description="An issues and repairs agent.",
        system_message=SystemMessage(
            content="You are a customer support agent for ACME Inc."
            "Always answer in a sentence or less."
            "Follow the following routine with the user:"
            "1. First, ask probing questions and understand the user's problem deeper.\n"
            " - unless the user has already provided a reason.\n"
            "2. Propose a fix (make one up).\n"
            "3. ONLY if not satisfied, offer a refund.\n"
            "4. If accepted, search for the ID and then execute refund."
        ),
        model_client=model_client,
        tools=[
            execute_refund_tool,
            look_up_item_tool,
        ],
        delegate_tools=[transfer_back_to_triage_tool],
        agent_topic_type=issues_and_repairs_agent_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# 为问题和维修代理添加订阅:它只会接收发布到自身主题的消息。
await runtime.add_subscription(
    TypeSubscription(topic_type=issues_and_repairs_agent_topic_type, agent_type=issues_and_repairs_agent_type.type)
)

# 注册人工代理。
human_agent_type = await HumanAgent.register(
    runtime,
    type=human_agent_topic_type,  # 使用主题类型作为代理类型。
    factory=lambda: HumanAgent(
        description="A human agent.",
        agent_topic_type=human_agent_topic_type,
        user_topic_type=user_topic_type,
    ),
)
# 为人工代理添加订阅:它只会接收发布到自身主题的消息。
await runtime.add_subscription(TypeSubscription(topic_type=human_agent_topic_type, agent_type=human_agent_type.type))

# 注册用户代理。
user_agent_type = await UserAgent.register(
    runtime,
    type=user_topic_type,
    factory=lambda: UserAgent(
        description="A user agent.",
        user_topic_type=user_topic_type,
        agent_topic_type=triage_agent_topic_type,  # 从分流代理开始。
    ),
)
# 为用户代理添加订阅:它将只接收发布到其专属主题的消息。
await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type))

运行团队#

最后,我们可以启动运行时并通过发布一个 UserLogin 消息来模拟用户会话。 该消息会被发布到主题ID,其类型设为 user_topic_type, 来源设为唯一的 session_id。 这个 session_id 将用于创建该用户会话中的所有主题ID,同时也用于创建 该用户会话中所有代理的代理ID。 要了解更多关于主题ID和代理ID的创建方式,请阅读 代理身份与生命周期主题与订阅

# 启动运行时。
runtime.start()

# 为用户创建新会话。
session_id = str(uuid.uuid4())
await runtime.publish_message(UserLogin(), topic_id=TopicId(user_topic_type, source=session_id))

# 运行直至完成。
await runtime.stop_when_idle()
await model_client.close()
--------------------------------------------------------------------------------
User login, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.
--------------------------------------------------------------------------------
User:
I want a refund
--------------------------------------------------------------------------------
TriageAgent:
I can help with that! Could I ask what item you're seeking a refund for?
--------------------------------------------------------------------------------
User:
A pair of shoes I bought
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_qPx1DXDL2NLcHs8QNo47egsJ', arguments='{}', name='transfer_to_issues_and_repairs')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to IssuesAndRepairsAgent
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I see you're looking for a refund on a pair of shoes. Can you tell me what the issue is with the shoes?
--------------------------------------------------------------------------------
User:
The shoes are too small
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I recommend trying a size up as a fix; would that work for you?
--------------------------------------------------------------------------------
User:
no I want a refund
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_Ytp8VUQRyKFNEU36mLE6Dkrp', arguments='{"search_query":"shoes"}', name='look_up_item')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='item_132612938', call_id='call_Ytp8VUQRyKFNEU36mLE6Dkrp')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_bPm6EKKBy5GJ65s9OKt9b1uE', arguments='{"item_id":"item_132612938","reason":"not provided"}', name='execute_refund')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionExecutionResult(content='success', call_id='call_bPm6EKKBy5GJ65s9OKt9b1uE')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Your refund has been successfully processed! If you have any other questions, feel free to ask.
--------------------------------------------------------------------------------
User:
I want to talk to your manager
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
I can help with that, let me transfer you to a supervisor.
--------------------------------------------------------------------------------
User:
Okay
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
[FunctionCall(id='call_PpmLZvwNoiDPUH8Tva3eAwHX', arguments='{}', name='transfer_back_to_triage')]
--------------------------------------------------------------------------------
IssuesAndRepairsAgent:
Delegating to TriageAgent
--------------------------------------------------------------------------------
TriageAgent:
[FunctionCall(id='call_jSL6IBm5537Dr74UbJSxaj6I', arguments='{}', name='escalate_to_human')]
--------------------------------------------------------------------------------
TriageAgent:
Delegating to HumanAgent
--------------------------------------------------------------------------------
HumanAgent:
Hello this is manager
--------------------------------------------------------------------------------
User:
Hi! Thanks for your service. I give you 5 stars!
--------------------------------------------------------------------------------
HumanAgent:
Thanks.
--------------------------------------------------------------------------------
User:
exit
--------------------------------------------------------------------------------
User session ended, session ID: 7a568cf5-13e7-4e81-8616-8265a01b3f2b.

后续步骤#

本笔记本演示了如何使用AutoGen Core实现交接模式。 您可以通过添加更多代理和工具来持续改进此设计, 或为用户代理和人类代理创建更好的用户界面。

欢迎您在我们的社区论坛上分享您的工作。