autogen_agentchat.agents._code_executor_agent 源代码

import logging
import re
from typing import (
    AsyncGenerator,
    List,
    Optional,
    Sequence,
    Union,
)

from autogen_core import CancellationToken, Component, ComponentModel
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from autogen_core.model_context import (
    ChatCompletionContext,
    UnboundedChatCompletionContext,
)
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    CreateResult,
    LLMMessage,
    SystemMessage,
    UserMessage,
)
from pydantic import BaseModel
from typing_extensions import Self

from .. import EVENT_LOGGER_NAME
from ..base import Response
from ..messages import (
    BaseAgentEvent,
    BaseChatMessage,
    CodeExecutionEvent,
    CodeGenerationEvent,
    HandoffMessage,
    ModelClientStreamingChunkEvent,
    TextMessage,
    ThoughtEvent,
)
from ..utils import remove_images
from ._base_chat_agent import BaseChatAgent

event_logger = logging.getLogger(EVENT_LOGGER_NAME)


class CodeExecutorAgentConfig(BaseModel):
    """CodeExecutorAgent 的配置"""

    name: str
    code_executor: ComponentModel
    model_client: ComponentModel | None = None
    description: str | None = None
    sources: List[str] | None = None
    system_message: str | None = None
    model_client_stream: bool = False
    model_context: ComponentModel | None = None


class RetryDecision(BaseModel):
    reason: str
    retry: bool


[文档] class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]): """(实验性) 一个根据用户指令生成并执行代码片段的代理。 .. note:: 该代理是实验性的,可能在未来的版本中发生变化。 通常与另一个生成待执行代码片段的代理在团队中配合使用,或单独使用(需提供 `model_client`)以便能够根据用户查询生成代码、执行代码并反思代码结果。 当与 `model_client` 配合使用时,它将使用模型生成代码片段并通过提供的 `code_executor` 执行。模型还会对代码执行结果进行反思。代理会将模型的最终反思结果作为最终响应返回。 当不配合 `model_client` 使用时,它仅执行在 :class:`~autogen_agentchat.messages.TextMessage` 消息中找到的代码块,并返回代码执行输出。 .. note:: 使用 :class:`~autogen_agentchat.agents.AssistantAgent` 配合 :class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool` 可作为该代理的替代方案。但该代理的模型需要生成正确转义的代码字符串作为工具参数。 Args: name (str): 代理名称 code_executor (CodeExecutor): 负责执行消息中接收到的代码的代码执行器 (推荐使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor`,参见下方示例) model_client (ChatCompletionClient, optional): 用于推理和生成代码的模型客户端。 如未提供,代理将仅执行输入消息中的代码块。 当前模型必须支持结构化输出模式,这是自动重试机制工作的必要条件。 model_client_stream (bool, optional): 若为 `True`,模型客户端将以流模式运行。 :meth:`on_messages_stream` 和 :meth:`BaseChatAgent.run_stream` 方法 也会在模型客户端生成响应块时产出 :class:`~autogen_agentchat.messages.ModelClientStreamingChunkEvent` 消息。默认为 `False`。 description (str, optional): 代理描述。如未提供, 将使用 :class:`~autogen_agentchat.agents.CodeExecutorAgent.DEFAULT_AGENT_DESCRIPTION`。 system_message (str, optional): 模型的系统消息。如提供,将在推理时预置到模型上下文的消息中。设为 `None` 可禁用。 默认为 :class:`~autogen_agentchat.agents.CodeExecutorAgent.DEFAULT_SYSTEM_MESSAGE`。仅在提供 `model_client` 时使用。 sources (Sequence[str], optional): 仅检查指定代理的消息以执行代码。 当代理是群聊的一部分且您希望将代码执行限制在特定代理的消息时很有用。 如未提供,将检查所有消息中的代码块。 仅在未提供 `model_client` 时使用。 max_retries_on_error (int, optional): 出错时的最大重试次数。如果代码执行失败,代理将重试至多该次数。 如果代码执行在重试该次数后仍失败,代理将产出反思结果。 .. note:: 建议 `CodeExecutorAgent` 代理使用 Docker 容器执行代码。这确保模型生成的代码在隔离环境中执行。要使用 Docker,您的环境必须安装并运行 Docker。 请遵循 `Docker <https://docs.docker.com/get-docker/>`_ 的安装说明。 .. note:: 代码执行器仅处理使用三重反引号正确格式化的 Markdown 代码块。 例如: .. code-block:: text ```python print("Hello World") ``` # 或 ```sh echo "Hello World" ``` 在此示例中,我们展示如何设置使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` 在 Docker 容器中执行代码片段的 `CodeExecutorAgent` 代理。`work_dir` 参数表示所有执行文件在被 Docker 容器执行前首先保存的本地位置。 .. code-block:: python import asyncio from autogen_agentchat.agents import CodeExecutorAgent from autogen_agentchat.messages import TextMessage from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor from autogen_core import CancellationToken async def run_code_executor_agent() -> None: # 创建使用 Docker 容器执行代码的代码执行器代理 code_executor = DockerCommandLineCodeExecutor(work_dir="coding") await code_executor.start() code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) # 使用给定代码片段运行代理 task = TextMessage( content='''Here is some code ```python print('Hello world') ``` ''', source="user", ) response = await code_executor_agent.on_messages([task], CancellationToken()) print(response.chat_message) # 停止代码执行器 await code_executor.stop() asyncio.run(run_code_executor_agent()) 在此示例中,我们展示如何设置使用 :py:class:`~docker.types.DeviceRequest` 向容器暴露 GPU 以执行 CUDA 加速代码的 `CodeExecutorAgent` 代理。 .. code-block:: python import asyncio from autogen_agentchat.agents import CodeExecutorAgent from autogen_agentchat.messages import TextMessage from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor from autogen_core import CancellationToken from docker.types import DeviceRequest async def run_code_executor_agent() -> None: # 创建使用 Docker 容器执行代码的代码执行器代理 code_executor = DockerCommandLineCodeExecutor( work_dir="coding", device_requests=[DeviceRequest(count=-1, capabilities=[["gpu"]])] ) await code_executor.start() code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor) # 显示 GPU 信息 task = TextMessage( content='''Here is some code ```bash nvidia-smi ``` ''', source="user", ) response = await code_executor_agent.on_messages([task], CancellationToken()) print(response.chat_message) # 停止代码执行器 await code_executor.stop() asyncio.run(run_code_executor_agent()) 在以下示例中,我们展示如何设置不带 `model_client` 参数的 `CodeExecutorAgent`,用于使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` 执行群聊中其他代理生成的代码块 .. code-block:: python import asyncio from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console termination_condition = MaxMessageTermination(3) async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # 定义 Docker CLI 代码执行器 code_executor = DockerCommandLineCodeExecutor(work_dir="coding") # 启动执行容器 await code_executor.start() code_executor_agent = CodeExecutorAgent("code_executor_agent", code_executor=code_executor) coder_agent = AssistantAgent("coder_agent", model_client=model_client) groupchat = RoundRobinGroupChat( participants=[coder_agent, code_executor_agent], termination_condition=termination_condition ) task = "Write python code to print Hello World!" await Console(groupchat.run_stream(task=task)) # 停止执行容器 await code_executor.stop() asyncio.run(main()) .. code-block:: text ---------- user ---------- Write python code to print Hello World! ---------- coder_agent ---------- Certainly! Here's a simple Python code to print "Hello World!": ```python print("Hello World!") ``` You can run this code in any Python environment to display the message. ---------- code_executor_agent ---------- Hello World! 在以下示例中,我们展示如何设置带 `model_client` 的 `CodeExecutorAgent`,该代理无需其他代理帮助即可自行生成代码,并在 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` 中执行 .. code-block:: python import asyncio from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import CodeExecutorAgent from autogen_agentchat.conditions import TextMessageTermination from autogen_agentchat.ui import Console termination_condition = TextMessageTermination("code_executor_agent") async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") # 定义 Docker CLI 代码执行器 code_executor = DockerCommandLineCodeExecutor(work_dir="coding") # 启动执行容器 await code_executor.start() code_executor_agent = CodeExecutorAgent( "code_executor_agent", code_executor=code_executor, model_client=model_client ) task = "Write python code to print Hello World!" await Console(code_executor_agent.run_stream(task=task)) # 停止执行容器 await code_executor.stop() asyncio.run(main()) .. code-block:: text ---------- user ---------- Write python code to print Hello World! ---------- code_executor_agent ---------- Certainly! Here is a simple Python code to print "Hello World!" to the console: ```python print("Hello World!") ``` Let's execute it to confirm the output. ---------- code_executor_agent ---------- Hello World! ---------- code_executor_agent ---------- The code has been executed successfully, and it printed "Hello World!" as expected. If you have any more requests or questions, feel free to ask! """ DEFAULT_TERMINAL_DESCRIPTION = "A computer terminal that performs no other action than running Python scripts (provided to it quoted in ```python code blocks), or sh shell scripts (provided to it quoted in ```sh code blocks)." DEFAULT_AGENT_DESCRIPTION = "A Code Execution Agent that generates and executes Python and shell scripts based on user instructions. It ensures correctness, efficiency, and minimal errors while gracefully handling edge cases." DEFAULT_SYSTEM_MESSAGE = "You are a Code Execution Agent. Your role is to generate and execute Python code and shell scripts based on user instructions, ensuring correctness, efficiency, and minimal errors. Handle edge cases gracefully. Python code should be provided in ```python code blocks, and sh shell scripts should be provided in ```sh code blocks for execution." NO_CODE_BLOCKS_FOUND_MESSAGE = "No code blocks found in the thread. Please provide at least one markdown-encoded code block to execute (i.e., quoting code in ```python or ```sh code blocks)." component_config_schema = CodeExecutorAgentConfig component_provider_override = "autogen_agentchat.agents.CodeExecutorAgent" def __init__( self, name: str, code_executor: CodeExecutor, *, model_client: ChatCompletionClient | None = None, model_context: ChatCompletionContext | None = None, model_client_stream: bool = False, max_retries_on_error: int = 0, description: str | None = None, system_message: str | None = DEFAULT_SYSTEM_MESSAGE, sources: Sequence[str] | None = None, ) -> None: if description is None: if model_client is None: description = CodeExecutorAgent.DEFAULT_TERMINAL_DESCRIPTION else: description = CodeExecutorAgent.DEFAULT_AGENT_DESCRIPTION super().__init__(name=name, description=description) self._code_executor = code_executor self._sources = sources self._model_client_stream = model_client_stream self._max_retries_on_error = max_retries_on_error self._model_client = None if model_client is not None: self._model_client = model_client if model_context is not None: self._model_context = model_context else: self._model_context = UnboundedChatCompletionContext() self._system_messaages: List[SystemMessage] = [] if system_message is None: self._system_messages = [] else: self._system_messages = [SystemMessage(content=system_message)] if self._max_retries_on_error > 0: if not self._model_client or not self._model_client.model_info: raise ValueError("model_client.model_info must be provided when max_retries_on_error > 0") if not self._model_client.model_info["structured_output"]: raise ValueError("Specified model_client doesn't support structured output mode.") @property def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: """代码执行器代理产生的消息类型。""" return (TextMessage,) @property def model_context(self) -> ChatCompletionContext: """ 代理正在使用的模型上下文。 """ return self._model_context
[文档] async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response: async for message in self.on_messages_stream(messages, cancellation_token): if isinstance(message, Response): return message raise AssertionError("The stream should have returned the final result.")
[文档] async def on_messages_stream( self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: """ 使用助手代理处理传入消息,并在事件/响应发生时生成它们。 """ # Gather all relevant state here agent_name = self.name model_context = self._model_context system_messages = self._system_messages model_client = self._model_client model_client_stream = self._model_client_stream max_retries_on_error = self._max_retries_on_error execution_result: CodeResult | None = None if model_client is None: # default behaviour for backward compatibility # execute generated code if present code_blocks: List[CodeBlock] = await self.extract_code_blocks_from_messages(messages) if not code_blocks: yield Response( chat_message=TextMessage( content=self.NO_CODE_BLOCKS_FOUND_MESSAGE, source=agent_name, ) ) return execution_result = await self.execute_code_block(code_blocks, cancellation_token) yield Response(chat_message=TextMessage(content=execution_result.output, source=self.name)) return inner_messages: List[BaseAgentEvent | BaseChatMessage] = [] for nth_try in range(max_retries_on_error + 1): # Do one default generation, execution and inference loop # Step 1: Add new user/handoff messages to the model context await self._add_messages_to_context( model_context=model_context, messages=messages, ) # Step 2: Run inference with the model context model_result = None async for inference_output in self._call_llm( model_client=model_client, model_client_stream=model_client_stream, system_messages=system_messages, model_context=model_context, agent_name=agent_name, cancellation_token=cancellation_token, ): if isinstance(inference_output, CreateResult): model_result = inference_output else: # Streaming chunk event yield inference_output assert model_result is not None, "No model result was produced." # Step 3: [NEW] If the model produced a hidden "thought," yield it as an event if model_result.thought: thought_event = ThoughtEvent(content=model_result.thought, source=agent_name) yield thought_event inner_messages.append(thought_event) # Step 4: Add the assistant message to the model context (including thought if present) await model_context.add_message( AssistantMessage( content=model_result.content, source=agent_name, thought=getattr(model_result, "thought", None), ) ) # Step 5: Extract the code blocks from inferred text assert isinstance(model_result.content, str), "Expected inferred model_result.content to be of type str." code_blocks = self._extract_markdown_code_blocks(str(model_result.content)) # Step 6: Exit the loop if no code blocks found if not code_blocks: yield Response( chat_message=TextMessage( content=str(model_result.content), source=agent_name, ) ) return # Step 7: Yield a CodeGenerationEvent inferred_text_message: CodeGenerationEvent = CodeGenerationEvent( retry_attempt=nth_try, content=model_result.content, code_blocks=code_blocks, source=agent_name, ) yield inferred_text_message # Step 8: Execute the extracted code blocks execution_result = await self.execute_code_block(inferred_text_message.code_blocks, cancellation_token) # Step 9: Update model context with the code execution result await model_context.add_message( UserMessage( content=execution_result.output, source=agent_name, ) ) # Step 10: Yield a CodeExecutionEvent yield CodeExecutionEvent(retry_attempt=nth_try, result=execution_result, source=self.name) # If execution was successful or last retry, then exit if execution_result.exit_code == 0 or nth_try == max_retries_on_error: break # Step 11: If exit code is non-zero and retries are available then # make an inference asking if we should retry or not chat_context = await model_context.get_messages() retry_prompt = ( f"The most recent code execution resulted in an error:\n{execution_result.output}\n\n" "Should we attempt to resolve it? Please respond with:\n" "- A boolean value for 'retry' indicating whether it should be retried.\n" "- A detailed explanation in 'reason' that identifies the issue, justifies your decision to retry or not, and outlines how you would resolve the error if a retry is attempted." ) chat_context = chat_context + [ UserMessage( content=retry_prompt, source=agent_name, ) ] response = await model_client.create(messages=chat_context, json_output=RetryDecision) assert isinstance( response.content, str ), "Expected structured response for retry decision to be of type str." should_retry_generation = RetryDecision.model_validate_json(str(response.content)) # Exit if no-retry is needed if not should_retry_generation.retry: break yield CodeGenerationEvent( retry_attempt=nth_try, content=f"Attempt number: {nth_try + 1}\nProposed correction: {should_retry_generation.reason}", code_blocks=[], source=agent_name, ) # Always reflect on the execution result async for reflection_response in CodeExecutorAgent._reflect_on_code_block_results_flow( system_messages=system_messages, model_client=model_client, model_client_stream=model_client_stream, model_context=model_context, agent_name=agent_name, inner_messages=inner_messages, ): yield reflection_response # Last reflection_response is of type Response so it will finish the routine
[文档] async def extract_code_blocks_from_messages(self, messages: Sequence[BaseChatMessage]) -> List[CodeBlock]: # Extract code blocks from the messages. code_blocks: List[CodeBlock] = [] for msg in messages: if self._sources is None or msg.source in self._sources: if isinstance(msg, TextMessage): code_blocks.extend(self._extract_markdown_code_blocks(msg.content)) # TODO: handle other message types if needed return code_blocks
[文档] async def execute_code_block( self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken ) -> CodeResult: # Execute the code blocks. result = await self._code_executor.execute_code_blocks(code_blocks, cancellation_token=cancellation_token) if result.output.strip() == "": # No output result.output = f"The script ran but produced no output to console. The POSIX exit code was: {result.exit_code}. If you were expecting output, consider revising the script to ensure content is printed to stdout." elif result.exit_code != 0: # Error result.output = f"The script ran, then exited with an error (POSIX exit code: {result.exit_code})\nIts output was:\n{result.output}" return result
[文档] async def on_reset(self, cancellation_token: CancellationToken) -> None: """这是一个空操作,因为代码执行代理没有可变状态。""" pass
def _extract_markdown_code_blocks(self, markdown_text: str) -> List[CodeBlock]: pattern = re.compile(r"```(?:\s*([\w\+\-]+))?\n([\s\S]*?)```") matches = pattern.findall(markdown_text) code_blocks: List[CodeBlock] = [] for match in matches: language = match[0].strip() if match[0] else "" code_content = match[1] code_blocks.append(CodeBlock(code=code_content, language=language)) return code_blocks
[文档] def _to_config(self) -> CodeExecutorAgentConfig: return CodeExecutorAgentConfig( name=self.name, model_client=(self._model_client.dump_component() if self._model_client is not None else None), code_executor=self._code_executor.dump_component(), description=self.description, sources=list(self._sources) if self._sources is not None else None, system_message=( self._system_messages[0].content if self._system_messages and isinstance(self._system_messages[0].content, str) else None ), model_client_stream=self._model_client_stream, model_context=self._model_context.dump_component(), )
[文档] @classmethod def _from_config(cls, config: CodeExecutorAgentConfig) -> Self: return cls( name=config.name, model_client=( ChatCompletionClient.load_component(config.model_client) if config.model_client is not None else None ), code_executor=CodeExecutor.load_component(config.code_executor), description=config.description, sources=config.sources, system_message=config.system_message, model_client_stream=config.model_client_stream, model_context=ChatCompletionContext.load_component(config.model_context) if config.model_context else None, )
@staticmethod def _get_compatible_context(model_client: ChatCompletionClient, messages: List[LLMMessage]) -> Sequence[LLMMessage]: """确保消息与底层客户端兼容,必要时移除图像。""" if model_client.model_info["vision"]: return messages else: return remove_images(messages) @classmethod async def _call_llm( cls, model_client: ChatCompletionClient, model_client_stream: bool, system_messages: List[SystemMessage], model_context: ChatCompletionContext, agent_name: str, cancellation_token: CancellationToken, ) -> AsyncGenerator[Union[CreateResult, ModelClientStreamingChunkEvent], None]: """ 执行模型推理并生成流式分块事件或最终的CreateResult。 """ all_messages = await model_context.get_messages() llm_messages = cls._get_compatible_context(model_client=model_client, messages=system_messages + all_messages) if model_client_stream: model_result: Optional[CreateResult] = None async for chunk in model_client.create_stream( llm_messages, tools=[], cancellation_token=cancellation_token ): if isinstance(chunk, CreateResult): model_result = chunk elif isinstance(chunk, str): yield ModelClientStreamingChunkEvent(content=chunk, source=agent_name) else: raise RuntimeError(f"Invalid chunk type: {type(chunk)}") if model_result is None: raise RuntimeError("No final model result in streaming mode.") yield model_result else: model_result = await model_client.create(llm_messages, tools=[], cancellation_token=cancellation_token) yield model_result @staticmethod async def _add_messages_to_context( model_context: ChatCompletionContext, messages: Sequence[BaseChatMessage], ) -> None: """ 将传入消息添加到模型上下文中。 """ for msg in messages: if isinstance(msg, HandoffMessage): for llm_msg in msg.context: await model_context.add_message(llm_msg) await model_context.add_message(msg.to_model_message()) @classmethod async def _reflect_on_code_block_results_flow( cls, system_messages: List[SystemMessage], model_client: ChatCompletionClient, model_client_stream: bool, model_context: ChatCompletionContext, agent_name: str, inner_messages: List[BaseAgentEvent | BaseChatMessage], ) -> AsyncGenerator[Response | ModelClientStreamingChunkEvent | ThoughtEvent, None]: """ 如果 reflect_on_code_block_results=True,我们会基于工具结果进行另一次推理 并生成最终的文本响应(或流式数据块)。 """ all_messages = system_messages + await model_context.get_messages() llm_messages = cls._get_compatible_context(model_client=model_client, messages=all_messages) reflection_result: Optional[CreateResult] = None if model_client_stream: async for chunk in model_client.create_stream(llm_messages): if isinstance(chunk, CreateResult): reflection_result = chunk elif isinstance(chunk, str): yield ModelClientStreamingChunkEvent(content=chunk, source=agent_name) else: raise RuntimeError(f"Invalid chunk type: {type(chunk)}") else: reflection_result = await model_client.create(llm_messages) if not reflection_result or not isinstance(reflection_result.content, str): raise RuntimeError("Reflect on tool use produced no valid text response.") # --- NEW: If the reflection produced a thought, yield it --- if reflection_result.thought: thought_event = ThoughtEvent(content=reflection_result.thought, source=agent_name) yield thought_event inner_messages.append(thought_event) # Add to context (including thought if present) await model_context.add_message( AssistantMessage( content=reflection_result.content, source=agent_name, thought=getattr(reflection_result, "thought", None), ) ) yield Response( chat_message=TextMessage( content=reflection_result.content, source=agent_name, models_usage=reflection_result.usage, ), inner_messages=inner_messages, )