import asyncio
import uuid
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Callable, Dict, List, Mapping, Sequence
from autogen_core import (
AgentId,
AgentRuntime,
AgentType,
CancellationToken,
ComponentBase,
SingleThreadedAgentRuntime,
TypeSubscription,
)
from pydantic import BaseModel, ValidationError
from ...base import ChatAgent, TaskResult, Team, TerminationCondition
from ...messages import (
BaseAgentEvent,
BaseChatMessage,
MessageFactory,
ModelClientStreamingChunkEvent,
StopMessage,
StructuredMessage,
TextMessage,
)
from ...state import TeamState
from ._chat_agent_container import ChatAgentContainer
from ._events import (
GroupChatPause,
GroupChatReset,
GroupChatResume,
GroupChatStart,
GroupChatTermination,
SerializableException,
)
from ._sequential_routed_agent import SequentialRoutedAgent
[文档]
class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]):
"""群组聊天团队的基础类。
要实现一个群组聊天团队,首先创建 :class:`BaseGroupChatManager` 的子类,然后
创建使用该群组聊天管理器的 :class:`BaseGroupChat` 的子类。
"""
component_type = "team"
def __init__(
self,
participants: List[ChatAgent],
group_chat_manager_name: str,
group_chat_manager_class: type[SequentialRoutedAgent],
termination_condition: TerminationCondition | None = None,
max_turns: int | None = None,
runtime: AgentRuntime | None = None,
custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None,
emit_team_events: bool = False,
):
if len(participants) == 0:
raise ValueError("At least one participant is required.")
if len(participants) != len(set(participant.name for participant in participants)):
raise ValueError("The participant names must be unique.")
self._participants = participants
self._base_group_chat_manager_class = group_chat_manager_class
self._termination_condition = termination_condition
self._max_turns = max_turns
self._message_factory = MessageFactory()
if custom_message_types is not None:
for message_type in custom_message_types:
self._message_factory.register(message_type)
for agent in participants:
for message_type in agent.produced_message_types:
try:
is_registered = self._message_factory.is_registered(message_type) # type: ignore[reportUnknownArgumentType]
if issubclass(message_type, StructuredMessage) and not is_registered:
self._message_factory.register(message_type) # type: ignore[reportUnknownArgumentType]
except TypeError:
# Not a class or not a valid subclassable type (skip)
pass
# The team ID is a UUID that is used to identify the team and its participants
# in the agent runtime. It is used to create unique topic types for each participant.
# Currently, team ID is binded to an object instance of the group chat class.
# So if you create two instances of group chat, there will be two teams with different IDs.
self._team_id = str(uuid.uuid4())
# Constants for the group chat team.
# The names are used to identify the agents within the team.
# The names may not be unique across different teams.
self._group_chat_manager_name = group_chat_manager_name
self._participant_names: List[str] = [participant.name for participant in participants]
self._participant_descriptions: List[str] = [participant.description for participant in participants]
# The group chat topic type is used for broadcast communication among all participants and the group chat manager.
self._group_topic_type = f"group_topic_{self._team_id}"
# The group chat manager topic type is used for direct communication with the group chat manager.
self._group_chat_manager_topic_type = f"{self._group_chat_manager_name}_{self._team_id}"
# The participant topic types are used for direct communication with each participant.
self._participant_topic_types: List[str] = [
f"{participant.name}_{self._team_id}" for participant in participants
]
# The output topic type is used for emitting streaming messages from the group chat.
# The group chat manager will relay the messages to the output message queue.
self._output_topic_type = f"output_topic_{self._team_id}"
# The queue for collecting the output messages.
self._output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination] = (
asyncio.Queue()
)
# Create a runtime for the team.
if runtime is not None:
self._runtime = runtime
self._embedded_runtime = False
else:
# Use a embedded single-threaded runtime for the group chat.
# Background exceptions must not be ignored as it results in non-surfaced exceptions and early team termination.
self._runtime = SingleThreadedAgentRuntime(ignore_unhandled_exceptions=False)
self._embedded_runtime = True
# Flag to track if the group chat has been initialized.
self._initialized = False
# Flag to track if the group chat is running.
self._is_running = False
# Flag to track if the team events should be emitted.
self._emit_team_events = emit_team_events
@abstractmethod
def _create_group_chat_manager_factory(
self,
name: str,
group_topic_type: str,
output_topic_type: str,
participant_topic_types: List[str],
participant_names: List[str],
participant_descriptions: List[str],
output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination],
termination_condition: TerminationCondition | None,
max_turns: int | None,
message_factory: MessageFactory,
) -> Callable[[], SequentialRoutedAgent]: ...
def _create_participant_factory(
self,
parent_topic_type: str,
output_topic_type: str,
agent: ChatAgent,
message_factory: MessageFactory,
) -> Callable[[], ChatAgentContainer]:
def _factory() -> ChatAgentContainer:
container = ChatAgentContainer(parent_topic_type, output_topic_type, agent, message_factory)
return container
return _factory
async def _init(self, runtime: AgentRuntime) -> None:
# Constants for the group chat manager.
group_chat_manager_agent_type = AgentType(self._group_chat_manager_topic_type)
# Register participants.
# Use the participant topic type as the agent type.
for participant, agent_type in zip(self._participants, self._participant_topic_types, strict=True):
# Register the participant factory.
await ChatAgentContainer.register(
runtime,
type=agent_type,
factory=self._create_participant_factory(
self._group_topic_type, self._output_topic_type, participant, self._message_factory
),
)
# Add subscriptions for the participant.
# The participant should be able to receive messages from its own topic.
await runtime.add_subscription(TypeSubscription(topic_type=agent_type, agent_type=agent_type))
# The participant should be able to receive messages from the group topic.
await runtime.add_subscription(TypeSubscription(topic_type=self._group_topic_type, agent_type=agent_type))
# Register the group chat manager.
await self._base_group_chat_manager_class.register(
runtime,
type=group_chat_manager_agent_type.type,
factory=self._create_group_chat_manager_factory(
name=self._group_chat_manager_name,
group_topic_type=self._group_topic_type,
output_topic_type=self._output_topic_type,
participant_names=self._participant_names,
participant_topic_types=self._participant_topic_types,
participant_descriptions=self._participant_descriptions,
output_message_queue=self._output_message_queue,
termination_condition=self._termination_condition,
max_turns=self._max_turns,
message_factory=self._message_factory,
),
)
# Add subscriptions for the group chat manager.
# The group chat manager should be able to receive messages from the its own topic.
await runtime.add_subscription(
TypeSubscription(
topic_type=self._group_chat_manager_topic_type, agent_type=group_chat_manager_agent_type.type
)
)
# The group chat manager should be able to receive messages from the group topic.
await runtime.add_subscription(
TypeSubscription(topic_type=self._group_topic_type, agent_type=group_chat_manager_agent_type.type)
)
# The group chat manager will relay the messages from output topic to the output message queue.
await runtime.add_subscription(
TypeSubscription(topic_type=self._output_topic_type, agent_type=group_chat_manager_agent_type.type)
)
self._initialized = True
[文档]
async def run(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""运行团队并返回结果。基础实现使用
:meth:`run_stream` 运行团队,然后返回最终结果。
团队停止后,终止条件会被重置。
Args:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None): 运行团队的任务。可以是字符串、单个 :class:`BaseChatMessage` 或 :class:`BaseChatMessage` 列表。
cancellation_token (CancellationToken | None): 用于立即终止任务的取消令牌。
设置取消令牌可能导致团队处于不一致状态,
并且可能不会重置终止条件。
要优雅地停止团队,请改用 :class:`~autogen_agentchat.conditions.ExternalTermination`。
Returns:
result: 任务结果,类型为 :class:`~autogen_agentchat.base.TaskResult`。结果包含团队产生的消息和停止原因。
使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
result = await team.run(task="Count from 1 to 10, respond one at a time.")
print(result)
# 再次运行团队时不带任务以继续之前的任务。
result = await team.run()
print(result)
asyncio.run(main())
使用 :class:`~autogen_core.CancellationToken` 取消任务的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
cancellation_token = CancellationToken()
# 创建后台运行团队的任务。
run_task = asyncio.create_task(
team.run(
task="Count from 1 to 10, respond one at a time.",
cancellation_token=cancellation_token,
)
)
# 等待1秒后取消任务。
await asyncio.sleep(1)
cancellation_token.cancel()
# 这将引发取消错误。
await run_task
asyncio.run(main())
"""
result: TaskResult | None = None
async for message in self.run_stream(
task=task,
cancellation_token=cancellation_token,
):
if isinstance(message, TaskResult):
result = message
if result is not None:
return result
raise AssertionError("The stream should have returned the final result.")
[文档]
async def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
"""运行团队并生成消息流和最终结果,
最终结果的类型为 :class:`~autogen_agentchat.base.TaskResult`,作为流的最后一项。
团队停止后,终止条件会被重置。
.. note::
如果代理产生 :class:`~autogen_agentchat.messages.ModelClientStreamingChunkEvent`,
该消息将在流中产生,但不会包含在
:attr:`~autogen_agentchat.base.TaskResult.messages` 中。
Args:
task (str | BaseChatMessage | Sequence[BaseChatMessage] | None): 运行团队的任务。可以是字符串、单个 :class:`BaseChatMessage` 或 :class:`BaseChatMessage` 列表。
cancellation_token (CancellationToken | None): 用于立即终止任务的取消令牌。
设置取消令牌可能导致团队处于不一致状态,
并且可能不会重置终止条件。
要优雅地停止团队,请改用 :class:`~autogen_agentchat.conditions.ExternalTermination`。
Returns:
stream: 一个 :class:`~collections.abc.AsyncGenerator`,产生 :class:`~autogen_agentchat.messages.BaseAgentEvent`、:class:`~autogen_agentchat.messages.BaseChatMessage` 以及最终的 :class:`~autogen_agentchat.base.TaskResult` 结果作为流的最后一项。
使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
async for message in stream:
print(message)
# 再次运行团队时不带任务以继续之前的任务。
stream = team.run_stream()
async for message in stream:
print(message)
asyncio.run(main())
使用 :class:`~autogen_core.CancellationToken` 取消任务的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
cancellation_token = CancellationToken()
# 创建后台运行团队的任务。
run_task = asyncio.create_task(
Console(
team.run_stream(
task="Count from 1 to 10, respond one at a time.",
cancellation_token=cancellation_token,
)
)
)
# 等待1秒后取消任务。
await asyncio.sleep(1)
cancellation_token.cancel()
# 这将引发取消错误。
await run_task
asyncio.run(main())
"""
# Create the messages list if the task is a string or a chat message.
messages: List[BaseChatMessage] | None = None
if task is None:
pass
elif isinstance(task, str):
messages = [TextMessage(content=task, source="user")]
elif isinstance(task, BaseChatMessage):
messages = [task]
elif isinstance(task, list):
if not task:
raise ValueError("Task list cannot be empty.")
messages = []
for msg in task:
if not isinstance(msg, BaseChatMessage):
raise ValueError("All messages in task list must be valid BaseChatMessage types")
messages.append(msg)
else:
raise ValueError("Task must be a string, a BaseChatMessage, or a list of BaseChatMessage.")
# Check if the messages types are registered with the message factory.
if messages is not None:
for msg in messages:
if not self._message_factory.is_registered(msg.__class__):
raise ValueError(
f"Message type {msg.__class__} is not registered with the message factory. "
"Please register it with the message factory by adding it to the "
"custom_message_types list when creating the team."
)
if self._is_running:
raise ValueError("The team is already running, it cannot run again until it is stopped.")
self._is_running = True
if self._embedded_runtime:
# Start the embedded runtime.
assert isinstance(self._runtime, SingleThreadedAgentRuntime)
self._runtime.start()
if not self._initialized:
await self._init(self._runtime)
shutdown_task: asyncio.Task[None] | None = None
if self._embedded_runtime:
async def stop_runtime() -> None:
assert isinstance(self._runtime, SingleThreadedAgentRuntime)
try:
# This will propagate any exceptions raised.
await self._runtime.stop_when_idle()
# Put a termination message in the queue to indicate that the group chat is stopped for whatever reason
# but not due to an exception.
await self._output_message_queue.put(
GroupChatTermination(
message=StopMessage(
content="The group chat is stopped.", source=self._group_chat_manager_name
)
)
)
except Exception as e:
# Stop the consumption of messages and end the stream.
# NOTE: we also need to put a GroupChatTermination event here because when the runtime
# has an exception, the group chat manager may not be able to put a GroupChatTermination event in the queue.
# This may not be necessary if the group chat manager is able to handle the exception and put the event in the queue.
await self._output_message_queue.put(
GroupChatTermination(
message=StopMessage(
content="An exception occurred in the runtime.", source=self._group_chat_manager_name
),
error=SerializableException.from_exception(e),
)
)
# Create a background task to stop the runtime when the group chat
# is stopped or has an exception.
shutdown_task = asyncio.create_task(stop_runtime())
try:
# Run the team by sending the start message to the group chat manager.
# The group chat manager will start the group chat by relaying the message to the participants
# and the group chat manager.
await self._runtime.send_message(
GroupChatStart(messages=messages),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
cancellation_token=cancellation_token,
)
# Collect the output messages in order.
output_messages: List[BaseAgentEvent | BaseChatMessage] = []
stop_reason: str | None = None
# Yield the messsages until the queue is empty.
while True:
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
# Wait for the next message, this will raise an exception if the task is cancelled.
message = await message_future
if isinstance(message, GroupChatTermination):
# If the message contains an error, we need to raise it here.
# This will stop the team and propagate the error.
if message.error is not None:
raise RuntimeError(str(message.error))
stop_reason = message.message.content
break
yield message
if isinstance(message, ModelClientStreamingChunkEvent):
# Skip the model client streaming chunk events.
continue
output_messages.append(message)
# Yield the final result.
yield TaskResult(messages=output_messages, stop_reason=stop_reason)
finally:
try:
if shutdown_task is not None:
# Wait for the shutdown task to finish.
# This will propagate any exceptions raised.
await shutdown_task
finally:
# Clear the output message queue.
while not self._output_message_queue.empty():
self._output_message_queue.get_nowait()
# Indicate that the team is no longer running.
self._is_running = False
[文档]
async def reset(self) -> None:
"""将团队及其参与者重置到初始状态。
团队必须停止后才能进行重置。
Raises:
RuntimeError: 如果团队未初始化或当前正在运行。
使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
async for message in stream:
print(message)
# Reset the team.
await team.reset()
stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
async for message in stream:
print(message)
asyncio.run(main())
"""
if not self._initialized:
await self._init(self._runtime)
if self._is_running:
raise RuntimeError("The group chat is currently running. It must be stopped before it can be reset.")
self._is_running = True
if self._embedded_runtime:
# Start the runtime.
assert isinstance(self._runtime, SingleThreadedAgentRuntime)
self._runtime.start()
try:
# Send a reset messages to all participants.
for participant_topic_type in self._participant_topic_types:
await self._runtime.send_message(
GroupChatReset(),
recipient=AgentId(type=participant_topic_type, key=self._team_id),
)
# Send a reset message to the group chat manager.
await self._runtime.send_message(
GroupChatReset(),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
)
finally:
if self._embedded_runtime:
# Stop the runtime.
assert isinstance(self._runtime, SingleThreadedAgentRuntime)
await self._runtime.stop_when_idle()
# Reset the output message queue.
while not self._output_message_queue.empty():
self._output_message_queue.get_nowait()
# Indicate that the team is no longer running.
self._is_running = False
[文档]
async def pause(self) -> None:
"""在团队运行时通过直接RPC调用参与者的
:meth:`~autogen_agentchat.base.ChatAgent.on_pause` 方法来暂停它们。
.. attention::
这是v0.4.9引入的实验性功能,未来可能会变更或移除。
团队必须初始化后才能暂停。
与终止不同,暂停团队不会导致
:meth:`run` 或 :meth:`run_stream` 方法返回。它会调用每个参与者的
:meth:`~autogen_agentchat.base.ChatAgent.on_pause` 方法,如果参与者未实现该方法,
则不会有任何操作。
.. note::
代理类需要负责处理暂停操作,并确保代理后续可以恢复。
请确保在你的代理类中实现 :meth:`~autogen_agentchat.agents.BaseChatAgent.on_pause`
方法来自定义暂停行为。
默认情况下,代理被调用时不会执行任何操作。
Raises:
RuntimeError: 如果团队未初始化。来自参与者的异常当调用它们的
:class:`~autogen_agentchat.base.ChatAgent.on_pause` 实现时
会传播到该方法并抛出。
"""
if not self._initialized:
raise RuntimeError("The group chat has not been initialized. It must be run before it can be paused.")
# Send a pause message to all participants.
for participant_topic_type in self._participant_topic_types:
await self._runtime.send_message(
GroupChatPause(),
recipient=AgentId(type=participant_topic_type, key=self._team_id),
)
# Send a pause message to the group chat manager.
await self._runtime.send_message(
GroupChatPause(),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
)
[文档]
async def resume(self) -> None:
"""在团队运行且暂停时通过直接RPC调用参与者的
:meth:`~autogen_agentchat.base.ChatAgent.on_resume` 方法来恢复它们。
.. attention::
这是v0.4.9引入的实验性功能,未来可能会变更或移除。
团队必须初始化后才能恢复。
与终止和用新任务重启不同,恢复团队不会导致
:meth:`run` 或 :meth:`run_stream` 方法返回。它会调用每个参与者的
:meth:`~autogen_agentchat.base.ChatAgent.on_resume` 方法,如果参与者未实现该方法,
则不会有任何操作。
.. note::
代理类需要负责处理恢复操作,并确保代理从暂停处继续执行。
请确保在你的代理类中实现 :meth:`~autogen_agentchat.agents.BaseChatAgent.on_resume`
方法来自定义恢复行为。
Raises:
RuntimeError: 如果团队未初始化。来自参与者的异常当调用它们的
:class:`~autogen_agentchat.base.ChatAgent.on_resume` 方法实现时
会传播到该方法并抛出。
"""
if not self._initialized:
raise RuntimeError("The group chat has not been initialized. It must be run before it can be resumed.")
# Send a resume message to all participants.
for participant_topic_type in self._participant_topic_types:
await self._runtime.send_message(
GroupChatResume(),
recipient=AgentId(type=participant_topic_type, key=self._team_id),
)
# Send a resume message to the group chat manager.
await self._runtime.send_message(
GroupChatResume(),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
)
[文档]
async def save_state(self) -> Mapping[str, Any]:
"""保存群聊团队的状态。
通过调用每个参与者和群聊管理器的 :meth:`~autogen_core.AgentRuntime.agent_save_state` 方法(使用其内部代理ID)来保存状态。
状态以嵌套字典形式返回:一个包含 `agent_states` 键的字典,该键对应的值是另一个字典,其中代理名称作为键,状态作为值。
.. code-block:: text
{
"agent_states": {
"agent1": ...,
"agent2": ...,
"RoundRobinGroupChatManager": ...
}
}
.. note::
从 v0.4.9 版本开始,状态使用代理名称作为键而非代理ID,并且 `team_id` 字段已从状态中移除。
这使得状态可以在不同团队和运行时之间移植。以旧格式保存的状态未来可能与新格式不兼容。
.. caution::
在团队运行时调用 :func:`~autogen_agentchat.teams.BaseGroupChat.save_state` 可能导致状态不一致并产生意外结果。
建议在团队未运行或停止后调用此方法。
"""
if not self._initialized:
await self._init(self._runtime)
# Store state of each agent by their name.
# NOTE: we don't use the agent ID as the key here because we need to be able to decouple
# the state of the agents from their identities in the agent runtime.
agent_states: Dict[str, Mapping[str, Any]] = {}
# Save the state of all participants.
for name, agent_type in zip(self._participant_names, self._participant_topic_types, strict=True):
agent_id = AgentId(type=agent_type, key=self._team_id)
# NOTE: We are using the runtime's save state method rather than the agent instance's
# save_state method because we want to support saving state of remote agents.
agent_states[name] = await self._runtime.agent_save_state(agent_id)
# Save the state of the group chat manager.
agent_id = AgentId(type=self._group_chat_manager_topic_type, key=self._team_id)
agent_states[self._group_chat_manager_name] = await self._runtime.agent_save_state(agent_id)
return TeamState(agent_states=agent_states).model_dump()
[文档]
async def load_state(self, state: Mapping[str, Any]) -> None:
"""加载外部状态并覆盖当前群聊团队的状态。
通过调用每个参与者和群聊管理器的 :meth:`~autogen_core.AgentRuntime.agent_load_state` 方法(使用其内部代理ID)来加载状态。
关于状态的预期格式,请参阅 :meth:`~autogen_agentchat.teams.BaseGroupChat.save_state` 方法。
"""
if not self._initialized:
await self._init(self._runtime)
if self._is_running:
raise RuntimeError("The team cannot be loaded while it is running.")
self._is_running = True
try:
team_state = TeamState.model_validate(state)
# Load the state of all participants.
for name, agent_type in zip(self._participant_names, self._participant_topic_types, strict=True):
agent_id = AgentId(type=agent_type, key=self._team_id)
if name not in team_state.agent_states:
raise ValueError(f"Agent state for {name} not found in the saved state.")
await self._runtime.agent_load_state(agent_id, team_state.agent_states[name])
# Load the state of the group chat manager.
agent_id = AgentId(type=self._group_chat_manager_topic_type, key=self._team_id)
if self._group_chat_manager_name not in team_state.agent_states:
raise ValueError(f"Agent state for {self._group_chat_manager_name} not found in the saved state.")
await self._runtime.agent_load_state(agent_id, team_state.agent_states[self._group_chat_manager_name])
except ValidationError as e:
raise ValueError(
"Invalid state format. The expected state format has changed since v0.4.9. "
"Please read the release note on GitHub."
) from e
finally:
# Indicate that the team is no longer running.
self._is_running = False