autogen_agentchat.base._termination 源代码
import asyncio
from abc import ABC, abstractmethod
from typing import List, Sequence
from autogen_core import Component, ComponentBase, ComponentModel
from pydantic import BaseModel
from typing_extensions import Self
from ..messages import BaseAgentEvent, BaseChatMessage, StopMessage
[文档]
class TerminatedException(BaseException): ...
[文档]
class TerminationCondition(ABC, ComponentBase[BaseModel]):
"""一种有状态的判断条件,用于确定何时应终止对话。
终止条件是一个可调用对象,它接收自上次调用以来的一系列BaseChatMessage对象,
如果对话应终止则返回StopMessage,否则返回None。
一旦达到终止条件,必须重置后才能再次使用。
终止条件可以通过AND和OR运算符进行组合。
示例:
.. code-block:: python
import asyncio
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
async def main() -> None:
# 在10轮对话后或提及"TERMINATE"文本时终止对话
cond1 = MaxMessageTermination(10) | TextMentionTermination("TERMINATE")
# 在10轮对话后且提及"TERMINATE"文本时终止对话
cond2 = MaxMessageTermination(10) & TextMentionTermination("TERMINATE")
# ...
# 重置终止条件
await cond1.reset()
await cond2.reset()
asyncio.run(main())
"""
component_type = "termination"
@property
@abstractmethod
def terminated(self) -> bool:
"""检查是否已达到终止条件"""
...
@abstractmethod
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
"""根据自上次调用条件以来接收到的消息,检查对话是否应终止。
如果对话应终止则返回 StopMessage,否则返回 None。
Args:
messages: 自上次调用条件以来接收到的消息。
Returns:
StopMessage | None: 如果对话应终止则返回 StopMessage,否则返回 None。
Raises:
TerminatedException: 如果终止条件已被触发。"""
...
[文档]
@abstractmethod
async def reset(self) -> None:
"""重置终止条件。"""
...
def __and__(self, other: "TerminationCondition") -> "TerminationCondition":
"""用 AND 操作合并两个终止条件。"""
return AndTerminationCondition(self, other)
def __or__(self, other: "TerminationCondition") -> "TerminationCondition":
"""将两个终止条件用 OR 操作组合起来。"""
return OrTerminationCondition(self, other)
class AndTerminationConditionConfig(BaseModel):
conditions: List[ComponentModel]
[文档]
class AndTerminationCondition(TerminationCondition, Component[AndTerminationConditionConfig]):
component_config_schema = AndTerminationConditionConfig
component_type = "termination"
component_provider_override = "autogen_agentchat.base.AndTerminationCondition"
def __init__(self, *conditions: TerminationCondition) -> None:
self._conditions = conditions
self._stop_messages: List[StopMessage] = []
@property
def terminated(self) -> bool:
return all(condition.terminated for condition in self._conditions)
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self.terminated:
raise TerminatedException("Termination condition has already been reached.")
# Check all remaining conditions.
stop_messages = await asyncio.gather(
*[condition(messages) for condition in self._conditions if not condition.terminated]
)
# Collect stop messages.
for stop_message in stop_messages:
if stop_message is not None:
self._stop_messages.append(stop_message)
if any(stop_message is None for stop_message in stop_messages):
# If any remaining condition has not reached termination, it is not terminated.
return None
content = ", ".join(stop_message.content for stop_message in self._stop_messages)
source = ", ".join(stop_message.source for stop_message in self._stop_messages)
return StopMessage(content=content, source=source)
[文档]
async def reset(self) -> None:
for condition in self._conditions:
await condition.reset()
self._stop_messages.clear()
def _to_config(self) -> AndTerminationConditionConfig:
"""将 AND 终止条件转换为配置。"""
return AndTerminationConditionConfig(conditions=[condition.dump_component() for condition in self._conditions])
@classmethod
def _from_config(cls, config: AndTerminationConditionConfig) -> Self:
"""从配置创建 AND 终止条件。"""
conditions = [TerminationCondition.load_component(condition_model) for condition_model in config.conditions]
return cls(*conditions)
class OrTerminationConditionConfig(BaseModel):
conditions: List[ComponentModel]
"""满足其中任意一个即可的终止条件列表。"""
[文档]
class OrTerminationCondition(TerminationCondition, Component[OrTerminationConditionConfig]):
component_config_schema = OrTerminationConditionConfig
component_type = "termination"
component_provider_override = "autogen_agentchat.base.OrTerminationCondition"
def __init__(self, *conditions: TerminationCondition) -> None:
self._conditions = conditions
@property
def terminated(self) -> bool:
return any(condition.terminated for condition in self._conditions)
async def __call__(self, messages: Sequence[BaseAgentEvent | BaseChatMessage]) -> StopMessage | None:
if self.terminated:
raise RuntimeError("Termination condition has already been reached")
stop_messages = await asyncio.gather(*[condition(messages) for condition in self._conditions])
stop_messages_filter = [stop_message for stop_message in stop_messages if stop_message is not None]
if len(stop_messages_filter) > 0:
content = ", ".join(stop_message.content for stop_message in stop_messages_filter)
source = ", ".join(stop_message.source for stop_message in stop_messages_filter)
return StopMessage(content=content, source=source)
return None
[文档]
async def reset(self) -> None:
for condition in self._conditions:
await condition.reset()
def _to_config(self) -> OrTerminationConditionConfig:
"""将 OR 终止条件转换为配置。"""
return OrTerminationConditionConfig(conditions=[condition.dump_component() for condition in self._conditions])
@classmethod
def _from_config(cls, config: OrTerminationConditionConfig) -> Self:
"""根据配置创建 OR 终止条件。"""
conditions = [TerminationCondition.load_component(condition_model) for condition_model in config.conditions]
return cls(*conditions)