{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# ACA 动态会话代码执行器\n\n本指南将解释 Azure 容器应用中的动态会话功能,并展示如何使用 Azure 容器代码执行器类。\n\n[Azure 容器应用动态会话](https://learn.microsoft.com/en-us/azure/container-apps/sessions)是 Azure 容器应用服务中的一个组件。该环境托管在远程 Azure 实例上,不会在本地执行任何代码。解释器能够在预装了常用包的 Jupyter 环境中执行 Python 代码。用户可以为自己的应用创建[自定义环境](https://learn.microsoft.com/en-us/azure/container-apps/sessions-custom-container)。此外,文件可以[上传到会话或从会话下载](https://learn.microsoft.com/en-us/azure/container-apps/sessions-code-interpreter#upload-a-file-to-a-session)。\n\n代码解释器可以运行多个代码会话,每个会话都由一个会话标识符字符串分隔。\n\n## 创建容器应用会话池\n\n在 Azure 门户中,创建一个新的`容器应用会话池`资源,将池类型设置为`Python 代码解释器`,并记下`池管理端点`。端点格式应类似于`https://{region}.dynamicsessions.io/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/sessionPools/{session_pool_name}`。\n\n或者,您也可以使用[Azure CLI 创建会话池](https://learn.microsoft.com/en-us/azure/container-apps/sessions-code-interpreter#create-a-session-pool-with-azure-cli)。\n\n## ACADynamicSessionsCodeExecutor\n\n{py:class}`~autogen_ext.code_executors.azure.ACADynamicSessionsCodeExecutor`类是一个 Python 代码执行器,可在默认的无服务器代码解释器会话上创建和执行任意 Python 代码。其接口如下\n\n### 初始化\n\n首先,您需要找到或创建一个实现了{py:class}`~autogen_ext.code_executors.azure.TokenProvider`接口的凭据对象。这是任何实现了以下函数的对象\n```python\ndef get_token(\n self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any\n) -> azure.core.credentials.AccessToken\n```\n此类对象的一个示例是[azure.identity.DefaultAzureCredential](https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python)类。\n\n让我们从安装开始\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# pip install azure.identity" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来,导入我们代码所需的所有模块和类\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import tempfile\n", "\n", "from anyio import open_file\n", "from autogen_core import CancellationToken\n", "from autogen_core.code_executor import CodeBlock\n", "from autogen_ext.code_executors.azure import ACADynamicSessionsCodeExecutor\n", "from azure.identity import DefaultAzureCredential" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "现在,我们将创建 Azure 代码执行器并运行一些测试代码,同时验证其是否正确运行。我们将使用临时工作目录创建执行器,以确保在展示每个功能使用时环境干净\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cancellation_token = CancellationToken()\n", "POOL_MANAGEMENT_ENDPOINT = \"...\"\n", "\n", "with tempfile.TemporaryDirectory() as temp_dir:\n", " executor = ACADynamicSessionsCodeExecutor(\n", " pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir\n", " )\n", "\n", " code_blocks = [CodeBlock(code=\"import sys; print('hello world!')\", language=\"python\")]\n", " code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", " assert code_result.exit_code == 0 and \"hello world!\" in code_result.output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "接下来,让我们尝试上传一些文件并验证其完整性。所有上传到无服务器代码解释器的文件都会被上传到`/mnt/data`目录。所有可下载文件也必须放在该目录中。默认情况下,代码执行器的当前工作目录设置为`/mnt/data`。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with tempfile.TemporaryDirectory() as temp_dir:\n", " test_file_1 = \"test_upload_1.txt\"\n", " test_file_1_contents = \"test1 contents\"\n", " test_file_2 = \"test_upload_2.txt\"\n", " test_file_2_contents = \"test2 contents\"\n", "\n", " async with await open_file(os.path.join(temp_dir, test_file_1), \"w\") as f: # type: ignore[syntax]\n", " await f.write(test_file_1_contents)\n", " async with await open_file(os.path.join(temp_dir, test_file_2), \"w\") as f: # type: ignore[syntax]\n", " await f.write(test_file_2_contents)\n", "\n", " assert os.path.isfile(os.path.join(temp_dir, test_file_1))\n", " assert os.path.isfile(os.path.join(temp_dir, test_file_2))\n", "\n", " executor = ACADynamicSessionsCodeExecutor(\n", " pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir\n", " )\n", " await executor.upload_files([test_file_1, test_file_2], cancellation_token)\n", "\n", " file_list = await executor.get_file_list(cancellation_token)\n", " assert test_file_1 in file_list\n", " assert test_file_2 in file_list\n", "\n", " code_blocks = [\n", " CodeBlock(\n", " code=f\"\"\"\n", "with open(\"{test_file_1}\") as f:\n", " print(f.read())\n", "with open(\"{test_file_2}\") as f:\n", " print(f.read())\n", "\"\"\",\n", " language=\"python\",\n", " )\n", " ]\n", " code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", " assert code_result.exit_code == 0\n", " assert test_file_1_contents in code_result.output\n", " assert test_file_2_contents in code_result.output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "下载文件的操作方式类似。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with tempfile.TemporaryDirectory() as temp_dir:\n", " test_file_1 = \"test_upload_1.txt\"\n", " test_file_1_contents = \"test1 contents\"\n", " test_file_2 = \"test_upload_2.txt\"\n", " test_file_2_contents = \"test2 contents\"\n", "\n", " assert not os.path.isfile(os.path.join(temp_dir, test_file_1))\n", " assert not os.path.isfile(os.path.join(temp_dir, test_file_2))\n", "\n", " executor = ACADynamicSessionsCodeExecutor(\n", " pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential(), work_dir=temp_dir\n", " )\n", "\n", " code_blocks = [\n", " CodeBlock(\n", " code=f\"\"\"\n", "with open(\"{test_file_1}\", \"w\") as f:\n", " f.write(\"{test_file_1_contents}\")\n", "with open(\"{test_file_2}\", \"w\") as f:\n", " f.write(\"{test_file_2_contents}\")\n", "\"\"\",\n", " language=\"python\",\n", " ),\n", " ]\n", " code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", " assert code_result.exit_code == 0\n", "\n", " file_list = await executor.get_file_list(cancellation_token)\n", " assert test_file_1 in file_list\n", " assert test_file_2 in file_list\n", "\n", " await executor.download_files([test_file_1, test_file_2], cancellation_token)\n", "\n", " assert os.path.isfile(os.path.join(temp_dir, test_file_1))\n", " async with await open_file(os.path.join(temp_dir, test_file_1), \"r\") as f: # type: ignore[syntax]\n", " content = await f.read()\n", " assert test_file_1_contents in content\n", " assert os.path.isfile(os.path.join(temp_dir, test_file_2))\n", " async with await open_file(os.path.join(temp_dir, test_file_2), \"r\") as f: # type: ignore[syntax]\n", " content = await f.read()\n", " assert test_file_2_contents in content" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 新会话\n\n每个 {py:class}`~autogen_ext.code_executors.azure.ACADynamicSessionsCodeExecutor` 类的实例都会有一个唯一的会话ID。对特定代码执行器的每次调用都将在同一会话中执行,直到调用 {py:meth}`~autogen_ext.code_executors.azure.ACADynamicSessionsCodeExecutor.restart` 函数。之前的会话无法重复使用。\n\n这里我们将在代码会话上运行一些代码,重启它,然后验证是否已打开新会话。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "executor = ACADynamicSessionsCodeExecutor(\n", " pool_management_endpoint=POOL_MANAGEMENT_ENDPOINT, credential=DefaultAzureCredential()\n", ")\n", "\n", "code_blocks = [CodeBlock(code=\"x = 'abcdefg'\", language=\"python\")]\n", "code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", "assert code_result.exit_code == 0\n", "\n", "code_blocks = [CodeBlock(code=\"print(x)\", language=\"python\")]\n", "code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", "assert code_result.exit_code == 0 and \"abcdefg\" in code_result.output\n", "\n", "await executor.restart()\n", "code_blocks = [CodeBlock(code=\"print(x)\", language=\"python\")]\n", "code_result = await executor.execute_code_blocks(code_blocks, cancellation_token)\n", "assert code_result.exit_code != 0 and \"NameError\" in code_result.output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 可用包\n\n每个代码执行实例都预装了大多数常用包。但是,可用包及其版本的列表在执行环境之外不可见。可以通过调用代码执行器上的 {py:meth}`~autogen_ext.code_executors.azure.ACADynamicSessionsCodeExecutor.get_available_packages` 函数来获取环境中的包列表。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(executor.get_available_packages(cancellation_token))" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.5" } }, "nbformat": 4, "nbformat_minor": 2 }