主题与订阅示例场景#

简介#

在本操作指南中,我们将通过四种不同的广播场景来探索 AutoGen 中代理通信的广播机制。这些场景展示了在代理之间处理和分发消息的各种方式。我们将以税务管理公司处理客户请求的统一案例来演示每个场景。

场景概览#

设想一家提供多种服务的税务管理公司,包括税务规划、争议解决、合规咨询和税务申报等服务。公司拥有一支由不同领域专家组成的税务专员团队,以及一名负责监督运营的税务系统经理。

客户提交的请求需要由相应的专员处理。在这个系统中,客户、税务系统经理和税务专员之间的通信都是通过广播机制实现的。

我们将探讨不同广播场景如何影响消息在代理间的分发方式,以及如何根据特定需求定制通信流程。


广播场景概览#

我们将涵盖以下广播场景:

  1. 单租户,单一发布范围

  2. 多租户,单一发布范围

  3. 单租户,多重发布范围

  4. 多租户,多重发布范围

每个场景代表了系统中消息分发和代理交互的不同方法。通过理解这些场景,您可以设计最适合应用需求的代理通信策略。

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # 处理客户请求。
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # 向经理发送响应
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

1. 单租户,单一发布范围#

场景说明#

在单租户、单一发布范围的场景中:

  • 所有代理都在单个租户内运行(例如一个客户端或用户会话)

  • 消息发布到单一主题,所有代理都订阅该主题

  • 每个代理都会收到发布到该主题的每条消息

此场景适用于所有代理都需要知晓所有消息,且不需要隔离不同代理组或会话间通信的情况。

在税务专员公司的应用#

在我们的税务专员公司中,这个场景意味着:

  • 所有税务专员都会收到每个客户请求和内部消息

  • 所有代理紧密协作,完全透明地看到所有通信

  • 适用于需要所有代理知晓所有消息的任务或团队

场景运作方式#

  • 订阅:所有代理使用默认订阅(如"default")

  • 发布:消息发布到默认主题

  • 消息处理:每个代理根据消息内容和可用处理器决定是否处理

优势#

  • 简单性:易于设置和理解

  • 协作性:促进代理间的透明度和协作

  • 灵活性:代理可以动态决定处理哪些消息

注意事项#

  • 可扩展性:在代理或消息数量庞大时可能扩展性不佳

  • 效率:代理可能收到许多无关消息,导致不必要的处理

async def run_single_tenant_single_scope() -> None:
    # 创建运行时。
    runtime = SingleThreadedAgentRuntime()

    # 为每个专业注册税务专家代理
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # 为每种代理类型添加默认订阅
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # 启动运行时并向代理发送默认主题的消息
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.

2. 多租户,单一发布范围#

场景说明#

在多租户、单一发布范围的场景中:

  • 存在多个租户(例如多个客户或用户会话)

  • 每个租户通过主题源拥有自己独立的话题

  • 租户内的所有代理都订阅该租户的话题。如果需要,会为每个租户创建新的代理实例

  • 消息仅对同一租户内的代理可见

此场景适用于需要在不同租户间隔离通信,但希望租户内所有代理都能感知所有消息的情况。

在税务专家公司的应用#

在此场景中:

  • 公司同时服务多个客户(租户)

  • 为每个客户创建专属的代理实例集

  • 每个客户的通信与其他客户隔离

  • 客户的所有代理都能收到发布到该客户主题的消息

场景运作方式#

  • 订阅:代理根据租户身份订阅相应主题

  • 发布:消息发布到租户专属主题

  • 消息处理:代理仅接收与其租户相关的消息

优势#

  • 租户隔离:确保客户间的数据隐私和分离

  • 租户内协作:代理可在其租户内自由协作

注意事项#

  • 复杂性:需要管理多组代理和主题

  • 资源使用:更多代理实例可能消耗额外资源

async def run_multi_tenant_single_scope() -> None:
    # 创建运行时
    runtime = SingleThreadedAgentRuntime()

    # 客户(租户)列表
    tenants = ["ClientABC", "ClientXYZ"]

    # 初始化会话并将主题类型映射到每个税务专家代理类型
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # 启动运行时
    runtime.start()

    # 将客户请求发布到各自的主题
    for tenant in tenants:
        topic_source = tenant  # 主题来源是客户端名称
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # 预留消息处理时间
    await asyncio.sleep(1)

    # 空闲时停止运行时
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.

3. 单租户多发布范围场景#

场景说明#

在单租户多发布范围场景中:

  • 所有代理都在同一租户内运行

  • 消息发布到不同主题

  • 代理根据其角色或专业领域订阅特定主题

  • 消息基于主题定向发送给代理子集

该场景支持租户内的定向通信,实现对消息分发的更精细控制。

在税务管理公司的应用#

在此场景中:

  • 税务系统经理根据专业领域与特定专家通信

  • 不同主题代表不同专业领域(如"规划"、"合规")

  • 专家仅订阅与其专业匹配的主题

  • 经理向特定主题发布消息以触达目标专家

场景运作方式#

  • 订阅:代理订阅与其专业对应的主题

  • 发布:根据目标接收方将消息发布到特定主题

  • 消息处理:只有订阅了主题的代理才能接收其消息

优势#

  • 定向通信:消息仅触达相关代理

  • 高效性:减少代理处理不必要消息的负担

注意事项#

  • 设置复杂性:需要谨慎管理主题和订阅关系

  • 灵活性:通信场景变更可能需要更新订阅关系

async def run_single_tenant_multiple_scope() -> None:
    # 创建运行时
    runtime = SingleThreadedAgentRuntime()
    # 为每个专业领域注册税务专家代理并添加订阅
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # 启动运行时
    runtime.start()

    # 向每位专家的主题发布ClientRequest
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # 预留消息处理时间
    await asyncio.sleep(1)

    # 空闲时停止运行时
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.

4. 多租户、多发布范围场景#

场景说明#

在多租户、多发布范围场景中:

  • 存在多个租户,每个租户拥有自己的代理集合

  • 消息发布到每个租户内的多个主题

  • 代理根据其角色订阅租户专属的相关主题

  • 结合了租户隔离与定向通信的特性

此场景提供了最高级别的消息分发控制,适用于具有多个客户端和特定通信需求的复杂系统。

在税务管理公司中的应用#

在此场景中:

  • 公司服务多个客户,每个客户拥有专属代理实例

  • 每个客户内部,代理基于专业领域通过多个主题进行通信

  • 例如:客户A的规划专家订阅来源为"ClientA"的"planning"主题

  • 每个客户的税务系统管理员通过租户专属主题与其专家通信

场景运作方式#

  • 订阅:代理基于租户身份和专业领域订阅主题

  • 发布:消息发布到租户专属和领域专属的主题

  • 消息处理:只有匹配租户和主题的代理才能接收消息

优势#

  • 完全隔离:确保租户和通信的双重隔离

  • 精细控制:实现消息到目标代理的精准路由

注意事项#

  • 复杂性:需要谨慎管理主题、租户和订阅关系

  • 资源消耗:代理实例和主题数量增加可能影响资源

async def run_multi_tenant_multiple_scope() -> None:
    # 创建运行时
    runtime = SingleThreadedAgentRuntime()

    # 为每个专业领域和租户定义TypeSubscriptions
    tenants = ["ClientABC", "ClientXYZ"]

    # 初始化所有专业领域的代理并添加类型订阅
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # 启动运行时
    runtime.start()

    # 为每个租户向每个专业发送消息
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # 留出消息处理时间
    await asyncio.sleep(1)

    # 空闲时停止运行时
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.