autogen_core._cancellation_token 源代码
import threading
from asyncio import Future
from typing import Any, Callable, List
[文档]
class CancellationToken:
"""用于取消待处理异步调用的令牌"""
def __init__(self) -> None:
self._cancelled: bool = False
self._lock: threading.Lock = threading.Lock()
self._callbacks: List[Callable[[], None]] = []
[文档]
def cancel(self) -> None:
"""取消与此取消令牌关联的待处理异步调用"""
with self._lock:
if not self._cancelled:
self._cancelled = True
for callback in self._callbacks:
callback()
[文档]
def is_cancelled(self) -> bool:
"""检查 CancellationToken 是否已被使用"""
with self._lock:
return self._cancelled
[文档]
def add_callback(self, callback: Callable[[], None]) -> None:
"""附加一个回调函数,当调用取消操作时将被触发"""
with self._lock:
if self._cancelled:
callback()
else:
self._callbacks.append(callback)
[文档]
def link_future(self, future: Future[Any]) -> Future[Any]:
"""将待处理的异步调用与令牌关联,以允许其被取消"""
with self._lock:
if self._cancelled:
future.cancel()
else:
def _cancel() -> None:
future.cancel()
self._callbacks.append(_cancel)
return future