autogen_agentchat.teams#

本模块提供了多种预定义多智能体团队的实现。 每个团队都继承自 BaseGroupChat 类。

class BaseGroupChat(participants: List[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[源代码]#

基类:Team, ABC, ComponentBase[BaseModel]

群组聊天团队的基础类。

要实现一个群组聊天团队,首先创建 BaseGroupChatManager 的子类,然后 创建使用该群组聊天管理器的 BaseGroupChat 的子类。

component_type: ClassVar[ComponentType] = 'team'#

组件的逻辑类型。

async load_state(state: Mapping[str, Any]) None[源代码]#

加载外部状态并覆盖当前群聊团队的状态。

通过调用每个参与者和群聊管理器的 agent_load_state() 方法(使用其内部代理ID)来加载状态。 关于状态的预期格式,请参阅 save_state() 方法。

async pause() None[源代码]#

在团队运行时通过直接RPC调用参与者的 on_pause() 方法来暂停它们。

注意

这是v0.4.9引入的实验性功能,未来可能会变更或移除。

团队必须初始化后才能暂停。

与终止不同,暂停团队不会导致 run()run_stream() 方法返回。它会调用每个参与者的 on_pause() 方法,如果参与者未实现该方法, 则不会有任何操作。

备注

代理类需要负责处理暂停操作,并确保代理后续可以恢复。 请确保在你的代理类中实现 on_pause() 方法来自定义暂停行为。 默认情况下,代理被调用时不会执行任何操作。

抛出:

RuntimeError -- 如果团队未初始化。来自参与者的异常当调用它们的 on_pause 实现时 会传播到该方法并抛出。

async reset() None[源代码]#

将团队及其参与者重置到初始状态。

团队必须停止后才能进行重置。

抛出:

RuntimeError -- 如果团队未初始化或当前正在运行。

使用 RoundRobinGroupChat 团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # Reset the team.
    await team.reset()
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)


asyncio.run(main())
async resume() None[源代码]#

在团队运行且暂停时通过直接RPC调用参与者的 on_resume() 方法来恢复它们。

注意

这是v0.4.9引入的实验性功能,未来可能会变更或移除。

团队必须初始化后才能恢复。

与终止和用新任务重启不同,恢复团队不会导致 run()run_stream() 方法返回。它会调用每个参与者的 on_resume() 方法,如果参与者未实现该方法, 则不会有任何操作。

备注

代理类需要负责处理恢复操作,并确保代理从暂停处继续执行。 请确保在你的代理类中实现 on_resume() 方法来自定义恢复行为。

抛出:

RuntimeError -- 如果团队未初始化。来自参与者的异常当调用它们的 on_resume 方法实现时 会传播到该方法并抛出。

async run(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) TaskResult[源代码]#

运行团队并返回结果。基础实现使用 run_stream() 运行团队,然后返回最终结果。 团队停止后,终止条件会被重置。

参数:
  • task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) -- 运行团队的任务。可以是字符串、单个 BaseChatMessageBaseChatMessage 列表。

  • cancellation_token (CancellationToken | None) -- 用于立即终止任务的取消令牌。 设置取消令牌可能导致团队处于不一致状态, 并且可能不会重置终止条件。 要优雅地停止团队,请改用 ExternalTermination

Returns:

result -- 任务结果,类型为 TaskResult。结果包含团队产生的消息和停止原因。

使用 RoundRobinGroupChat 团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    result = await team.run(task="Count from 1 to 10, respond one at a time.")
    print(result)

    # 再次运行团队时不带任务以继续之前的任务。
    result = await team.run()
    print(result)


asyncio.run(main())

使用 CancellationToken 取消任务的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # 创建后台运行团队的任务。
    run_task = asyncio.create_task(
        team.run(
            task="Count from 1 to 10, respond one at a time.",
            cancellation_token=cancellation_token,
        )
    )

    # 等待1秒后取消任务。
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # 这将引发取消错误。
    await run_task


asyncio.run(main())
async run_stream(*, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None) AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None][源代码]#

运行团队并生成消息流和最终结果, 最终结果的类型为 TaskResult,作为流的最后一项。 团队停止后,终止条件会被重置。

备注

如果代理产生 ModelClientStreamingChunkEvent, 该消息将在流中产生,但不会包含在 messages 中。

参数:
  • task (str | BaseChatMessage | Sequence[BaseChatMessage] | None) -- 运行团队的任务。可以是字符串、单个 BaseChatMessageBaseChatMessage 列表。

  • cancellation_token (CancellationToken | None) -- 用于立即终止任务的取消令牌。 设置取消令牌可能导致团队处于不一致状态, 并且可能不会重置终止条件。 要优雅地停止团队,请改用 ExternalTermination

Returns:

stream -- 一个 AsyncGenerator,产生 BaseAgentEventBaseChatMessage 以及最终的 TaskResult 结果作为流的最后一项。

使用 RoundRobinGroupChat 团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # 再次运行团队时不带任务以继续之前的任务。
    stream = team.run_stream()
    async for message in stream:
        print(message)


asyncio.run(main())

使用 CancellationToken 取消任务的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # 创建后台运行团队的任务。
    run_task = asyncio.create_task(
        Console(
            team.run_stream(
                task="Count from 1 to 10, respond one at a time.",
                cancellation_token=cancellation_token,
            )
        )
    )

    # 等待1秒后取消任务。
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # 这将引发取消错误。
    await run_task


asyncio.run(main())
async save_state() Mapping[str, Any][源代码]#

保存群聊团队的状态。

通过调用每个参与者和群聊管理器的 agent_save_state() 方法(使用其内部代理ID)来保存状态。 状态以嵌套字典形式返回:一个包含 agent_states 键的字典,该键对应的值是另一个字典,其中代理名称作为键,状态作为值。

{
    "agent_states": {
        "agent1": ...,
        "agent2": ...,
        "RoundRobinGroupChatManager": ...
    }
}

备注

从 v0.4.9 版本开始,状态使用代理名称作为键而非代理ID,并且 team_id 字段已从状态中移除。 这使得状态可以在不同团队和运行时之间移植。以旧格式保存的状态未来可能与新格式不兼容。

小心

在团队运行时调用 save_state() 可能导致状态不一致并产生意外结果。 建议在团队未运行或停止后调用此方法。

pydantic model DiGraph[源代码]#

基类:BaseModel

定义了一个包含节点和边的有向图结构。 GraphFlow 使用此结构来确定执行顺序和条件。

警告

这是一个实验性功能,API 将在未来版本中发生变化。

Show JSON schema
{
   "title": "DiGraph",
   "description": "\u5b9a\u4e49\u4e86\u4e00\u4e2a\u5305\u542b\u8282\u70b9\u548c\u8fb9\u7684\u6709\u5411\u56fe\u7ed3\u6784\u3002\n:class:`GraphFlow` \u4f7f\u7528\u6b64\u7ed3\u6784\u6765\u786e\u5b9a\u6267\u884c\u987a\u5e8f\u548c\u6761\u4ef6\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d1\u751f\u53d8\u5316\u3002",
   "type": "object",
   "properties": {
      "nodes": {
         "additionalProperties": {
            "$ref": "#/$defs/DiGraphNode"
         },
         "title": "Nodes",
         "type": "object"
      },
      "default_start_node": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Default Start Node"
      }
   },
   "$defs": {
      "DiGraphEdge": {
         "description": "\u8868\u793a :class:`DiGraph` \u4e2d\u7684\u6709\u5411\u8fb9\uff0c\u5e26\u6709\u53ef\u9009\u7684\u6267\u884c\u6761\u4ef6\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d8\u66f4\u3002",
         "properties": {
            "target": {
               "title": "Target",
               "type": "string"
            },
            "condition": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Condition"
            }
         },
         "required": [
            "target"
         ],
         "title": "DiGraphEdge",
         "type": "object"
      },
      "DiGraphNode": {
         "description": "\u8868\u793a :class:`DiGraph` \u4e2d\u7684\u8282\u70b9(\u4ee3\u7406)\uff0c\u5305\u542b\u5176\u51fa\u8fb9\u548c\u6fc0\u6d3b\u7c7b\u578b\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d8\u66f4\u3002",
         "properties": {
            "name": {
               "title": "Name",
               "type": "string"
            },
            "edges": {
               "default": [],
               "items": {
                  "$ref": "#/$defs/DiGraphEdge"
               },
               "title": "Edges",
               "type": "array"
            },
            "activation": {
               "default": "all",
               "enum": [
                  "all",
                  "any"
               ],
               "title": "Activation",
               "type": "string"
            }
         },
         "required": [
            "name"
         ],
         "title": "DiGraphNode",
         "type": "object"
      }
   },
   "required": [
      "nodes"
   ]
}

Fields:
  • default_start_node (str | None)

  • nodes (Dict[str, autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphNode])

field default_start_node: str | None = None#
field nodes: Dict[str, DiGraphNode] [Required]#
get_has_cycles() bool[源代码]#

指示图中是否至少存在一个循环(具有有效的退出条件)。

get_leaf_nodes() Set[str][源代码]#

返回没有出边的节点(最终输出节点)。

get_parents() Dict[str, List[str]][源代码]#

计算每个节点到其父节点的映射关系。

get_start_nodes() Set[str][源代码]#

返回没有入边的节点(入口点)。

graph_validate() None[源代码]#

验证图结构和执行规则。

has_cycles_with_exit() bool[源代码]#

检查图中是否存在环,并验证每个环至少有一条条件边。

返回:
bool: 如果存在至少一个环且所有环都有退出条件,则返回True。

如果不存在环,则返回False。

抛出:

ValueError: 如果存在没有任何条件边的环。

model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

参数:
  • self -- The BaseModel instance.

  • context -- The context.

class DiGraphBuilder[源代码]#

基类:object

用于构建 GraphFlow 中使用的 DiGraph 执行图的流式构建器。

警告

这是实验性功能,API 将在未来版本中变更。

该工具提供了一种便捷的编程方式来构建智能体交互图,支持包括以下复杂执行流程:

  • 顺序链式执行

  • 并行扇出

  • 条件分支

  • 带安全出口的循环

图中每个节点代表一个智能体。边定义了智能体间的执行路径,并可选择性地基于消息内容进行条件控制。

该构建器兼容 Graph 运行器,同时支持标准智能体和过滤型智能体。

方法说明:
  • add_node(agent, activation): 向图中添加智能体节点

  • add_edge(source, target, condition): 连接两个节点,可选添加条件

  • add_conditional_edges(source, condition_to_target): 从源节点添加多个条件边

  • set_entry_point(agent): 设置默认起始节点(可选)

  • build(): 生成验证过的 DiGraph

  • get_participants(): 返回已添加的智能体列表

示例 — 顺序流 A → B → C:
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
>>> team = Graph(
...     participants=builder.get_participants(),
...     graph=builder.build(),
...     termination_condition=MaxMessageTermination(5),
... )
示例 — 并行扇出 A → (B, C):
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
示例 — 条件分支 A → B ("yes"), A → C ("no"):
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_conditional_edges(agent_a, {"yes": agent_b, "no": agent_c})
示例 — 循环: A → B → A ("loop"), B → C ("exit"):
>>> builder = GraphBuilder()
>>> builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
>>> builder.add_edge(agent_a, agent_b)
>>> builder.add_conditional_edges(agent_b, {"loop": agent_a, "exit": agent_c})
add_conditional_edges(source: str | ChatAgent, condition_to_target: Dict[str, str | ChatAgent]) DiGraphBuilder[源代码]#

根据条件字符串从源节点添加多条条件边。

add_edge(source: str | ChatAgent, target: str | ChatAgent, condition: str | None = None) DiGraphBuilder[源代码]#

添加从源节点到目标节点的有向边,可选择性地添加条件。

add_node(agent: ChatAgent, activation: Literal['all', 'any'] = 'all') DiGraphBuilder[源代码]#

向图中添加节点并注册其智能体。

build() DiGraph[源代码]#

构建并验证有向图。

get_participants() list[ChatAgent][源代码]#

返回构建器中按插入顺序排列的代理列表。

set_entry_point(name: str | ChatAgent) DiGraphBuilder[源代码]#

设置图的默认起始节点。

pydantic model DiGraphEdge[源代码]#

基类:BaseModel

表示 DiGraph 中的有向边,带有可选的执行条件。

警告

这是一个实验性功能,API 将在未来版本中变更。

Show JSON schema
{
   "title": "DiGraphEdge",
   "description": "\u8868\u793a :class:`DiGraph` \u4e2d\u7684\u6709\u5411\u8fb9\uff0c\u5e26\u6709\u53ef\u9009\u7684\u6267\u884c\u6761\u4ef6\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d8\u66f4\u3002",
   "type": "object",
   "properties": {
      "target": {
         "title": "Target",
         "type": "string"
      },
      "condition": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Condition"
      }
   },
   "required": [
      "target"
   ]
}

Fields:
  • condition (str | None)

  • target (str)

field condition: str | None = None#

(实验性) 执行该边的条件。 如果为 None,则该边是无条件的。如果是字符串,则该边取决于最后一条代理聊天消息中是否包含该字符串。 注意:这是一个实验性功能,未来版本将会变更以允许更好地指定分支条件,类似于 TerminationCondition 类。

field target: str [Required]#
pydantic model DiGraphNode[源代码]#

基类:BaseModel

表示 DiGraph 中的节点(代理),包含其出边和激活类型。

警告

这是一个实验性功能,API 将在未来版本中变更。

Show JSON schema
{
   "title": "DiGraphNode",
   "description": "\u8868\u793a :class:`DiGraph` \u4e2d\u7684\u8282\u70b9(\u4ee3\u7406)\uff0c\u5305\u542b\u5176\u51fa\u8fb9\u548c\u6fc0\u6d3b\u7c7b\u578b\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d8\u66f4\u3002",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "edges": {
         "default": [],
         "items": {
            "$ref": "#/$defs/DiGraphEdge"
         },
         "title": "Edges",
         "type": "array"
      },
      "activation": {
         "default": "all",
         "enum": [
            "all",
            "any"
         ],
         "title": "Activation",
         "type": "string"
      }
   },
   "$defs": {
      "DiGraphEdge": {
         "description": "\u8868\u793a :class:`DiGraph` \u4e2d\u7684\u6709\u5411\u8fb9\uff0c\u5e26\u6709\u53ef\u9009\u7684\u6267\u884c\u6761\u4ef6\u3002\n\n.. warning::\n\n    \u8fd9\u662f\u4e00\u4e2a\u5b9e\u9a8c\u6027\u529f\u80fd\uff0cAPI \u5c06\u5728\u672a\u6765\u7248\u672c\u4e2d\u53d8\u66f4\u3002",
         "properties": {
            "target": {
               "title": "Target",
               "type": "string"
            },
            "condition": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Condition"
            }
         },
         "required": [
            "target"
         ],
         "title": "DiGraphEdge",
         "type": "object"
      }
   },
   "required": [
      "name"
   ]
}

Fields:
  • activation (Literal['all', 'any'])

  • edges (List[autogen_agentchat.teams._group_chat._graph._digraph_group_chat.DiGraphEdge])

  • name (str)

field activation: Literal['all', 'any'] = 'all'#
field edges: List[DiGraphEdge] = []#
field name: str [Required]#
class GraphFlow(participants: List[ChatAgent], graph: DiGraph, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None)[源代码]#

基类:BaseGroupChat, Component[GraphFlowConfig]

一个按照有向图执行模式运行的群聊团队。

警告

这是一个实验性功能,API 将在未来版本中变更。

该群聊基于有向图 (DiGraph) 结构执行代理,支持复杂工作流,包括顺序执行、并行分发、 条件分支、合并模式以及带有显式退出条件的循环。

执行顺序由 DiGraph 中定义的边决定。图中每个节点对应一个代理,边定义代理间的消息流向。 节点可配置为在以下情况下激活:

  • **所有**父节点完成时 (activation="all") → 默认

  • **任一**父节点完成时 (activation="any")

通过边条件支持条件分支,根据聊天历史内容选择下一个代理。只要存在最终退出循环的条件, 就允许循环结构。

备注

使用 DiGraphBuilder 类可轻松创建 DiGraph。它提供了流畅的 API 用于添加节点和边、设置入口点以及验证图结构。 详见 DiGraphBuilder 文档。 GraphFlow 类设计用于与 DiGraphBuilder 配合创建复杂工作流。

参数:
  • participants (List[ChatAgent]) -- 群聊中的参与者列表。

  • termination_condition (TerminationCondition, optional) -- 聊天终止条件。

  • max_turns (int, optional) -- 强制终止前的最大轮次。

  • graph (DiGraph) -- 定义节点流向和条件的有向执行图。

抛出:

ValueError -- 如果参与者名称不唯一,或图验证失败(如存在无退出条件的循环)。

示例

顺序流: A → B → C

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to English.")

    # Create a directed graph with sequential flow A -> B -> C.
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_b, agent_c)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

并行分发: A → (B, C)

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent("A", model_client=model_client, system_message="You are a helpful assistant.")
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to Chinese.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Japanese.")

    # Create a directed graph with fan-out flow A -> (B, C).
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b).add_edge(agent_a, agent_c)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short story about a cat."):
        print(event)


asyncio.run(main())

条件分支: A → B (若'yes') 或 C (若'no')

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1-nano")
    agent_a = AssistantAgent(
        "A",
        model_client=model_client,
        system_message="Detect if the input is in Chinese. If it is, say 'yes', else say 'no', and nothing else.",
    )
    agent_b = AssistantAgent("B", model_client=model_client, system_message="Translate input to English.")
    agent_c = AssistantAgent("C", model_client=model_client, system_message="Translate input to Chinese.")

    # Create a directed graph with conditional branching flow A -> B ("yes"), A -> C ("no").
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b, condition="yes")
    builder.add_edge(agent_a, agent_c, condition="no")
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(5),
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="AutoGen is a framework for building AI agents."):
        print(event)


asyncio.run(main())

带退出条件的循环: A → B → C (若'APPROVE') 或 A (若'REJECT')

import asyncio

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main():
    # Initialize agents with OpenAI model clients.
    model_client = OpenAIChatCompletionClient(model="gpt-4.1")
    agent_a = AssistantAgent(
        "A",
        model_client=model_client,
        system_message="You are a helpful assistant.",
    )
    agent_b = AssistantAgent(
        "B",
        model_client=model_client,
        system_message="Provide feedback on the input, if your feedback has been addressed, "
        "say 'APPROVE', else say 'REJECT' and provide a reason.",
    )
    agent_c = AssistantAgent(
        "C", model_client=model_client, system_message="Translate the final product to Korean."
    )

    # Create a loop graph with conditional exit: A -> B -> C ("APPROVE"), B -> A ("REJECT").
    builder = DiGraphBuilder()
    builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
    builder.add_edge(agent_a, agent_b)
    builder.add_conditional_edges(agent_b, {"APPROVE": agent_c, "REJECT": agent_a})
    builder.set_entry_point(agent_a)
    graph = builder.build()

    # Create a GraphFlow team with the directed graph.
    team = GraphFlow(
        participants=[agent_a, agent_b, agent_c],
        graph=graph,
        termination_condition=MaxMessageTermination(20),  # Max 20 messages to avoid infinite loop.
    )

    # Run the team and print the events.
    async for event in team.run_stream(task="Write a short poem about AI Agents."):
        print(event)


asyncio.run(main())
component_config_schema#

GraphFlowConfig 的别名

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.GraphFlow'#

覆盖组件的provider字符串。这应该用于防止内部模块名称成为模块名称的一部分。

class MagenticOneGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = 20, runtime: AgentRuntime | None = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[源代码]#

基类:BaseGroupChat, Component[MagenticOneGroupChatConfig]

一个由 MagenticOneOrchestrator 管理参与者并运行群聊的团队。

该协调器负责处理对话流程,通过管理参与者间的交互确保任务高效完成。

协调器基于 Magentic-One 架构,这是一个用于解决复杂任务的通用多智能体系统(参见下方参考文献)。

参数:
  • participants (List[ChatAgent]) -- 群聊中的参与者列表。

  • model_client (ChatCompletionClient) -- 用于生成响应的模型客户端。

  • termination_condition (TerminationCondition, optional) -- 群聊的终止条件。默认为 None。 若无终止条件,群聊将根据协调器逻辑运行或直到达到最大轮数。

  • max_turns (int, optional) -- 群聊停止前的最大对话轮数。默认为 20。

  • max_stalls (int, optional) -- 重新规划前允许的最大停滞次数。默认为 3。

  • final_answer_prompt (str, optional) -- 用于从团队对话记录生成最终答案的 LLM 提示语。已提供默认值(适用于 GPT-4o 级别模型)。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) -- 将在群聊中使用的自定义消息类型列表。 若使用自定义消息类型或您的智能体会产生自定义消息类型,需在此指定。 请确保自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) -- 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

抛出:

ValueError -- 当协调逻辑中进度记录缺失必要字段或下一发言者无效时抛出。

Examples:

含单个助手智能体的 MagenticOneGroupChat:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import MagenticOneGroupChat
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
    )
    team = MagenticOneGroupChat([assistant], model_client=model_client)
    await Console(team.run_stream(task="Provide a different proof to Fermat last theorem"))


asyncio.run(main())

引用

若在研究中使用了 MagenticOneGroupChat,请引用以下论文:

@article{fourney2024magentic,
    title={Magentic-one: A generalist multi-agent system for solving complex tasks},
    author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others},
    journal={arXiv preprint arXiv:2411.04468},
    year={2024}
}
classmethod _from_config(config: MagenticOneGroupChatConfig) Self[源代码]#

从配置对象创建组件的新实例。

参数:

config (T) -- 配置对象。

Returns:

Self -- 组件的新实例。

_to_config() MagenticOneGroupChatConfig[源代码]#

导出当前组件实例的配置,该配置可用于创建具有相同配置的新组件实例。

Returns:

T -- 组件的配置。

component_config_schema#

MagenticOneGroupChatConfig 的别名

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.MagenticOneGroupChat'#

覆盖组件的provider字符串。这应该用于防止内部模块名称成为模块名称的一部分。

class RoundRobinGroupChat(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[源代码]#

基类:BaseGroupChat, Component[RoundRobinGroupChatConfig]

一个运行群聊的团队,参与者以轮询方式轮流向所有人发布消息。

如果团队中只有一个参与者,该参与者将是唯一的发言者。

参数:
  • participants (List[BaseChatAgent]) -- 群聊中的参与者列表。

  • termination_condition (TerminationCondition, optional) -- 群聊的终止条件。默认为 None。 如果没有终止条件,群聊将无限期运行。

  • max_turns (int, optional) -- 群聊停止前的最大轮次。默认为 None,表示无限制。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) -- 群聊中使用的自定义消息类型列表。 如果使用自定义消息类型或您的代理生成了自定义消息类型,需要在此指定。 确保您的自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) -- 是否通过 BaseGroupChat.run_stream() 方法发出团队事件。默认为 False。

抛出:

ValueError -- 如果未提供参与者或参与者名称不唯一。

Examples:

包含一个带工具参与者的团队:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def get_weather(location: str) -> str:
        return f"The weather in {location} is sunny."

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
        tools=[get_weather],
    )
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([assistant], termination_condition=termination)
    await Console(team.run_stream(task="What's the weather in New York?"))


asyncio.run(main())

包含多个参与者的团队:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    await Console(team.run_stream(task="Tell me some jokes."))


asyncio.run(main())
classmethod _from_config(config: RoundRobinGroupChatConfig) Self[源代码]#

从配置对象创建组件的新实例。

参数:

config (T) -- 配置对象。

Returns:

Self -- 组件的新实例。

_to_config() RoundRobinGroupChatConfig[源代码]#

导出当前组件实例的配置,该配置可用于创建具有相同配置的新组件实例。

Returns:

T -- 组件的配置。

component_config_schema#

RoundRobinGroupChatConfig 的别名

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.RoundRobinGroupChat'#

覆盖组件的provider字符串。这应该用于防止内部模块名称成为模块名称的一部分。

class SelectorGroupChat(participants: List[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]] | None = None, candidate_func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]] | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, model_client_streaming: bool = False, model_context: ChatCompletionContext | None = None)[源代码]#

基类:BaseGroupChat, Component[SelectorGroupChatConfig]

一个拥有参与者的群聊团队,参与者轮流向所有人发布消息, 使用 ChatCompletion 模型在每条消息后选择下一位发言者。

参数:
  • participants (List[ChatAgent]) -- 群聊中的参与者, 必须具有唯一名称且至少两名参与者。

  • model_client (ChatCompletionClient) -- 用于选择下一位发言者的 ChatCompletion 模型客户端。

  • termination_condition (TerminationCondition, optional) -- 群聊的终止条件。默认为 None。 没有终止条件时,群聊将无限期运行。

  • max_turns (int, optional) -- 群聊停止前的最大轮数。默认为 None,表示无限制。

  • selector_prompt (str, optional) -- 用于选择下一位发言者的提示模板。 可用字段:'{roles}'、'{participants}' 和 '{history}'。 {participants} 是候选发言者名称列表,格式为 ["<name1>", "<name2>", ...]{roles} 是候选代理名称和描述的新行分隔列表,每行格式为:"<name> : <description>"{history} 是格式化为双新行分隔的名称和消息内容的对话历史,每条消息格式为:"<name> : <message content>"

  • allow_repeated_speaker (bool, optional) -- 是否将上一位发言者包含在下轮候选列表中。 默认为 False。模型仍可能选择上一位发言者——若发生此情况将记录警告。

  • max_selector_attempts (int, optional) -- 使用模型选择发言者的最大尝试次数。默认为 3。 若达到最大尝试次数后模型仍未能选择发言者,将使用上一位发言者(若可用), 否则使用第一位参与者。

  • selector_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], str | None], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[str | None]], optional) -- 自定义选择函数, 接收对话历史并返回下一位发言者名称。 若提供,此函数将覆盖模型选择逻辑。 若函数返回 None,则使用模型选择下一位发言者。

  • candidate_func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], List[str]], Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[List[str]]], optional) -- 自定义函数,接收对话历史并返回模型选择下位发言者的过滤候选列表。 若返回空列表或 NoneSelectorGroupChat 将抛出 ValueError。 仅当未设置 selector_func 时使用此函数。若设置此函数,allow_repeated_speaker 将被忽略。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) -- 群聊中使用的自定义消息类型列表。 若使用自定义消息类型或代理产生自定义消息类型,需在此指定。 确保自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) -- 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

  • model_client_streaming (bool, optional) -- 是否为模型客户端启用流式传输。(适用于 QwQ 等推理模型)。默认为 False。

  • model_context (ChatCompletionContext | None, optional) -- 存储和检索 LLMMessage 的模型上下文。 可预加载初始消息。模型上下文中的消息将用于发言者选择。团队重置时将清除初始消息。

抛出:

ValueError -- 若参与者少于两名或选择提示无效时抛出。

Examples:

含多个参与者的团队:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def lookup_hotel(location: str) -> str:
        return f"Here are some hotels in {location}: hotel1, hotel2, hotel3."

    async def lookup_flight(origin: str, destination: str) -> str:
        return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3."

    async def book_trip() -> str:
        return "Your trip is booked!"

    travel_advisor = AssistantAgent(
        "Travel_Advisor",
        model_client,
        tools=[book_trip],
        description="Helps with travel planning.",
    )
    hotel_agent = AssistantAgent(
        "Hotel_Agent",
        model_client,
        tools=[lookup_hotel],
        description="Helps with hotel booking.",
    )
    flight_agent = AssistantAgent(
        "Flight_Agent",
        model_client,
        tools=[lookup_flight],
        description="Helps with flight booking.",
    )
    termination = TextMentionTermination("TERMINATE")
    team = SelectorGroupChat(
        [travel_advisor, hotel_agent, flight_agent],
        model_client=model_client,
        termination_condition=termination,
    )
    await Console(team.run_stream(task="Book a 3-day trip to new york."))


asyncio.run(main())

含自定义选择函数的团队:

import asyncio
from typing import Sequence
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    def check_calculation(x: int, y: int, answer: int) -> str:
        if x + y == answer:
            return "Correct!"
        else:
            return "Incorrect!"

    agent1 = AssistantAgent(
        "Agent1",
        model_client,
        description="For calculation",
        system_message="Calculate the sum of two numbers",
    )
    agent2 = AssistantAgent(
        "Agent2",
        model_client,
        tools=[check_calculation],
        description="For checking calculation",
        system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'",
    )

    def selector_func(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> str | None:
        if len(messages) == 1 or messages[-1].to_text() == "Incorrect!":
            return "Agent1"
        if messages[-1].source == "Agent1":
            return "Agent2"
        return None

    termination = TextMentionTermination("Correct!")
    team = SelectorGroupChat(
        [agent1, agent2],
        model_client=model_client,
        selector_func=selector_func,
        termination_condition=termination,
    )

    await Console(team.run_stream(task="What is 1 + 1?"))


asyncio.run(main())

含自定义模型上下文的团队:

import asyncio

from autogen_core.model_context import BufferedChatCompletionContext
from autogen_ext.models.openai import OpenAIChatCompletionClient

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    model_context = BufferedChatCompletionContext(buffer_size=5)

    async def lookup_hotel(location: str) -> str:
        return f"Here are some hotels in {location}: hotel1, hotel2, hotel3."

    async def lookup_flight(origin: str, destination: str) -> str:
        return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3."

    async def book_trip() -> str:
        return "Your trip is booked!"

    travel_advisor = AssistantAgent(
        "Travel_Advisor",
        model_client,
        tools=[book_trip],
        description="Helps with travel planning.",
    )
    hotel_agent = AssistantAgent(
        "Hotel_Agent",
        model_client,
        tools=[lookup_hotel],
        description="Helps with hotel booking.",
    )
    flight_agent = AssistantAgent(
        "Flight_Agent",
        model_client,
        tools=[lookup_flight],
        description="Helps with flight booking.",
    )
    termination = TextMentionTermination("TERMINATE")
    team = SelectorGroupChat(
        [travel_advisor, hotel_agent, flight_agent],
        model_client=model_client,
        termination_condition=termination,
        model_context=model_context,
    )
    await Console(team.run_stream(task="Book a 3-day trip to new york."))


asyncio.run(main())
classmethod _from_config(config: SelectorGroupChatConfig) Self[源代码]#

从配置对象创建组件的新实例。

参数:

config (T) -- 配置对象。

Returns:

Self -- 组件的新实例。

_to_config() SelectorGroupChatConfig[源代码]#

导出当前组件实例的配置,该配置可用于创建具有相同配置的新组件实例。

Returns:

T -- 组件的配置。

component_config_schema#

SelectorGroupChatConfig 的别名

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.SelectorGroupChat'#

覆盖组件的provider字符串。这应该用于防止内部模块名称成为模块名称的一部分。

class Swarm(participants: List[ChatAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False)[源代码]#

基类:BaseGroupChat, Component[SwarmConfig]

一个仅基于交接消息选择下一位发言者的群聊团队。

参与者列表中的第一位成员是初始发言者。 下一位发言者根据当前发言者发送的 HandoffMessage 消息选择。 如果没有发送交接消息,则当前发言者继续发言。

参数:
  • participants (List[ChatAgent]) -- 参与群聊的代理列表。列表中的第一个代理是初始发言者。

  • termination_condition (TerminationCondition, optional) -- 群聊的终止条件。默认为 None。 若无终止条件,群聊将无限运行。

  • max_turns (int, optional) -- 群聊停止前的最大轮次。默认为 None,表示无限制。

  • custom_message_types (List[type[BaseAgentEvent | BaseChatMessage]], optional) -- 群聊中使用的自定义消息类型列表。 如果使用自定义消息类型或代理会产生自定义消息类型,需在此指定。 确保自定义消息类型是 BaseAgentEventBaseChatMessage 的子类。

  • emit_team_events (bool, optional) -- 是否通过 BaseGroupChat.run_stream() 发出团队事件。默认为 False。

基础示例:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import MaxMessageTermination


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["Bob"],
        system_message="You are Alice and you only answer questions about yourself.",
    )
    agent2 = AssistantAgent(
        "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January."
    )

    termination = MaxMessageTermination(3)
    team = Swarm([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="What is bob's birthday?")
    async for message in stream:
        print(message)


asyncio.run(main())

使用 HandoffTermination 实现人机协作交接:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import HandoffMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["user"],
        system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.",
    )
    termination = HandoffTermination(target="user") | MaxMessageTermination(3)
    team = Swarm([agent], termination_condition=termination)

    # 开始对话。
    await Console(team.run_stream(task="What is bob's birthday?"))

    # 根据用户反馈继续。
    await Console(
        team.run_stream(
            task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.")
        )
    )


asyncio.run(main())
classmethod _from_config(config: SwarmConfig) Swarm[源代码]#

从配置对象创建组件的新实例。

参数:

config (T) -- 配置对象。

Returns:

Self -- 组件的新实例。

_to_config() SwarmConfig[源代码]#

导出当前组件实例的配置,该配置可用于创建具有相同配置的新组件实例。

Returns:

T -- 组件的配置。

component_config_schema#

SwarmConfig 的别名

component_provider_override: ClassVar[str | None] = 'autogen_agentchat.teams.Swarm'#

覆盖组件的provider字符串。这应该用于防止内部模块名称成为模块名称的一部分。