autogen_agentchat.conditions._terminations 源代码

import asyncio
import time
from typing import Awaitable, Callable, List, Sequence

from autogen_core import Component
from pydantic import BaseModel
from typing_extensions import Self

from ..base import TerminatedException, TerminationCondition
from ..messages import (
    BaseAgentEvent,
    BaseChatMessage,
    HandoffMessage,
    StopMessage,
    TextMessage,
    ToolCallExecutionEvent,
)


class StopMessageTerminationConfig(BaseModel):
    pass


[文档] class StopMessageTermination(TerminationCondition, Component[StopMessageTerminationConfig]): """如果接收到 StopMessage 则终止对话。""" component_config_schema = StopMessageTerminationConfig component_provider_override = "autogen_agentchat.conditions.StopMessageTermination" def __init__(self) -> None: self._terminated = False @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if isinstance(message, StopMessage): self._terminated = True return StopMessage(content="Stop message received", source="StopMessageTermination") return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> StopMessageTerminationConfig: return StopMessageTerminationConfig()
[文档] @classmethod def _from_config(cls, config: StopMessageTerminationConfig) -> Self: return cls()
class MaxMessageTerminationConfig(BaseModel): max_messages: int include_agent_event: bool = False
[文档] class MaxMessageTermination(TerminationCondition, Component[MaxMessageTerminationConfig]): """在交换达到最大消息数量后终止对话。 Args: max_messages: 对话中允许的最大消息数量。 include_agent_event: 如果为 True,将 :class:`~autogen_agentchat.messages.BaseAgentEvent` 计入消息计数。 否则仅计算 :class:`~autogen_agentchat.messages.BaseChatMessage`。默认为 False。 """ component_config_schema = MaxMessageTerminationConfig component_provider_override = "autogen_agentchat.conditions.MaxMessageTermination" def __init__(self, max_messages: int, include_agent_event: bool = False) -> None: self._max_messages = max_messages self._message_count = 0 self._include_agent_event = include_agent_event @property def terminated(self) -> bool: return self._message_count >= self._max_messages async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached") self._message_count += len([m for m in messages if self._include_agent_event or isinstance(m, BaseChatMessage)]) if self._message_count >= self._max_messages: return StopMessage( content=f"Maximum number of messages {self._max_messages} reached, current message count: {self._message_count}", source="MaxMessageTermination", ) return None
[文档] async def reset(self) -> None: self._message_count = 0
[文档] def _to_config(self) -> MaxMessageTerminationConfig: return MaxMessageTerminationConfig( max_messages=self._max_messages, include_agent_event=self._include_agent_event )
[文档] @classmethod def _from_config(cls, config: MaxMessageTerminationConfig) -> Self: return cls(max_messages=config.max_messages, include_agent_event=config.include_agent_event)
class TextMentionTerminationConfig(BaseModel): text: str
[文档] class TextMentionTermination(TerminationCondition, Component[TextMentionTerminationConfig]): """如果消息中提到特定文本则终止对话。 Args: text: 需要在消息中查找的文本。 sources: 仅在指定代理的消息中检查目标文本。 """ component_config_schema = TextMentionTerminationConfig component_provider_override = "autogen_agentchat.conditions.TextMentionTermination" def __init__(self, text: str, sources: Sequence[str] | None = None) -> None: self._termination_text = text self._terminated = False self._sources = sources @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if self._sources is not None and message.source not in self._sources: continue content = message.to_text() if self._termination_text in content: self._terminated = True return StopMessage( content=f"Text '{self._termination_text}' mentioned", source="TextMentionTermination" ) return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> TextMentionTerminationConfig: return TextMentionTerminationConfig(text=self._termination_text)
[文档] @classmethod def _from_config(cls, config: TextMentionTerminationConfig) -> Self: return cls(text=config.text)
[文档] class FunctionalTermination(TerminationCondition): """当满足函数表达式时终止对话。 Args: func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], bool] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[bool]]): 一个接收消息序列的函数 如果满足终止条件则返回True,否则返回False。 该函数可以是可调用对象或异步可调用对象。 Example: .. code-block:: python import asyncio from typing import Sequence from autogen_agentchat.conditions import FunctionalTermination from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, StopMessage def expression(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> bool: # 检查最后一条消息是否为停止消息 return isinstance(messages[-1], StopMessage) termination = FunctionalTermination(expression) async def run() -> None: messages = [ StopMessage(source="agent1", content="Stop"), ] result = await termination(messages) print(result) asyncio.run(run()) .. code-block:: text StopMessage(source="FunctionalTermination", content="Functional termination condition met") """ def __init__( self, func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], bool] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[bool]], ) -> None: self._func = func self._terminated = False @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") if asyncio.iscoroutinefunction(self._func): result = await self._func(messages) else: result = self._func(messages) if result is True: self._terminated = True return StopMessage(content="Functional termination condition met", source="FunctionalTermination") return None
[文档] async def reset(self) -> None: self._terminated = False
class TokenUsageTerminationConfig(BaseModel): max_total_token: int | None max_prompt_token: int | None max_completion_token: int | None
[文档] class TokenUsageTermination(TerminationCondition, Component[TokenUsageTerminationConfig]): """当达到令牌使用限制时终止对话。 Args: max_total_token: 对话中允许的最大令牌总数。 max_prompt_token: 对话中允许的最大提示令牌数。 max_completion_token: 对话中允许的最大完成令牌数。 Raises: ValueError: 如果未提供max_total_token、max_prompt_token或max_completion_token中的任何一个。 """ component_config_schema = TokenUsageTerminationConfig component_provider_override = "autogen_agentchat.conditions.TokenUsageTermination" def __init__( self, max_total_token: int | None = None, max_prompt_token: int | None = None, max_completion_token: int | None = None, ) -> None: if max_total_token is None and max_prompt_token is None and max_completion_token is None: raise ValueError( "At least one of max_total_token, max_prompt_token, or max_completion_token must be provided" ) self._max_total_token = max_total_token self._max_prompt_token = max_prompt_token self._max_completion_token = max_completion_token self._total_token_count = 0 self._prompt_token_count = 0 self._completion_token_count = 0 @property def terminated(self) -> bool: return ( (self._max_total_token is not None and self._total_token_count >= self._max_total_token) or (self._max_prompt_token is not None and self._prompt_token_count >= self._max_prompt_token) or (self._max_completion_token is not None and self._completion_token_count >= self._max_completion_token) ) async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self.terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if message.models_usage is not None: self._prompt_token_count += message.models_usage.prompt_tokens self._completion_token_count += message.models_usage.completion_tokens self._total_token_count += message.models_usage.prompt_tokens + message.models_usage.completion_tokens if self.terminated: content = f"Token usage limit reached, total token count: {self._total_token_count}, prompt token count: {self._prompt_token_count}, completion token count: {self._completion_token_count}." return StopMessage(content=content, source="TokenUsageTermination") return None
[文档] async def reset(self) -> None: self._total_token_count = 0 self._prompt_token_count = 0 self._completion_token_count = 0
[文档] def _to_config(self) -> TokenUsageTerminationConfig: return TokenUsageTerminationConfig( max_total_token=self._max_total_token, max_prompt_token=self._max_prompt_token, max_completion_token=self._max_completion_token, )
[文档] @classmethod def _from_config(cls, config: TokenUsageTerminationConfig) -> Self: return cls( max_total_token=config.max_total_token, max_prompt_token=config.max_prompt_token, max_completion_token=config.max_completion_token, )
class HandoffTerminationConfig(BaseModel): target: str
[文档] class HandoffTermination(TerminationCondition, Component[HandoffTerminationConfig]): """当接收到带有指定目标的:class:`~autogen_agentchat.messages.HandoffMessage`时终止对话。 Args: target (str): 交接消息的目标对象。 """ component_config_schema = HandoffTerminationConfig component_provider_override = "autogen_agentchat.conditions.HandoffTermination" def __init__(self, target: str) -> None: self._terminated = False self._target = target @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if isinstance(message, HandoffMessage) and message.target == self._target: self._terminated = True return StopMessage( content=f"Handoff to {self._target} from {message.source} detected.", source="HandoffTermination" ) return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> HandoffTerminationConfig: return HandoffTerminationConfig(target=self._target)
[文档] @classmethod def _from_config(cls, config: HandoffTerminationConfig) -> Self: return cls(target=config.target)
class TimeoutTerminationConfig(BaseModel): timeout_seconds: float
[文档] class TimeoutTermination(TerminationCondition, Component[TimeoutTerminationConfig]): """在指定持续时间过后终止对话。 Args: timeout_seconds: 终止对话前的最大持续时间(秒)。 """ component_config_schema = TimeoutTerminationConfig component_provider_override = "autogen_agentchat.conditions.TimeoutTermination" def __init__(self, timeout_seconds: float) -> None: self._timeout_seconds = timeout_seconds self._start_time = time.monotonic() self._terminated = False @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") if (time.monotonic() - self._start_time) >= self._timeout_seconds: self._terminated = True return StopMessage( content=f"Timeout of {self._timeout_seconds} seconds reached", source="TimeoutTermination" ) return None
[文档] async def reset(self) -> None: self._start_time = time.monotonic() self._terminated = False
[文档] def _to_config(self) -> TimeoutTerminationConfig: return TimeoutTerminationConfig(timeout_seconds=self._timeout_seconds)
[文档] @classmethod def _from_config(cls, config: TimeoutTerminationConfig) -> Self: return cls(timeout_seconds=config.timeout_seconds)
class ExternalTerminationConfig(BaseModel): pass
[文档] class ExternalTermination(TerminationCondition, Component[ExternalTerminationConfig]): """一种外部控制的终止条件 通过调用 :meth:`set` 方法来设置。 Example: .. code-block:: python from autogen_agentchat.conditions import ExternalTermination termination = ExternalTermination() # 在 asyncio 任务中运行团队 ... # 从外部设置终止条件 termination.set() """ component_config_schema = ExternalTerminationConfig component_provider_override = "autogen_agentchat.conditions.ExternalTermination" def __init__(self) -> None: self._terminated = False self._setted = False @property def terminated(self) -> bool: return self._terminated
[文档] def set(self) -> None: """将终止条件设置为已终止。""" self._setted = True
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") if self._setted: self._terminated = True return StopMessage(content="External termination requested", source="ExternalTermination") return None
[文档] async def reset(self) -> None: self._terminated = False self._setted = False
[文档] def _to_config(self) -> ExternalTerminationConfig: return ExternalTerminationConfig()
[文档] @classmethod def _from_config(cls, config: ExternalTerminationConfig) -> Self: return cls()
class SourceMatchTerminationConfig(BaseModel): sources: List[str]
[文档] class SourceMatchTermination(TerminationCondition, Component[SourceMatchTerminationConfig]): """在特定来源响应后终止对话。 Args: sources (List[str]): 用于终止对话的来源名称列表。 Raises: TerminatedException: 如果终止条件已被触发。 """ component_config_schema = SourceMatchTerminationConfig component_provider_override = "autogen_agentchat.conditions.SourceMatchTermination" def __init__(self, sources: List[str]) -> None: self._sources = sources self._terminated = False @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") if not messages: return None for message in messages: if message.source in self._sources: self._terminated = True return StopMessage(content=f"'{message.source}' answered", source="SourceMatchTermination") return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> SourceMatchTerminationConfig: return SourceMatchTerminationConfig(sources=self._sources)
[文档] @classmethod def _from_config(cls, config: SourceMatchTerminationConfig) -> Self: return cls(sources=config.sources)
class TextMessageTerminationConfig(BaseModel): """TextMessageTermination 终止条件的配置。""" source: str | None = None """用于终止对话的文本消息来源。"""
[文档] class TextMessageTermination(TerminationCondition, Component[TextMessageTerminationConfig]): """如果收到 :class:`~autogen_agentchat.messages.TextMessage` 则终止对话。 此终止条件会检查消息序列中的 TextMessage 实例。当发现 TextMessage 时, 在以下任一情况下会终止对话: - 未指定来源(任何 TextMessage 都会触发终止) - 消息来源与指定的来源匹配 Args: source (str | None, optional): 用于匹配传入消息的来源名称。若为 None 则匹配所有来源。 默认为 None。 """ component_config_schema = TextMessageTerminationConfig component_provider_override = "autogen_agentchat.conditions.TextMessageTermination" def __init__(self, source: str | None = None) -> None: self._terminated = False self._source = source @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if isinstance(message, TextMessage) and (self._source is None or message.source == self._source): self._terminated = True return StopMessage( content=f"Text message received from '{message.source}'", source="TextMessageTermination" ) return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> TextMessageTerminationConfig: return TextMessageTerminationConfig(source=self._source)
[文档] @classmethod def _from_config(cls, config: TextMessageTerminationConfig) -> Self: return cls(source=config.source)
class FunctionCallTerminationConfig(BaseModel): """FunctionCallTermination 终止条件的配置。""" function_name: str
[文档] class FunctionCallTermination(TerminationCondition, Component[FunctionCallTerminationConfig]): """如果收到具有特定名称的 :class:`~autogen_core.models.FunctionExecutionResult` 则终止对话。 Args: function_name (str): 要在消息中查找的函数名称。 Raises: TerminatedException: 如果终止条件已满足。 """ component_config_schema = FunctionCallTerminationConfig component_provider_override = "autogen_agentchat.conditions.FunctionCallTermination" """组件配置的架构。""" def __init__(self, function_name: str) -> None: self._terminated = False self._function_name = function_name @property def terminated(self) -> bool: return self._terminated async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None: if self._terminated: raise TerminatedException("Termination condition has already been reached") for message in messages: if isinstance(message, ToolCallExecutionEvent): for execution in message.content: if execution.name == self._function_name: self._terminated = True return StopMessage( content=f"Function '{self._function_name}' was executed.", source="FunctionCallTermination", ) return None
[文档] async def reset(self) -> None: self._terminated = False
[文档] def _to_config(self) -> FunctionCallTerminationConfig: return FunctionCallTerminationConfig( function_name=self._function_name, )
[文档] @classmethod def _from_config(cls, config: FunctionCallTerminationConfig) -> Self: return cls( function_name=config.function_name, )