{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 并发代理\n\n本节探讨多个代理并发工作的使用场景,主要涵盖三种模式:\n\n1. **单消息多处理器** \n 展示单个消息如何被订阅同一主题的多个代理同时处理。\n\n2. **多消息多处理器** \n 演示如何根据主题将特定消息类型路由到专用代理。\n\n3. **直接消息传递** \n 专注于代理之间以及运行时与代理之间的消息发送。\n" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "from dataclasses import dataclass\n", "\n", "from autogen_core import (\n", " AgentId,\n", " ClosureAgent,\n", " ClosureContext,\n", " DefaultTopicId,\n", " MessageContext,\n", " RoutedAgent,\n", " SingleThreadedAgentRuntime,\n", " TopicId,\n", " TypeSubscription,\n", " default_subscription,\n", " message_handler,\n", " type_subscription,\n", ")" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class Task:\n", " task_id: str\n", "\n", "\n", "@dataclass\n", "class TaskResponse:\n", " task_id: str\n", " result: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 单消息多处理器\n第一种模式展示单个消息如何被多个代理同时处理:\n\n- 每个`Processor`代理使用{py:meth}`~autogen_core.components.default_subscription`装饰器订阅默认主题\n- 当向默认主题发布消息时,所有注册代理将独立处理该消息\n```{note}\n下方我们使用{py:meth}`~autogen_core.components.default_subscription`装饰器订阅`Processor`,还有另一种完全不使用装饰器的订阅方式,如[主题订阅与发布](../framework/message-and-communication.ipynb#subscribe-and-publish-to-topics)所示,这种方式可以让同一个代理类订阅不同主题。\n```\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "@default_subscription\n", "class Processor(RoutedAgent):\n", " @message_handler\n", " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", " print(f\"{self._description} starting task {message.task_id}\")\n", " await asyncio.sleep(2) # 模拟工作\n", " print(f\"{self._description} finished task {message.task_id}\")" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Agent 1 starting task task-1\n", "Agent 2 starting task task-1\n", "Agent 1 finished task task-1\n", "Agent 2 finished task task-1\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "\n", "await Processor.register(runtime, \"agent_1\", lambda: Processor(\"Agent 1\"))\n", "await Processor.register(runtime, \"agent_2\", lambda: Processor(\"Agent 2\"))\n", "\n", "runtime.start()\n", "\n", "await runtime.publish_message(Task(task_id=\"task-1\"), topic_id=DefaultTopicId())\n", "\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 多消息多处理器\n第二种模式演示将不同类型消息路由到特定处理器:\n- `UrgentProcessor`订阅\"urgent\"紧急主题\n- `NormalProcessor`订阅\"normal\"普通主题\n\n我们使用{py:meth}`~autogen_core.components.type_subscription`装饰器让代理订阅特定主题类型。\n" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [], "source": [ "TASK_RESULTS_TOPIC_TYPE = \"task-results\"\n", "task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source=\"default\")\n", "\n", "\n", "@type_subscription(topic_type=\"urgent\")\n", "class UrgentProcessor(RoutedAgent):\n", " @message_handler\n", " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", " print(f\"Urgent processor starting task {message.task_id}\")\n", " await asyncio.sleep(1) # 模拟工作\n", " print(f\"Urgent processor finished task {message.task_id}\")\n", "\n", " task_response = TaskResponse(task_id=message.task_id, result=\"Results by Urgent Processor\")\n", " await self.publish_message(task_response, topic_id=task_results_topic_id)\n", "\n", "\n", "@type_subscription(topic_type=\"normal\")\n", "class NormalProcessor(RoutedAgent):\n", " @message_handler\n", " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", " print(f\"Normal processor starting task {message.task_id}\")\n", " await asyncio.sleep(3) # 模拟工作\n", " print(f\"Normal processor finished task {message.task_id}\")\n", "\n", " task_response = TaskResponse(task_id=message.task_id, result=\"Results by Normal Processor\")\n", " await self.publish_message(task_response, topic_id=task_results_topic_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在注册代理后,我们可以向\"urgent\"和\"normal\"主题发布消息:\n" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Normal processor starting task normal-1\n", "Urgent processor starting task urgent-1\n", "Urgent processor finished task urgent-1\n", "Normal processor finished task normal-1\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "\n", "await UrgentProcessor.register(runtime, \"urgent_processor\", lambda: UrgentProcessor(\"Urgent Processor\"))\n", "await NormalProcessor.register(runtime, \"normal_processor\", lambda: NormalProcessor(\"Normal Processor\"))\n", "\n", "runtime.start()\n", "\n", "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n", "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n", "\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 收集结果\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在前面的示例中,我们依赖控制台输出来验证任务完成情况。但在实际应用中,通常需要以编程方式收集和处理结果。\n\n为了收集这些消息,我们将使用{py:class}`~autogen_core.components.ClosureAgent`。我们定义了一个专用主题`TASK_RESULTS_TOPIC_TYPE`,`UrgentProcessor`和`NormalProcessor`都会将结果发布到这个主题。然后ClosureAgent会处理来自该主题的消息。\n" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Normal processor starting task normal-1\n", "Urgent processor starting task urgent-1\n", "Urgent processor finished task urgent-1\n", "Normal processor finished task normal-1\n" ] } ], "source": [ "queue = asyncio.Queue[TaskResponse]()\n", "\n", "\n", "async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:\n", " await queue.put(message)\n", "\n", "\n", "runtime.start()\n", "\n", "CLOSURE_AGENT_TYPE = \"collect_result_agent\"\n", "await ClosureAgent.register_closure(\n", " runtime,\n", " CLOSURE_AGENT_TYPE,\n", " collect_result,\n", " subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],\n", ")\n", "\n", "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n", "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n", "\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')\n", "TaskResponse(task_id='normal-1', result='Results by Normal Processor')\n" ] } ], "source": [ "while not queue.empty():\n", " print(await queue.get())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 直接消息\n\n与之前的模式不同,这个模式专注于直接消息。这里我们演示两种发送方式:\n\n- 代理之间的直接消息传递 \n- 从运行时向特定代理发送消息 \n\n需要注意以下几点:\n\n- 消息使用{py:class}`~autogen_core.components.AgentId`进行寻址 \n- 发送方可以期待收到目标代理的响应 \n- 我们只注册一次`WorkerAgent`类,但向两个不同的工作线程发送任务\n - 如何实现?如[代理生命周期](../core-concepts/agent-identity-and-lifecycle.md#agent-lifecycle)所述,当使用{py:class}`~autogen_core.components.AgentId`传递消息时,运行时将获取实例(如果不存在则创建一个)。在本例中,运行时在发送这两条消息时创建了两个工作线程实例。\n" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "class WorkerAgent(RoutedAgent):\n", " @message_handler\n", " async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n", " print(f\"{self.id} starting task {message.task_id}\")\n", " await asyncio.sleep(2) # 模拟工作\n", " print(f\"{self.id} finished task {message.task_id}\")\n", " return TaskResponse(task_id=message.task_id, result=f\"Results by {self.id}\")\n", "\n", "\n", "class DelegatorAgent(RoutedAgent):\n", " def __init__(self, description: str, worker_type: str):\n", " super().__init__(description)\n", " self.worker_instances = [AgentId(worker_type, f\"{worker_type}-1\"), AgentId(worker_type, f\"{worker_type}-2\")]\n", "\n", " @message_handler\n", " async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n", " print(f\"Delegator received task {message.task_id}.\")\n", "\n", " subtask1 = Task(task_id=\"task-part-1\")\n", " subtask2 = Task(task_id=\"task-part-2\")\n", "\n", " worker1_result, worker2_result = await asyncio.gather(\n", " self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])\n", " )\n", "\n", " combined_result = f\"Part 1: {worker1_result.result}, \" f\"Part 2: {worker2_result.result}\"\n", " task_response = TaskResponse(task_id=message.task_id, result=combined_result)\n", " return task_response" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Delegator received task main-task.\n", "worker/worker-1 starting task task-part-1\n", "worker/worker-2 starting task task-part-2\n", "worker/worker-1 finished task task-part-1\n", "worker/worker-2 finished task task-part-2\n", "Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "\n", "await WorkerAgent.register(runtime, \"worker\", lambda: WorkerAgent(\"Worker Agent\"))\n", "await DelegatorAgent.register(runtime, \"delegator\", lambda: DelegatorAgent(\"Delegator Agent\", \"worker\"))\n", "\n", "runtime.start()\n", "\n", "delegator = AgentId(\"delegator\", \"default\")\n", "response = await runtime.send_message(Task(task_id=\"main-task\"), recipient=delegator)\n", "\n", "print(f\"Final result: {response.result}\")\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 额外资源\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "如果您对并发处理更感兴趣,可以查看[智能体混合](./mixture-of-agents.ipynb)模式,该模式大量依赖并发智能体。\n" ] } ], "metadata": { "kernelspec": { "display_name": "autogen", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 2 }