import logging
import re
from typing import (
AsyncGenerator,
List,
Optional,
Sequence,
Union,
)
from autogen_core import CancellationToken, Component, ComponentModel
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from autogen_core.model_context import (
ChatCompletionContext,
UnboundedChatCompletionContext,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
CreateResult,
LLMMessage,
SystemMessage,
UserMessage,
)
from pydantic import BaseModel
from typing_extensions import Self
from .. import EVENT_LOGGER_NAME
from ..base import Response
from ..messages import (
BaseAgentEvent,
BaseChatMessage,
CodeExecutionEvent,
CodeGenerationEvent,
HandoffMessage,
ModelClientStreamingChunkEvent,
TextMessage,
ThoughtEvent,
)
from ..utils import remove_images
from ._base_chat_agent import BaseChatAgent
event_logger = logging.getLogger(EVENT_LOGGER_NAME)
class CodeExecutorAgentConfig(BaseModel):
"""CodeExecutorAgent 的配置"""
name: str
code_executor: ComponentModel
model_client: ComponentModel | None = None
description: str | None = None
sources: List[str] | None = None
system_message: str | None = None
model_client_stream: bool = False
model_context: ComponentModel | None = None
class RetryDecision(BaseModel):
reason: str
retry: bool
[文档]
class CodeExecutorAgent(BaseChatAgent, Component[CodeExecutorAgentConfig]):
"""(实验性) 一个根据用户指令生成并执行代码片段的代理。
.. note::
该代理是实验性的,可能在未来的版本中发生变化。
通常与另一个生成待执行代码片段的代理在团队中配合使用,或单独使用(需提供 `model_client`)以便能够根据用户查询生成代码、执行代码并反思代码结果。
当与 `model_client` 配合使用时,它将使用模型生成代码片段并通过提供的 `code_executor` 执行。模型还会对代码执行结果进行反思。代理会将模型的最终反思结果作为最终响应返回。
当不配合 `model_client` 使用时,它仅执行在 :class:`~autogen_agentchat.messages.TextMessage` 消息中找到的代码块,并返回代码执行输出。
.. note::
使用 :class:`~autogen_agentchat.agents.AssistantAgent` 配合
:class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool`
可作为该代理的替代方案。但该代理的模型需要生成正确转义的代码字符串作为工具参数。
Args:
name (str): 代理名称
code_executor (CodeExecutor): 负责执行消息中接收到的代码的代码执行器
(推荐使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor`,参见下方示例)
model_client (ChatCompletionClient, optional): 用于推理和生成代码的模型客户端。
如未提供,代理将仅执行输入消息中的代码块。
当前模型必须支持结构化输出模式,这是自动重试机制工作的必要条件。
model_client_stream (bool, optional): 若为 `True`,模型客户端将以流模式运行。
:meth:`on_messages_stream` 和 :meth:`BaseChatAgent.run_stream` 方法
也会在模型客户端生成响应块时产出 :class:`~autogen_agentchat.messages.ModelClientStreamingChunkEvent`
消息。默认为 `False`。
description (str, optional): 代理描述。如未提供,
将使用 :class:`~autogen_agentchat.agents.CodeExecutorAgent.DEFAULT_AGENT_DESCRIPTION`。
system_message (str, optional): 模型的系统消息。如提供,将在推理时预置到模型上下文的消息中。设为 `None` 可禁用。
默认为 :class:`~autogen_agentchat.agents.CodeExecutorAgent.DEFAULT_SYSTEM_MESSAGE`。仅在提供 `model_client` 时使用。
sources (Sequence[str], optional): 仅检查指定代理的消息以执行代码。
当代理是群聊的一部分且您希望将代码执行限制在特定代理的消息时很有用。
如未提供,将检查所有消息中的代码块。
仅在未提供 `model_client` 时使用。
max_retries_on_error (int, optional): 出错时的最大重试次数。如果代码执行失败,代理将重试至多该次数。
如果代码执行在重试该次数后仍失败,代理将产出反思结果。
.. note::
建议 `CodeExecutorAgent` 代理使用 Docker 容器执行代码。这确保模型生成的代码在隔离环境中执行。要使用 Docker,您的环境必须安装并运行 Docker。
请遵循 `Docker <https://docs.docker.com/get-docker/>`_ 的安装说明。
.. note::
代码执行器仅处理使用三重反引号正确格式化的 Markdown 代码块。
例如:
.. code-block:: text
```python
print("Hello World")
```
# 或
```sh
echo "Hello World"
```
在此示例中,我们展示如何设置使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor`
在 Docker 容器中执行代码片段的 `CodeExecutorAgent` 代理。`work_dir` 参数表示所有执行文件在被 Docker 容器执行前首先保存的本地位置。
.. code-block:: python
import asyncio
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_core import CancellationToken
async def run_code_executor_agent() -> None:
# 创建使用 Docker 容器执行代码的代码执行器代理
code_executor = DockerCommandLineCodeExecutor(work_dir="coding")
await code_executor.start()
code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor)
# 使用给定代码片段运行代理
task = TextMessage(
content='''Here is some code
```python
print('Hello world')
```
''',
source="user",
)
response = await code_executor_agent.on_messages([task], CancellationToken())
print(response.chat_message)
# 停止代码执行器
await code_executor.stop()
asyncio.run(run_code_executor_agent())
在此示例中,我们展示如何设置使用 :py:class:`~docker.types.DeviceRequest` 向容器暴露 GPU 以执行 CUDA 加速代码的 `CodeExecutorAgent` 代理。
.. code-block:: python
import asyncio
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_core import CancellationToken
from docker.types import DeviceRequest
async def run_code_executor_agent() -> None:
# 创建使用 Docker 容器执行代码的代码执行器代理
code_executor = DockerCommandLineCodeExecutor(
work_dir="coding", device_requests=[DeviceRequest(count=-1, capabilities=[["gpu"]])]
)
await code_executor.start()
code_executor_agent = CodeExecutorAgent("code_executor", code_executor=code_executor)
# 显示 GPU 信息
task = TextMessage(
content='''Here is some code
```bash
nvidia-smi
```
''',
source="user",
)
response = await code_executor_agent.on_messages([task], CancellationToken())
print(response.chat_message)
# 停止代码执行器
await code_executor.stop()
asyncio.run(run_code_executor_agent())
在以下示例中,我们展示如何设置不带 `model_client` 参数的 `CodeExecutorAgent`,用于使用 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` 执行群聊中其他代理生成的代码块
.. code-block:: python
import asyncio
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
termination_condition = MaxMessageTermination(3)
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 定义 Docker CLI 代码执行器
code_executor = DockerCommandLineCodeExecutor(work_dir="coding")
# 启动执行容器
await code_executor.start()
code_executor_agent = CodeExecutorAgent("code_executor_agent", code_executor=code_executor)
coder_agent = AssistantAgent("coder_agent", model_client=model_client)
groupchat = RoundRobinGroupChat(
participants=[coder_agent, code_executor_agent], termination_condition=termination_condition
)
task = "Write python code to print Hello World!"
await Console(groupchat.run_stream(task=task))
# 停止执行容器
await code_executor.stop()
asyncio.run(main())
.. code-block:: text
---------- user ----------
Write python code to print Hello World!
---------- coder_agent ----------
Certainly! Here's a simple Python code to print "Hello World!":
```python
print("Hello World!")
```
You can run this code in any Python environment to display the message.
---------- code_executor_agent ----------
Hello World!
在以下示例中,我们展示如何设置带 `model_client` 的 `CodeExecutorAgent`,该代理无需其他代理帮助即可自行生成代码,并在 :py:class:`~autogen_ext.code_executors.docker.DockerCommandLineCodeExecutor` 中执行
.. code-block:: python
import asyncio
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.conditions import TextMessageTermination
from autogen_agentchat.ui import Console
termination_condition = TextMessageTermination("code_executor_agent")
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 定义 Docker CLI 代码执行器
code_executor = DockerCommandLineCodeExecutor(work_dir="coding")
# 启动执行容器
await code_executor.start()
code_executor_agent = CodeExecutorAgent(
"code_executor_agent", code_executor=code_executor, model_client=model_client
)
task = "Write python code to print Hello World!"
await Console(code_executor_agent.run_stream(task=task))
# 停止执行容器
await code_executor.stop()
asyncio.run(main())
.. code-block:: text
---------- user ----------
Write python code to print Hello World!
---------- code_executor_agent ----------
Certainly! Here is a simple Python code to print "Hello World!" to the console:
```python
print("Hello World!")
```
Let's execute it to confirm the output.
---------- code_executor_agent ----------
Hello World!
---------- code_executor_agent ----------
The code has been executed successfully, and it printed "Hello World!" as expected. If you have any more requests or questions, feel free to ask!
"""
DEFAULT_TERMINAL_DESCRIPTION = "A computer terminal that performs no other action than running Python scripts (provided to it quoted in ```python code blocks), or sh shell scripts (provided to it quoted in ```sh code blocks)."
DEFAULT_AGENT_DESCRIPTION = "A Code Execution Agent that generates and executes Python and shell scripts based on user instructions. It ensures correctness, efficiency, and minimal errors while gracefully handling edge cases."
DEFAULT_SYSTEM_MESSAGE = "You are a Code Execution Agent. Your role is to generate and execute Python code and shell scripts based on user instructions, ensuring correctness, efficiency, and minimal errors. Handle edge cases gracefully. Python code should be provided in ```python code blocks, and sh shell scripts should be provided in ```sh code blocks for execution."
NO_CODE_BLOCKS_FOUND_MESSAGE = "No code blocks found in the thread. Please provide at least one markdown-encoded code block to execute (i.e., quoting code in ```python or ```sh code blocks)."
component_config_schema = CodeExecutorAgentConfig
component_provider_override = "autogen_agentchat.agents.CodeExecutorAgent"
def __init__(
self,
name: str,
code_executor: CodeExecutor,
*,
model_client: ChatCompletionClient | None = None,
model_context: ChatCompletionContext | None = None,
model_client_stream: bool = False,
max_retries_on_error: int = 0,
description: str | None = None,
system_message: str | None = DEFAULT_SYSTEM_MESSAGE,
sources: Sequence[str] | None = None,
) -> None:
if description is None:
if model_client is None:
description = CodeExecutorAgent.DEFAULT_TERMINAL_DESCRIPTION
else:
description = CodeExecutorAgent.DEFAULT_AGENT_DESCRIPTION
super().__init__(name=name, description=description)
self._code_executor = code_executor
self._sources = sources
self._model_client_stream = model_client_stream
self._max_retries_on_error = max_retries_on_error
self._model_client = None
if model_client is not None:
self._model_client = model_client
if model_context is not None:
self._model_context = model_context
else:
self._model_context = UnboundedChatCompletionContext()
self._system_messaages: List[SystemMessage] = []
if system_message is None:
self._system_messages = []
else:
self._system_messages = [SystemMessage(content=system_message)]
if self._max_retries_on_error > 0:
if not self._model_client or not self._model_client.model_info:
raise ValueError("model_client.model_info must be provided when max_retries_on_error > 0")
if not self._model_client.model_info["structured_output"]:
raise ValueError("Specified model_client doesn't support structured output mode.")
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
"""代码执行器代理产生的消息类型。"""
return (TextMessage,)
@property
def model_context(self) -> ChatCompletionContext:
"""
代理正在使用的模型上下文。
"""
return self._model_context
[文档]
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
return message
raise AssertionError("The stream should have returned the final result.")
[文档]
async def on_messages_stream(
self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
"""
使用助手代理处理传入消息,并在事件/响应发生时生成它们。
"""
# Gather all relevant state here
agent_name = self.name
model_context = self._model_context
system_messages = self._system_messages
model_client = self._model_client
model_client_stream = self._model_client_stream
max_retries_on_error = self._max_retries_on_error
execution_result: CodeResult | None = None
if model_client is None: # default behaviour for backward compatibility
# execute generated code if present
code_blocks: List[CodeBlock] = await self.extract_code_blocks_from_messages(messages)
if not code_blocks:
yield Response(
chat_message=TextMessage(
content=self.NO_CODE_BLOCKS_FOUND_MESSAGE,
source=agent_name,
)
)
return
execution_result = await self.execute_code_block(code_blocks, cancellation_token)
yield Response(chat_message=TextMessage(content=execution_result.output, source=self.name))
return
inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
for nth_try in range(max_retries_on_error + 1): # Do one default generation, execution and inference loop
# Step 1: Add new user/handoff messages to the model context
await self._add_messages_to_context(
model_context=model_context,
messages=messages,
)
# Step 2: Run inference with the model context
model_result = None
async for inference_output in self._call_llm(
model_client=model_client,
model_client_stream=model_client_stream,
system_messages=system_messages,
model_context=model_context,
agent_name=agent_name,
cancellation_token=cancellation_token,
):
if isinstance(inference_output, CreateResult):
model_result = inference_output
else:
# Streaming chunk event
yield inference_output
assert model_result is not None, "No model result was produced."
# Step 3: [NEW] If the model produced a hidden "thought," yield it as an event
if model_result.thought:
thought_event = ThoughtEvent(content=model_result.thought, source=agent_name)
yield thought_event
inner_messages.append(thought_event)
# Step 4: Add the assistant message to the model context (including thought if present)
await model_context.add_message(
AssistantMessage(
content=model_result.content,
source=agent_name,
thought=getattr(model_result, "thought", None),
)
)
# Step 5: Extract the code blocks from inferred text
assert isinstance(model_result.content, str), "Expected inferred model_result.content to be of type str."
code_blocks = self._extract_markdown_code_blocks(str(model_result.content))
# Step 6: Exit the loop if no code blocks found
if not code_blocks:
yield Response(
chat_message=TextMessage(
content=str(model_result.content),
source=agent_name,
)
)
return
# Step 7: Yield a CodeGenerationEvent
inferred_text_message: CodeGenerationEvent = CodeGenerationEvent(
retry_attempt=nth_try,
content=model_result.content,
code_blocks=code_blocks,
source=agent_name,
)
yield inferred_text_message
# Step 8: Execute the extracted code blocks
execution_result = await self.execute_code_block(inferred_text_message.code_blocks, cancellation_token)
# Step 9: Update model context with the code execution result
await model_context.add_message(
UserMessage(
content=execution_result.output,
source=agent_name,
)
)
# Step 10: Yield a CodeExecutionEvent
yield CodeExecutionEvent(retry_attempt=nth_try, result=execution_result, source=self.name)
# If execution was successful or last retry, then exit
if execution_result.exit_code == 0 or nth_try == max_retries_on_error:
break
# Step 11: If exit code is non-zero and retries are available then
# make an inference asking if we should retry or not
chat_context = await model_context.get_messages()
retry_prompt = (
f"The most recent code execution resulted in an error:\n{execution_result.output}\n\n"
"Should we attempt to resolve it? Please respond with:\n"
"- A boolean value for 'retry' indicating whether it should be retried.\n"
"- A detailed explanation in 'reason' that identifies the issue, justifies your decision to retry or not, and outlines how you would resolve the error if a retry is attempted."
)
chat_context = chat_context + [
UserMessage(
content=retry_prompt,
source=agent_name,
)
]
response = await model_client.create(messages=chat_context, json_output=RetryDecision)
assert isinstance(
response.content, str
), "Expected structured response for retry decision to be of type str."
should_retry_generation = RetryDecision.model_validate_json(str(response.content))
# Exit if no-retry is needed
if not should_retry_generation.retry:
break
yield CodeGenerationEvent(
retry_attempt=nth_try,
content=f"Attempt number: {nth_try + 1}\nProposed correction: {should_retry_generation.reason}",
code_blocks=[],
source=agent_name,
)
# Always reflect on the execution result
async for reflection_response in CodeExecutorAgent._reflect_on_code_block_results_flow(
system_messages=system_messages,
model_client=model_client,
model_client_stream=model_client_stream,
model_context=model_context,
agent_name=agent_name,
inner_messages=inner_messages,
):
yield reflection_response # Last reflection_response is of type Response so it will finish the routine
[文档]
async def execute_code_block(
self, code_blocks: List[CodeBlock], cancellation_token: CancellationToken
) -> CodeResult:
# Execute the code blocks.
result = await self._code_executor.execute_code_blocks(code_blocks, cancellation_token=cancellation_token)
if result.output.strip() == "":
# No output
result.output = f"The script ran but produced no output to console. The POSIX exit code was: {result.exit_code}. If you were expecting output, consider revising the script to ensure content is printed to stdout."
elif result.exit_code != 0:
# Error
result.output = f"The script ran, then exited with an error (POSIX exit code: {result.exit_code})\nIts output was:\n{result.output}"
return result
[文档]
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""这是一个空操作,因为代码执行代理没有可变状态。"""
pass
def _extract_markdown_code_blocks(self, markdown_text: str) -> List[CodeBlock]:
pattern = re.compile(r"```(?:\s*([\w\+\-]+))?\n([\s\S]*?)```")
matches = pattern.findall(markdown_text)
code_blocks: List[CodeBlock] = []
for match in matches:
language = match[0].strip() if match[0] else ""
code_content = match[1]
code_blocks.append(CodeBlock(code=code_content, language=language))
return code_blocks
[文档]
def _to_config(self) -> CodeExecutorAgentConfig:
return CodeExecutorAgentConfig(
name=self.name,
model_client=(self._model_client.dump_component() if self._model_client is not None else None),
code_executor=self._code_executor.dump_component(),
description=self.description,
sources=list(self._sources) if self._sources is not None else None,
system_message=(
self._system_messages[0].content
if self._system_messages and isinstance(self._system_messages[0].content, str)
else None
),
model_client_stream=self._model_client_stream,
model_context=self._model_context.dump_component(),
)
[文档]
@classmethod
def _from_config(cls, config: CodeExecutorAgentConfig) -> Self:
return cls(
name=config.name,
model_client=(
ChatCompletionClient.load_component(config.model_client) if config.model_client is not None else None
),
code_executor=CodeExecutor.load_component(config.code_executor),
description=config.description,
sources=config.sources,
system_message=config.system_message,
model_client_stream=config.model_client_stream,
model_context=ChatCompletionContext.load_component(config.model_context) if config.model_context else None,
)
@staticmethod
def _get_compatible_context(model_client: ChatCompletionClient, messages: List[LLMMessage]) -> Sequence[LLMMessage]:
"""确保消息与底层客户端兼容,必要时移除图像。"""
if model_client.model_info["vision"]:
return messages
else:
return remove_images(messages)
@classmethod
async def _call_llm(
cls,
model_client: ChatCompletionClient,
model_client_stream: bool,
system_messages: List[SystemMessage],
model_context: ChatCompletionContext,
agent_name: str,
cancellation_token: CancellationToken,
) -> AsyncGenerator[Union[CreateResult, ModelClientStreamingChunkEvent], None]:
"""
执行模型推理并生成流式分块事件或最终的CreateResult。
"""
all_messages = await model_context.get_messages()
llm_messages = cls._get_compatible_context(model_client=model_client, messages=system_messages + all_messages)
if model_client_stream:
model_result: Optional[CreateResult] = None
async for chunk in model_client.create_stream(
llm_messages, tools=[], cancellation_token=cancellation_token
):
if isinstance(chunk, CreateResult):
model_result = chunk
elif isinstance(chunk, str):
yield ModelClientStreamingChunkEvent(content=chunk, source=agent_name)
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
if model_result is None:
raise RuntimeError("No final model result in streaming mode.")
yield model_result
else:
model_result = await model_client.create(llm_messages, tools=[], cancellation_token=cancellation_token)
yield model_result
@staticmethod
async def _add_messages_to_context(
model_context: ChatCompletionContext,
messages: Sequence[BaseChatMessage],
) -> None:
"""
将传入消息添加到模型上下文中。
"""
for msg in messages:
if isinstance(msg, HandoffMessage):
for llm_msg in msg.context:
await model_context.add_message(llm_msg)
await model_context.add_message(msg.to_model_message())
@classmethod
async def _reflect_on_code_block_results_flow(
cls,
system_messages: List[SystemMessage],
model_client: ChatCompletionClient,
model_client_stream: bool,
model_context: ChatCompletionContext,
agent_name: str,
inner_messages: List[BaseAgentEvent | BaseChatMessage],
) -> AsyncGenerator[Response | ModelClientStreamingChunkEvent | ThoughtEvent, None]:
"""
如果 reflect_on_code_block_results=True,我们会基于工具结果进行另一次推理
并生成最终的文本响应(或流式数据块)。
"""
all_messages = system_messages + await model_context.get_messages()
llm_messages = cls._get_compatible_context(model_client=model_client, messages=all_messages)
reflection_result: Optional[CreateResult] = None
if model_client_stream:
async for chunk in model_client.create_stream(llm_messages):
if isinstance(chunk, CreateResult):
reflection_result = chunk
elif isinstance(chunk, str):
yield ModelClientStreamingChunkEvent(content=chunk, source=agent_name)
else:
raise RuntimeError(f"Invalid chunk type: {type(chunk)}")
else:
reflection_result = await model_client.create(llm_messages)
if not reflection_result or not isinstance(reflection_result.content, str):
raise RuntimeError("Reflect on tool use produced no valid text response.")
# --- NEW: If the reflection produced a thought, yield it ---
if reflection_result.thought:
thought_event = ThoughtEvent(content=reflection_result.thought, source=agent_name)
yield thought_event
inner_messages.append(thought_event)
# Add to context (including thought if present)
await model_context.add_message(
AssistantMessage(
content=reflection_result.content,
source=agent_name,
thought=getattr(reflection_result, "thought", None),
)
)
yield Response(
chat_message=TextMessage(
content=reflection_result.content,
source=agent_name,
models_usage=reflection_result.usage,
),
inner_messages=inner_messages,
)