记忆与RAG#

存在多种应用场景,维护一个_存储_有用事实的知识库具有重要价值,这些信息可以在特定步骤前智能地添加到智能体的上下文中。典型应用场景是RAG模式:通过查询从数据库中检索相关信息,然后将其添加到智能体的上下文中。

AgentChat提供了可扩展的Memory协议来实现此功能。关键方法包括queryupdate_contextaddclearclose

  • add: 向记忆存储添加新条目

  • query: 从记忆存储检索相关信息

  • update_context: 通过添加检索到的信息来修改智能体内部的model_context(用于AssistantAgent类)

  • clear: 清除记忆存储中的所有条目

  • close: 清理记忆存储使用的所有资源

ListMemory示例#

{py:class}~autogen_core.memory.ListMemory作为{py:class}~autogen_core.memory.Memory协议的示例实现提供。这是一个基于列表的简单记忆实现,按时间顺序维护记忆条目,将最新记忆追加到模型的上下文中。该实现设计简单且可预测,便于理解和调试。 以下示例中,我们将使用ListMemory维护用户偏好的记忆库,并演示如何利用它为智能体响应提供持续的上下文支持。

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 初始化用户记忆
user_memory = ListMemory()

# 将用户偏好添加到记忆库
await user_memory.add(MemoryContent(content="The weather should be in metric units", mime_type=MemoryMimeType.TEXT))

await user_memory.add(MemoryContent(content="Meal recipe must be vegan", mime_type=MemoryMimeType.TEXT))


async def get_weather(city: str, units: str = "imperial") -> str:
    if units == "imperial":
        return f"The weather in {city} is 73 °F and Sunny."
    elif units == "metric":
        return f"The weather in {city} is 23 °C and Sunny."
    else:
        return f"Sorry, I don't know the weather in {city}."


assistant_agent = AssistantAgent(
    name="assistant_agent",
    model_client=OpenAIChatCompletionClient(
        model="gpt-4o-2024-08-06",
    ),
    tools=[get_weather],
    memory=[user_memory],
)
# 运行带任务的智能体
stream = assistant_agent.run_stream(task="What is the weather in New York?")
await Console(stream)

我们可以检查assistant_agent的model_context确实已更新为检索到的记忆条目。transform方法用于将检索到的记忆条目格式化为智能体可用的字符串。本例中,我们简单地将每个记忆条目的内容连接成单个字符串。

await assistant_agent._model_context.get_messages()

从上面我们可以看到,天气信息按照用户偏好以摄氏度返回。

类似地,假设我们询问另一个关于生成膳食计划的问题,代理能够从记忆存储中检索相关信息并提供个性化(素食)响应。

stream = assistant_agent.run_stream(task="Write brief meal recipe with broth")
await Console(stream)

自定义记忆存储(向量数据库等)#

你可以基于Memory协议实现更复杂的记忆存储。例如,可以实现一个使用向量数据库存储和检索信息的自定义记忆存储,或者实现一个使用机器学习模型根据用户偏好生成个性化响应的记忆存储等。

具体来说,你需要重载addqueryupdate_context方法来实现所需功能,并将记忆存储传递给你的代理。

目前以下示例记忆存储作为autogen_ext扩展包的一部分可用。

  • autogen_ext.memory.chromadb.ChromaDBVectorMemory: 一个使用向量数据库存储和检索信息的记忆存储。

import os
from pathlib import Path

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_core.memory import MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 使用自定义配置初始化ChromaDB记忆存储
chroma_user_memory = ChromaDBVectorMemory(
    config=PersistentChromaDBVectorMemoryConfig(
        collection_name="preferences",
        persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen"),
        k=2,  # 返回前k个结果
        score_threshold=0.4,  # 最小相似度分数
    )
)
# HttpChromaDBVectorMemoryConfig 也支持连接到远程 ChromaDB 服务器

# 将用户偏好添加到内存
await chroma_user_memory.add(
    MemoryContent(
        content="The weather should be in metric units",
        mime_type=MemoryMimeType.TEXT,
        metadata={"category": "preferences", "type": "units"},
    )
)

await chroma_user_memory.add(
    MemoryContent(
        content="Meal recipe must be vegan",
        mime_type=MemoryMimeType.TEXT,
        metadata={"category": "preferences", "type": "dietary"},
    )
)

model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
)

# 创建带有 ChromaDB 内存的助手代理
assistant_agent = AssistantAgent(
    name="assistant_agent",
    model_client=model_client,
    tools=[get_weather],
    memory=[chroma_user_memory],
)

stream = assistant_agent.run_stream(task="What is the weather in New York?")
await Console(stream)

await model_client.close()
await chroma_user_memory.close()

注意你也可以序列化 ChromaDBVectorMemory 并将其保存到磁盘

chroma_user_memory.dump_component().model_dump_json()

RAG 代理:整体实现#

RAG(检索增强生成)模式在构建 AI 系统时很常见,包含两个不同阶段:

  1. 索引:加载文档、分块处理并存储到向量数据库中

  2. 检索:在对话运行时查找并使用相关文档块

在之前的示例中,我们手动将项目添加到内存并传递给代理。实际上,索引过程通常是自动化的,基于更大的文档来源,如产品文档、内部文件或知识库。

注意:RAG 系统的质量取决于分块和检索过程的质量(模型、嵌入等)。你可能需要尝试更高级的分块和检索模型以获得最佳效果。

构建简单 RAG 代理#

首先,让我们创建一个简单的文档索引器,用于加载文档、分块处理并将其存储到 ChromaDBVectorMemory 内存存储中。

import re
from typing import List

import aiofiles
import aiohttp
from autogen_core.memory import Memory, MemoryContent, MemoryMimeType


class SimpleDocumentIndexer:
    """Basic document indexer for AutoGen Memory."""

    def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
        self.memory = memory
        self.chunk_size = chunk_size

    async def _fetch_content(self, source: str) -> str:
        """Fetch content from URL or file."""
        if source.startswith(("http://", "https://")):
            async with aiohttp.ClientSession() as session:
                async with session.get(source) as response:
                    return await response.text()
        else:
            async with aiofiles.open(source, "r", encoding="utf-8") as f:
                return await f.read()

    def _strip_html(self, text: str) -> str:
        """Remove HTML tags and normalize whitespace."""
        text = re.sub(r"<[^>]*>", " ", text)
        text = re.sub(r"\s+", " ", text)
        return text.strip()

    def _split_text(self, text: str) -> List[str]:
        """Split text into fixed-size chunks."""
        chunks: list[str] = []
        # 将文本分割成固定大小的块
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i : i + self.chunk_size]
            chunks.append(chunk.strip())
        return chunks

    async def index_documents(self, sources: List[str]) -> int:
        """Index documents into memory."""
        total_chunks = 0

        for source in sources:
            try:
                content = await self._fetch_content(source)

                # 如果内容看起来是HTML则去除HTML标签
                if "<" in content and ">" in content:
                    content = self._strip_html(content)

                chunks = self._split_text(content)

                for i, chunk in enumerate(chunks):
                    await self.memory.add(
                        MemoryContent(
                            content=chunk, mime_type=MemoryMimeType.TEXT, metadata={"source": source, "chunk_index": i}
                        )
                    )

                total_chunks += len(chunks)

            except Exception as e:
                print(f"Error indexing {source}: {str(e)}")

        return total_chunks

现在让我们使用我们的索引器与ChromaDBVectorMemory构建一个完整的RAG代理:

import os
from pathlib import Path

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.memory.chromadb import ChromaDBVectorMemory, PersistentChromaDBVectorMemoryConfig
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 初始化向量存储器

rag_memory = ChromaDBVectorMemory(
    config=PersistentChromaDBVectorMemoryConfig(
        collection_name="autogen_docs",
        persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen"),
        k=3,  # 返回前3个结果
        score_threshold=0.4,  # 最小相似度分数
    )
)

await rag_memory.clear()  # 清除现有内存


# 为AutoGen文档建立索引
async def index_autogen_docs() -> None:
    indexer = SimpleDocumentIndexer(memory=rag_memory)
    sources = [
        "https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/teams.html",
        "https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/termination.html",
    ]
    chunks: int = await indexer.index_documents(sources)
    print(f"Indexed {chunks} chunks from {len(sources)} AutoGen documents")


await index_autogen_docs()
Indexed 72 chunks from 4 AutoGen documents
# 创建我们的RAG助手代理
rag_assistant = AssistantAgent(
    name="rag_assistant", model_client=OpenAIChatCompletionClient(model="gpt-4o"), memory=[rag_memory]
)

# 询问关于AutoGen的问题
stream = rag_assistant.run_stream(task="What is AgentChat?")
await Console(stream)

# 使用完毕后记得关闭内存
await rag_memory.close()
---------- user ----------
What is AgentChat?
Query results: results=[MemoryContent(content='ng OpenAI\'s GPT-4o model. See [other supported models](https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/models.html). ```python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client) print(await agent.run(task="Say \'Hello World!\'")) await model_client.close() asyncio.run(main()) ``` ### Web Browsing Agent Team Create a group chat team with a web surfer agent and a user proxy agent for web browsing tasks. You need to install [playwright](https://playwright.dev/python/docs/library). ```python # pip install -U autogen-agentchat autogen-ext[openai,web-surfer] # playwright install import asyncio from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.agents.web_surfer import MultimodalWebSurfer async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # The web surfer will open a Chromium browser window to perform web browsing tasks. web_surfer = MultimodalWebSurfer("web_surfer", model_client, headless=False, animate_actions=True) # The user proxy agent is used to ge', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://raw.githubusercontent.com/microsoft/autogen/main/README.md', 'score': 0.48810458183288574, 'id': '16088e03-0153-4da3-9dec-643b39c549f5'}), MemoryContent(content='els_usage=None content=&#39;AutoGen is a programming framework for building multi-agent applications.&#39; type=&#39;ToolCallSummaryMessage&#39; The call to the on_messages() method returns a Response that contains the agent’s final response in the chat_message attribute, as well as a list of inner messages in the inner_messages attribute, which stores the agent’s “thought process” that led to the final response. Note It is important to note that on_messages() will update the internal state of the agent – it will add the messages to the agent’s history. So you should call this method with new messages. You should not repeatedly call this method with the same messages or the complete history. Note Unlike in v0.2 AgentChat, the tools are executed by the same agent directly within the same call to on_messages() . By default, the agent will return the result of the tool call as the final response. You can also call the run() method, which is a convenience method that calls on_messages() . It follows the same interface as Teams and returns a TaskResult object. Multi-Modal Input # The AssistantAgent can handle multi-modal input by providing the input as a MultiModalMessage . from io import BytesIO import PIL import requests from autogen_agentchat.messages import MultiModalMessage from autogen_core import Image # Create a multi-modal message with random image and text. pil_image = PIL . Image . open ( BytesIO ( requests . get ( &quot;https://picsum.photos/300/200&quot; ) . content )', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 3, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html', 'score': 0.4665141701698303, 'id': '3d603b62-7cab-4f74-b671-586fe36306f2'}), MemoryContent(content='AgentChat Termination Termination # In the previous section, we explored how to define agents, and organize them into teams that can solve tasks. However, a run can go on forever, and in many cases, we need to know when to stop them. This is the role of the termination condition. AgentChat supports several termination condition by providing a base TerminationCondition class and several implementations that inherit from it. A termination condition is a callable that takes a sequence of BaseAgentEvent or BaseChatMessage objects since the last time the condition was called , and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset by calling reset() before it can be used again. Some important things to note about termination conditions: They are stateful but reset automatically after each run ( run() or run_stream() ) is finished. They can be combined using the AND and OR operators. Note For group chat teams (i.e., RoundRobinGroupChat , SelectorGroupChat , and Swarm ), the termination condition is called after each agent responds. While a response may contain multiple inner messages, the team calls its termination condition just once for all the messages from a single response. So the condition is called with the “delta sequence” of messages since the last time it was called. Built-In Termination Conditions: MaxMessageTermination : Stops after a specified number of messages have been produced,', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/termination.html', 'score': 0.461774212772051, 'id': '699ef490-d108-4cd3-b629-c1198d6b78ba'})]
---------- rag_assistant ----------
[MemoryContent(content='ng OpenAI\'s GPT-4o model. See [other supported models](https://microsoft.github.io/autogen/stable/user-guide/agentchat-user-guide/tutorial/models.html). ```python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client) print(await agent.run(task="Say \'Hello World!\'")) await model_client.close() asyncio.run(main()) ``` ### Web Browsing Agent Team Create a group chat team with a web surfer agent and a user proxy agent for web browsing tasks. You need to install [playwright](https://playwright.dev/python/docs/library). ```python # pip install -U autogen-agentchat autogen-ext[openai,web-surfer] # playwright install import asyncio from autogen_agentchat.agents import UserProxyAgent from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.agents.web_surfer import MultimodalWebSurfer async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # The web surfer will open a Chromium browser window to perform web browsing tasks. web_surfer = MultimodalWebSurfer("web_surfer", model_client, headless=False, animate_actions=True) # The user proxy agent is used to ge', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://raw.githubusercontent.com/microsoft/autogen/main/README.md', 'score': 0.48810458183288574, 'id': '16088e03-0153-4da3-9dec-643b39c549f5'}), MemoryContent(content='els_usage=None content=&#39;AutoGen is a programming framework for building multi-agent applications.&#39; type=&#39;ToolCallSummaryMessage&#39; The call to the on_messages() method returns a Response that contains the agent’s final response in the chat_message attribute, as well as a list of inner messages in the inner_messages attribute, which stores the agent’s “thought process” that led to the final response. Note It is important to note that on_messages() will update the internal state of the agent – it will add the messages to the agent’s history. So you should call this method with new messages. You should not repeatedly call this method with the same messages or the complete history. Note Unlike in v0.2 AgentChat, the tools are executed by the same agent directly within the same call to on_messages() . By default, the agent will return the result of the tool call as the final response. You can also call the run() method, which is a convenience method that calls on_messages() . It follows the same interface as Teams and returns a TaskResult object. Multi-Modal Input # The AssistantAgent can handle multi-modal input by providing the input as a MultiModalMessage . from io import BytesIO import PIL import requests from autogen_agentchat.messages import MultiModalMessage from autogen_core import Image # Create a multi-modal message with random image and text. pil_image = PIL . Image . open ( BytesIO ( requests . get ( &quot;https://picsum.photos/300/200&quot; ) . content )', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 3, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html', 'score': 0.4665141701698303, 'id': '3d603b62-7cab-4f74-b671-586fe36306f2'}), MemoryContent(content='AgentChat Termination Termination # In the previous section, we explored how to define agents, and organize them into teams that can solve tasks. However, a run can go on forever, and in many cases, we need to know when to stop them. This is the role of the termination condition. AgentChat supports several termination condition by providing a base TerminationCondition class and several implementations that inherit from it. A termination condition is a callable that takes a sequenceBaseChatMessageent or BaseChatMessage objects since the last time the condition was called , and returns a StopMessage if the conversation should be terminated, or None otherwise. Once a termination condition has been reached, it must be reset by calling reset() before it can be used again. Some important things to note about termination conditions: They are stateful but reset automatically after each run ( run() or run_stream() ) is finished. They can be combined using the AND and OR operators. Note For group chat teams (i.e., RoundRobinGroupChat , SelectorGroupChat , and Swarm ), the termination condition is called after each agent responds. While a response may contain multiple inner messages, the team calls its termination condition just once for all the messages from a single response. So the condition is called with the “delta sequence” of messages since the last time it was called. Built-In Termination Conditions: MaxMessageTermination : Stops after a specified number of messages have been produced,', mime_type='MemoryMimeType.TEXT', metadata={'chunk_index': 1, 'mime_type': 'MemoryMimeType.TEXT', 'source': 'https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/termination.html', 'score': 0.461774212772051, 'id': '699ef490-d108-4cd3-b629-c1198d6b78ba'})]
---------- rag_assistant ----------
AgentChat is part of the AutoGen framework, a programming environment for building multi-agent applications. In AgentChat, agents can interact with each other and with users to perform various tasks, including web browsing and engaging in dialogue. It utilizes models from OpenAI for chat completions and supports multi-modal input, which means agents can handle inputs that include both text and images. Additionally, AgentChat provides mechanisms to define termination conditions to control when a conversation or task should be concluded, ensuring that the agent interactions are efficient and goal-oriented. TERMINATE

本实现提供了一个基于AutoGen文档的RAG问答代理。当提问时,内存系统会检索相关文本块并添加到上下文中,使助手能够生成基于信息的回答。

对于生产系统,您可能需要:

  1. 实现更复杂的文本分块策略

  2. 添加元数据过滤功能

  3. 自定义检索评分机制

  4. 针对特定领域优化嵌入模型