autogen_core._cancellation_token 源代码

import threading
from asyncio import Future
from typing import Any, Callable, List


[文档] class CancellationToken: """用于取消待处理异步调用的令牌""" def __init__(self) -> None: self._cancelled: bool = False self._lock: threading.Lock = threading.Lock() self._callbacks: List[Callable[[], None]] = []
[文档] def cancel(self) -> None: """取消与此取消令牌关联的待处理异步调用""" with self._lock: if not self._cancelled: self._cancelled = True for callback in self._callbacks: callback()
[文档] def is_cancelled(self) -> bool: """检查 CancellationToken 是否已被使用""" with self._lock: return self._cancelled
[文档] def add_callback(self, callback: Callable[[], None]) -> None: """附加一个回调函数,当调用取消操作时将被触发""" with self._lock: if self._cancelled: callback() else: self._callbacks.append(callback)