import asyncio
import time
from typing import Awaitable, Callable, List, Sequence
from autogen_core import Component
from pydantic import BaseModel
from typing_extensions import Self
from ..base import TerminatedException, TerminationCondition
from ..messages import (
BaseAgentEvent,
BaseChatMessage,
HandoffMessage,
StopMessage,
TextMessage,
ToolCallExecutionEvent,
)
class StopMessageTerminationConfig(BaseModel):
pass
[文档]
class StopMessageTermination(TerminationCondition, Component[StopMessageTerminationConfig]):
"""如果接收到 StopMessage 则终止对话。"""
component_config_schema = StopMessageTerminationConfig
component_provider_override = "autogen_agentchat.conditions.StopMessageTermination"
def __init__(self) -> None:
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, StopMessage):
self._terminated = True
return StopMessage(content="Stop message received", source="StopMessageTermination")
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> StopMessageTerminationConfig:
return StopMessageTerminationConfig()
[文档]
@classmethod
def _from_config(cls, config: StopMessageTerminationConfig) -> Self:
return cls()
class MaxMessageTerminationConfig(BaseModel):
max_messages: int
include_agent_event: bool = False
[文档]
class MaxMessageTermination(TerminationCondition, Component[MaxMessageTerminationConfig]):
"""在交换达到最大消息数量后终止对话。
Args:
max_messages: 对话中允许的最大消息数量。
include_agent_event: 如果为 True,将 :class:`~autogen_agentchat.messages.BaseAgentEvent` 计入消息计数。
否则仅计算 :class:`~autogen_agentchat.messages.BaseChatMessage`。默认为 False。
"""
component_config_schema = MaxMessageTerminationConfig
component_provider_override = "autogen_agentchat.conditions.MaxMessageTermination"
def __init__(self, max_messages: int, include_agent_event: bool = False) -> None:
self._max_messages = max_messages
self._message_count = 0
self._include_agent_event = include_agent_event
@property
def terminated(self) -> bool:
return self._message_count >= self._max_messages
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self.terminated:
raise TerminatedException("Termination condition has already been reached")
self._message_count += len([m for m in messages if self._include_agent_event or isinstance(m, BaseChatMessage)])
if self._message_count >= self._max_messages:
return StopMessage(
content=f"Maximum number of messages {self._max_messages} reached, current message count: {self._message_count}",
source="MaxMessageTermination",
)
return None
[文档]
async def reset(self) -> None:
self._message_count = 0
[文档]
def _to_config(self) -> MaxMessageTerminationConfig:
return MaxMessageTerminationConfig(
max_messages=self._max_messages, include_agent_event=self._include_agent_event
)
[文档]
@classmethod
def _from_config(cls, config: MaxMessageTerminationConfig) -> Self:
return cls(max_messages=config.max_messages, include_agent_event=config.include_agent_event)
class TextMentionTerminationConfig(BaseModel):
text: str
[文档]
class TextMentionTermination(TerminationCondition, Component[TextMentionTerminationConfig]):
"""如果消息中提到特定文本则终止对话。
Args:
text: 需要在消息中查找的文本。
sources: 仅在指定代理的消息中检查目标文本。
"""
component_config_schema = TextMentionTerminationConfig
component_provider_override = "autogen_agentchat.conditions.TextMentionTermination"
def __init__(self, text: str, sources: Sequence[str] | None = None) -> None:
self._termination_text = text
self._terminated = False
self._sources = sources
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if self._sources is not None and message.source not in self._sources:
continue
content = message.to_text()
if self._termination_text in content:
self._terminated = True
return StopMessage(
content=f"Text '{self._termination_text}' mentioned", source="TextMentionTermination"
)
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> TextMentionTerminationConfig:
return TextMentionTerminationConfig(text=self._termination_text)
[文档]
@classmethod
def _from_config(cls, config: TextMentionTerminationConfig) -> Self:
return cls(text=config.text)
[文档]
class FunctionalTermination(TerminationCondition):
"""当满足函数表达式时终止对话。
Args:
func (Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], bool] | Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[bool]]): 一个接收消息序列的函数
如果满足终止条件则返回True,否则返回False。
该函数可以是可调用对象或异步可调用对象。
Example:
.. code-block:: python
import asyncio
from typing import Sequence
from autogen_agentchat.conditions import FunctionalTermination
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, StopMessage
def expression(messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> bool:
# 检查最后一条消息是否为停止消息
return isinstance(messages[-1], StopMessage)
termination = FunctionalTermination(expression)
async def run() -> None:
messages = [
StopMessage(source="agent1", content="Stop"),
]
result = await termination(messages)
print(result)
asyncio.run(run())
.. code-block:: text
StopMessage(source="FunctionalTermination", content="Functional termination condition met")
"""
def __init__(
self,
func: Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], bool]
| Callable[[Sequence[BaseAgentEvent | BaseChatMessage]], Awaitable[bool]],
) -> None:
self._func = func
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if asyncio.iscoroutinefunction(self._func):
result = await self._func(messages)
else:
result = self._func(messages)
if result is True:
self._terminated = True
return StopMessage(content="Functional termination condition met", source="FunctionalTermination")
return None
[文档]
async def reset(self) -> None:
self._terminated = False
class TokenUsageTerminationConfig(BaseModel):
max_total_token: int | None
max_prompt_token: int | None
max_completion_token: int | None
[文档]
class TokenUsageTermination(TerminationCondition, Component[TokenUsageTerminationConfig]):
"""当达到令牌使用限制时终止对话。
Args:
max_total_token: 对话中允许的最大令牌总数。
max_prompt_token: 对话中允许的最大提示令牌数。
max_completion_token: 对话中允许的最大完成令牌数。
Raises:
ValueError: 如果未提供max_total_token、max_prompt_token或max_completion_token中的任何一个。
"""
component_config_schema = TokenUsageTerminationConfig
component_provider_override = "autogen_agentchat.conditions.TokenUsageTermination"
def __init__(
self,
max_total_token: int | None = None,
max_prompt_token: int | None = None,
max_completion_token: int | None = None,
) -> None:
if max_total_token is None and max_prompt_token is None and max_completion_token is None:
raise ValueError(
"At least one of max_total_token, max_prompt_token, or max_completion_token must be provided"
)
self._max_total_token = max_total_token
self._max_prompt_token = max_prompt_token
self._max_completion_token = max_completion_token
self._total_token_count = 0
self._prompt_token_count = 0
self._completion_token_count = 0
@property
def terminated(self) -> bool:
return (
(self._max_total_token is not None and self._total_token_count >= self._max_total_token)
or (self._max_prompt_token is not None and self._prompt_token_count >= self._max_prompt_token)
or (self._max_completion_token is not None and self._completion_token_count >= self._max_completion_token)
)
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self.terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if message.models_usage is not None:
self._prompt_token_count += message.models_usage.prompt_tokens
self._completion_token_count += message.models_usage.completion_tokens
self._total_token_count += message.models_usage.prompt_tokens + message.models_usage.completion_tokens
if self.terminated:
content = f"Token usage limit reached, total token count: {self._total_token_count}, prompt token count: {self._prompt_token_count}, completion token count: {self._completion_token_count}."
return StopMessage(content=content, source="TokenUsageTermination")
return None
[文档]
async def reset(self) -> None:
self._total_token_count = 0
self._prompt_token_count = 0
self._completion_token_count = 0
[文档]
def _to_config(self) -> TokenUsageTerminationConfig:
return TokenUsageTerminationConfig(
max_total_token=self._max_total_token,
max_prompt_token=self._max_prompt_token,
max_completion_token=self._max_completion_token,
)
[文档]
@classmethod
def _from_config(cls, config: TokenUsageTerminationConfig) -> Self:
return cls(
max_total_token=config.max_total_token,
max_prompt_token=config.max_prompt_token,
max_completion_token=config.max_completion_token,
)
class HandoffTerminationConfig(BaseModel):
target: str
[文档]
class HandoffTermination(TerminationCondition, Component[HandoffTerminationConfig]):
"""当接收到带有指定目标的:class:`~autogen_agentchat.messages.HandoffMessage`时终止对话。
Args:
target (str): 交接消息的目标对象。
"""
component_config_schema = HandoffTerminationConfig
component_provider_override = "autogen_agentchat.conditions.HandoffTermination"
def __init__(self, target: str) -> None:
self._terminated = False
self._target = target
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, HandoffMessage) and message.target == self._target:
self._terminated = True
return StopMessage(
content=f"Handoff to {self._target} from {message.source} detected.", source="HandoffTermination"
)
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> HandoffTerminationConfig:
return HandoffTerminationConfig(target=self._target)
[文档]
@classmethod
def _from_config(cls, config: HandoffTerminationConfig) -> Self:
return cls(target=config.target)
class TimeoutTerminationConfig(BaseModel):
timeout_seconds: float
[文档]
class TimeoutTermination(TerminationCondition, Component[TimeoutTerminationConfig]):
"""在指定持续时间过后终止对话。
Args:
timeout_seconds: 终止对话前的最大持续时间(秒)。
"""
component_config_schema = TimeoutTerminationConfig
component_provider_override = "autogen_agentchat.conditions.TimeoutTermination"
def __init__(self, timeout_seconds: float) -> None:
self._timeout_seconds = timeout_seconds
self._start_time = time.monotonic()
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if (time.monotonic() - self._start_time) >= self._timeout_seconds:
self._terminated = True
return StopMessage(
content=f"Timeout of {self._timeout_seconds} seconds reached", source="TimeoutTermination"
)
return None
[文档]
async def reset(self) -> None:
self._start_time = time.monotonic()
self._terminated = False
[文档]
def _to_config(self) -> TimeoutTerminationConfig:
return TimeoutTerminationConfig(timeout_seconds=self._timeout_seconds)
[文档]
@classmethod
def _from_config(cls, config: TimeoutTerminationConfig) -> Self:
return cls(timeout_seconds=config.timeout_seconds)
class ExternalTerminationConfig(BaseModel):
pass
[文档]
class ExternalTermination(TerminationCondition, Component[ExternalTerminationConfig]):
"""一种外部控制的终止条件
通过调用 :meth:`set` 方法来设置。
Example:
.. code-block:: python
from autogen_agentchat.conditions import ExternalTermination
termination = ExternalTermination()
# 在 asyncio 任务中运行团队
...
# 从外部设置终止条件
termination.set()
"""
component_config_schema = ExternalTerminationConfig
component_provider_override = "autogen_agentchat.conditions.ExternalTermination"
def __init__(self) -> None:
self._terminated = False
self._setted = False
@property
def terminated(self) -> bool:
return self._terminated
[文档]
def set(self) -> None:
"""将终止条件设置为已终止。"""
self._setted = True
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if self._setted:
self._terminated = True
return StopMessage(content="External termination requested", source="ExternalTermination")
return None
[文档]
async def reset(self) -> None:
self._terminated = False
self._setted = False
[文档]
def _to_config(self) -> ExternalTerminationConfig:
return ExternalTerminationConfig()
[文档]
@classmethod
def _from_config(cls, config: ExternalTerminationConfig) -> Self:
return cls()
class SourceMatchTerminationConfig(BaseModel):
sources: List[str]
[文档]
class SourceMatchTermination(TerminationCondition, Component[SourceMatchTerminationConfig]):
"""在特定来源响应后终止对话。
Args:
sources (List[str]): 用于终止对话的来源名称列表。
Raises:
TerminatedException: 如果终止条件已被触发。
"""
component_config_schema = SourceMatchTerminationConfig
component_provider_override = "autogen_agentchat.conditions.SourceMatchTermination"
def __init__(self, sources: List[str]) -> None:
self._sources = sources
self._terminated = False
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
if not messages:
return None
for message in messages:
if message.source in self._sources:
self._terminated = True
return StopMessage(content=f"'{message.source}' answered", source="SourceMatchTermination")
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> SourceMatchTerminationConfig:
return SourceMatchTerminationConfig(sources=self._sources)
[文档]
@classmethod
def _from_config(cls, config: SourceMatchTerminationConfig) -> Self:
return cls(sources=config.sources)
class TextMessageTerminationConfig(BaseModel):
"""TextMessageTermination 终止条件的配置。"""
source: str | None = None
"""用于终止对话的文本消息来源。"""
[文档]
class TextMessageTermination(TerminationCondition, Component[TextMessageTerminationConfig]):
"""如果收到 :class:`~autogen_agentchat.messages.TextMessage` 则终止对话。
此终止条件会检查消息序列中的 TextMessage 实例。当发现 TextMessage 时,
在以下任一情况下会终止对话:
- 未指定来源(任何 TextMessage 都会触发终止)
- 消息来源与指定的来源匹配
Args:
source (str | None, optional): 用于匹配传入消息的来源名称。若为 None 则匹配所有来源。
默认为 None。
"""
component_config_schema = TextMessageTerminationConfig
component_provider_override = "autogen_agentchat.conditions.TextMessageTermination"
def __init__(self, source: str | None = None) -> None:
self._terminated = False
self._source = source
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, TextMessage) and (self._source is None or message.source == self._source):
self._terminated = True
return StopMessage(
content=f"Text message received from '{message.source}'", source="TextMessageTermination"
)
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> TextMessageTerminationConfig:
return TextMessageTerminationConfig(source=self._source)
[文档]
@classmethod
def _from_config(cls, config: TextMessageTerminationConfig) -> Self:
return cls(source=config.source)
class FunctionCallTerminationConfig(BaseModel):
"""FunctionCallTermination 终止条件的配置。"""
function_name: str
[文档]
class FunctionCallTermination(TerminationCondition, Component[FunctionCallTerminationConfig]):
"""如果收到具有特定名称的 :class:`~autogen_core.models.FunctionExecutionResult`
则终止对话。
Args:
function_name (str): 要在消息中查找的函数名称。
Raises:
TerminatedException: 如果终止条件已满足。
"""
component_config_schema = FunctionCallTerminationConfig
component_provider_override = "autogen_agentchat.conditions.FunctionCallTermination"
"""组件配置的架构。"""
def __init__(self, function_name: str) -> None:
self._terminated = False
self._function_name = function_name
@property
def terminated(self) -> bool:
return self._terminated
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self._terminated:
raise TerminatedException("Termination condition has already been reached")
for message in messages:
if isinstance(message, ToolCallExecutionEvent):
for execution in message.content:
if execution.name == self._function_name:
self._terminated = True
return StopMessage(
content=f"Function '{self._function_name}' was executed.",
source="FunctionCallTermination",
)
return None
[文档]
async def reset(self) -> None:
self._terminated = False
[文档]
def _to_config(self) -> FunctionCallTerminationConfig:
return FunctionCallTerminationConfig(
function_name=self._function_name,
)
[文档]
@classmethod
def _from_config(cls, config: FunctionCallTerminationConfig) -> Self:
return cls(
function_name=config.function_name,
)