消息与通信#

AutoGen核心中的代理能够响应、发送和发布消息, 消息是代理之间相互通信的唯一方式。

消息#

消息是可序列化对象,可以通过以下方式定义:

  • 继承Pydantic的pydantic.BaseModel子类,或

  • 使用数据类

例如:

from dataclasses import dataclass


@dataclass
class TextMessage:
    content: str
    source: str


@dataclass
class ImageMessage:
    url: str
    source: str

备注

消息是纯数据载体,不应包含任何逻辑。

消息处理器#

当代理收到消息时,运行时将调用代理的消息处理器 (on_message()),该处理器应实现代理的消息处理逻辑。 如果代理无法处理该消息,则应抛出 CantHandleException异常。

基类BaseAgent不提供消息处理逻辑, 除非是高级用例,否则不建议直接实现on_message()方法。

开发者应从实现RoutedAgent基类开始, 它提供了内置的消息路由功能。

按类型路由消息#

RoutedAgent基类提供了一种机制, 通过message_handler()装饰器 将消息类型与消息处理器关联, 因此开发者无需实现on_message()方法。

例如,以下类型路由代理使用不同的消息处理器 来响应TextMessageImageMessage

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


class MyAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you said {message.content}!")

    @message_handler
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you sent me {message.url}!")

创建代理运行时并注册代理类型(参见代理与代理运行时):

runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')

使用 TextMessageImageMessage 测试此代理。

runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!

运行时在传递第一条消息时,会自动创建一个 MyAgent 实例, 代理ID为 AgentId("my_agent", "default")

同类型消息的路由#

在某些场景下,将相同类型的消息路由到不同的处理器很有用。 例如,来自不同发送方代理的消息需要区别处理。 您可以使用 message_handler() 装饰器的 match 参数。

match 参数将相同消息类型的处理器关联到特定消息—— 它是消息类型路由的次级路由。 它接受一个可调用对象,该对象接收消息和 MessageContext 作为参数, 并返回布尔值指示该消息是否应由装饰的处理器处理。 这些可调用对象会按照处理器名称的字母顺序进行检查。

以下是一个基于发送方代理路由消息的代理示例, 使用了 match 参数:

class RoutedBySenderAgent(RoutedAgent):
    @message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignore
    async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hello, {message.source}, you sent me {message.url}!")

上述代理使用消息的 source 字段来确定发送方代理。 你也可以使用 MessageContextsender 字段, 通过代理ID(如果可用)来确定发送方代理。

让我们用不同 source 值的消息来测试这个代理:

runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!

在上面的例子中,第一个 ImageMessage 未被处理,因为消息的 source 字段 与处理器的 match 条件不匹配。

直接消息#

AutoGen核心中有两种通信类型:

  • 直接消息:向另一个代理发送直接消息

  • 广播:向主题发布消息

我们先来看直接消息。 要在消息处理器中向另一个代理发送直接消息,使用 autogen_core.BaseAgent.send_message() 方法; 在运行时使用 autogen_core.AgentRuntime.send_message() 方法。 等待这些方法的调用将返回接收代理消息处理器的返回值。 当接收代理的处理器返回 None 时,将返回 None

备注

如果被调用的代理在发送方等待时抛出异常, 该异常将传播回发送方。

请求/响应#

直接消息可用于请求/响应场景, 其中发送方期望从接收方获得响应。 接收方可以通过从其消息处理器返回一个值来响应消息。 你可以将其视为代理之间的函数调用。

例如,考虑以下代理:

from dataclasses import dataclass

from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


@dataclass
class Message:
    content: str


class InnerAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"Hello from inner, {message.content}")


class OuterAgent(RoutedAgent):
    def __init__(self, description: str, inner_agent_type: str):
        super().__init__(description)
        self.inner_agent_id = AgentId(inner_agent_type, self.id.key)

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Received message: {message.content}")
        # 向内部代理发送直接消息并接收响应。
        response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)
        print(f"Received inner response: {response.content}")

当接收到消息时,OuterAgent会直接向InnerAgent发送消息并接收响应消息。

我们可以通过向OuterAgent发送Message来测试这些代理。

runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!

两个输出都是由OuterAgent的消息处理器产生的,但第二个输出是基于InnerAgent的响应。

一般来说,直接消息传递适用于发送方和接收方紧密耦合的场景——它们被一起创建,且发送方链接到接收方的特定实例。 例如,代理通过向~autogen_core.tool_agent.ToolAgent的实例发送直接消息来执行工具调用, 并利用响应形成动作-观察循环。

广播#

广播实质上是带有主题和订阅的发布/订阅模型。 阅读主题与订阅 了解核心概念。

直接消息传递与广播的关键区别在于广播不能用于请求/响应场景。 当代理发布消息时,这是单向的,它无法从任何其他代理接收响应, 即使接收代理的处理器返回了值。

备注

如果对发布的消息给出响应,该响应将被丢弃。

备注

如果代理发布了它已订阅的消息类型,它将不会收到自己发布的消息。这是为了防止无限循环。

订阅和发布主题#

基于类型的订阅 将发布到给定主题类型的消息映射到给定代理类型的代理。 要使继承自~autogen_core.RoutedAgent的代理订阅给定主题类型的主题, 可以使用~autogen_core.components.type_subscription类装饰器。

以下示例展示了一个ReceiverAgent类,它使用~autogen_core.components.type_subscription装饰器 订阅"default"主题类型的主题,并打印接收到的消息。

from autogen_core import RoutedAgent, message_handler, type_subscription


@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Received a message: {message.content}")

要从代理的处理器发布消息, 使用~autogen_core.BaseAgent.publish_message方法并指定 一个~autogen_core.TopicId。 此调用仍需等待,以允许运行时安排向所有订阅者传递消息,但它始终返回None。 如果代理在处理发布的消息时引发异常, 这将被记录但不会传播回发布代理。

以下示例展示了一个BroadcastingAgent,它 在收到消息时向主题发布消息。

from autogen_core import TopicId


class BroadcastingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("Publishing a message from broadcasting agent!"),
            topic_id=TopicId(type="default", source=self.id.key),
        )

BroadcastingAgent向类型为"default"且源分配了代理实例的代理键的主题发布消息。

订阅通过代理运行时注册,既可以作为代理类型注册的一部分,也可以通过单独的API方法注册。 以下展示了如何为接收代理使用type_subscription()装饰器注册TypeSubscription,以及为广播代理不使用装饰器的注册方式。

from autogen_core import TypeSubscription

runtime = SingleThreadedAgentRuntime()

# 选项1:使用type_subscription装饰器
type_subscription类装饰器会在代理注册时自动向运行时添加TypeSubscription
# 。
# 
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))

# 选项2:使用TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))

# 启动运行时并发布消息。
runtime.start()
await runtime.publish_message(
    Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

如上例所示,您还可以直接通过运行时的publish_message()方法向主题发布消息, 而无需创建代理实例。

从输出中可以看到接收代理收到了两条消息: 一条是通过运行时发布的,另一条是由广播代理发布的。

默认主题与订阅#

在上述示例中,我们分别使用 TopicIdTypeSubscription 来指定主题和订阅。 这在许多场景下是恰当的做法。 但当存在单一的发布范围时,即所有代理都会发布并订阅所有广播消息时, 我们可以使用便捷类DefaultTopicIddefault_subscription()来简化代码。

DefaultTopicId用于 创建使用"default"作为主题类型默认值、 并以发布代理的键作为主题源默认值的主题。 default_subscription()用于 创建订阅默认主题的类型订阅。 通过使用DefaultTopicIddefault_subscription(), 我们可以简化BroadcastingAgent的实现。

from autogen_core import DefaultTopicId, default_subscription


@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        # 向同一命名空间内的所有代理发布消息。
        await self.publish_message(
            Message("Publishing a message from broadcasting agent!"),
            topic_id=DefaultTopicId(),
        )

当运行时调用register()注册代理类型时, 它会创建一个TypeSubscription, 其主题类型使用"default"作为默认值, 且代理类型使用同一上下文中正在注册的相同代理类型。

runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(
    runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

备注

如果您的场景允许所有代理发布并订阅 所有广播消息,请使用DefaultTopicIddefault_subscription()来装饰您的 代理类。