{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## 主题与订阅示例场景\n\n### 简介\n\n在本操作指南中,我们将通过四种不同的广播场景来探索 AutoGen 中代理通信的广播机制。这些场景展示了在代理之间处理和分发消息的各种方式。我们将以税务管理公司处理客户请求的统一案例来演示每个场景。\n\n### 场景概览\n\n设想一家提供多种服务的税务管理公司,包括税务规划、争议解决、合规咨询和税务申报等服务。公司拥有一支由不同领域专家组成的税务专员团队,以及一名负责监督运营的税务系统经理。\n\n客户提交的请求需要由相应的专员处理。在这个系统中,客户、税务系统经理和税务专员之间的通信都是通过广播机制实现的。\n\n我们将探讨不同广播场景如何影响消息在代理间的分发方式,以及如何根据特定需求定制通信流程。\n\n---\n\n### 广播场景概览\n\n我们将涵盖以下广播场景:\n\n1. **单租户,单一发布范围**\n2. **多租户,单一发布范围**\n3. **单租户,多重发布范围**\n4. **多租户,多重发布范围**\n\n每个场景代表了系统中消息分发和代理交互的不同方法。通过理解这些场景,您可以设计最适合应用需求的代理通信策略。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "from dataclasses import dataclass\n", "from enum import Enum\n", "from typing import List\n", "\n", "from autogen_core import (\n", " MessageContext,\n", " RoutedAgent,\n", " SingleThreadedAgentRuntime,\n", " TopicId,\n", " TypeSubscription,\n", " message_handler,\n", ")\n", "from autogen_core._default_subscription import DefaultSubscription\n", "from autogen_core._default_topic import DefaultTopicId\n", "from autogen_core.models import (\n", " SystemMessage,\n", ")" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "class TaxSpecialty(str, Enum):\n", " PLANNING = \"planning\"\n", " DISPUTE_RESOLUTION = \"dispute_resolution\"\n", " COMPLIANCE = \"compliance\"\n", " PREPARATION = \"preparation\"\n", "\n", "\n", "@dataclass\n", "class ClientRequest:\n", " content: str\n", "\n", "\n", "@dataclass\n", "class RequestAssessment:\n", " content: str\n", "\n", "\n", "class TaxSpecialist(RoutedAgent):\n", " def __init__(\n", " self,\n", " description: str,\n", " specialty: TaxSpecialty,\n", " system_messages: List[SystemMessage],\n", " ) -> None:\n", " super().__init__(description)\n", " self.specialty = specialty\n", " self._system_messages = system_messages\n", " self._memory: List[ClientRequest] = []\n", "\n", " @message_handler\n", " async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:\n", " # 处理客户请求。\n", " print(f\"\\n{'='*50}\\nTax specialist {self.id} with specialty {self.specialty}:\\n{message.content}\")\n", " # 向经理发送响应\n", " if ctx.topic_id is None:\n", " raise ValueError(\"Topic ID is required for broadcasting\")\n", " await self.publish_message(\n", " message=RequestAssessment(content=f\"I can handle this request in {self.specialty}.\"),\n", " topic_id=ctx.topic_id,\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. 单租户,单一发布范围\n\n#### 场景说明\n在单租户、单一发布范围的场景中:\n\n- 所有代理都在单个租户内运行(例如一个客户端或用户会话)\n- 消息发布到单一主题,所有代理都订阅该主题\n- 每个代理都会收到发布到该主题的每条消息\n\n此场景适用于所有代理都需要知晓所有消息,且不需要隔离不同代理组或会话间通信的情况。\n\n#### 在税务专员公司的应用\n\n在我们的税务专员公司中,这个场景意味着:\n\n- 所有税务专员都会收到每个客户请求和内部消息\n- 所有代理紧密协作,完全透明地看到所有通信\n- 适用于需要所有代理知晓所有消息的任务或团队\n\n#### 场景运作方式\n\n- 订阅:所有代理使用默认订阅(如\"default\")\n- 发布:消息发布到默认主题\n- 消息处理:每个代理根据消息内容和可用处理器决定是否处理\n\n#### 优势\n- 简单性:易于设置和理解\n- 协作性:促进代理间的透明度和协作\n- 灵活性:代理可以动态决定处理哪些消息\n\n#### 注意事项\n- 可扩展性:在代理或消息数量庞大时可能扩展性不佳\n- 效率:代理可能收到许多无关消息,导致不必要的处理\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "==================================================\n", "Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:\n", "I need to have my tax for 2024 prepared.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "I need to have my tax for 2024 prepared.\n" ] } ], "source": [ "async def run_single_tenant_single_scope() -> None:\n", " # 创建运行时。\n", " runtime = SingleThreadedAgentRuntime()\n", "\n", " # 为每个专业注册税务专家代理\n", " specialist_agent_type_1 = \"TaxSpecialist_1\"\n", " specialist_agent_type_2 = \"TaxSpecialist_2\"\n", " await TaxSpecialist.register(\n", " runtime=runtime,\n", " type=specialist_agent_type_1,\n", " factory=lambda: TaxSpecialist(\n", " description=\"A tax specialist 1\",\n", " specialty=TaxSpecialty.PLANNING,\n", " system_messages=[SystemMessage(content=\"You are a tax specialist.\")],\n", " ),\n", " )\n", "\n", " await TaxSpecialist.register(\n", " runtime=runtime,\n", " type=specialist_agent_type_2,\n", " factory=lambda: TaxSpecialist(\n", " description=\"A tax specialist 2\",\n", " specialty=TaxSpecialty.DISPUTE_RESOLUTION,\n", " system_messages=[SystemMessage(content=\"You are a tax specialist.\")],\n", " ),\n", " )\n", "\n", " # 为每种代理类型添加默认订阅\n", " await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))\n", " await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))\n", "\n", " # 启动运行时并向代理发送默认主题的消息\n", " runtime.start()\n", " await runtime.publish_message(ClientRequest(\"I need to have my tax for 2024 prepared.\"), topic_id=DefaultTopicId())\n", " await runtime.stop_when_idle()\n", "\n", "\n", "await run_single_tenant_single_scope()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. 多租户,单一发布范围\n\n#### 场景说明\n\n在多租户、单一发布范围的场景中:\n\n- 存在多个租户(例如多个客户或用户会话)\n- 每个租户通过主题源拥有自己独立的话题\n- 租户内的所有代理都订阅该租户的话题。如果需要,会为每个租户创建新的代理实例\n- 消息仅对同一租户内的代理可见\n\n此场景适用于需要在不同租户间隔离通信,但希望租户内所有代理都能感知所有消息的情况。\n\n#### 在税务专家公司的应用\n\n在此场景中:\n\n- 公司同时服务多个客户(租户)\n- 为每个客户创建专属的代理实例集\n- 每个客户的通信与其他客户隔离\n- 客户的所有代理都能收到发布到该客户主题的消息\n\n#### 场景运作方式\n\n- 订阅:代理根据租户身份订阅相应主题\n- 发布:消息发布到租户专属主题\n- 消息处理:代理仅接收与其租户相关的消息\n\n#### 优势\n- 租户隔离:确保客户间的数据隐私和分离\n- 租户内协作:代理可在其租户内自由协作\n\n#### 注意事项\n- 复杂性:需要管理多组代理和主题\n- 资源使用:更多代理实例可能消耗额外资源\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "==================================================\n", "Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:\n", "ClientABC requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "ClientABC requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:\n", "ClientABC requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:\n", "ClientABC requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:\n", "ClientXYZ requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "ClientXYZ requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:\n", "ClientXYZ requires tax services.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:\n", "ClientXYZ requires tax services.\n" ] } ], "source": [ "async def run_multi_tenant_single_scope() -> None:\n", " # 创建运行时\n", " runtime = SingleThreadedAgentRuntime()\n", "\n", " # 客户(租户)列表\n", " tenants = [\"ClientABC\", \"ClientXYZ\"]\n", "\n", " # 初始化会话并将主题类型映射到每个税务专家代理类型\n", " for specialty in TaxSpecialty:\n", " specialist_agent_type = f\"TaxSpecialist_{specialty.value}\"\n", " await TaxSpecialist.register(\n", " runtime=runtime,\n", " type=specialist_agent_type,\n", " factory=lambda specialty=specialty: TaxSpecialist( # type: ignore\n", " description=f\"A tax specialist in {specialty.value}.\",\n", " specialty=specialty,\n", " system_messages=[SystemMessage(content=f\"You are a tax specialist in {specialty.value}.\")],\n", " ),\n", " )\n", " specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)\n", " await runtime.add_subscription(specialist_subscription)\n", "\n", " # 启动运行时\n", " runtime.start()\n", "\n", " # 将客户请求发布到各自的主题\n", " for tenant in tenants:\n", " topic_source = tenant # 主题来源是客户端名称\n", " topic_id = DefaultTopicId(source=topic_source)\n", " await runtime.publish_message(\n", " ClientRequest(f\"{tenant} requires tax services.\"),\n", " topic_id=topic_id,\n", " )\n", "\n", " # 预留消息处理时间\n", " await asyncio.sleep(1)\n", "\n", " # 空闲时停止运行时\n", " await runtime.stop_when_idle()\n", "\n", "\n", "await run_multi_tenant_single_scope()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. 单租户多发布范围场景\n\n#### 场景说明\n\n在单租户多发布范围场景中:\n\n- 所有代理都在同一租户内运行\n- 消息发布到不同主题\n- 代理根据其角色或专业领域订阅特定主题\n- 消息基于主题定向发送给代理子集\n\n该场景支持租户内的定向通信,实现对消息分发的更精细控制。\n\n#### 在税务管理公司的应用\n\n在此场景中:\n\n- 税务系统经理根据专业领域与特定专家通信\n- 不同主题代表不同专业领域(如\"规划\"、\"合规\")\n- 专家仅订阅与其专业匹配的主题\n- 经理向特定主题发布消息以触达目标专家\n\n#### 场景运作方式\n\n- 订阅:代理订阅与其专业对应的主题\n- 发布:根据目标接收方将消息发布到特定主题\n- 消息处理:只有订阅了主题的代理才能接收其消息\n\n#### 优势\n\n- 定向通信:消息仅触达相关代理\n- 高效性:减少代理处理不必要消息的负担\n\n#### 注意事项\n\n- 设置复杂性:需要谨慎管理主题和订阅关系\n- 灵活性:通信场景变更可能需要更新订阅关系\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "==================================================\n", "Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:\n", "I need assistance with planning taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "I need assistance with dispute_resolution taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:\n", "I need assistance with compliance taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:\n", "I need assistance with preparation taxes.\n" ] } ], "source": [ "async def run_single_tenant_multiple_scope() -> None:\n", " # 创建运行时\n", " runtime = SingleThreadedAgentRuntime()\n", " # 为每个专业领域注册税务专家代理并添加订阅\n", " for specialty in TaxSpecialty:\n", " specialist_agent_type = f\"TaxSpecialist_{specialty.value}\"\n", " await TaxSpecialist.register(\n", " runtime=runtime,\n", " type=specialist_agent_type,\n", " factory=lambda specialty=specialty: TaxSpecialist( # type: ignore\n", " description=f\"A tax specialist in {specialty.value}.\",\n", " specialty=specialty,\n", " system_messages=[SystemMessage(content=f\"You are a tax specialist in {specialty.value}.\")],\n", " ),\n", " )\n", " specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)\n", " await runtime.add_subscription(specialist_subscription)\n", "\n", " # 启动运行时\n", " runtime.start()\n", "\n", " # 向每位专家的主题发布ClientRequest\n", " for specialty in TaxSpecialty:\n", " topic_id = TopicId(type=specialty.value, source=\"default\")\n", " await runtime.publish_message(\n", " ClientRequest(f\"I need assistance with {specialty.value} taxes.\"),\n", " topic_id=topic_id,\n", " )\n", "\n", " # 预留消息处理时间\n", " await asyncio.sleep(1)\n", "\n", " # 空闲时停止运行时\n", " await runtime.stop_when_idle()\n", "\n", "\n", "await run_single_tenant_multiple_scope()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. 多租户、多发布范围场景\n\n#### 场景说明\n\n在多租户、多发布范围场景中:\n\n- 存在多个租户,每个租户拥有自己的代理集合\n- 消息发布到每个租户内的多个主题\n- 代理根据其角色订阅租户专属的相关主题\n- 结合了租户隔离与定向通信的特性\n\n此场景提供了最高级别的消息分发控制,适用于具有多个客户端和特定通信需求的复杂系统。\n\n#### 在税务管理公司中的应用\n\n在此场景中:\n\n- 公司服务多个客户,每个客户拥有专属代理实例\n- 每个客户内部,代理基于专业领域通过多个主题进行通信\n- 例如:客户A的规划专家订阅来源为\"ClientA\"的\"planning\"主题\n- 每个客户的税务系统管理员通过租户专属主题与其专家通信\n\n#### 场景运作方式\n\n- 订阅:代理基于租户身份和专业领域订阅主题\n- 发布:消息发布到租户专属和领域专属的主题\n- 消息处理:只有匹配租户和主题的代理才能接收消息\n\n#### 优势\n\n- 完全隔离:确保租户和通信的双重隔离\n- 精细控制:实现消息到目标代理的精准路由\n\n#### 注意事项\n\n- 复杂性:需要谨慎管理主题、租户和订阅关系\n- 资源消耗:代理实例和主题数量增加可能影响资源\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "==================================================\n", "Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:\n", "ClientABC needs assistance with planning taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "ClientABC needs assistance with dispute_resolution taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:\n", "ClientABC needs assistance with compliance taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:\n", "ClientABC needs assistance with preparation taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:\n", "ClientXYZ needs assistance with planning taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:\n", "ClientXYZ needs assistance with dispute_resolution taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:\n", "ClientXYZ needs assistance with compliance taxes.\n", "\n", "==================================================\n", "Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:\n", "ClientXYZ needs assistance with preparation taxes.\n" ] } ], "source": [ "async def run_multi_tenant_multiple_scope() -> None:\n", " # 创建运行时\n", " runtime = SingleThreadedAgentRuntime()\n", "\n", " # 为每个专业领域和租户定义TypeSubscriptions\n", " tenants = [\"ClientABC\", \"ClientXYZ\"]\n", "\n", " # 初始化所有专业领域的代理并添加类型订阅\n", " for specialty in TaxSpecialty:\n", " specialist_agent_type = f\"TaxSpecialist_{specialty.value}\"\n", " await TaxSpecialist.register(\n", " runtime=runtime,\n", " type=specialist_agent_type,\n", " factory=lambda specialty=specialty: TaxSpecialist( # type: ignore\n", " description=f\"A tax specialist in {specialty.value}.\",\n", " specialty=specialty,\n", " system_messages=[SystemMessage(content=f\"You are a tax specialist in {specialty.value}.\")],\n", " ),\n", " )\n", " for tenant in tenants:\n", " specialist_subscription = TypeSubscription(\n", " topic_type=f\"{tenant}_{specialty.value}\", agent_type=specialist_agent_type\n", " )\n", " await runtime.add_subscription(specialist_subscription)\n", "\n", " # 启动运行时\n", " runtime.start()\n", "\n", " # 为每个租户向每个专业发送消息\n", " for tenant in tenants:\n", " for specialty in TaxSpecialty:\n", " topic_id = TopicId(type=f\"{tenant}_{specialty.value}\", source=tenant)\n", " await runtime.publish_message(\n", " ClientRequest(f\"{tenant} needs assistance with {specialty.value} taxes.\"),\n", " topic_id=topic_id,\n", " )\n", "\n", " # 留出消息处理时间\n", " await asyncio.sleep(1)\n", "\n", " # 空闲时停止运行时\n", " await runtime.stop_when_idle()\n", "\n", "\n", "await run_multi_tenant_multiple_scope()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.6" } }, "nbformat": 4, "nbformat_minor": 2 }