autogen_ext.tools.code_execution._code_execution 源代码
from autogen_core import CancellationToken, Component, ComponentModel
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.tools import BaseTool
from pydantic import BaseModel, Field, model_serializer
from typing_extensions import Self
[文档]
class CodeExecutionInput(BaseModel):
code: str = Field(description="The contents of the Python code block that should be executed")
class PythonCodeExecutionToolConfig(BaseModel):
"""PythonCodeExecutionTool 的配置"""
executor: ComponentModel
description: str = "Execute Python code blocks."
[文档]
class PythonCodeExecutionTool(
BaseTool[CodeExecutionInput, CodeExecutionResult], Component[PythonCodeExecutionToolConfig]
):
"""一个在代码执行器中运行 Python 代码并返回输出的工具。
示例执行器:
* :class:`autogen_ext.code_executors.local.LocalCommandLineCodeExecutor`
* :class:`autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor`
* :class:`autogen_ext.code_executors.azure.ACADynamicSessionsCodeExecutor`
使用示例:
.. code-block:: bash
pip install -U "autogen-agentchat" "autogen-ext[openai]" "yfinance" "matplotlib"
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.tools.code_execution import PythonCodeExecutionTool
async def main() -> None:
tool = PythonCodeExecutionTool(LocalCommandLineCodeExecutor(work_dir="coding"))
agent = AssistantAgent(
"assistant", OpenAIChatCompletionClient(model="gpt-4o"), tools=[tool], reflect_on_tool_use=True
)
await Console(
agent.run_stream(
task="Create a plot of MSFT stock prices in 2024 and save it to a file. Use yfinance and matplotlib."
)
)
asyncio.run(main())
Args:
executor (CodeExecutor): 用于执行代码块的代码执行器。
"""
component_config_schema = PythonCodeExecutionToolConfig
component_provider_override = "autogen_ext.tools.code_execution.PythonCodeExecutionTool"
def __init__(self, executor: CodeExecutor):
super().__init__(CodeExecutionInput, CodeExecutionResult, "CodeExecutor", "Execute Python code blocks.")
self._executor = executor
[文档]
async def run(self, args: CodeExecutionInput, cancellation_token: CancellationToken) -> CodeExecutionResult:
code_blocks = [CodeBlock(code=args.code, language="python")]
result = await self._executor.execute_code_blocks(
code_blocks=code_blocks, cancellation_token=cancellation_token
)
return CodeExecutionResult(success=result.exit_code == 0, output=result.output)
def _to_config(self) -> PythonCodeExecutionToolConfig:
"""将当前实例转换为配置对象"""
return PythonCodeExecutionToolConfig(executor=self._executor.dump_component())
@classmethod
def _from_config(cls, config: PythonCodeExecutionToolConfig) -> Self:
"""从配置对象创建实例"""
executor = CodeExecutor.load_component(config.executor)
return cls(executor=executor)