import asyncio
import base64
import json
import os
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Union
from autogen_core import CancellationToken, Component
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from autogen_ext.code_executors._common import silence_pip
from pydantic import BaseModel
from typing_extensions import Self
from ._jupyter_server import JupyterClient, JupyterConnectable, JupyterConnectionInfo, JupyterKernelClient
[文档]
@dataclass
class DockerJupyterCodeResult(CodeResult):
"""(实验性) IPython 代码执行器的代码结果类。"""
output_files: list[Path]
class DockerJupyterCodeExecutorConfig(BaseModel):
"""JupyterCodeExecutor 的配置"""
jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo]
kernel_name: str = "python3"
timeout: int = 60
output_dir: Optional[Union[Path, str]] = None
class Config:
arbitrary_types_allowed = True
[文档]
class DockerJupyterCodeExecutor(CodeExecutor, Component[DockerJupyterCodeExecutorConfig]):
"""(实验性) 一个使用 Jupyter 服务器进行状态化代码执行的执行器类。
每次执行都是状态化的,可以访问同一会话中先前执行创建的变量。
使用前需安装以下依赖:
.. code-block:: shell
pip install "autogen-ext[docker-jupyter-executor]"
Args:
jupyter_server (Union[JupyterConnectable, JupyterConnectionInfo]): 要使用的 Jupyter 服务器。
kernel_name (str): 要使用的内核名称,确保已安装。
默认为 "python3"。
timeout (int): 代码执行超时时间,默认为 60。
output_dir (str): 输出文件保存目录,默认为 None。
直接使用示例:
.. code-block:: python
import asyncio
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer
async def main() -> None:
async with DockerJupyterServer() as jupyter_server:
async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor:
code_blocks = [CodeBlock(code="print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token=CancellationToken())
print(code_result)
asyncio.run(main())
使用自定义 jupyter 镜像的示例:
.. code-block:: python
import asyncio
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer
async def main() -> None:
async with DockerJupyterServer(custom_image_name="your_custom_images_name", expose_port=8888) as jupyter_server:
async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor:
code_blocks = [CodeBlock(code="print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancellation_token=CancellationToken())
print(code_result)
asyncio.run(main())
与 :class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool` 配合使用的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.code_execution import PythonCodeExecutionTool
async def main() -> None:
async with DockerJupyterServer() as jupyter_server:
async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor:
tool = PythonCodeExecutionTool(executor)
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent("assistant", model_client=model_client, tools=[tool])
result = await agent.run(task="What is the 10th Fibonacci number? Use Python to calculate it.")
print(result)
asyncio.run(main())
在 :class:`~autogen_agentchat.agents._code_executor_agent.CodeExecutorAgent` 中使用的示例:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.code_executors.docker_jupyter import DockerJupyterCodeExecutor, DockerJupyterServer
from autogen_core import CancellationToken
async def main() -> None:
async with DockerJupyterServer() as jupyter_server:
async with DockerJupyterCodeExecutor(jupyter_server=jupyter_server) as executor:
code_executor_agent = CodeExecutorAgent("code_executor", code_executor=executor)
task = TextMessage(
content='''Here is some code
```python
print('Hello world')
```
''',
source="user",
)
response = await code_executor_agent.on_messages([task], CancellationToken())
print(response.chat_message)
asyncio.run(main())
"""
component_config_schema = DockerJupyterCodeExecutorConfig
component_provider_override = "autogen_ext.code_executors.docker_jupyter.DockerJupyterCodeExecutor"
def __init__(
self,
jupyter_server: Union[JupyterConnectable, JupyterConnectionInfo],
kernel_name: str = "python3",
timeout: int = 60,
output_dir: Path | None = None,
):
if timeout < 1:
raise ValueError("Timeout must be greater than or equal to 1.")
if isinstance(jupyter_server, JupyterConnectable):
self._connection_info = jupyter_server.connection_info
elif isinstance(jupyter_server, JupyterConnectionInfo):
self._connection_info = jupyter_server
else:
raise ValueError("jupyter_server must be a JupyterConnectable or JupyterConnectionInfo.")
self._output_dir = output_dir or getattr(jupyter_server, "_bind_dir", None)
if not self._output_dir:
with tempfile.TemporaryDirectory() as temp_dir:
self._output_dir = Path(temp_dir)
self._output_dir.mkdir(exist_ok=True)
self._jupyter_client = JupyterClient(self._connection_info)
self._kernel_name = kernel_name
self._timeout = timeout
self._async_jupyter_kernel_client: Optional[JupyterKernelClient] = None
self._kernel_id: Optional[str] = None
async def _ensure_async_kernel_client(self) -> JupyterKernelClient:
"""确保存在异步内核客户端并返回它。"""
if self._kernel_id is None:
await self.start()
assert self._kernel_id is not None
if self._async_jupyter_kernel_client is None:
self._async_jupyter_kernel_client = await self._jupyter_client.get_kernel_client(self._kernel_id)
return self._async_jupyter_kernel_client
[文档]
async def execute_code_blocks(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> DockerJupyterCodeResult:
"""(实验性) 执行一系列代码块并返回结果。
此方法将一系列代码块作为Jupyter内核中的单元执行。
消息协议请参见: https://jupyter-client.readthedocs.io/en/stable/messaging.html
Args:
code_blocks (List[CodeBlock]): 要执行的代码块列表。
Returns:
DockerJupyterCodeResult: 代码执行的结果。
"""
kernel_client = await self._ensure_async_kernel_client()
# Wait for kernel to be ready using async client
is_ready = await kernel_client.wait_for_ready(timeout_seconds=self._timeout)
if not is_ready:
return DockerJupyterCodeResult(exit_code=1, output="ERROR: Kernel not ready", output_files=[])
outputs: List[str] = []
output_files: List[Path] = []
for code_block in code_blocks:
code = silence_pip(code_block.code, code_block.language)
# Execute code using async client
exec_task = asyncio.create_task(kernel_client.execute(code, timeout_seconds=self._timeout))
cancellation_token.link_future(exec_task)
result = await exec_task
if result.is_ok:
outputs.append(result.output)
for data in result.data_items:
if data.mime_type == "image/png":
path = self._save_image(data.data)
outputs.append(path)
output_files.append(Path(path))
elif data.mime_type == "text/html":
path = self._save_html(data.data)
outputs.append(path)
output_files.append(Path(path))
else:
outputs.append(json.dumps(data.data))
else:
existing_output = "\n".join([str(output) for output in outputs])
return DockerJupyterCodeResult(
exit_code=1, output=existing_output + "\nERROR: " + result.output, output_files=output_files
)
return DockerJupyterCodeResult(
exit_code=0, output="\n".join([str(output) for output in outputs]), output_files=output_files
)
[文档]
async def restart(self) -> None:
"""(实验性) 重启一个新会话。"""
# Use async client to restart kernel
if self._kernel_id is not None:
await self._jupyter_client.restart_kernel(self._kernel_id)
# Reset the clients to force recreation
if self._async_jupyter_kernel_client is not None:
await self._async_jupyter_kernel_client.stop()
self._async_jupyter_kernel_client = None
[文档]
async def start(self) -> None:
"""(实验性)开始一个新会话。"""
available_kernels = await self._jupyter_client.list_kernel_specs()
if self._kernel_name not in available_kernels["kernelspecs"]:
raise ValueError(f"Kernel {self._kernel_name} is not installed.")
self._kernel_id = await self._jupyter_client.start_kernel(self._kernel_name)
def _save_image(self, image_data_base64: str) -> str:
"""将图像数据保存到文件。"""
image_data = base64.b64decode(image_data_base64)
filename = f"{uuid.uuid4().hex}.png"
path = os.path.join(str(self._output_dir), filename)
with open(path, "wb") as f:
f.write(image_data)
return os.path.abspath(path)
def _save_html(self, html_data: str) -> str:
"""将 HTML 数据保存到文件。"""
filename = f"{uuid.uuid4().hex}.html"
path = os.path.join(str(self._output_dir), filename)
with open(path, "w") as f:
f.write(html_data)
return os.path.abspath(path)
[文档]
async def stop(self) -> None:
"""停止内核。"""
if self._kernel_id is not None:
await self._jupyter_client.delete_kernel(self._kernel_id)
if self._async_jupyter_kernel_client is not None:
await self._async_jupyter_kernel_client.stop()
self._async_jupyter_kernel_client = None
await self._jupyter_client.close()
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.stop()