autogen_core._cache_store 源代码
from abc import ABC, abstractmethod
from typing import Dict, Generic, Optional, TypeVar
from pydantic import BaseModel
from typing_extensions import Self
from ._component_config import Component, ComponentBase
T = TypeVar("T")
[文档]
class CacheStore(ABC, Generic[T], ComponentBase[BaseModel]):
"""
该协议定义了存储/缓存操作的基本接口。
子类应处理底层存储的生命周期。
"""
component_type = "cache_store"
[文档]
@abstractmethod
def get(self, key: str, default: Optional[T] = None) -> Optional[T]:
"""
从存储中检索一个项目。
Args:
key: 标识存储中项目的键。
default (optional): 如果未找到键时返回的默认值。
默认为 None。
Returns:
如果找到则返回与键关联的值,否则返回默认值。
"""
...
[文档]
@abstractmethod
def set(self, key: str, value: T) -> None:
"""
在存储中设置一个项目。
Args:
key: 用于存储项目的键。
value: 要存储在存储中的值。
"""
...
class InMemoryStoreConfig(BaseModel):
pass
[文档]
class InMemoryStore(CacheStore[T], Component[InMemoryStoreConfig]):
component_provider_override = "autogen_core.InMemoryStore"
component_config_schema = InMemoryStoreConfig
def __init__(self) -> None:
self.store: Dict[str, T] = {}
[文档]
def get(self, key: str, default: Optional[T] = None) -> Optional[T]:
return self.store.get(key, default)
[文档]
def set(self, key: str, value: T) -> None:
self.store[key] = value
[文档]
def _to_config(self) -> InMemoryStoreConfig:
return InMemoryStoreConfig()
[文档]
@classmethod
def _from_config(cls, config: InMemoryStoreConfig) -> Self:
return cls()