autogen_agentchat.agents._society_of_mind_agent 源代码

from typing import Any, AsyncGenerator, List, Mapping, Sequence

from autogen_core import CancellationToken, Component, ComponentModel
from autogen_core.model_context import (
    ChatCompletionContext,
    UnboundedChatCompletionContext,
)
from autogen_core.models import ChatCompletionClient, LLMMessage, SystemMessage, UserMessage
from pydantic import BaseModel
from typing_extensions import Self

from autogen_agentchat.base import Response
from autogen_agentchat.state import SocietyOfMindAgentState

from ..base import TaskResult, Team
from ..messages import (
    BaseAgentEvent,
    BaseChatMessage,
    HandoffMessage,
    ModelClientStreamingChunkEvent,
    TextMessage,
)
from ._base_chat_agent import BaseChatAgent


class SocietyOfMindAgentConfig(BaseModel):
    """SocietyOfMindAgent 的声明式配置。"""

    name: str
    team: ComponentModel
    model_client: ComponentModel
    description: str | None = None
    instruction: str | None = None
    response_prompt: str | None = None
    model_context: ComponentModel | None = None


[文档] class SocietyOfMindAgent(BaseChatAgent, Component[SocietyOfMindAgentConfig]): """一个使用内部代理团队生成响应的代理。 每次调用代理的 :meth:`on_messages` 或 :meth:`on_messages_stream` 方法时,它会运行内部代理团队,然后使用模型客户端基于内部团队的消息 生成响应。生成响应后,代理会通过调用 :meth:`Team.reset` 重置内部团队。 限制发送给模型的上下文大小: 您可以通过将 `model_context` 参数设置为 :class:`~autogen_core.model_context.BufferedChatCompletionContext` 来限制发送给模型的消息数量。这将限制发送给模型的最近消息数量,当模型 有可处理令牌数量限制时非常有用。 您也可以通过继承 :class:`~autogen_core.model_context.ChatCompletionContext` 创建自己的模型上下文。 参数: name (str): 代理名称。 team (Team): 要使用的代理团队。 model_client (ChatCompletionClient): 用于准备响应的模型客户端。 description (str, 可选): 代理的描述。 instruction (str, 可选): 使用内部团队消息生成响应时的指令。 默认为 :attr:`DEFAULT_INSTRUCTION`。该指令将作为 'system' 角色。 response_prompt (str, 可选): 使用内部团队消息生成响应时的提示词。 默认为 :attr:`DEFAULT_RESPONSE_PROMPT`。该提示词将作为 'system' 角色。 model_context (ChatCompletionContext | None, 可选): 用于存储和检索 :class:`~autogen_core.models.LLMMessage` 的模型上下文。可以预加载初始消息。代理重置时会清除初始消息。 示例: .. code-block:: python import asyncio from autogen_agentchat.ui import Console from autogen_agentchat.agents import AssistantAgent, SocietyOfMindAgent from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a writer, write well.") agent2 = AssistantAgent( "assistant2", model_client=model_client, system_message="You are an editor, provide critical feedback. Respond with 'APPROVE' if the text addresses all feedbacks.", ) inner_termination = TextMentionTermination("APPROVE") inner_team = RoundRobinGroupChat([agent1, agent2], termination_condition=inner_termination) society_of_mind_agent = SocietyOfMindAgent("society_of_mind", team=inner_team, model_client=model_client) agent3 = AssistantAgent( "assistant3", model_client=model_client, system_message="Translate the text to Spanish." ) team = RoundRobinGroupChat([society_of_mind_agent, agent3], max_turns=2) stream = team.run_stream(task="Write a short story with a surprising ending.") await Console(stream) asyncio.run(main()) """ component_config_schema = SocietyOfMindAgentConfig component_provider_override = "autogen_agentchat.agents.SocietyOfMindAgent" DEFAULT_INSTRUCTION = "Earlier you were asked to fulfill a request. You and your team worked diligently to address that request. Here is a transcript of that conversation:" """str: 使用内部团队消息生成响应时的默认指令。该指令将在使用模型生成响应时 被前置到内部团队的消息前。该指令将作为 'system' 角色。""" DEFAULT_RESPONSE_PROMPT = ( "Output a standalone response to the original request, without mentioning any of the intermediate discussion." ) """str: 使用内部团队消息生成响应时的默认响应提示。它承担'system'角色。""" DEFAULT_DESCRIPTION = "An agent that uses an inner team of agents to generate responses." """str: SocietyOfMindAgent的默认描述。""" def __init__( self, name: str, team: Team, model_client: ChatCompletionClient, *, description: str = DEFAULT_DESCRIPTION, instruction: str = DEFAULT_INSTRUCTION, response_prompt: str = DEFAULT_RESPONSE_PROMPT, model_context: ChatCompletionContext | None = None, ) -> None: super().__init__(name=name, description=description) self._team = team self._model_client = model_client self._instruction = instruction self._response_prompt = response_prompt if model_context is not None: self._model_context = model_context else: self._model_context = UnboundedChatCompletionContext() @property def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: return (TextMessage,) @property def model_context(self) -> ChatCompletionContext: """ 代理正在使用的模型上下文。 """ return self._model_context
[文档] async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response: # Call the stream method and collect the messages. response: Response | None = None async for msg in self.on_messages_stream(messages, cancellation_token): if isinstance(msg, Response): response = msg assert response is not None return response
[文档] async def on_messages_stream( self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: # Prepare the task for the team of agents. task_messages = list(messages) # Run the team of agents. result: TaskResult | None = None inner_messages: List[BaseAgentEvent | BaseChatMessage] = [] model_context = self._model_context count = 0 prev_content = await model_context.get_messages() if len(prev_content) > 0: prev_message = HandoffMessage( content="relevant previous messages", source=self.name, target="", context=prev_content, ) task_messages = [prev_message] + task_messages if len(task_messages) == 0: task = None else: task = task_messages async for inner_msg in self._team.run_stream(task=task, cancellation_token=cancellation_token): if isinstance(inner_msg, TaskResult): result = inner_msg else: count += 1 if count <= len(task_messages): # Skip the task messages. continue yield inner_msg if isinstance(inner_msg, ModelClientStreamingChunkEvent): # Skip the model client streaming chunk events. continue inner_messages.append(inner_msg) assert result is not None if len(inner_messages) == 0: yield Response( chat_message=TextMessage(source=self.name, content="No response."), inner_messages=[], # Response's inner_messages should be empty. Cause that mean is response to outer world. ) else: llm_messages: List[LLMMessage] = [] if self._model_client.model_info.get("multiple_system_messages", False): # The model client supports multiple system messages, so we llm_messages.append(SystemMessage(content=self._instruction)) else: # The model client does not support multiple system messages, so we llm_messages.append(UserMessage(content=self._instruction, source="user")) # Generate a response using the model client. for message in inner_messages: if isinstance(message, BaseChatMessage): llm_messages.append(message.to_model_message()) if self._model_client.model_info.get("multiple_system_messages", False): # The model client supports multiple system messages, so we llm_messages.append(SystemMessage(content=self._response_prompt)) else: # The model client does not support multiple system messages, so we llm_messages.append(UserMessage(content=self._response_prompt, source="user")) completion = await self._model_client.create(messages=llm_messages, cancellation_token=cancellation_token) assert isinstance(completion.content, str) yield Response( chat_message=TextMessage(source=self.name, content=completion.content, models_usage=completion.usage), inner_messages=[], # Response's inner_messages should be empty. Cause that mean is response to outer world. ) # Add new user/handoff messages to the model context await self._add_messages_to_context( model_context=model_context, messages=messages, ) # Reset the team. await self._team.reset()
@staticmethod async def _add_messages_to_context( model_context: ChatCompletionContext, messages: Sequence[BaseChatMessage], ) -> None: """ 将传入消息添加到模型上下文中。 """ for msg in messages: if isinstance(msg, HandoffMessage): for llm_msg in msg.context: await model_context.add_message(llm_msg) await model_context.add_message(msg.to_model_message())
[文档] async def on_reset(self, cancellation_token: CancellationToken) -> None: await self._team.reset() await self._model_context.clear()
[文档] async def save_state(self) -> Mapping[str, Any]: team_state = await self._team.save_state() state = SocietyOfMindAgentState(inner_team_state=team_state) return state.model_dump()
[文档] async def load_state(self, state: Mapping[str, Any]) -> None: society_of_mind_state = SocietyOfMindAgentState.model_validate(state) await self._team.load_state(society_of_mind_state.inner_team_state)
[文档] def _to_config(self) -> SocietyOfMindAgentConfig: return SocietyOfMindAgentConfig( name=self.name, team=self._team.dump_component(), model_client=self._model_client.dump_component(), description=self.description, instruction=self._instruction, response_prompt=self._response_prompt, model_context=self._model_context.dump_component(), )
[文档] @classmethod def _from_config(cls, config: SocietyOfMindAgentConfig) -> Self: model_client = ChatCompletionClient.load_component(config.model_client) team = Team.load_component(config.team) return cls( name=config.name, team=team, model_client=model_client, description=config.description or cls.DEFAULT_DESCRIPTION, instruction=config.instruction or cls.DEFAULT_INSTRUCTION, response_prompt=config.response_prompt or cls.DEFAULT_RESPONSE_PROMPT, model_context=ChatCompletionContext.load_component(config.model_context) if config.model_context else None, )