autogen_core._intervention 源代码

from typing import Any, Protocol, final

from ._agent_id import AgentId
from ._message_context import MessageContext

__all__ = [
    "DropMessage",
    "InterventionHandler",
    "DefaultInterventionHandler",
]


[文档] @final class DropMessage: """用于标记消息应被干预处理器丢弃的标记类型。该类型本身应从处理器返回。""" ...
[文档] class InterventionHandler(Protocol): """干预处理器是一个类,可用于修改、记录或丢弃正在被 :class:`autogen_core.base.AgentRuntime` 处理的消息。 当消息提交到运行时,处理器会被调用。 目前唯一支持此功能的运行时是 :class:`autogen_core.base.SingleThreadedAgentRuntime`。 注意:从任何干预处理器方法返回 None 都会导致发出警告并被当作"无更改"处理。如果确实要丢弃消息,应显式返回 :class:`DropMessage`。 示例: .. code-block:: python from autogen_core import DefaultInterventionHandler, MessageContext, AgentId, SingleThreadedAgentRuntime from dataclasses import dataclass from typing import Any @dataclass class MyMessage: content: str class MyInterventionHandler(DefaultInterventionHandler): async def on_send(self, message: Any, *, message_context: MessageContext, recipient: AgentId) -> MyMessage: if isinstance(message, MyMessage): message.content = message.content.upper() return message runtime = SingleThreadedAgentRuntime(intervention_handlers=[MyInterventionHandler()]) """
[文档] async def on_send( self, message: Any, *, message_context: MessageContext, recipient: AgentId ) -> Any | type[DropMessage]: """当通过 :meth:`autogen_core.base.AgentRuntime.send_message` 方法向 AgentRuntime 提交消息时调用。""" ...
[文档] async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any | type[DropMessage]: """当通过 :meth:`autogen_core.base.AgentRuntime.publish_message` 方法向 AgentRuntime 发布消息时调用。""" ...
[文档] async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]: """当 AgentRuntime 从 Agent 的消息处理器接收到返回值响应时调用。""" ...
[文档] class DefaultInterventionHandler(InterventionHandler): """提供所有干预处理器方法默认实现的简单类,这些方法会原样返回消息。便于通过子类化仅覆盖所需方法。"""
[文档] async def on_send( self, message: Any, *, message_context: MessageContext, recipient: AgentId ) -> Any | type[DropMessage]: return message
[文档] async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any | type[DropMessage]: return message
[文档] async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]: return message