自定义代理#
您可能需要一些行为不符合预设模式的代理。 在这种情况下,您可以构建自定义代理。
AgentChat中的所有代理都继承自BaseChatAgent
类,并实现以下抽象方法和属性:
on_messages()
:定义代理响应消息行为的抽象方法。当调用run()
要求代理提供响应时,会调用此方法。它返回一个Response
对象。on_reset()
:将代理重置到初始状态的抽象方法。当要求代理重置自身时会调用此方法。produced_message_types
:代理在响应中可能产生的BaseChatMessage
消息类型列表。
可选地,您可以实现on_messages_stream()
方法来流式传输代理生成的消息。
此方法由run_stream()
调用以流式传输消息。
如果未实现此方法,代理
将使用on_messages_stream()
的默认实现,
该实现会调用on_messages()
方法并
生成响应中的所有消息。
倒计时代理#
在这个示例中,我们创建一个简单的代理,它从给定数字倒数到零, 并生成包含当前计数的消息流。
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_core import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# 调用on_messages_stream方法。
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# 响应在流结束时返回。它包含最终消息和所有内部消息
# 。
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countdown_agent() -> None:
# 创建一个倒计时代理。
countdown_agent = CountDownAgent("countdown")
# 运行代理执行给定任务并流式传输响应。
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message)
else:
print(message)
# 在脚本中运行时使用 asyncio.run(run_countdown_agent())。
await run_countdown_agent()
3...
2...
1...
Done!
算术代理#
在这个示例中,我们创建一个代理类,可以对给定整数执行简单算术运算。然后,我们将使用这个代理类的不同实例
在 SelectorGroupChat
中
通过应用一系列算术运算将给定整数转换为另一个整数。
ArithmeticAgent
类接收一个 operator_func
,该函数接收一个整数并返回一个整数,
在对该整数应用算术运算之后。
在其 on_messages
方法中,它将 operator_func
应用于输入消息中的整数,
并返回包含结果响应。
from typing import Callable, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import BaseChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
class ArithmeticAgent(BaseChatAgent):
def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[BaseChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# 更新消息历史记录。注意:消息可能为空列表,
# 这意味着代理之前已被选中。
self._message_history.extend(messages)
# 解析最后一条消息中的数字。
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
# 对数字应用运算符函数。
result = self._operator_func(number)
# 用结果创建新消息。
response_message = TextMessage(content=str(result), source=self.name)
# 更新消息历史记录。
self._message_history.append(response_message)
# 返回响应。
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
备注
on_messages
方法可能会被传入空消息列表调用,这种情况意味着代理之前已被调用过,现在再次被调用时,
调用者没有发送任何新消息。因此保持代理接收到的先前消息历史记录非常重要,
并使用该历史记录来生成响应。
现在我们可以创建一个包含5个 ArithmeticAgent
实例的 SelectorGroupChat
:
一个将输入整数加1的代理,
一个将输入整数减1的代理,
一个将输入整数乘以2的代理,
一个将输入整数除以2并向下取整的代理,
一个保持输入整数不变的代理。
然后我们用这些代理创建一个 SelectorGroupChat
,
并设置适当的选择器配置:
允许同一个代理连续被选中以支持重复操作,
自定义选择器提示词来使模型响应更适合特定任务。
async def run_number_agents() -> None:
# 创建用于数字运算的代理。
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)
# 终止条件是发送10条消息后停止。
termination_condition = MaxMessageTermination(10)
# 创建一个选择器群聊。
selector_group_chat = SelectorGroupChat(
[add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=termination_condition,
allow_repeated_speaker=True, # 允许同一个代理多次发言,这对本任务很有必要。
selector_prompt=(
"Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
"Current conversation history:\n{history}\n"
"Please select the most appropriate role for the next message, and only return the role name."
),
)
# 运行带有给定任务的selector群聊并流式传输响应。
task: List[BaseChatMessage] = [
TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
TextMessage(content="10", source="user"),
]
stream = selector_group_chat.run_stream(task=task)
await Console(stream)
# 在脚本中运行时使用asyncio.run(run_number_agents())。
await run_number_agents()
---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- multiply_agent ----------
20
---------- add_agent ----------
21
---------- multiply_agent ----------
42
---------- divide_agent ----------
21
---------- add_agent ----------
22
---------- add_agent ----------
23
---------- add_agent ----------
24
---------- add_agent ----------
25
---------- Summary ----------
Number of messages: 10
Finish reason: Maximum number of messages 10 reached, current message count: 10
Total prompt tokens: 0
Total completion tokens: 0
Duration: 2.40 seconds
从输出中可以看到,代理们通过依次选择应用算术运算的合适代理,成功地将输入整数从10转换为了25。
在自定义代理中使用自定义模型客户端#
AgentChat中AssistantAgent
预设的一个关键特性是它接受model_client
参数,并可以在响应消息时使用它。然而,在某些情况下,您可能希望您的代理使用当前不支持的自定义模型客户端(参见支持的模型客户端)或自定义模型行为。
您可以通过实现您的自定义模型客户端的自定义代理来实现这一点。
在下面的示例中,我们将演示一个直接使用Google Gemini SDK来响应消息的自定义代理。
注意: 您需要安装Google Gemini SDK才能运行此示例。可以使用以下命令安装:
pip install google-genai
# !pip install google-genai
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types
class GeminiAssistantAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
# 将消息添加到模型上下文
for msg in messages:
await self._model_context.add_message(msg.to_model_message())
# 获取对话历史记录
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# 使用Gemini生成响应
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# 创建使用元数据
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# 将响应添加到模型上下文
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# 生成最终响应
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
await Console(gemini_assistant.run_stream(task="What is the capital of New York?"))
---------- user ----------
What is the capital of New York?
---------- gemini_assistant ----------
Albany
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the capital of New York?', type='TextMessage'), TextMessage(source='gemini_assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=5), content='Albany\nTERMINATE\n', type='TextMessage')], stop_reason=None)
在上面的示例中,我们选择提供 model
、api_key
和 system_message
作为参数 - 您可以选择提供模型客户端所需的任何其他参数,或符合您的应用程序设计。
现在,让我们探索如何将这个自定义智能体作为团队的一部分在 AgentChat 中使用。
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
# 创建主智能体
primary_agent = AssistantAgent(
"primary",
model_client=model_client,
system_message="You are a helpful AI assistant.",
)
# 基于我们新的 GeminiAssistantAgent 创建评论智能体
gemini_critic_agent = GeminiAssistantAgent(
"gemini_critic",
system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)
# 定义一个终止条件,当评论者批准或达到 10 条消息后停止任务
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)
# 创建一个包含主要和评论代理的团队。
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)
await Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
await model_client.close()
---------- user ----------
Write a Haiku poem with 4 lines about the fall season.
---------- primary ----------
Crimson leaves cascade,
Whispering winds sing of change,
Chill wraps the fading,
Nature's quilt, rich and warm.
---------- gemini_critic ----------
The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.
---------- primary ----------
Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:
Crimson leaves drift down,
Chill winds whisper through the gold,
Autumn’s breath is near.
---------- gemini_critic ----------
The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Write a Haiku poem with 4 lines about the fall season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=33, completion_tokens=31), content="Crimson leaves cascade, \nWhispering winds sing of change, \nChill wraps the fading, \nNature's quilt, rich and warm.", type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=86, completion_tokens=60), content="The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.\n", type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=141, completion_tokens=49), content='Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:\n\nCrimson leaves drift down, \nChill winds whisper through the gold, \nAutumn’s breath is near.', type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=211, completion_tokens=32), content='The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE\n', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")
在上面的部分中,我们展示了几个非常重要的概念:
我们开发了一个自定义代理,使用Google Gemini SDK来响应消息。
我们展示这个自定义代理可以作为更广泛的AgentChat生态系统的一部分使用 - 在本例中作为
RoundRobinGroupChat
的参与者,只要它继承自BaseChatAgent
。
使自定义代理具有声明性#
Autogen提供了一个组件接口,用于将组件配置序列化为声明性格式。这对于保存和加载配置,以及与他人共享配置非常有用。
我们通过继承Component
类并实现_from_config
和_to_config
方法来实现这一点。
声明性类可以使用dump_component
方法序列化为JSON格式,并使用load_component
方法从JSON格式反序列化。
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken, Component
from pydantic import BaseModel
from typing_extensions import Self
class GeminiAssistantAgentConfig(BaseModel):
name: str
description: str = "An agent that provides assistance with ability to use tools."
model: str = "gemini-1.5-flash-002"
system_message: str | None = None
class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]): # type: ignore[no-redef]
component_config_schema = GeminiAssistantAgentConfig
# component_provider_override = "mypackage.agents.GeminiAssistantAgent"
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
# 将消息添加到模型上下文
for msg in messages:
await self._model_context.add_message(msg.to_model_message())
# 获取对话历史记录
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# 使用Gemini生成响应
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# 创建使用元数据
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# 将响应添加到模型上下文
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# 生成最终响应
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
@classmethod
def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
return cls(
name=config.name, description=config.description, model=config.model, system_message=config.system_message
)
def _to_config(self) -> GeminiAssistantAgentConfig:
return GeminiAssistantAgentConfig(
name=self.name,
description=self.description,
model=self._model,
system_message=self._system_message,
)
现在我们已经实现了所需的方法,可以加载自定义代理并将其转储为JSON格式,然后从JSON格式重新加载代理。
注意:您需要将
component_provider_override
类变量设置为包含自定义代理类的模块完整路径(例如mypackage.agents.GeminiAssistantAgent
)。这将被load_component
方法用来确定如何实例化该类。
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
config = gemini_assistant.dump_component()
print(config.model_dump_json(indent=2))
loaded_agent = GeminiAssistantAgent.load_component(config)
print(loaded_agent)
{
"provider": "__main__.GeminiAssistantAgent",
"component_type": "agent",
"version": 1,
"component_version": 1,
"description": null,
"label": "GeminiAssistantAgent",
"config": {
"name": "gemini_assistant",
"description": "An agent that provides assistance with ability to use tools.",
"model": "gemini-1.5-flash-002",
"system_message": "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed."
}
}
<__main__.GeminiAssistantAgent object at 0x11a5c5a90>
后续步骤#
到目前为止,我们已经了解了如何创建自定义代理、向代理添加自定义模型客户端以及使自定义代理具有声明性。这个基础示例可以通过以下几种方式进行扩展:
扩展Gemini模型客户端以处理类似于
AssistantAgent
类的函数调用功能。参考https://ai.google.dev/gemini-api/docs/function-calling实现一个包含自定义代理的包,并尝试在AutoGen Studio等工具中使用其声明式格式进行实验。