自定义代理#

您可能需要一些行为不符合预设模式的代理。 在这种情况下,您可以构建自定义代理。

AgentChat中的所有代理都继承自BaseChatAgent类,并实现以下抽象方法和属性:

  • on_messages():定义代理响应消息行为的抽象方法。当调用run()要求代理提供响应时,会调用此方法。它返回一个Response对象。

  • on_reset():将代理重置到初始状态的抽象方法。当要求代理重置自身时会调用此方法。

  • produced_message_types:代理在响应中可能产生的BaseChatMessage消息类型列表。

可选地,您可以实现on_messages_stream()方法来流式传输代理生成的消息。 此方法由run_stream()调用以流式传输消息。 如果未实现此方法,代理 将使用on_messages_stream()的默认实现, 该实现会调用on_messages()方法并 生成响应中的所有消息。

倒计时代理#

在这个示例中,我们创建一个简单的代理,它从给定数字倒数到零, 并生成包含当前计数的消息流。

from typing import AsyncGenerator, List, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_core import CancellationToken


class CountDownAgent(BaseChatAgent):
    def __init__(self, name: str, count: int = 3):
        super().__init__(name, "A simple agent that counts down.")
        self._count = count

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # 调用on_messages_stream方法。
        response: Response | None = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                response = message
        assert response is not None
        return response

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
        for i in range(self._count, 0, -1):
            msg = TextMessage(content=f"{i}...", source=self.name)
            inner_messages.append(msg)
            yield msg
        # 响应在流结束时返回。它包含最终消息和所有内部消息
        # 。
        yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass


async def run_countdown_agent() -> None:
    # 创建一个倒计时代理。
    countdown_agent = CountDownAgent("countdown")

    # 运行代理执行给定任务并流式传输响应。
    async for message in countdown_agent.on_messages_stream([], CancellationToken()):
        if isinstance(message, Response):
            print(message.chat_message)
        else:
            print(message)


# 在脚本中运行时使用 asyncio.run(run_countdown_agent())。
await run_countdown_agent()
3...
2...
1...
Done!

算术代理#

在这个示例中,我们创建一个代理类,可以对给定整数执行简单算术运算。然后,我们将使用这个代理类的不同实例 在 SelectorGroupChat 中 通过应用一系列算术运算将给定整数转换为另一个整数。

ArithmeticAgent 类接收一个 operator_func,该函数接收一个整数并返回一个整数, 在对该整数应用算术运算之后。 在其 on_messages 方法中,它将 operator_func 应用于输入消息中的整数, 并返回包含结果响应。

from typing import Callable, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import BaseChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


class ArithmeticAgent(BaseChatAgent):
    def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
        super().__init__(name, description=description)
        self._operator_func = operator_func
        self._message_history: List[BaseChatMessage] = []

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # 更新消息历史记录。注意:消息可能为空列表,
        # 这意味着代理之前已被选中。
        self._message_history.extend(messages)
        # 解析最后一条消息中的数字。
        assert isinstance(self._message_history[-1], TextMessage)
        number = int(self._message_history[-1].content)
        # 对数字应用运算符函数。
        result = self._operator_func(number)
        # 用结果创建新消息。
        response_message = TextMessage(content=str(result), source=self.name)
        # 更新消息历史记录。
        self._message_history.append(response_message)
        # 返回响应。
        return Response(chat_message=response_message)

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        pass

备注

on_messages 方法可能会被传入空消息列表调用,这种情况意味着代理之前已被调用过,现在再次被调用时, 调用者没有发送任何新消息。因此保持代理接收到的先前消息历史记录非常重要, 并使用该历史记录来生成响应。

现在我们可以创建一个包含5个 ArithmeticAgent 实例的 SelectorGroupChat

  • 一个将输入整数加1的代理,

  • 一个将输入整数减1的代理,

  • 一个将输入整数乘以2的代理,

  • 一个将输入整数除以2并向下取整的代理,

  • 一个保持输入整数不变的代理。

然后我们用这些代理创建一个 SelectorGroupChat, 并设置适当的选择器配置:

  • 允许同一个代理连续被选中以支持重复操作,

  • 自定义选择器提示词来使模型响应更适合特定任务。

async def run_number_agents() -> None:
    # 创建用于数字运算的代理。
    add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
    multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
    subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
    divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
    identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)

    # 终止条件是发送10条消息后停止。
    termination_condition = MaxMessageTermination(10)

    # 创建一个选择器群聊。
    selector_group_chat = SelectorGroupChat(
        [add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
        model_client=OpenAIChatCompletionClient(model="gpt-4o"),
        termination_condition=termination_condition,
        allow_repeated_speaker=True,  # 允许同一个代理多次发言,这对本任务很有必要。
        selector_prompt=(
            "Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
            "Current conversation history:\n{history}\n"
            "Please select the most appropriate role for the next message, and only return the role name."
        ),
    )

    # 运行带有给定任务的selector群聊并流式传输响应。
    task: List[BaseChatMessage] = [
        TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
        TextMessage(content="10", source="user"),
    ]
    stream = selector_group_chat.run_stream(task=task)
    await Console(stream)


# 在脚本中运行时使用asyncio.run(run_number_agents())。
await run_number_agents()
---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- multiply_agent ----------
20
---------- add_agent ----------
21
---------- multiply_agent ----------
42
---------- divide_agent ----------
21
---------- add_agent ----------
22
---------- add_agent ----------
23
---------- add_agent ----------
24
---------- add_agent ----------
25
---------- Summary ----------
Number of messages: 10
Finish reason: Maximum number of messages 10 reached, current message count: 10
Total prompt tokens: 0
Total completion tokens: 0
Duration: 2.40 seconds

从输出中可以看到,代理们通过依次选择应用算术运算的合适代理,成功地将输入整数从10转换为了25。

在自定义代理中使用自定义模型客户端#

AgentChat中AssistantAgent预设的一个关键特性是它接受model_client参数,并可以在响应消息时使用它。然而,在某些情况下,您可能希望您的代理使用当前不支持的自定义模型客户端(参见支持的模型客户端)或自定义模型行为。

您可以通过实现您的自定义模型客户端的自定义代理来实现这一点。

在下面的示例中,我们将演示一个直接使用Google Gemini SDK来响应消息的自定义代理。

注意: 您需要安装Google Gemini SDK才能运行此示例。可以使用以下命令安装:

pip install google-genai
# !pip install google-genai
import os
from typing import AsyncGenerator, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types


class GeminiAssistantAgent(BaseChatAgent):
    def __init__(
        self,
        name: str,
        description: str = "An agent that provides assistance with ability to use tools.",
        model: str = "gemini-1.5-flash-002",
        api_key: str = os.environ["GEMINI_API_KEY"],
        system_message: str
        | None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
    ):
        super().__init__(name=name, description=description)
        self._model_context = UnboundedChatCompletionContext()
        self._model_client = genai.Client(api_key=api_key)
        self._system_message = system_message
        self._model = model

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        final_response = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                final_response = message

        if final_response is None:
            raise AssertionError("The stream should have returned the final result.")

        return final_response

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        # 将消息添加到模型上下文
        for msg in messages:
            await self._model_context.add_message(msg.to_model_message())

        # 获取对话历史记录
        history = [
            (msg.source if hasattr(msg, "source") else "system")
            + ": "
            + (msg.content if isinstance(msg.content, str) else "")
            + "\n"
            for msg in await self._model_context.get_messages()
        ]
        # 使用Gemini生成响应
        response = self._model_client.models.generate_content(
            model=self._model,
            contents=f"History: {history}\nGiven the history, please provide a response",
            config=types.GenerateContentConfig(
                system_instruction=self._system_message,
                temperature=0.3,
            ),
        )

        # 创建使用元数据
        usage = RequestUsage(
            prompt_tokens=response.usage_metadata.prompt_token_count,
            completion_tokens=response.usage_metadata.candidates_token_count,
        )

        # 将响应添加到模型上下文
        await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))

        # 生成最终响应
        yield Response(
            chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
            inner_messages=[],
        )

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """Reset the assistant by clearing the model context."""
        await self._model_context.clear()
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
await Console(gemini_assistant.run_stream(task="What is the capital of New York?"))
---------- user ----------
What is the capital of New York?
---------- gemini_assistant ----------
Albany
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the capital of New York?', type='TextMessage'), TextMessage(source='gemini_assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=5), content='Albany\nTERMINATE\n', type='TextMessage')], stop_reason=None)

在上面的示例中,我们选择提供 modelapi_keysystem_message 作为参数 - 您可以选择提供模型客户端所需的任何其他参数,或符合您的应用程序设计。

现在,让我们探索如何将这个自定义智能体作为团队的一部分在 AgentChat 中使用。

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console

model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

# 创建主智能体
primary_agent = AssistantAgent(
    "primary",
    model_client=model_client,
    system_message="You are a helpful AI assistant.",
)

# 基于我们新的 GeminiAssistantAgent 创建评论智能体
gemini_critic_agent = GeminiAssistantAgent(
    "gemini_critic",
    system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)


# 定义一个终止条件,当评论者批准或达到 10 条消息后停止任务
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)

# 创建一个包含主要和评论代理的团队。
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)

await Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
await model_client.close()
---------- user ----------
Write a Haiku poem with 4 lines about the fall season.
---------- primary ----------
Crimson leaves cascade,  
Whispering winds sing of change,  
Chill wraps the fading,  
Nature's quilt, rich and warm.
---------- gemini_critic ----------
The poem is good, but it has four lines instead of three.  A haiku must have three lines with a 5-7-5 syllable structure.  The content is evocative of autumn, but the form is incorrect.  Please revise to adhere to the haiku's syllable structure.

---------- primary ----------
Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:

Crimson leaves drift down,  
Chill winds whisper through the gold,  
Autumn’s breath is near.
---------- gemini_critic ----------
The revised haiku is much improved.  It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn.  APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Write a Haiku poem with 4 lines about the fall season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=33, completion_tokens=31), content="Crimson leaves cascade,  \nWhispering winds sing of change,  \nChill wraps the fading,  \nNature's quilt, rich and warm.", type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=86, completion_tokens=60), content="The poem is good, but it has four lines instead of three.  A haiku must have three lines with a 5-7-5 syllable structure.  The content is evocative of autumn, but the form is incorrect.  Please revise to adhere to the haiku's syllable structure.\n", type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=141, completion_tokens=49), content='Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:\n\nCrimson leaves drift down,  \nChill winds whisper through the gold,  \nAutumn’s breath is near.', type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=211, completion_tokens=32), content='The revised haiku is much improved.  It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn.  APPROVE\n', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")

在上面的部分中,我们展示了几个非常重要的概念:

  • 我们开发了一个自定义代理,使用Google Gemini SDK来响应消息。

  • 我们展示这个自定义代理可以作为更广泛的AgentChat生态系统的一部分使用 - 在本例中作为RoundRobinGroupChat的参与者,只要它继承自BaseChatAgent

使自定义代理具有声明性#

Autogen提供了一个组件接口,用于将组件配置序列化为声明性格式。这对于保存和加载配置,以及与他人共享配置非常有用。

我们通过继承Component类并实现_from_config_to_config方法来实现这一点。 声明性类可以使用dump_component方法序列化为JSON格式,并使用load_component方法从JSON格式反序列化。

import os
from typing import AsyncGenerator, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken, Component
from pydantic import BaseModel
from typing_extensions import Self


class GeminiAssistantAgentConfig(BaseModel):
    name: str
    description: str = "An agent that provides assistance with ability to use tools."
    model: str = "gemini-1.5-flash-002"
    system_message: str | None = None


class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]):  # type: ignore[no-redef]
    component_config_schema = GeminiAssistantAgentConfig
    # component_provider_override = "mypackage.agents.GeminiAssistantAgent"

    def __init__(
        self,
        name: str,
        description: str = "An agent that provides assistance with ability to use tools.",
        model: str = "gemini-1.5-flash-002",
        api_key: str = os.environ["GEMINI_API_KEY"],
        system_message: str
        | None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
    ):
        super().__init__(name=name, description=description)
        self._model_context = UnboundedChatCompletionContext()
        self._model_client = genai.Client(api_key=api_key)
        self._system_message = system_message
        self._model = model

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        final_response = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                final_response = message

        if final_response is None:
            raise AssertionError("The stream should have returned the final result.")

        return final_response

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        # 将消息添加到模型上下文
        for msg in messages:
            await self._model_context.add_message(msg.to_model_message())

        # 获取对话历史记录
        history = [
            (msg.source if hasattr(msg, "source") else "system")
            + ": "
            + (msg.content if isinstance(msg.content, str) else "")
            + "\n"
            for msg in await self._model_context.get_messages()
        ]

        # 使用Gemini生成响应
        response = self._model_client.models.generate_content(
            model=self._model,
            contents=f"History: {history}\nGiven the history, please provide a response",
            config=types.GenerateContentConfig(
                system_instruction=self._system_message,
                temperature=0.3,
            ),
        )

        # 创建使用元数据
        usage = RequestUsage(
            prompt_tokens=response.usage_metadata.prompt_token_count,
            completion_tokens=response.usage_metadata.candidates_token_count,
        )

        # 将响应添加到模型上下文
        await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))

        # 生成最终响应
        yield Response(
            chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
            inner_messages=[],
        )

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """Reset the assistant by clearing the model context."""
        await self._model_context.clear()

    @classmethod
    def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
        return cls(
            name=config.name, description=config.description, model=config.model, system_message=config.system_message
        )

    def _to_config(self) -> GeminiAssistantAgentConfig:
        return GeminiAssistantAgentConfig(
            name=self.name,
            description=self.description,
            model=self._model,
            system_message=self._system_message,
        )

现在我们已经实现了所需的方法,可以加载自定义代理并将其转储为JSON格式,然后从JSON格式重新加载代理。

注意:您需要将component_provider_override类变量设置为包含自定义代理类的模块完整路径(例如mypackage.agents.GeminiAssistantAgent)。这将被load_component方法用来确定如何实例化该类。

gemini_assistant = GeminiAssistantAgent("gemini_assistant")
config = gemini_assistant.dump_component()
print(config.model_dump_json(indent=2))
loaded_agent = GeminiAssistantAgent.load_component(config)
print(loaded_agent)
{
  "provider": "__main__.GeminiAssistantAgent",
  "component_type": "agent",
  "version": 1,
  "component_version": 1,
  "description": null,
  "label": "GeminiAssistantAgent",
  "config": {
    "name": "gemini_assistant",
    "description": "An agent that provides assistance with ability to use tools.",
    "model": "gemini-1.5-flash-002",
    "system_message": "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed."
  }
}
<__main__.GeminiAssistantAgent object at 0x11a5c5a90>

后续步骤#

到目前为止,我们已经了解了如何创建自定义代理、向代理添加自定义模型客户端以及使自定义代理具有声明性。这个基础示例可以通过以下几种方式进行扩展:

  • 扩展Gemini模型客户端以处理类似于AssistantAgent类的函数调用功能。参考https://ai.google.dev/gemini-api/docs/function-calling

  • 实现一个包含自定义代理的包,并尝试在AutoGen Studio等工具中使用其声明式格式进行实验。