autogen_core._cache_store 源代码

from abc import ABC, abstractmethod
from typing import Dict, Generic, Optional, TypeVar

from pydantic import BaseModel
from typing_extensions import Self

from ._component_config import Component, ComponentBase

T = TypeVar("T")


[文档] class CacheStore(ABC, Generic[T], ComponentBase[BaseModel]): """ 该协议定义了存储/缓存操作的基本接口。 子类应处理底层存储的生命周期。 """ component_type = "cache_store"
[文档] @abstractmethod def get(self, key: str, default: Optional[T] = None) -> Optional[T]: """ 从存储中检索一个项目。 Args: key: 标识存储中项目的键。 default (optional): 如果未找到键时返回的默认值。 默认为 None。 Returns: 如果找到则返回与键关联的值,否则返回默认值。 """ ...
[文档] @abstractmethod def set(self, key: str, value: T) -> None: """ 在存储中设置一个项目。 Args: key: 用于存储项目的键。 value: 要存储在存储中的值。 """ ...
class InMemoryStoreConfig(BaseModel): pass
[文档] class InMemoryStore(CacheStore[T], Component[InMemoryStoreConfig]): component_provider_override = "autogen_core.InMemoryStore" component_config_schema = InMemoryStoreConfig def __init__(self) -> None: self.store: Dict[str, T] = {}
[文档] def get(self, key: str, default: Optional[T] = None) -> Optional[T]: return self.store.get(key, default)
[文档] def set(self, key: str, value: T) -> None: self.store[key] = value
[文档] def _to_config(self) -> InMemoryStoreConfig: return InMemoryStoreConfig()
[文档] @classmethod def _from_config(cls, config: InMemoryStoreConfig) -> Self: return cls()