autogen_ext.code_executors.docker_jupyter._docker_jupyter 源代码

import asyncio
import base64
import json
import os
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Union

from autogen_core import CancellationToken, Component
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from autogen_ext.code_executors._common import silence_pip
from pydantic import BaseModel
from typing_extensions import Self

from ._jupyter_server import JupyterClient, JupyterConnectable, JupyterConnectionInfo, JupyterKernelClient


[文档] @dataclass class DockerJupyterCodeResult(CodeResult): """(实验性) IPython 代码执行器的代码结果类。""" output_files: list[Path]
class DockerJupyterCodeExecutorConfig(BaseModel): """JupyterCodeExecutor 的配置""" jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo] kernel_name: str = "python3" timeout: int = 60 output_dir: Optional[Union[Path, str]] = None class Config: arbitrary_types_allowed = True
[文档] class DockerJupyterCodeExecutor(CodeExecutor, Component[DockerJupyterCodeExecutorConfig]): """(实验性) 一个使用 Jupyter 服务器进行状态化代码执行的执行器类。 每次执行都是状态化的,可以访问同一会话中先前执行创建的变量。 使用前需安装以下依赖: .. code-block:: shell pip install "autogen-ext[docker-jupyter-executor]" Args: jupyter_server (Union[JupyterConnectable, JupyterConnectionInfo]): 要使用的 Jupyter 服务器。 kernel_name (str): 要使用的内核名称,确保已安装。 默认为 "python3"。 timeout (int): 代码执行超时时间,默认为 60。 output_dir (str): 输出文件保存目录,默认为 None。 直接使用示例: .. code-block:: python import asyncio from autogen_core import CancellationToken from autogen_core.code_executor import CodeBlock from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer async def main() -> None: async with DockerJupyterServer() as jupyter_server: async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor: code_blocks = [CodeBlock(code="print('hello world!')", language="python")] code_result = await executor.execute_code_blocks(code_blocks, cancellation_token=CancellationToken()) print(code_result) asyncio.run(main()) 使用自定义 jupyter 镜像的示例: .. code-block:: python import asyncio from autogen_core import CancellationToken from autogen_core.code_executor import CodeBlock from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer async def main() -> None: async with DockerJupyterServer(custom_image_name="your_custom_images_name", expose_port=8888) as jupyter_server: async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor: code_blocks = [CodeBlock(code="print('hello world!')", language="python")] code_result = await executor.execute_code_blocks(code_blocks, cancellation_token=CancellationToken()) print(code_result) asyncio.run(main()) 与 :class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool` 配合使用的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.tools.code_execution import PythonCodeExecutionTool async def main() -> None: async with DockerJupyterServer() as jupyter_server: async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor: tool = PythonCodeExecutionTool(executor) model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client, tools=[tool]) result = await agent.run(task="What is the 10th Fibonacci number? Use Python to calculate it.") print(result) asyncio.run(main()) 在 :class:`~autogen_agentchat.agents._code_executor_agent.CodeExecutorAgent` 中使用的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import CodeExecutorAgent from autogen_agentchat.messages import TextMessage from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer from autogen_core import CancellationToken async def main() -> None: async with DockerJupyterServer() as jupyter_server: async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor: code_executor_agent = CodeExecutorAgent("code_executor", code_executor=executor) task = TextMessage( content='''Here is some code ```python print('Hello world') ``` ''', source="user", ) response = await code_executor_agent.on_messages([task], CancellationToken()) print(response.chat_message) asyncio.run(main()) """ component_config_schema = DockerJupyterCodeExecutorConfig component_provider_override = "autogen_ext.code_executors.docker_jupyter.DockerJupyterCodeExecutor" def __init__( self, jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo], kernel_name: str = "python3", timeout: int = 60, output_dir: Path | None = None, ): if timeout < 1: raise ValueError("Timeout must be greater than or equal to 1.") if isinstance(jupyter_server, JupyterConnectable): self._connection_info = jupyter_server.connection_info elif isinstance(jupyter_server, JupyterConnectionInfo): self._connection_info = jupyter_server else: raise ValueError("jupyter_server must be a JupyterConnectable or JupyterConnectionInfo.") self._output_dir = output_dir or getattr(jupyter_server, "_bind_dir", None) if not self._output_dir: with tempfile.TemporaryDirectory() as temp_dir: self._output_dir = Path(temp_dir) self._output_dir.mkdir(exist_ok=True) self._jupyter_client = JupyterClient(self._connection_info) self._kernel_name = kernel_name self._timeout = timeout self._async_jupyter_kernel_client: Optional[JupyterKernelClient] = None self._kernel_id: Optional[str] = None async def _ensure_async_kernel_client(self) -> JupyterKernelClient: """确保存在异步内核客户端并返回它。""" if self._kernel_id is None: await self.start() assert self._kernel_id is not None if self._async_jupyter_kernel_client is None: self._async_jupyter_kernel_client = await self._jupyter_client.get_kernel_client(self._kernel_id) return self._async_jupyter_kernel_client
[文档] async def execute_code_blocks( self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken ) -> DockerJupyterCodeResult: """(实验性) 执行一系列代码块并返回结果。 此方法将一系列代码块作为Jupyter内核中的单元执行。 消息协议请参见: https://jupyter-client.readthedocs.io/en/stable/messaging.html Args: code_blocks (List[CodeBlock]): 要执行的代码块列表。 Returns: DockerJupyterCodeResult: 代码执行的结果。 """ kernel_client = await self._ensure_async_kernel_client() # Wait for kernel to be ready using async client is_ready = await kernel_client.wait_for_ready(timeout_seconds=self._timeout) if not is_ready: return DockerJupyterCodeResult(exit_code=1, output="ERROR: Kernel not ready", output_files=[]) outputs: List[str] = [] output_files: List[Path] = [] for code_block in code_blocks: code = silence_pip(code_block.code, code_block.language) # Execute code using async client exec_task = asyncio.create_task(kernel_client.execute(code, timeout_seconds=self._timeout)) cancellation_token.link_future(exec_task) result = await exec_task if result.is_ok: outputs.append(result.output) for data in result.data_items: if data.mime_type == "image/png": path = self._save_image(data.data) outputs.append(path) output_files.append(Path(path)) elif data.mime_type == "text/html": path = self._save_html(data.data) outputs.append(path) output_files.append(Path(path)) else: outputs.append(json.dumps(data.data)) else: existing_output = "\n".join([str(output) for output in outputs]) return DockerJupyterCodeResult( exit_code=1, output=existing_output + "\nERROR: " + result.output, output_files=output_files ) return DockerJupyterCodeResult( exit_code=0, output="\n".join([str(output) for output in outputs]), output_files=output_files )
[文档] async def restart(self) -> None: """(实验性) 重启一个新会话。""" # Use async client to restart kernel if self._kernel_id is not None: await self._jupyter_client.restart_kernel(self._kernel_id) # Reset the clients to force recreation if self._async_jupyter_kernel_client is not None: await self._async_jupyter_kernel_client.stop() self._async_jupyter_kernel_client = None
[文档] async def start(self) -> None: """(实验性)开始一个新会话。""" available_kernels = await self._jupyter_client.list_kernel_specs() if self._kernel_name not in available_kernels["kernelspecs"]: raise ValueError(f"Kernel {self._kernel_name} is not installed.") self._kernel_id = await self._jupyter_client.start_kernel(self._kernel_name)
def _save_image(self, image_data_base64: str) -> str: """将图像数据保存到文件。""" image_data = base64.b64decode(image_data_base64) filename = f"{uuid.uuid4().hex}.png" path = os.path.join(str(self._output_dir), filename) with open(path, "wb") as f: f.write(image_data) return os.path.abspath(path) def _save_html(self, html_data: str) -> str: """将 HTML 数据保存到文件。""" filename = f"{uuid.uuid4().hex}.html" path = os.path.join(str(self._output_dir), filename) with open(path, "w") as f: f.write(html_data) return os.path.abspath(path)
[文档] async def stop(self) -> None: """停止内核。""" if self._kernel_id is not None: await self._jupyter_client.delete_kernel(self._kernel_id) if self._async_jupyter_kernel_client is not None: await self._async_jupyter_kernel_client.stop() self._async_jupyter_kernel_client = None await self._jupyter_client.close()
async def __aenter__(self) -> Self: await self.start() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.stop()