{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 消息与通信\n\nAutoGen核心中的代理能够响应、发送和发布消息,\n消息是代理之间相互通信的唯一方式。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 消息\n\n消息是可序列化对象,可以通过以下方式定义:\n\n- 继承Pydantic的{py:class}`pydantic.BaseModel`子类,或\n- 使用数据类\n\n例如:\n" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "\n", "@dataclass\n", "class TextMessage:\n", " content: str\n", " source: str\n", "\n", "\n", "@dataclass\n", "class ImageMessage:\n", " url: str\n", " source: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n消息是纯数据载体,不应包含任何逻辑。\n```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 消息处理器\n\n当代理收到消息时,运行时将调用代理的消息处理器\n({py:meth}`~autogen_core.Agent.on_message`),该处理器应实现代理的消息处理逻辑。\n如果代理无法处理该消息,则应抛出\n{py:class}`~autogen_core.exceptions.CantHandleException`异常。\n\n基类{py:class}`~autogen_core.BaseAgent`不提供消息处理逻辑,\n除非是高级用例,否则不建议直接实现{py:meth}`~autogen_core.Agent.on_message`方法。\n\n开发者应从实现{py:class}`~autogen_core.RoutedAgent`基类开始,\n它提供了内置的消息路由功能。\n\n### 按类型路由消息\n\n{py:class}`~autogen_core.RoutedAgent`基类提供了一种机制,\n通过{py:meth}`~autogen_core.components.message_handler`装饰器\n将消息类型与消息处理器关联,\n因此开发者无需实现{py:meth}`~autogen_core.Agent.on_message`方法。\n\n例如,以下类型路由代理使用不同的消息处理器\n来响应`TextMessage`和`ImageMessage`:\n" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler\n", "\n", "\n", "class MyAgent(RoutedAgent):\n", " @message_handler\n", " async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "创建代理运行时并注册代理类型(参见[代理与代理运行时](agent-and-agent-runtime.ipynb)):\n" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "AgentType(type='my_agent')" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await MyAgent.register(runtime, \"my_agent\", lambda: MyAgent(\"My Agent\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "使用 `TextMessage` 和 `ImageMessage` 测试此代理。\n" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello, User, you said Hello, World!!\n", "Hello, User, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"User\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"User\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "运行时在传递第一条消息时,会自动创建一个 `MyAgent` 实例,\n代理ID为 `AgentId(\"my_agent\", \"default\")`。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 同类型消息的路由\n\n在某些场景下,将相同类型的消息路由到不同的处理器很有用。\n例如,来自不同发送方代理的消息需要区别处理。\n您可以使用 {py:meth}`~autogen_core.components.message_handler` 装饰器的 `match` 参数。\n\n`match` 参数将相同消息类型的处理器关联到特定消息——\n它是消息类型路由的次级路由。\n它接受一个可调用对象,该对象接收消息和\n{py:class}`~autogen_core.MessageContext` 作为参数,\n并返回布尔值指示该消息是否应由装饰的处理器处理。\n这些可调用对象会按照处理器名称的字母顺序进行检查。\n\n以下是一个基于发送方代理路由消息的代理示例,\n使用了 `match` 参数:\n" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "class RoutedBySenderAgent(RoutedAgent):\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user1\")) # type: ignore\n", " async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 1 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello from user 2 handler, {message.source}, you said {message.content}!\")\n", "\n", " @message_handler(match=lambda msg, ctx: msg.source.startswith(\"user2\")) # type: ignore\n", " async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:\n", " print(f\"Hello, {message.source}, you sent me {message.url}!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "上述代理使用消息的 `source` 字段来确定发送方代理。\n你也可以使用 {py:class}`~autogen_core.MessageContext` 的 `sender` 字段,\n通过代理ID(如果可用)来确定发送方代理。\n\n让我们用不同 `source` 值的消息来测试这个代理:\n" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hello from user 1 handler, user1-test, you said Hello, World!!\n", "Hello from user 2 handler, user2-test, you said Hello, World!!\n", "Hello, user2-test, you sent me https://example.com/image.jpg!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await RoutedBySenderAgent.register(runtime, \"my_agent\", lambda: RoutedBySenderAgent(\"Routed by sender agent\"))\n", "runtime.start()\n", "agent_id = AgentId(\"my_agent\", \"default\")\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(TextMessage(content=\"Hello, World!\", source=\"user2-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user1-test\"), agent_id)\n", "await runtime.send_message(ImageMessage(url=\"https://example.com/image.jpg\", source=\"user2-test\"), agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "在上面的例子中,第一个 `ImageMessage` 未被处理,因为消息的 `source` 字段\n与处理器的 `match` 条件不匹配。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 直接消息\n\nAutoGen核心中有两种通信类型:\n\n- **直接消息**:向另一个代理发送直接消息\n- **广播**:向主题发布消息\n\n我们先来看直接消息。\n要在消息处理器中向另一个代理发送直接消息,使用\n{py:meth}`autogen_core.BaseAgent.send_message` 方法;\n在运行时使用 {py:meth}`autogen_core.AgentRuntime.send_message` 方法。\n等待这些方法的调用将返回接收代理消息处理器的返回值。\n当接收代理的处理器返回 `None` 时,将返回 `None`。\n\n```{note}\n如果被调用的代理在发送方等待时抛出异常,\n该异常将传播回发送方。\n```\n\n### 请求/响应\n\n直接消息可用于请求/响应场景,\n其中发送方期望从接收方获得响应。\n接收方可以通过从其消息处理器返回一个值来响应消息。\n你可以将其视为代理之间的函数调用。\n\n例如,考虑以下代理:\n" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler\n", "\n", "\n", "@dataclass\n", "class Message:\n", " content: str\n", "\n", "\n", "class InnerAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:\n", " return Message(content=f\"Hello from inner, {message.content}\")\n", "\n", "\n", "class OuterAgent(RoutedAgent):\n", " def __init__(self, description: str, inner_agent_type: str):\n", " super().__init__(description)\n", " self.inner_agent_id = AgentId(inner_agent_type, self.id.key)\n", "\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received message: {message.content}\")\n", " # 向内部代理发送直接消息并接收响应。\n", " response = await self.send_message(Message(f\"Hello from outer, {message.content}\"), self.inner_agent_id)\n", " print(f\"Received inner response: {response.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "当接收到消息时,`OuterAgent`会直接向`InnerAgent`发送消息并接收响应消息。\n\n我们可以通过向`OuterAgent`发送`Message`来测试这些代理。\n" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received message: Hello, World!\n", "Received inner response: Hello from inner, Hello from outer, Hello, World!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await InnerAgent.register(runtime, \"inner_agent\", lambda: InnerAgent(\"InnerAgent\"))\n", "await OuterAgent.register(runtime, \"outer_agent\", lambda: OuterAgent(\"OuterAgent\", \"inner_agent\"))\n", "runtime.start()\n", "outer_agent_id = AgentId(\"outer_agent\", \"default\")\n", "await runtime.send_message(Message(content=\"Hello, World!\"), outer_agent_id)\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "两个输出都是由`OuterAgent`的消息处理器产生的,但第二个输出是基于`InnerAgent`的响应。\n\n一般来说,直接消息传递适用于发送方和接收方紧密耦合的场景——它们被一起创建,且发送方链接到接收方的特定实例。\n例如,代理通过向`~autogen_core.tool_agent.ToolAgent`的实例发送直接消息来执行工具调用,\n并利用响应形成动作-观察循环。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 广播\n\n广播实质上是带有主题和订阅的发布/订阅模型。\n阅读[主题与订阅](../core-concepts/topic-and-subscription.md)\n了解核心概念。\n\n直接消息传递与广播的关键区别在于广播不能用于请求/响应场景。\n当代理发布消息时,这是单向的,它无法从任何其他代理接收响应,\n即使接收代理的处理器返回了值。\n\n```{note}\n如果对发布的消息给出响应,该响应将被丢弃。\n```\n\n```{note}\n如果代理发布了它已订阅的消息类型,它将不会收到自己发布的消息。这是为了防止无限循环。\n```\n\n### 订阅和发布主题\n\n[基于类型的订阅](../core-concepts/topic-and-subscription.md#type-based-subscription)\n将发布到给定主题类型的消息映射到给定代理类型的代理。\n要使继承自`~autogen_core.RoutedAgent`的代理订阅给定主题类型的主题,\n可以使用`~autogen_core.components.type_subscription`类装饰器。\n\n以下示例展示了一个`ReceiverAgent`类,它使用`~autogen_core.components.type_subscription`装饰器\n订阅`\"default\"`主题类型的主题,并打印接收到的消息。\n" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [], "source": [ "from autogen_core import RoutedAgent, message_handler, type_subscription\n", "\n", "\n", "@type_subscription(topic_type=\"default\")\n", "class ReceivingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " print(f\"Received a message: {message.content}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "要从代理的处理器发布消息,\n使用`~autogen_core.BaseAgent.publish_message`方法并指定\n一个`~autogen_core.TopicId`。\n此调用仍需等待,以允许运行时安排向所有订阅者传递消息,但它始终返回`None`。\n如果代理在处理发布的消息时引发异常,\n这将被记录但不会传播回发布代理。\n\n以下示例展示了一个`BroadcastingAgent`,它\n在收到消息时向主题发布消息。\n" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "from autogen_core import TopicId\n", "\n", "\n", "class BroadcastingAgent(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=TopicId(type=\"default\", source=self.id.key),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`BroadcastingAgent`向类型为`\"default\"`且源分配了代理实例的代理键的主题发布消息。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "订阅通过代理运行时注册,既可以作为代理类型注册的一部分,也可以通过单独的API方法注册。\n以下展示了如何为接收代理使用{py:meth}`~autogen_core.components.type_subscription`装饰器注册{py:class}`~autogen_core.components.TypeSubscription`,以及为广播代理不使用装饰器的注册方式。\n" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "from autogen_core import TypeSubscription\n", "\n", "runtime = SingleThreadedAgentRuntime()\n", "\n", "# 选项1:使用type_subscription装饰器\ntype_subscription类装饰器会在代理注册时自动向运行时添加TypeSubscription\n", "# 。\n", "# \n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "\n", "# 选项2:使用TypeSubscription\n", "await BroadcastingAgent.register(runtime, \"broadcasting_agent\", lambda: BroadcastingAgent(\"Broadcasting Agent\"))\n", "await runtime.add_subscription(TypeSubscription(topic_type=\"default\", agent_type=\"broadcasting_agent\"))\n", "\n", "# 启动运行时并发布消息。\n", "runtime.start()\n", "await runtime.publish_message(\n", " Message(\"Hello, World! From the runtime!\"), topic_id=TopicId(type=\"default\", source=\"default\")\n", ")\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "如上例所示,您还可以直接通过运行时的{py:meth}`~autogen_core.AgentRuntime.publish_message`方法向主题发布消息,\n而无需创建代理实例。\n\n从输出中可以看到接收代理收到了两条消息:\n一条是通过运行时发布的,另一条是由广播代理发布的。\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 默认主题与订阅\n\n在上述示例中,我们分别使用\n{py:class}`~autogen_core.TopicId`和{py:class}`~autogen_core.components.TypeSubscription`\n来指定主题和订阅。\n这在许多场景下是恰当的做法。\n但当存在单一的发布范围时,即所有代理都会发布并订阅所有广播消息时,\n我们可以使用便捷类{py:class}`~autogen_core.components.DefaultTopicId`\n和{py:meth}`~autogen_core.components.default_subscription`来简化代码。\n\n{py:class}`~autogen_core.components.DefaultTopicId`用于\n创建使用`\"default\"`作为主题类型默认值、\n并以发布代理的键作为主题源默认值的主题。\n{py:meth}`~autogen_core.components.default_subscription`用于\n创建订阅默认主题的类型订阅。\n通过使用{py:class}`~autogen_core.components.DefaultTopicId`和{py:meth}`~autogen_core.components.default_subscription`,\n我们可以简化`BroadcastingAgent`的实现。\n" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "from autogen_core import DefaultTopicId, default_subscription\n", "\n", "\n", "@default_subscription\n", "class BroadcastingAgentDefaultTopic(RoutedAgent):\n", " @message_handler\n", " async def on_my_message(self, message: Message, ctx: MessageContext) -> None:\n", " # 向同一命名空间内的所有代理发布消息。\n", " await self.publish_message(\n", " Message(\"Publishing a message from broadcasting agent!\"),\n", " topic_id=DefaultTopicId(),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "当运行时调用{py:meth}`~autogen_core.BaseAgent.register`注册代理类型时,\n它会创建一个{py:class}`~autogen_core.components.TypeSubscription`,\n其主题类型使用`\"default\"`作为默认值,\n且代理类型使用同一上下文中正在注册的相同代理类型。\n" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Received a message: Hello, World! From the runtime!\n", "Received a message: Publishing a message from broadcasting agent!\n" ] } ], "source": [ "runtime = SingleThreadedAgentRuntime()\n", "await BroadcastingAgentDefaultTopic.register(\n", " runtime, \"broadcasting_agent\", lambda: BroadcastingAgentDefaultTopic(\"Broadcasting Agent\")\n", ")\n", "await ReceivingAgent.register(runtime, \"receiving_agent\", lambda: ReceivingAgent(\"Receiving Agent\"))\n", "runtime.start()\n", "await runtime.publish_message(Message(\"Hello, World! From the runtime!\"), topic_id=DefaultTopicId())\n", "await runtime.stop_when_idle()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{note}\n如果您的场景允许所有代理发布并订阅\n所有广播消息,请使用{py:class}`~autogen_core.components.DefaultTopicId`\n和{py:meth}`~autogen_core.components.default_subscription`来装饰您的\n代理类。\n```\n" ] } ], "metadata": { "kernelspec": { "display_name": "agnext", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.9" } }, "nbformat": 4, "nbformat_minor": 2 }