{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 使用干预处理器实现终止\n\n```{note}\n此方法仅在使用 {py:class}`~autogen_core.SingleThreadedAgentRuntime` 时有效。\n```\n\n在 `autogen_core` 中有多种处理终止的方式。最终目标都是检测到运行时不再需要执行,从而可以继续执行最终化任务。其中一种方法是使用 {py:class}`autogen_core.base.intervention.InterventionHandler` 来检测终止消息并据此采取行动。\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "from typing import Any\n", "\n", "from autogen_core import (\n", " DefaultInterventionHandler,\n", " DefaultTopicId,\n", " MessageContext,\n", " RoutedAgent,\n", " SingleThreadedAgentRuntime,\n", " default_subscription,\n", " message_handler,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "首先,我们为常规消息和用于发出终止信号的消息定义一个数据类。\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class Message:\n", " content: Any\n", "\n", "\n", "@dataclass\n", "class Termination:\n", " reason: str" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "我们编写代理代码,使其在决定终止时发布终止消息。\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "@default_subscription\n", "class AnAgent(RoutedAgent):\n", " def __init__(self) -> None:\n", " super().__init__(\"MyAgent\")\n", " self.received = 0\n", "\n", " @message_handler\n", " async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n", " self.received += 1\n", " if self.received > 3:\n", " await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来,我们创建一个干预处理器来检测终止消息并采取行动。这个处理器会监听发布事件,当遇到 `Termination` 时,它会改变内部状态以表明已请求终止。\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "class TerminationHandler(DefaultInterventionHandler):\n", " def __init__(self) -> None:\n", " self._termination_value: Termination | None = None\n", "\n", " async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any:\n", " if isinstance(message, Termination):\n", " self._termination_value = message\n", " return message\n", "\n", " @property\n", " def termination_value(self) -> Termination | None:\n", " return self._termination_value\n", "\n", " @property\n", " def has_terminated(self) -> bool:\n", " return self._termination_value is not None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "最后,我们将此处理器添加到运行时中,用它来检测终止消息并在收到时停止运行时。\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Termination(reason='Reached maximum number of messages')\n" ] } ], "source": [ "termination_handler = TerminationHandler()\n", "runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n", "\n", "await AnAgent.register(runtime, \"my_agent\", AnAgent)\n", "\n", "runtime.start()\n", "\n", "# 发布超过3条消息以触发终止。\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", "\n", "# 等待终止。\n", "await runtime.stop_when(lambda: termination_handler.has_terminated)\n", "\n", "print(termination_handler.termination_value)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.5" } }, "nbformat": 4, "nbformat_minor": 2 }