autogen_ext.code_executors.jupyter._jupyter_code_executor 源代码

import asyncio
import base64
import json
import re
import sys
import tempfile
import uuid
import warnings
from dataclasses import dataclass
from pathlib import Path

from autogen_core import Component
from pydantic import BaseModel

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self

from contextlib import AbstractAsyncContextManager
from typing import Optional, Union

from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from nbclient import NotebookClient
from nbformat import NotebookNode
from nbformat import v4 as nbformat
from typing_extensions import Self

from .._common import silence_pip


[文档] @dataclass class JupyterCodeResult(CodeResult): """Jupyter代码执行器的代码结果类。""" output_files: list[Path]
class JupyterCodeExecutorConfig(BaseModel): """JupyterCodeExecutor的配置""" kernel_name: str = "python3" timeout: int = 60 output_dir: Optional[str] = None
[文档] class JupyterCodeExecutor(CodeExecutor, Component[JupyterCodeExecutorConfig]): """一个使用[nbclient](https://github.com/jupyter/nbclient)有状态执行代码的代码执行器类。 .. danger:: 这将在本地机器上执行代码。如果用于执行LLM生成的代码,应谨慎使用。 直接使用示例: .. code-block:: python import asyncio from autogen_core import CancellationToken from autogen_core.code_executor import CodeBlock from autogen_ext.code_executors.jupyter import JupyterCodeExecutor async def main() -> None: async with JupyterCodeExecutor() as executor: cancel_token = CancellationToken() code_blocks = [CodeBlock(code="print('hello world!')", language="python")] code_result = await executor.execute_code_blocks(code_blocks, cancel_token) print(code_result) asyncio.run(main()) 与:class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool`一起使用的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_ext.code_executors.jupyter import JupyterCodeExecutor from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_ext.tools.code_execution import PythonCodeExecutionTool async def main() -> None: async with JupyterCodeExecutor() as executor: tool = PythonCodeExecutionTool(executor) model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent("assistant", model_client=model_client, tools=[tool]) result = await agent.run(task="What is the 10th Fibonacci number? Use Python to calculate it.") print(result) asyncio.run(main()) 在:class:`~autogen_agentchat.agents._code_executor_agent.CodeExecutorAgent`内部使用的示例: .. code-block:: python import asyncio from autogen_agentchat.agents import CodeExecutorAgent from autogen_agentchat.messages import TextMessage from autogen_ext.code_executors.jupyter import JupyterCodeExecutor from autogen_core import CancellationToken async def main() -> None: async with JupyterCodeExecutor() as executor: code_executor_agent = CodeExecutorAgent("code_executor", code_executor=executor) task = TextMessage( content='''Here is some code ```python print('Hello world') ``` ''', source="user", ) response = await code_executor_agent.on_messages([task], CancellationToken()) print(response.chat_message) asyncio.run(main()) Args: kernel_name (str): 使用的内核名称,默认为"python3"。 timeout (int): 代码执行超时时间,默认为60。 output_dir (Path): 输出文件保存目录,默认为临时目录。 .. note:: 使用当前目录(".")作为输出目录已被弃用。使用它将引发弃用警告。 """ component_config_schema = JupyterCodeExecutorConfig component_provider_override = "autogen_ext.code_executors.jupyter.JupyterCodeExecutor" def __init__( self, kernel_name: str = "python3", timeout: int = 60, output_dir: Optional[Union[Path, str]] = None, ): if timeout < 1: raise ValueError("Timeout must be greater than or equal to 1.") self._output_dir: Path = Path(tempfile.mkdtemp()) if output_dir is None else Path(output_dir) self._output_dir.mkdir(exist_ok=True, parents=True) self._temp_dir: Optional[tempfile.TemporaryDirectory[str]] = None self._temp_dir_path: Optional[Path] = None self._started = False self._kernel_name = kernel_name self._timeout = timeout self._client: Optional[NotebookClient] = None self.kernel_context: Optional[AbstractAsyncContextManager[None]] = None
[文档] async def execute_code_blocks( self, code_blocks: list[CodeBlock], cancellation_token: CancellationToken ) -> JupyterCodeResult: """执行代码块并返回结果。 Args: code_blocks (list[CodeBlock]): 要执行的代码块列表。 Returns: JupyterCodeResult: 代码执行的结果。 """ outputs: list[str] = [] output_files: list[Path] = [] exit_code = 0 for code_block in code_blocks: result = await self._execute_code_block(code_block, cancellation_token) exit_code = result.exit_code outputs.append(result.output) output_files.extend(result.output_files) # Stop execution if one code block fails if exit_code != 0: break return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files)
async def _execute_code_block( self, code_block: CodeBlock, cancellation_token: CancellationToken ) -> JupyterCodeResult: """执行单个代码块并返回结果。 Args: code_block (CodeBlock): 要执行的代码块。 Returns: JupyterCodeResult: 代码执行的结果。 """ execute_task = asyncio.create_task( self._execute_cell( nbformat.new_code_cell(silence_pip(code_block.code, code_block.language)) # type: ignore ) ) cancellation_token.link_future(execute_task) output_cell = await asyncio.wait_for(asyncio.shield(execute_task), timeout=self._timeout) outputs: list[str] = [] output_files: list[Path] = [] exit_code = 0 for output in output_cell.get("outputs", []): match output.get("output_type"): case "stream": outputs.append(output.get("text", "")) case "error": traceback = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", "\n".join(output["traceback"])) outputs.append(traceback) exit_code = 1 case "execute_result" | "display_data": data = output.get("data", {}) for mime, content in data.items(): match mime: case "text/plain": outputs.append(content) case "image/png": path = self._save_image(content) output_files.append(path) case "image/jpeg": # TODO: Should this also be encoded? Images are encoded as both png and jpg pass case "text/html": path = self._save_html(content) output_files.append(path) case _: outputs.append(json.dumps(content)) case _: pass return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files) async def _execute_cell(self, cell: NotebookNode) -> NotebookNode: # Temporary push cell to nb as async_execute_cell expects it. But then we want to remove it again as cells can take up significant amount of memory (especially with images) if not self._client: raise RuntimeError("Executor must be started before executing cells") self._client.nb.cells.append(cell) output = await self._client.async_execute_cell( cell, cell_index=0, ) self._client.nb.cells.pop() return output def _save_image(self, image_data_base64: str) -> Path: """将图像数据保存到文件。""" image_data = base64.b64decode(image_data_base64) path = self._output_dir / f"{uuid.uuid4().hex}.png" path.write_bytes(image_data) return path.absolute() def _save_html(self, html_data: str) -> Path: """将HTML数据保存到文件。""" path = self._output_dir / f"{uuid.uuid4().hex}.html" path.write_text(html_data) return path.absolute()
[文档] async def restart(self) -> None: """重启代码执行器。""" await self.stop() await self.start()
[文档] async def start(self) -> None: """(实验性) 启动代码执行器。 通过创建新的Notebook并配置指定的Jupyter内核来初始化Jupyter Notebook执行环境。 将执行器标记为已启动状态,允许执行代码。 在执行任何代码块之前应先调用此方法。 """ if self._started: return notebook: NotebookNode = nbformat.new_notebook() # type: ignore self._client = NotebookClient( nb=notebook, kernel_name=self._kernel_name, timeout=self._timeout, allow_errors=True, ) self.kernel_context = self._client.async_setup_kernel() await self.kernel_context.__aenter__() self._started = True
[文档] async def stop(self) -> None: """(实验性)停止代码执行器。 通过退出内核上下文并清理相关资源来终止 Jupyter Notebook 的执行。""" if not self._started: return if self.kernel_context is not None: await self.kernel_context.__aexit__(None, None, None) self.kernel_context = None self._client = None self._started = False
def _to_config(self) -> JupyterCodeExecutorConfig: """将当前实例转换为配置对象""" return JupyterCodeExecutorConfig( kernel_name=self._kernel_name, timeout=self._timeout, output_dir=str(self.output_dir) ) @property def output_dir(self) -> Path: # If a user specifies the current directory, warn them that this is deprecated if self._output_dir == Path("."): warnings.warn( "Using the current directory as output_dir is deprecated", DeprecationWarning, stacklevel=2, ) return self._output_dir @classmethod def _from_config(cls, config: JupyterCodeExecutorConfig) -> Self: """从配置对象创建实例""" return cls( kernel_name=config.kernel_name, timeout=config.timeout, output_dir=Path(config.output_dir) if config.output_dir else None, )