主题与订阅示例场景#
简介#
在本操作指南中,我们将通过四种不同的广播场景来探索 AutoGen 中代理通信的广播机制。这些场景展示了在代理之间处理和分发消息的各种方式。我们将以税务管理公司处理客户请求的统一案例来演示每个场景。
场景概览#
设想一家提供多种服务的税务管理公司,包括税务规划、争议解决、合规咨询和税务申报等服务。公司拥有一支由不同领域专家组成的税务专员团队,以及一名负责监督运营的税务系统经理。
客户提交的请求需要由相应的专员处理。在这个系统中,客户、税务系统经理和税务专员之间的通信都是通过广播机制实现的。
我们将探讨不同广播场景如何影响消息在代理间的分发方式,以及如何根据特定需求定制通信流程。
广播场景概览#
我们将涵盖以下广播场景:
单租户,单一发布范围
多租户,单一发布范围
单租户,多重发布范围
多租户,多重发布范围
每个场景代表了系统中消息分发和代理交互的不同方法。通过理解这些场景,您可以设计最适合应用需求的代理通信策略。
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List
from autogen_core import (
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
SystemMessage,
)
class TaxSpecialty(str, Enum):
PLANNING = "planning"
DISPUTE_RESOLUTION = "dispute_resolution"
COMPLIANCE = "compliance"
PREPARATION = "preparation"
@dataclass
class ClientRequest:
content: str
@dataclass
class RequestAssessment:
content: str
class TaxSpecialist(RoutedAgent):
def __init__(
self,
description: str,
specialty: TaxSpecialty,
system_messages: List[SystemMessage],
) -> None:
super().__init__(description)
self.specialty = specialty
self._system_messages = system_messages
self._memory: List[ClientRequest] = []
@message_handler
async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
# 处理客户请求。
print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
# 向经理发送响应
if ctx.topic_id is None:
raise ValueError("Topic ID is required for broadcasting")
await self.publish_message(
message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
topic_id=ctx.topic_id,
)
1. 单租户,单一发布范围#
场景说明#
在单租户、单一发布范围的场景中:
所有代理都在单个租户内运行(例如一个客户端或用户会话)
消息发布到单一主题,所有代理都订阅该主题
每个代理都会收到发布到该主题的每条消息
此场景适用于所有代理都需要知晓所有消息,且不需要隔离不同代理组或会话间通信的情况。
在税务专员公司的应用#
在我们的税务专员公司中,这个场景意味着:
所有税务专员都会收到每个客户请求和内部消息
所有代理紧密协作,完全透明地看到所有通信
适用于需要所有代理知晓所有消息的任务或团队
场景运作方式#
订阅:所有代理使用默认订阅(如"default")
发布:消息发布到默认主题
消息处理:每个代理根据消息内容和可用处理器决定是否处理
优势#
简单性:易于设置和理解
协作性:促进代理间的透明度和协作
灵活性:代理可以动态决定处理哪些消息
注意事项#
可扩展性:在代理或消息数量庞大时可能扩展性不佳
效率:代理可能收到许多无关消息,导致不必要的处理
async def run_single_tenant_single_scope() -> None:
# 创建运行时。
runtime = SingleThreadedAgentRuntime()
# 为每个专业注册税务专家代理
specialist_agent_type_1 = "TaxSpecialist_1"
specialist_agent_type_2 = "TaxSpecialist_2"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type_1,
factory=lambda: TaxSpecialist(
description="A tax specialist 1",
specialty=TaxSpecialty.PLANNING,
system_messages=[SystemMessage(content="You are a tax specialist.")],
),
)
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type_2,
factory=lambda: TaxSpecialist(
description="A tax specialist 2",
specialty=TaxSpecialty.DISPUTE_RESOLUTION,
system_messages=[SystemMessage(content="You are a tax specialist.")],
),
)
# 为每种代理类型添加默认订阅
await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))
# 启动运行时并向代理发送默认主题的消息
runtime.start()
await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.
==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.
2. 多租户,单一发布范围#
场景说明#
在多租户、单一发布范围的场景中:
存在多个租户(例如多个客户或用户会话)
每个租户通过主题源拥有自己独立的话题
租户内的所有代理都订阅该租户的话题。如果需要,会为每个租户创建新的代理实例
消息仅对同一租户内的代理可见
此场景适用于需要在不同租户间隔离通信,但希望租户内所有代理都能感知所有消息的情况。
在税务专家公司的应用#
在此场景中:
公司同时服务多个客户(租户)
为每个客户创建专属的代理实例集
每个客户的通信与其他客户隔离
客户的所有代理都能收到发布到该客户主题的消息
场景运作方式#
订阅:代理根据租户身份订阅相应主题
发布:消息发布到租户专属主题
消息处理:代理仅接收与其租户相关的消息
优势#
租户隔离:确保客户间的数据隐私和分离
租户内协作:代理可在其租户内自由协作
注意事项#
复杂性:需要管理多组代理和主题
资源使用:更多代理实例可能消耗额外资源
async def run_multi_tenant_single_scope() -> None:
# 创建运行时
runtime = SingleThreadedAgentRuntime()
# 客户(租户)列表
tenants = ["ClientABC", "ClientXYZ"]
# 初始化会话并将主题类型映射到每个税务专家代理类型
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
),
)
specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
await runtime.add_subscription(specialist_subscription)
# 启动运行时
runtime.start()
# 将客户请求发布到各自的主题
for tenant in tenants:
topic_source = tenant # 主题来源是客户端名称
topic_id = DefaultTopicId(source=topic_source)
await runtime.publish_message(
ClientRequest(f"{tenant} requires tax services."),
topic_id=topic_id,
)
# 预留消息处理时间
await asyncio.sleep(1)
# 空闲时停止运行时
await runtime.stop_when_idle()
await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.
==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.
==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.
3. 单租户多发布范围场景#
场景说明#
在单租户多发布范围场景中:
所有代理都在同一租户内运行
消息发布到不同主题
代理根据其角色或专业领域订阅特定主题
消息基于主题定向发送给代理子集
该场景支持租户内的定向通信,实现对消息分发的更精细控制。
在税务管理公司的应用#
在此场景中:
税务系统经理根据专业领域与特定专家通信
不同主题代表不同专业领域(如"规划"、"合规")
专家仅订阅与其专业匹配的主题
经理向特定主题发布消息以触达目标专家
场景运作方式#
订阅:代理订阅与其专业对应的主题
发布:根据目标接收方将消息发布到特定主题
消息处理:只有订阅了主题的代理才能接收其消息
优势#
定向通信:消息仅触达相关代理
高效性:减少代理处理不必要消息的负担
注意事项#
设置复杂性:需要谨慎管理主题和订阅关系
灵活性:通信场景变更可能需要更新订阅关系
async def run_single_tenant_multiple_scope() -> None:
# 创建运行时
runtime = SingleThreadedAgentRuntime()
# 为每个专业领域注册税务专家代理并添加订阅
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
),
)
specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
await runtime.add_subscription(specialist_subscription)
# 启动运行时
runtime.start()
# 向每位专家的主题发布ClientRequest
for specialty in TaxSpecialty:
topic_id = TopicId(type=specialty.value, source="default")
await runtime.publish_message(
ClientRequest(f"I need assistance with {specialty.value} taxes."),
topic_id=topic_id,
)
# 预留消息处理时间
await asyncio.sleep(1)
# 空闲时停止运行时
await runtime.stop_when_idle()
await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.
4. 多租户、多发布范围场景#
场景说明#
在多租户、多发布范围场景中:
存在多个租户,每个租户拥有自己的代理集合
消息发布到每个租户内的多个主题
代理根据其角色订阅租户专属的相关主题
结合了租户隔离与定向通信的特性
此场景提供了最高级别的消息分发控制,适用于具有多个客户端和特定通信需求的复杂系统。
在税务管理公司中的应用#
在此场景中:
公司服务多个客户,每个客户拥有专属代理实例
每个客户内部,代理基于专业领域通过多个主题进行通信
例如:客户A的规划专家订阅来源为"ClientA"的"planning"主题
每个客户的税务系统管理员通过租户专属主题与其专家通信
场景运作方式#
订阅:代理基于租户身份和专业领域订阅主题
发布:消息发布到租户专属和领域专属的主题
消息处理:只有匹配租户和主题的代理才能接收消息
优势#
完全隔离:确保租户和通信的双重隔离
精细控制:实现消息到目标代理的精准路由
注意事项#
复杂性:需要谨慎管理主题、租户和订阅关系
资源消耗:代理实例和主题数量增加可能影响资源
async def run_multi_tenant_multiple_scope() -> None:
# 创建运行时
runtime = SingleThreadedAgentRuntime()
# 为每个专业领域和租户定义TypeSubscriptions
tenants = ["ClientABC", "ClientXYZ"]
# 初始化所有专业领域的代理并添加类型订阅
for specialty in TaxSpecialty:
specialist_agent_type = f"TaxSpecialist_{specialty.value}"
await TaxSpecialist.register(
runtime=runtime,
type=specialist_agent_type,
factory=lambda specialty=specialty: TaxSpecialist( # type: ignore
description=f"A tax specialist in {specialty.value}.",
specialty=specialty,
system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
),
)
for tenant in tenants:
specialist_subscription = TypeSubscription(
topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
)
await runtime.add_subscription(specialist_subscription)
# 启动运行时
runtime.start()
# 为每个租户向每个专业发送消息
for tenant in tenants:
for specialty in TaxSpecialty:
topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
await runtime.publish_message(
ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
topic_id=topic_id,
)
# 留出消息处理时间
await asyncio.sleep(1)
# 空闲时停止运行时
await runtime.stop_when_idle()
await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.
==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.
==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.
==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.
==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.