autogen_core._intervention 源代码
from typing import Any, Protocol, final
from ._agent_id import AgentId
from ._message_context import MessageContext
__all__ = [
"DropMessage",
"InterventionHandler",
"DefaultInterventionHandler",
]
[文档]
@final
class DropMessage:
"""用于标记消息应被干预处理器丢弃的标记类型。该类型本身应从处理器返回。"""
...
[文档]
class InterventionHandler(Protocol):
"""干预处理器是一个类,可用于修改、记录或丢弃正在被 :class:`autogen_core.base.AgentRuntime` 处理的消息。
当消息提交到运行时,处理器会被调用。
目前唯一支持此功能的运行时是 :class:`autogen_core.base.SingleThreadedAgentRuntime`。
注意:从任何干预处理器方法返回 None 都会导致发出警告并被当作"无更改"处理。如果确实要丢弃消息,应显式返回 :class:`DropMessage`。
示例:
.. code-block:: python
from autogen_core import DefaultInterventionHandler, MessageContext, AgentId, SingleThreadedAgentRuntime
from dataclasses import dataclass
from typing import Any
@dataclass
class MyMessage:
content: str
class MyInterventionHandler(DefaultInterventionHandler):
async def on_send(self, message: Any, *, message_context: MessageContext, recipient: AgentId) -> MyMessage:
if isinstance(message, MyMessage):
message.content = message.content.upper()
return message
runtime = SingleThreadedAgentRuntime(intervention_handlers=[MyInterventionHandler()])
"""
[文档]
async def on_send(
self, message: Any, *, message_context: MessageContext, recipient: AgentId
) -> Any | type[DropMessage]:
"""当通过 :meth:`autogen_core.base.AgentRuntime.send_message` 方法向 AgentRuntime 提交消息时调用。"""
...
[文档]
async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any | type[DropMessage]:
"""当通过 :meth:`autogen_core.base.AgentRuntime.publish_message` 方法向 AgentRuntime 发布消息时调用。"""
...
[文档]
async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]:
"""当 AgentRuntime 从 Agent 的消息处理器接收到返回值响应时调用。"""
...
[文档]
class DefaultInterventionHandler(InterventionHandler):
"""提供所有干预处理器方法默认实现的简单类,这些方法会原样返回消息。便于通过子类化仅覆盖所需方法。"""
[文档]
async def on_send(
self, message: Any, *, message_context: MessageContext, recipient: AgentId
) -> Any | type[DropMessage]:
return message
[文档]
async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any | type[DropMessage]:
return message
[文档]
async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]:
return message