autogen_agentchat.teams._group_chat._base_group_chat 源代码

import asyncio
import uuid
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Callable, Dict, List, Mapping, Sequence

from autogen_core import (
    AgentId,
    AgentRuntime,
    AgentType,
    CancellationToken,
    ComponentBase,
    SingleThreadedAgentRuntime,
    TypeSubscription,
)
from pydantic import BaseModel, ValidationError

from ...base import ChatAgent, TaskResult, Team, TerminationCondition
from ...messages import (
    BaseAgentEvent,
    BaseChatMessage,
    MessageFactory,
    ModelClientStreamingChunkEvent,
    StopMessage,
    StructuredMessage,
    TextMessage,
)
from ...state import TeamState
from ._chat_agent_container import ChatAgentContainer
from ._events import (
    GroupChatPause,
    GroupChatReset,
    GroupChatResume,
    GroupChatStart,
    GroupChatTermination,
    SerializableException,
)
from ._sequential_routed_agent import SequentialRoutedAgent


[文档] class BaseGroupChat(Team, ABC, ComponentBase[BaseModel]): """群组聊天团队的基础类。 要实现一个群组聊天团队,首先创建 :class:`BaseGroupChatManager` 的子类,然后 创建使用该群组聊天管理器的 :class:`BaseGroupChat` 的子类。 """ component_type = "team" def __init__( self, participants: List[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: type[SequentialRoutedAgent], termination_condition: TerminationCondition | None = None, max_turns: int | None = None, runtime: AgentRuntime | None = None, custom_message_types: List[type[BaseAgentEvent | BaseChatMessage]] | None = None, emit_team_events: bool = False, ): if len(participants) == 0: raise ValueError("At least one participant is required.") if len(participants) != len(set(participant.name for participant in participants)): raise ValueError("The participant names must be unique.") self._participants = participants self._base_group_chat_manager_class = group_chat_manager_class self._termination_condition = termination_condition self._max_turns = max_turns self._message_factory = MessageFactory() if custom_message_types is not None: for message_type in custom_message_types: self._message_factory.register(message_type) for agent in participants: for message_type in agent.produced_message_types: try: is_registered = self._message_factory.is_registered(message_type) # type: ignore[reportUnknownArgumentType] if issubclass(message_type, StructuredMessage) and not is_registered: self._message_factory.register(message_type) # type: ignore[reportUnknownArgumentType] except TypeError: # Not a class or not a valid subclassable type (skip) pass # The team ID is a UUID that is used to identify the team and its participants # in the agent runtime. It is used to create unique topic types for each participant. # Currently, team ID is binded to an object instance of the group chat class. # So if you create two instances of group chat, there will be two teams with different IDs. self._team_id = str(uuid.uuid4()) # Constants for the group chat team. # The names are used to identify the agents within the team. # The names may not be unique across different teams. self._group_chat_manager_name = group_chat_manager_name self._participant_names: List[str] = [participant.name for participant in participants] self._participant_descriptions: List[str] = [participant.description for participant in participants] # The group chat topic type is used for broadcast communication among all participants and the group chat manager. self._group_topic_type = f"group_topic_{self._team_id}" # The group chat manager topic type is used for direct communication with the group chat manager. self._group_chat_manager_topic_type = f"{self._group_chat_manager_name}_{self._team_id}" # The participant topic types are used for direct communication with each participant. self._participant_topic_types: List[str] = [ f"{participant.name}_{self._team_id}" for participant in participants ] # The output topic type is used for emitting streaming messages from the group chat. # The group chat manager will relay the messages to the output message queue. self._output_topic_type = f"output_topic_{self._team_id}" # The queue for collecting the output messages. self._output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination] = ( asyncio.Queue() ) # Create a runtime for the team. if runtime is not None: self._runtime = runtime self._embedded_runtime = False else: # Use a embedded single-threaded runtime for the group chat. # Background exceptions must not be ignored as it results in non-surfaced exceptions and early team termination. self._runtime = SingleThreadedAgentRuntime(ignore_unhandled_exceptions=False) self._embedded_runtime = True # Flag to track if the group chat has been initialized. self._initialized = False # Flag to track if the group chat is running. self._is_running = False # Flag to track if the team events should be emitted. self._emit_team_events = emit_team_events @abstractmethod def _create_group_chat_manager_factory( self, name: str, group_topic_type: str, output_topic_type: str, participant_topic_types: List[str], participant_names: List[str], participant_descriptions: List[str], output_message_queue: asyncio.Queue[BaseAgentEvent | BaseChatMessage | GroupChatTermination], termination_condition: TerminationCondition | None, max_turns: int | None, message_factory: MessageFactory, ) -> Callable[[], SequentialRoutedAgent]: ... def _create_participant_factory( self, parent_topic_type: str, output_topic_type: str, agent: ChatAgent, message_factory: MessageFactory, ) -> Callable[[], ChatAgentContainer]: def _factory() -> ChatAgentContainer: container = ChatAgentContainer(parent_topic_type, output_topic_type, agent, message_factory) return container return _factory async def _init(self, runtime: AgentRuntime) -> None: # Constants for the group chat manager. group_chat_manager_agent_type = AgentType(self._group_chat_manager_topic_type) # Register participants. # Use the participant topic type as the agent type. for participant, agent_type in zip(self._participants, self._participant_topic_types, strict=True): # Register the participant factory. await ChatAgentContainer.register( runtime, type=agent_type, factory=self._create_participant_factory( self._group_topic_type, self._output_topic_type, participant, self._message_factory ), ) # Add subscriptions for the participant. # The participant should be able to receive messages from its own topic. await runtime.add_subscription(TypeSubscription(topic_type=agent_type, agent_type=agent_type)) # The participant should be able to receive messages from the group topic. await runtime.add_subscription(TypeSubscription(topic_type=self._group_topic_type, agent_type=agent_type)) # Register the group chat manager. await self._base_group_chat_manager_class.register( runtime, type=group_chat_manager_agent_type.type, factory=self._create_group_chat_manager_factory( name=self._group_chat_manager_name, group_topic_type=self._group_topic_type, output_topic_type=self._output_topic_type, participant_names=self._participant_names, participant_topic_types=self._participant_topic_types, participant_descriptions=self._participant_descriptions, output_message_queue=self._output_message_queue, termination_condition=self._termination_condition, max_turns=self._max_turns, message_factory=self._message_factory, ), ) # Add subscriptions for the group chat manager. # The group chat manager should be able to receive messages from the its own topic. await runtime.add_subscription( TypeSubscription( topic_type=self._group_chat_manager_topic_type, agent_type=group_chat_manager_agent_type.type ) ) # The group chat manager should be able to receive messages from the group topic. await runtime.add_subscription( TypeSubscription(topic_type=self._group_topic_type, agent_type=group_chat_manager_agent_type.type) ) # The group chat manager will relay the messages from output topic to the output message queue. await runtime.add_subscription( TypeSubscription(topic_type=self._output_topic_type, agent_type=group_chat_manager_agent_type.type) ) self._initialized = True
[文档] async def run( self, *, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, ) -> TaskResult: """运行团队并返回结果。基础实现使用 :meth:`run_stream` 运行团队,然后返回最终结果。 团队停止后,终止条件会被重置。 Args: task (str | BaseChatMessage | Sequence[BaseChatMessage] | None): 运行团队的任务。可以是字符串、单个 :class:`BaseChatMessage` 或 :class:`BaseChatMessage` 列表。 cancellation_token (CancellationToken | None): 用于立即终止任务的取消令牌。 设置取消令牌可能导致团队处于不一致状态, 并且可能不会重置终止条件。 要优雅地停止团队,请改用 :class:`~autogen_agentchat.conditions.ExternalTermination`。 Returns: result: 任务结果,类型为 :class:`~autogen_agentchat.base.TaskResult`。结果包含团队产生的消息和停止原因。 使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # 再次运行团队时不带任务以继续之前的任务。 result = await team.run() print(result) asyncio.run(main()) 使用 :class:`~autogen_core.CancellationToken` 取消任务的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # 创建后台运行团队的任务。 run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # 等待1秒后取消任务。 await asyncio.sleep(1) cancellation_token.cancel() # 这将引发取消错误。 await run_task asyncio.run(main()) """ result: TaskResult | None = None async for message in self.run_stream( task=task, cancellation_token=cancellation_token, ): if isinstance(message, TaskResult): result = message if result is not None: return result raise AssertionError("The stream should have returned the final result.")
[文档] async def run_stream( self, *, task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None, cancellation_token: CancellationToken | None = None, ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]: """运行团队并生成消息流和最终结果, 最终结果的类型为 :class:`~autogen_agentchat.base.TaskResult`,作为流的最后一项。 团队停止后,终止条件会被重置。 .. note:: 如果代理产生 :class:`~autogen_agentchat.messages.ModelClientStreamingChunkEvent`, 该消息将在流中产生,但不会包含在 :attr:`~autogen_agentchat.base.TaskResult.messages` 中。 Args: task (str | BaseChatMessage | Sequence[BaseChatMessage] | None): 运行团队的任务。可以是字符串、单个 :class:`BaseChatMessage` 或 :class:`BaseChatMessage` 列表。 cancellation_token (CancellationToken | None): 用于立即终止任务的取消令牌。 设置取消令牌可能导致团队处于不一致状态, 并且可能不会重置终止条件。 要优雅地停止团队,请改用 :class:`~autogen_agentchat.conditions.ExternalTermination`。 Returns: stream: 一个 :class:`~collections.abc.AsyncGenerator`,产生 :class:`~autogen_agentchat.messages.BaseAgentEvent`、:class:`~autogen_agentchat.messages.BaseChatMessage` 以及最终的 :class:`~autogen_agentchat.base.TaskResult` 结果作为流的最后一项。 使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # 再次运行团队时不带任务以继续之前的任务。 stream = team.run_stream() async for message in stream: print(message) asyncio.run(main()) 使用 :class:`~autogen_core.CancellationToken` 取消任务的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # 创建后台运行团队的任务。 run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # 等待1秒后取消任务。 await asyncio.sleep(1) cancellation_token.cancel() # 这将引发取消错误。 await run_task asyncio.run(main()) """ # Create the messages list if the task is a string or a chat message. messages: List[BaseChatMessage] | None = None if task is None: pass elif isinstance(task, str): messages = [TextMessage(content=task, source="user")] elif isinstance(task, BaseChatMessage): messages = [task] elif isinstance(task, list): if not task: raise ValueError("Task list cannot be empty.") messages = [] for msg in task: if not isinstance(msg, BaseChatMessage): raise ValueError("All messages in task list must be valid BaseChatMessage types") messages.append(msg) else: raise ValueError("Task must be a string, a BaseChatMessage, or a list of BaseChatMessage.") # Check if the messages types are registered with the message factory. if messages is not None: for msg in messages: if not self._message_factory.is_registered(msg.__class__): raise ValueError( f"Message type {msg.__class__} is not registered with the message factory. " "Please register it with the message factory by adding it to the " "custom_message_types list when creating the team." ) if self._is_running: raise ValueError("The team is already running, it cannot run again until it is stopped.") self._is_running = True if self._embedded_runtime: # Start the embedded runtime. assert isinstance(self._runtime, SingleThreadedAgentRuntime) self._runtime.start() if not self._initialized: await self._init(self._runtime) shutdown_task: asyncio.Task[None] | None = None if self._embedded_runtime: async def stop_runtime() -> None: assert isinstance(self._runtime, SingleThreadedAgentRuntime) try: # This will propagate any exceptions raised. await self._runtime.stop_when_idle() # Put a termination message in the queue to indicate that the group chat is stopped for whatever reason # but not due to an exception. await self._output_message_queue.put( GroupChatTermination( message=StopMessage( content="The group chat is stopped.", source=self._group_chat_manager_name ) ) ) except Exception as e: # Stop the consumption of messages and end the stream. # NOTE: we also need to put a GroupChatTermination event here because when the runtime # has an exception, the group chat manager may not be able to put a GroupChatTermination event in the queue. # This may not be necessary if the group chat manager is able to handle the exception and put the event in the queue. await self._output_message_queue.put( GroupChatTermination( message=StopMessage( content="An exception occurred in the runtime.", source=self._group_chat_manager_name ), error=SerializableException.from_exception(e), ) ) # Create a background task to stop the runtime when the group chat # is stopped or has an exception. shutdown_task = asyncio.create_task(stop_runtime()) try: # Run the team by sending the start message to the group chat manager. # The group chat manager will start the group chat by relaying the message to the participants # and the group chat manager. await self._runtime.send_message( GroupChatStart(messages=messages), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), cancellation_token=cancellation_token, ) # Collect the output messages in order. output_messages: List[BaseAgentEvent | BaseChatMessage] = [] stop_reason: str | None = None # Yield the messsages until the queue is empty. while True: message_future = asyncio.ensure_future(self._output_message_queue.get()) if cancellation_token is not None: cancellation_token.link_future(message_future) # Wait for the next message, this will raise an exception if the task is cancelled. message = await message_future if isinstance(message, GroupChatTermination): # If the message contains an error, we need to raise it here. # This will stop the team and propagate the error. if message.error is not None: raise RuntimeError(str(message.error)) stop_reason = message.message.content break yield message if isinstance(message, ModelClientStreamingChunkEvent): # Skip the model client streaming chunk events. continue output_messages.append(message) # Yield the final result. yield TaskResult(messages=output_messages, stop_reason=stop_reason) finally: try: if shutdown_task is not None: # Wait for the shutdown task to finish. # This will propagate any exceptions raised. await shutdown_task finally: # Clear the output message queue. while not self._output_message_queue.empty(): self._output_message_queue.get_nowait() # Indicate that the team is no longer running. self._is_running = False
[文档] async def reset(self) -> None: """将团队及其参与者重置到初始状态。 团队必须停止后才能进行重置。 Raises: RuntimeError: 如果团队未初始化或当前正在运行。 使用 :class:`~autogen_agentchat.teams.RoundRobinGroupChat` 团队的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main()) """ if not self._initialized: await self._init(self._runtime) if self._is_running: raise RuntimeError("The group chat is currently running. It must be stopped before it can be reset.") self._is_running = True if self._embedded_runtime: # Start the runtime. assert isinstance(self._runtime, SingleThreadedAgentRuntime) self._runtime.start() try: # Send a reset messages to all participants. for participant_topic_type in self._participant_topic_types: await self._runtime.send_message( GroupChatReset(), recipient=AgentId(type=participant_topic_type, key=self._team_id), ) # Send a reset message to the group chat manager. await self._runtime.send_message( GroupChatReset(), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), ) finally: if self._embedded_runtime: # Stop the runtime. assert isinstance(self._runtime, SingleThreadedAgentRuntime) await self._runtime.stop_when_idle() # Reset the output message queue. while not self._output_message_queue.empty(): self._output_message_queue.get_nowait() # Indicate that the team is no longer running. self._is_running = False
[文档] async def pause(self) -> None: """在团队运行时通过直接RPC调用参与者的 :meth:`~autogen_agentchat.base.ChatAgent.on_pause` 方法来暂停它们。 .. attention:: 这是v0.4.9引入的实验性功能,未来可能会变更或移除。 团队必须初始化后才能暂停。 与终止不同,暂停团队不会导致 :meth:`run` 或 :meth:`run_stream` 方法返回。它会调用每个参与者的 :meth:`~autogen_agentchat.base.ChatAgent.on_pause` 方法,如果参与者未实现该方法, 则不会有任何操作。 .. note:: 代理类需要负责处理暂停操作,并确保代理后续可以恢复。 请确保在你的代理类中实现 :meth:`~autogen_agentchat.agents.BaseChatAgent.on_pause` 方法来自定义暂停行为。 默认情况下,代理被调用时不会执行任何操作。 Raises: RuntimeError: 如果团队未初始化。来自参与者的异常当调用它们的 :class:`~autogen_agentchat.base.ChatAgent.on_pause` 实现时 会传播到该方法并抛出。 """ if not self._initialized: raise RuntimeError("The group chat has not been initialized. It must be run before it can be paused.") # Send a pause message to all participants. for participant_topic_type in self._participant_topic_types: await self._runtime.send_message( GroupChatPause(), recipient=AgentId(type=participant_topic_type, key=self._team_id), ) # Send a pause message to the group chat manager. await self._runtime.send_message( GroupChatPause(), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), )
[文档] async def resume(self) -> None: """在团队运行且暂停时通过直接RPC调用参与者的 :meth:`~autogen_agentchat.base.ChatAgent.on_resume` 方法来恢复它们。 .. attention:: 这是v0.4.9引入的实验性功能,未来可能会变更或移除。 团队必须初始化后才能恢复。 与终止和用新任务重启不同,恢复团队不会导致 :meth:`run` 或 :meth:`run_stream` 方法返回。它会调用每个参与者的 :meth:`~autogen_agentchat.base.ChatAgent.on_resume` 方法,如果参与者未实现该方法, 则不会有任何操作。 .. note:: 代理类需要负责处理恢复操作,并确保代理从暂停处继续执行。 请确保在你的代理类中实现 :meth:`~autogen_agentchat.agents.BaseChatAgent.on_resume` 方法来自定义恢复行为。 Raises: RuntimeError: 如果团队未初始化。来自参与者的异常当调用它们的 :class:`~autogen_agentchat.base.ChatAgent.on_resume` 方法实现时 会传播到该方法并抛出。 """ if not self._initialized: raise RuntimeError("The group chat has not been initialized. It must be run before it can be resumed.") # Send a resume message to all participants. for participant_topic_type in self._participant_topic_types: await self._runtime.send_message( GroupChatResume(), recipient=AgentId(type=participant_topic_type, key=self._team_id), ) # Send a resume message to the group chat manager. await self._runtime.send_message( GroupChatResume(), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), )
[文档] async def save_state(self) -> Mapping[str, Any]: """保存群聊团队的状态。 通过调用每个参与者和群聊管理器的 :meth:`~autogen_core.AgentRuntime.agent_save_state` 方法(使用其内部代理ID)来保存状态。 状态以嵌套字典形式返回:一个包含 `agent_states` 键的字典,该键对应的值是另一个字典,其中代理名称作为键,状态作为值。 .. code-block:: text { "agent_states": { "agent1": ..., "agent2": ..., "RoundRobinGroupChatManager": ... } } .. note:: 从 v0.4.9 版本开始,状态使用代理名称作为键而非代理ID,并且 `team_id` 字段已从状态中移除。 这使得状态可以在不同团队和运行时之间移植。以旧格式保存的状态未来可能与新格式不兼容。 .. caution:: 在团队运行时调用 :func:`~autogen_agentchat.teams.BaseGroupChat.save_state` 可能导致状态不一致并产生意外结果。 建议在团队未运行或停止后调用此方法。 """ if not self._initialized: await self._init(self._runtime) # Store state of each agent by their name. # NOTE: we don't use the agent ID as the key here because we need to be able to decouple # the state of the agents from their identities in the agent runtime. agent_states: Dict[str, Mapping[str, Any]] = {} # Save the state of all participants. for name, agent_type in zip(self._participant_names, self._participant_topic_types, strict=True): agent_id = AgentId(type=agent_type, key=self._team_id) # NOTE: We are using the runtime's save state method rather than the agent instance's # save_state method because we want to support saving state of remote agents. agent_states[name] = await self._runtime.agent_save_state(agent_id) # Save the state of the group chat manager. agent_id = AgentId(type=self._group_chat_manager_topic_type, key=self._team_id) agent_states[self._group_chat_manager_name] = await self._runtime.agent_save_state(agent_id) return TeamState(agent_states=agent_states).model_dump()
[文档] async def load_state(self, state: Mapping[str, Any]) -> None: """加载外部状态并覆盖当前群聊团队的状态。 通过调用每个参与者和群聊管理器的 :meth:`~autogen_core.AgentRuntime.agent_load_state` 方法(使用其内部代理ID)来加载状态。 关于状态的预期格式,请参阅 :meth:`~autogen_agentchat.teams.BaseGroupChat.save_state` 方法。 """ if not self._initialized: await self._init(self._runtime) if self._is_running: raise RuntimeError("The team cannot be loaded while it is running.") self._is_running = True try: team_state = TeamState.model_validate(state) # Load the state of all participants. for name, agent_type in zip(self._participant_names, self._participant_topic_types, strict=True): agent_id = AgentId(type=agent_type, key=self._team_id) if name not in team_state.agent_states: raise ValueError(f"Agent state for {name} not found in the saved state.") await self._runtime.agent_load_state(agent_id, team_state.agent_states[name]) # Load the state of the group chat manager. agent_id = AgentId(type=self._group_chat_manager_topic_type, key=self._team_id) if self._group_chat_manager_name not in team_state.agent_states: raise ValueError(f"Agent state for {self._group_chat_manager_name} not found in the saved state.") await self._runtime.agent_load_state(agent_id, team_state.agent_states[self._group_chat_manager_name]) except ValidationError as e: raise ValueError( "Invalid state format. The expected state format has changed since v0.4.9. " "Please read the release note on GitHub." ) from e finally: # Indicate that the team is no longer running. self._is_running = False