OpenAI 助手代理#

Open AI 助手Azure OpenAI 助手 是用于构建代理的服务器端API。 它们可用于在AutoGen中构建代理。本示例演示如何 使用OpenAI助手创建一个可以运行代码和进行文档问答的代理。

消息协议#

首先,我们需要为基于OpenAI助手的代理指定消息协议。消息协议定义了 代理处理和发布的消息结构。 为便于说明,我们定义了一个简单的 包含4种消息类型的协议:MessageResetUploadForCodeInterpreterUploadForFileSearch

from dataclasses import dataclass


@dataclass
class TextMessage:
    content: str
    source: str


@dataclass
class Reset:
    pass


@dataclass
class UploadForCodeInterpreter:
    file_path: str


@dataclass
class UploadForFileSearch:
    file_path: str
    vector_store_id: str

TextMessage消息类型用于与代理通信。它有一个 包含消息内容的content字段,以及一个表示发送者的source字段。 Reset消息类型是重置代理内存的控制消息。它没有字段。 当我们需要与代理开始新对话时,这非常有用。

UploadForCodeInterpreter消息类型用于上传代码解释器的数据文件, 而UploadForFileSearch消息类型用于上传文件搜索的文档。 两种消息类型都有一个file_path字段,包含要上传文件的本地路径。

定义代理#

接下来,我们定义代理类。 代理类构造函数有以下参数:descriptionclientassistant_idthread_idassistant_event_handler_factoryclient参数是OpenAI异步客户端对象, assistant_event_handler_factory用于创建助手事件处理器 来处理OpenAI助手事件。 这可用于创建来自助手的流式输出。

代理类有以下消息处理器:

  • handle_message:处理TextMessage消息类型,并返回来自助手的响应。

  • handle_reset:处理Reset消息类型,并重置助手代理的内存。

  • handle_upload_for_code_interpreter:处理UploadForCodeInterpreter 消息类型,并将文件上传至代码解释器。

  • handle_upload_for_file_search:处理UploadForFileSearch 消息类型,并将文档上传至文件搜索。

助手的内存存储在一个线程中,该线程保存在服务器端。 线程由thread_id参数引用。

import asyncio
import os
from typing import Any, Callable, List

import aiofiles
from autogen_core import AgentId, MessageContext, RoutedAgent, message_handler
from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.thread import ToolResources, ToolResourcesFileSearch


class OpenAIAssistantAgent(RoutedAgent):
    """An agent implementation that uses the OpenAI Assistant API to generate
    responses.

    Args:
        description (str): The description of the agent.
        client (openai.AsyncClient): The client to use for the OpenAI API.
        assistant_id (str): The assistant ID to use for the OpenAI API.
        thread_id (str): The thread ID to use for the OpenAI API.
        assistant_event_handler_factory (Callable[[], AsyncAssistantEventHandler], optional):
            A factory function to create an async assistant event handler. Defaults to None.
            If provided, the agent will use the streaming mode with the event handler.
            If not provided, the agent will use the blocking mode to generate responses.
    """

    def __init__(
        self,
        description: str,
        client: AsyncClient,
        assistant_id: str,
        thread_id: str,
        assistant_event_handler_factory: Callable[[], AsyncAssistantEventHandler],
    ) -> None:
        super().__init__(description)
        self._client = client
        self._assistant_id = assistant_id
        self._thread_id = thread_id
        self._assistant_event_handler_factory = assistant_event_handler_factory

    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> TextMessage:
        """Handle a message. This method adds the message to the thread and publishes a response."""
        # 将消息保存到线程中。
        await ctx.cancellation_token.link_future(
            asyncio.ensure_future(
                self._client.beta.threads.messages.create(
                    thread_id=self._thread_id,
                    content=message.content,
                    role="user",
                    metadata={"sender": message.source},
                )
            )
        )
        # 生成响应。
        async with self._client.beta.threads.runs.stream(
            thread_id=self._thread_id,
            assistant_id=self._assistant_id,
            event_handler=self._assistant_event_handler_factory(),
        ) as stream:
            await ctx.cancellation_token.link_future(asyncio.ensure_future(stream.until_done()))

        # 获取最后一条消息。
        messages = await ctx.cancellation_token.link_future(
            asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, order="desc", limit=1))
        )
        last_message_content = messages.data[0].content

        # 获取最后一条消息的文本内容
        text_content = [content for content in last_message_content if content.type == "text"]
        if not text_content:
            raise ValueError(f"Expected text content in the last message: {last_message_content}")

        return TextMessage(content=text_content[0].text.value, source=self.metadata["type"])

    @message_handler()
    async def on_reset(self, message: Reset, ctx: MessageContext) -> None:
        """Handle a reset message. This method deletes all messages in the thread."""
        # 获取此线程中的所有消息
        all_msgs: List[str] = []
        while True:
            if not all_msgs:
                msgs = await ctx.cancellation_token.link_future(
                    asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id))
                )
            else:
                msgs = await ctx.cancellation_token.link_future(
                    asyncio.ensure_future(self._client.beta.threads.messages.list(self._thread_id, after=all_msgs[-1]))
                )
            for msg in msgs.data:
                all_msgs.append(msg.id)
            if not msgs.has_next_page():
                break
        # 删除所有消息
        for msg_id in all_msgs:
            status = await ctx.cancellation_token.link_future(
                asyncio.ensure_future(
                    self._client.beta.threads.messages.delete(message_id=msg_id, thread_id=self._thread_id)
                )
            )
            assert status.deleted is True

    @message_handler()
    async def on_upload_for_code_interpreter(self, message: UploadForCodeInterpreter, ctx: MessageContext) -> None:
        """Handle an upload for code interpreter. This method uploads a file and updates the thread with the file."""
        # 获取文件内容
        async with aiofiles.open(message.file_path, mode="rb") as f:
            file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(f.read()))
        file_name = os.path.basename(message.file_path)
        # 上传文件
        file = await ctx.cancellation_token.link_future(
            asyncio.ensure_future(self._client.files.create(file=(file_name, file_content), purpose="assistants"))
        )
        # 从工具资源中获取现有文件ID。
        thread = await ctx.cancellation_token.link_future(
            asyncio.ensure_future(self._client.beta.threads.retrieve(thread_id=self._thread_id))
        )
        tool_resources: ToolResources = thread.tool_resources if thread.tool_resources else ToolResources()
        assert tool_resources.code_interpreter is not None
        if tool_resources.code_interpreter.file_ids:
            file_ids = tool_resources.code_interpreter.file_ids
        else:
            file_ids = [file.id]
        # 使用新文件更新线程。
        await ctx.cancellation_token.link_future(
            asyncio.ensure_future(
                self._client.beta.threads.update(
                    thread_id=self._thread_id,
                    tool_resources={
                        "code_interpreter": {"file_ids": file_ids},
                    },
                )
            )
        )

    @message_handler()
    async def on_upload_for_file_search(self, message: UploadForFileSearch, ctx: MessageContext) -> None:
        """Handle an upload for file search. This method uploads a file and updates the vector store."""
        # 获取文件内容。
        async with aiofiles.open(message.file_path, mode="rb") as file:
            file_content = await ctx.cancellation_token.link_future(asyncio.ensure_future(file.read()))
        file_name = os.path.basename(message.file_path)
        # 上传文件。
        await ctx.cancellation_token.link_future(
            asyncio.ensure_future(
                self._client.vector_stores.file_batches.upload_and_poll(
                    vector_store_id=message.vector_store_id,
                    files=[(file_name, file_content)],
                )
            )
        )

Agent类是对OpenAI Assistant API的轻量级封装,用于实现 消息协议。更多功能,如多模态消息处理, 可以通过扩展消息协议来添加。

助手事件处理器#

助手事件处理器提供了处理 Assistant API 特定事件的回调函数。这对于处理助手的流式输出和进一步的用户界面集成非常有用。

from openai import AsyncAssistantEventHandler, AsyncClient
from openai.types.beta.threads import Message, Text, TextDelta
from openai.types.beta.threads.runs import RunStep, RunStepDelta
from typing_extensions import override


class EventHandler(AsyncAssistantEventHandler):
    @override
    async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
        print(delta.value, end="", flush=True)

    @override
    async def on_run_step_created(self, run_step: RunStep) -> None:
        details = run_step.step_details
        if details.type == "tool_calls":
            for tool in details.tool_calls:
                if tool.type == "code_interpreter":
                    print("\nGenerating code to interpret:\n\n```python")

    @override
    async def on_run_step_done(self, run_step: RunStep) -> None:
        details = run_step.step_details
        if details.type == "tool_calls":
            for tool in details.tool_calls:
                if tool.type == "code_interpreter":
                    print("\n```\nExecuting code...")

    @override
    async def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep) -> None:
        details = delta.step_details
        if details is not None and details.type == "tool_calls":
            for tool in details.tool_calls or []:
                if tool.type == "code_interpreter" and tool.code_interpreter and tool.code_interpreter.input:
                    print(tool.code_interpreter.input, end="", flush=True)

    @override
    async def on_message_created(self, message: Message) -> None:
        print(f"{'-'*80}\nAssistant:\n")

    @override
    async def on_message_done(self, message: Message) -> None:
        # 打印所搜索文件的引用信息
        if not message.content:
            return
        content = message.content[0]
        if not content.type == "text":
            return
        text_content = content.text
        annotations = text_content.annotations
        citations: List[str] = []
        for index, annotation in enumerate(annotations):
            text_content.value = text_content.value.replace(annotation.text, f"[{index}]")
            if file_citation := getattr(annotation, "file_citation", None):
                client = AsyncClient()
                cited_file = await client.files.retrieve(file_citation.file_id)
                citations.append(f"[{index}] {cited_file.filename}")
        if citations:
            print("\n".join(citations))

使用代理#

首先我们需要使用 openai 客户端创建实际的助手、线程和向量存储。我们的 AutoGen 代理将使用这些资源。

import openai

# 创建一个带有代码解释器和文件搜索工具的助手。
oai_assistant = openai.beta.assistants.create(
    model="gpt-4o-mini",
    description="An AI assistant that helps with everyday tasks.",
    instructions="Help the user with their task.",
    tools=[{"type": "code_interpreter"}, {"type": "file_search"}],
)

# 创建一个用于文件搜索的向量存储。
vector_store = openai.vector_stores.create()

# 创建一个线程作为助手的记忆存储
thread = openai.beta.threads.create(
    tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}},
)

然后,我们创建一个运行时环境,并为该智能体注册一个代理工厂函数

from autogen_core import SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()
await OpenAIAssistantAgent.register(
    runtime,
    "assistant",
    lambda: OpenAIAssistantAgent(
        description="OpenAI Assistant Agent",
        client=openai.AsyncClient(),
        assistant_id=oai_assistant.id,
        thread_id=thread.id,
        assistant_event_handler_factory=lambda: EventHandler(),
    ),
)
agent = AgentId("assistant", "default")

开启日志记录功能,观察底层运行情况

import logging

logging.basicConfig(level=logging.WARNING)
logging.getLogger("autogen_core").setLevel(logging.DEBUG)

向智能体发送问候消息,查看流式返回的响应

runtime.start()
await runtime.send_message(TextMessage(content="Hello, how are you today!", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Hello, how are you today!', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:

Hello! I'm here and ready to assist you. How can I help you today?
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "Hello! I'm here and ready to assist you. How can I help you today?", 'source': 'assistant'}

带代码解释器的助手#

向智能体提出数学问题,观察它如何使用代码解释器来回答问题

runtime.start()
await runtime.send_message(TextMessage(content="What is 1332322 x 123212?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What is 1332322 x 123212?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Calculating the product of 1332322 and 123212
result = 1332322 * 123212
result
```
Executing code...
--------------------------------------------------------------------------------
Assistant:

The product of 1,332,322 and 123,212 is 164,158,058,264.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The product of 1,332,322 and 123,212 is 164,158,058,264.', 'source': 'assistant'}

让我们从西雅图开放数据门户获取一些数据。我们将使用 西雅图城市工资数据。 首先下载它。

import requests

response = requests.get("https://data.seattle.gov/resource/2khk-5ukd.csv")
with open("seattle_city_wages.csv", "wb") as file:
    file.write(response.content)

让我们使用 UploadForCodeInterpreter 消息将文件发送给代理。

runtime.start()
await runtime.send_message(UploadForCodeInterpreter(file_path="seattle_city_wages.csv"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForCodeInterpreter to assistant: {'file_path': 'seattle_city_wages.csv'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForCodeInterpreter sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None

现在我们可以向代理询问一些关于数据的问题。

runtime.start()
await runtime.send_message(TextMessage(content="Take a look at the uploaded CSV file.", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'Take a look at the uploaded CSV file.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
import pandas as pd

# Load the uploaded CSV file to examine its contents
file_path = '/mnt/data/file-oEvRiyGyHc2jZViKyDqL8aoh'
csv_data = pd.read_csv(file_path)

# Display the first few rows of the dataframe to understand its structure
csv_data.head()
```
Executing code...
--------------------------------------------------------------------------------
Assistant:

The uploaded CSV file contains the following columns:

1. **department**: The department in which the individual works.
2. **last_name**: The last name of the employee.
3. **first_name**: The first name of the employee.
4. **job_title**: The job title of the employee.
5. **hourly_rate**: The hourly rate for the employee's position.

Here are the first few entries from the file:

| department                     | last_name | first_name | job_title                          | hourly_rate |
|--------------------------------|-----------|------------|------------------------------------|-------------|
| Police Department              | Aagard    | Lori       | Pol Capt-Precinct                 | 112.70      |
| Police Department              | Aakervik  | Dag        | Pol Ofcr-Detective                | 75.61       |
| Seattle City Light             | Aaltonen  | Evan       | Pwrline Clear Tree Trimmer        | 53.06       |
| Seattle Public Utilities       | Aar       | Abdimallik | Civil Engrng Spec,Sr               | 64.43       |
| Seattle Dept of Transportation | Abad      | Abigail    | Admin Spec II-BU                  | 37.40       |

If you need any specific analysis or information from this data, please let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': "The uploaded CSV file contains the following columns:\n\n1. **department**: The department in which the individual works.\n2. **last_name**: The last name of the employee.\n3. **first_name**: The first name of the employee.\n4. **job_title**: The job title of the employee.\n5. **hourly_rate**: The hourly rate for the employee's position.\n\nHere are the first few entries from the file:\n\n| department                     | last_name | first_name | job_title                          | hourly_rate |\n|--------------------------------|-----------|------------|------------------------------------|-------------|\n| Police Department              | Aagard    | Lori       | Pol Capt-Precinct                 | 112.70      |\n| Police Department              | Aakervik  | Dag        | Pol Ofcr-Detective                | 75.61       |\n| Seattle City Light             | Aaltonen  | Evan       | Pwrline Clear Tree Trimmer        | 53.06       |\n| Seattle Public Utilities       | Aar       | Abdimallik | Civil Engrng Spec,Sr               | 64.43       |\n| Seattle Dept of Transportation | Abad      | Abigail    | Admin Spec II-BU                  | 37.40       |\n\nIf you need any specific analysis or information from this data, please let me know!", 'source': 'assistant'}
runtime.start()
await runtime.send_message(TextMessage(content="What are the top-10 salaries?", source="user"), agent)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'What are the top-10 salaries?', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
# Sorting the data by hourly_rate in descending order and selecting the top 10 salaries
top_10_salaries = csv_data[['first_name', 'last_name', 'job_title', 'hourly_rate']].sort_values(by='hourly_rate', ascending=False).head(10)
top_10_salaries.reset_index(drop=True, inplace=True)
top_10_salaries
```
Executing code...
--------------------------------------------------------------------------------
Assistant:

Here are the top 10 salaries based on the hourly rates from the CSV file:

| First Name | Last Name | Job Title                          | Hourly Rate |
|------------|-----------|------------------------------------|-------------|
| Eric       | Barden    | Executive4                        | 139.61      |
| Idris      | Beauregard| Executive3                        | 115.90      |
| Lori       | Aagard    | Pol Capt-Precinct                 | 112.70      |
| Krista     | Bair      | Pol Capt-Precinct                 | 108.74      |
| Amy        | Bannister | Fire Chief, Dep Adm-80 Hrs        | 104.07      |
| Ginger     | Armbruster| Executive2                        | 102.42      |
| William    | Andersen  | Executive2                        | 102.42      |
| Valarie    | Anderson  | Executive2                        | 102.42      |
| Paige      | Alderete  | Executive2                        | 102.42      |
| Kathryn    | Aisenberg | Executive2                        | 100.65      |

If you need any further details or analysis, let me know!
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'Here are the top 10 salaries based on the hourly rates from the CSV file:\n\n| First Name | Last Name | Job Title                          | Hourly Rate |\n|------------|-----------|------------------------------------|-------------|\n| Eric       | Barden    | Executive4                        | 139.61      |\n| Idris      | Beauregard| Executive3                        | 115.90      |\n| Lori       | Aagard    | Pol Capt-Precinct                 | 112.70      |\n| Krista     | Bair      | Pol Capt-Precinct                 | 108.74      |\n| Amy        | Bannister | Fire Chief, Dep Adm-80 Hrs        | 104.07      |\n| Ginger     | Armbruster| Executive2                        | 102.42      |\n| William    | Andersen  | Executive2                        | 102.42      |\n| Valarie    | Anderson  | Executive2                        | 102.42      |\n| Paige      | Alderete  | Executive2                        | 102.42      |\n| Kathryn    | Aisenberg | Executive2                        | 100.65      |\n\nIf you need any further details or analysis, let me know!', 'source': 'assistant'}

带文件搜索功能的助手#

让我们尝试文档问答功能。首先下载 关于第三次英阿战争的维基百科页面。

response = requests.get("https://en.wikipedia.org/wiki/Third_Anglo-Afghan_War")
with open("third_anglo_afghan_war.html", "wb") as file:
    file.write(response.content)

使用 UploadForFileSearch 消息将文件发送给代理。

runtime.start()
await runtime.send_message(
    UploadForFileSearch(file_path="third_anglo_afghan_war.html", vector_store_id=vector_store.id), agent
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type UploadForFileSearch to assistant: {'file_path': 'third_anglo_afghan_war.html', 'vector_store_id': 'vs_h3xxPbJFnd1iZ9WdjsQwNdrp'}
INFO:autogen_core:Calling message handler for assistant:default with message type UploadForFileSearch sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None

让我们向代理询问一些关于文档的问题。在提问之前, 我们重置了代理的记忆以开始新的对话。

runtime.start()
await runtime.send_message(Reset(), agent)
await runtime.send_message(
    TextMessage(
        content="When and where was the treaty of Rawalpindi signed? Answer using the document provided.", source="user"
    ),
    agent,
)
await runtime.stop_when_idle()
INFO:autogen_core:Sending message of type Reset to assistant: {}
INFO:autogen_core:Calling message handler for assistant:default with message type Reset sent by Unknown
INFO:autogen_core:Resolving response with message type NoneType for recipient None from assistant: None
INFO:autogen_core:Sending message of type TextMessage to assistant: {'content': 'When and where was the treaty of Rawalpindi signed? Answer using the document provided.', 'source': 'user'}
INFO:autogen_core:Calling message handler for assistant:default with message type TextMessage sent by Unknown
--------------------------------------------------------------------------------
Assistant:

The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.
INFO:autogen_core:Resolving response with message type TextMessage for recipient None from assistant: {'content': 'The Treaty of Rawalpindi was signed on **8 August 1919**. The location of the signing was in **Rawalpindi**, which is in present-day Pakistan【6:0†source】.', 'source': 'assistant'}
[0] third_anglo_afghan_war.html

就是这样!我们已成功构建了一个由OpenAI Assistant支持的代理。