autogen_agentchat.base._task 源代码
from typing import AsyncGenerator, Protocol, Sequence
from autogen_core import CancellationToken
from pydantic import BaseModel
from ..messages import BaseAgentEvent, BaseChatMessage
[文档]
class TaskResult(BaseModel):
"""运行任务的结果。"""
messages: Sequence[BaseAgentEvent | BaseChatMessage]
"""任务产生的消息。"""
stop_reason: str | None = None
"""任务停止的原因。"""
[文档]
class TaskRunner(Protocol):
"""一个任务运行器。"""
[文档]
async def run(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> TaskResult:
"""运行任务并返回结果。
任务可以是字符串、单条消息或消息序列。
运行器是有状态的,后续调用此方法将从上次调用的中断处继续执行。
如果未指定任务,运行器将继续执行当前任务。"""
...
[文档]
def run_stream(
self,
*,
task: str | BaseChatMessage | Sequence[BaseChatMessage] | None = None,
cancellation_token: CancellationToken | None = None,
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None]:
"""运行任务并生成消息流,最终结果以 :class:`TaskResult` 作为流中的最后一项。
任务可以是字符串、单条消息或消息序列。
运行器是有状态的,后续调用此方法将从上次调用的中断处继续执行。
如果未指定任务,运行器将继续执行当前任务。"""
...