Compare commits
13 Commits
develop
...
erikj/tree
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5de571987e | ||
|
|
129691f190 | ||
|
|
462db2a171 | ||
|
|
7f7b36d56d | ||
|
|
057ae8b61c | ||
|
|
7aceec3ed9 | ||
|
|
cad555f07c | ||
|
|
23c2f394a5 | ||
|
|
602a81f5a2 | ||
|
|
f046366d2a | ||
|
|
a22716c5c5 | ||
|
|
40a8fba5f6 | ||
|
|
326a175987 |
1
changelog.d/13292.misc
Normal file
1
changelog.d/13292.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Make `DictionaryCache` expire full entries if they haven't been queried in a while, even if specific keys have been queried recently.
|
||||||
@@ -202,7 +202,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
|||||||
requests state from the cache, if False we need to query the DB for the
|
requests state from the cache, if False we need to query the DB for the
|
||||||
missing state.
|
missing state.
|
||||||
"""
|
"""
|
||||||
cache_entry = cache.get(group)
|
# If we are asked explicitly for a subset of keys, we only ask for those
|
||||||
|
# from the cache. This ensures that the `DictionaryCache` can make
|
||||||
|
# better decisions about what to cache and what to expire.
|
||||||
|
dict_keys = None
|
||||||
|
if not state_filter.has_wildcards():
|
||||||
|
dict_keys = state_filter.concrete_types()
|
||||||
|
|
||||||
|
cache_entry = cache.get(group, dict_keys=dict_keys)
|
||||||
state_dict_ids = cache_entry.value
|
state_dict_ids = cache_entry.value
|
||||||
|
|
||||||
if cache_entry.full or state_filter.is_full():
|
if cache_entry.full or state_filter.is_full():
|
||||||
|
|||||||
@@ -14,11 +14,13 @@
|
|||||||
import enum
|
import enum
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from typing import Any, Dict, Generic, Iterable, Optional, Set, TypeVar
|
from typing import Any, Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
|
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_items
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -53,20 +55,67 @@ class DictionaryEntry: # should be: Generic[DKT, DV].
|
|||||||
return len(self.value)
|
return len(self.value)
|
||||||
|
|
||||||
|
|
||||||
|
class _FullCacheKey(enum.Enum):
|
||||||
|
"""The key we use to cache the full dict."""
|
||||||
|
|
||||||
|
KEY = object()
|
||||||
|
|
||||||
|
|
||||||
class _Sentinel(enum.Enum):
|
class _Sentinel(enum.Enum):
|
||||||
# defining a sentinel in this way allows mypy to correctly handle the
|
# defining a sentinel in this way allows mypy to correctly handle the
|
||||||
# type of a dictionary lookup.
|
# type of a dictionary lookup.
|
||||||
sentinel = object()
|
sentinel = object()
|
||||||
|
|
||||||
|
|
||||||
|
class _PerKeyValue(Generic[DV]):
|
||||||
|
"""The cached value of a dictionary key. If `value` is the sentinel,
|
||||||
|
indicates that the requested key is known to *not* be in the full dict.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ["value"]
|
||||||
|
|
||||||
|
def __init__(self, value: Union[DV, Literal[_Sentinel.sentinel]]) -> None:
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
# We add a `__len__` implementation as we use this class in a cache
|
||||||
|
# where the values are variable length.
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
class DictionaryCache(Generic[KT, DKT, DV]):
|
class DictionaryCache(Generic[KT, DKT, DV]):
|
||||||
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
|
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
|
||||||
fetching a subset of dictionary keys for a particular key.
|
fetching a subset of dictionary keys for a particular key.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name: str, max_entries: int = 1000):
|
def __init__(self, name: str, max_entries: int = 1000):
|
||||||
self.cache: LruCache[KT, DictionaryEntry] = LruCache(
|
# We use a single cache to cache two different types of entries:
|
||||||
max_size=max_entries, cache_name=name, size_callback=len
|
# 1. Map from (key, dict_key) -> dict value (or sentinel, indicating
|
||||||
|
# the key doesn't exist in the dict); and
|
||||||
|
# 2. Map from (key, _FullCacheKey.KEY) -> full dict.
|
||||||
|
#
|
||||||
|
# The former is used when explicit keys of the dictionary are looked up,
|
||||||
|
# and the latter when the full dictionary is requested.
|
||||||
|
#
|
||||||
|
# If when explicit keys are requested and not in the cache, we then look
|
||||||
|
# to see if we have the full dict and use that if we do. If found in the
|
||||||
|
# full dict each key is added into the cache.
|
||||||
|
#
|
||||||
|
# This set up allows the `LruCache` to prune the full dict entries if
|
||||||
|
# they haven't been used in a while, even when there have been recent
|
||||||
|
# queries for subsets of the dict.
|
||||||
|
#
|
||||||
|
# Typing:
|
||||||
|
# * A key of `(KT, DKT)` has a value of `_PerKeyValue`
|
||||||
|
# * A key of `(KT, _FullCacheKey.KEY)` has a value of `Dict[DKT, DV]`
|
||||||
|
self.cache: LruCache[
|
||||||
|
Tuple[KT, Union[DKT, Literal[_FullCacheKey.KEY]]],
|
||||||
|
Union[_PerKeyValue, Dict[DKT, DV]],
|
||||||
|
] = LruCache(
|
||||||
|
max_size=max_entries,
|
||||||
|
cache_name=name,
|
||||||
|
cache_type=TreeCache,
|
||||||
|
size_callback=len,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
@@ -96,28 +145,107 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
|||||||
Returns:
|
Returns:
|
||||||
DictionaryEntry
|
DictionaryEntry
|
||||||
"""
|
"""
|
||||||
entry = self.cache.get(key, _Sentinel.sentinel)
|
|
||||||
if entry is not _Sentinel.sentinel:
|
|
||||||
if dict_keys is None:
|
|
||||||
return DictionaryEntry(
|
|
||||||
entry.full, entry.known_absent, dict(entry.value)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return DictionaryEntry(
|
|
||||||
entry.full,
|
|
||||||
entry.known_absent,
|
|
||||||
{k: entry.value[k] for k in dict_keys if k in entry.value},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
if dict_keys is None:
|
||||||
|
# First we check if we have cached the full dict.
|
||||||
|
entry = self.cache.get((key, _FullCacheKey.KEY), _Sentinel.sentinel)
|
||||||
|
if entry is not _Sentinel.sentinel:
|
||||||
|
assert isinstance(entry, dict)
|
||||||
|
return DictionaryEntry(True, set(), entry)
|
||||||
|
|
||||||
|
# If not, check if we have cached any of dict keys.
|
||||||
|
all_entries = self.cache.get_multi(
|
||||||
|
(key,),
|
||||||
|
_Sentinel.sentinel,
|
||||||
|
)
|
||||||
|
if all_entries is _Sentinel.sentinel:
|
||||||
return DictionaryEntry(False, set(), {})
|
return DictionaryEntry(False, set(), {})
|
||||||
|
|
||||||
|
# If there are entries we need to unwrap the returned cache nodes
|
||||||
|
# and `_PerKeyValue` into the `DictionaryEntry`.
|
||||||
|
values = {}
|
||||||
|
known_absent = set()
|
||||||
|
for dict_key, dict_value in iterate_tree_cache_items((), all_entries):
|
||||||
|
dict_key = dict_key[0]
|
||||||
|
dict_value = dict_value.value
|
||||||
|
|
||||||
|
# We have explicitly looked for a full cache key, so we
|
||||||
|
# shouldn't see one.
|
||||||
|
assert dict_key != _FullCacheKey.KEY
|
||||||
|
|
||||||
|
# ... therefore the values must be `_PerKeyValue`
|
||||||
|
assert isinstance(dict_value, _PerKeyValue)
|
||||||
|
|
||||||
|
if dict_value.value is _Sentinel.sentinel:
|
||||||
|
known_absent.add(dict_key)
|
||||||
|
else:
|
||||||
|
values[dict_key] = dict_value.value
|
||||||
|
|
||||||
|
return DictionaryEntry(False, known_absent, values)
|
||||||
|
|
||||||
|
# We are being asked for a subset of keys.
|
||||||
|
|
||||||
|
# First got and check for each requested dict key in the cache, tracking
|
||||||
|
# which we couldn't find.
|
||||||
|
values = {}
|
||||||
|
known_absent = set()
|
||||||
|
missing = set()
|
||||||
|
for dict_key in dict_keys:
|
||||||
|
entry = self.cache.get((key, dict_key), _Sentinel.sentinel)
|
||||||
|
if entry is _Sentinel.sentinel:
|
||||||
|
missing.add(dict_key)
|
||||||
|
continue
|
||||||
|
|
||||||
|
assert isinstance(entry, _PerKeyValue)
|
||||||
|
|
||||||
|
if entry.value is _Sentinel.sentinel:
|
||||||
|
known_absent.add(dict_key)
|
||||||
|
else:
|
||||||
|
values[dict_key] = entry.value
|
||||||
|
|
||||||
|
# If we found everything we can return immediately.
|
||||||
|
if not missing:
|
||||||
|
return DictionaryEntry(False, known_absent, values)
|
||||||
|
|
||||||
|
# If we are missing any keys check if we happen to have the full dict in
|
||||||
|
# the cache.
|
||||||
|
#
|
||||||
|
# We don't update the last access time for this cache fetch, as we
|
||||||
|
# aren't explicitly interested in the full dict and so we don't want
|
||||||
|
# requests for explicit dict keys to keep the full dict in the cache.
|
||||||
|
entry = self.cache.get(
|
||||||
|
(key, _FullCacheKey.KEY),
|
||||||
|
_Sentinel.sentinel,
|
||||||
|
update_last_access=False,
|
||||||
|
)
|
||||||
|
if entry is _Sentinel.sentinel:
|
||||||
|
# Not in the cache, return the subset of keys we found.
|
||||||
|
return DictionaryEntry(False, known_absent, values)
|
||||||
|
|
||||||
|
# We have the full dict!
|
||||||
|
assert isinstance(entry, dict)
|
||||||
|
|
||||||
|
values = {}
|
||||||
|
for dict_key in dict_keys:
|
||||||
|
# We explicitly add each dict key to the cache, so that cache hit
|
||||||
|
# rates for each key can be tracked separately.
|
||||||
|
value = entry.get(dict_key, _Sentinel.sentinel) # type: ignore[arg-type]
|
||||||
|
self.cache[(key, dict_key)] = _PerKeyValue(value)
|
||||||
|
|
||||||
|
if value is not _Sentinel.sentinel:
|
||||||
|
values[dict_key] = value
|
||||||
|
|
||||||
|
return DictionaryEntry(True, set(), values)
|
||||||
|
|
||||||
def invalidate(self, key: KT) -> None:
|
def invalidate(self, key: KT) -> None:
|
||||||
self.check_thread()
|
self.check_thread()
|
||||||
|
|
||||||
# Increment the sequence number so that any SELECT statements that
|
# Increment the sequence number so that any SELECT statements that
|
||||||
# raced with the INSERT don't update the cache (SYN-369)
|
# raced with the INSERT don't update the cache (SYN-369)
|
||||||
self.sequence += 1
|
self.sequence += 1
|
||||||
self.cache.pop(key, None)
|
|
||||||
|
# Del-multi accepts truncated tuples.
|
||||||
|
self.cache.del_multi((key,)) # type: ignore[arg-type]
|
||||||
|
|
||||||
def invalidate_all(self) -> None:
|
def invalidate_all(self) -> None:
|
||||||
self.check_thread()
|
self.check_thread()
|
||||||
@@ -149,20 +277,27 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
|||||||
# Only update the cache if the caches sequence number matches the
|
# Only update the cache if the caches sequence number matches the
|
||||||
# number that the cache had before the SELECT was started (SYN-369)
|
# number that the cache had before the SELECT was started (SYN-369)
|
||||||
if fetched_keys is None:
|
if fetched_keys is None:
|
||||||
self._insert(key, value, set())
|
self.cache[(key, _FullCacheKey.KEY)] = value
|
||||||
else:
|
else:
|
||||||
self._update_or_insert(key, value, fetched_keys)
|
self._update_subset(key, value, fetched_keys)
|
||||||
|
|
||||||
def _update_or_insert(
|
def _update_subset(
|
||||||
self, key: KT, value: Dict[DKT, DV], known_absent: Iterable[DKT]
|
self, key: KT, value: Dict[DKT, DV], fetched_keys: Iterable[DKT]
|
||||||
) -> None:
|
) -> None:
|
||||||
# We pop and reinsert as we need to tell the cache the size may have
|
"""Add the given dictionary values as explicit keys in the cache.
|
||||||
# changed
|
|
||||||
|
|
||||||
entry: DictionaryEntry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
|
Args:
|
||||||
entry.value.update(value)
|
key
|
||||||
entry.known_absent.update(known_absent)
|
value: The dictionary with all the values that we should cache
|
||||||
self.cache[key] = entry
|
fetched_keys: The full set of keys that were looked up, any keys
|
||||||
|
here not in `value` should be marked as "known absent".
|
||||||
|
"""
|
||||||
|
|
||||||
def _insert(self, key: KT, value: Dict[DKT, DV], known_absent: Set[DKT]) -> None:
|
for dict_key, dict_value in value.items():
|
||||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
self.cache[(key, dict_key)] = _PerKeyValue(dict_value)
|
||||||
|
|
||||||
|
for dict_key in fetched_keys:
|
||||||
|
if (key, dict_key) in self.cache:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.cache[(key, dict_key)] = _PerKeyValue(_Sentinel.sentinel)
|
||||||
|
|||||||
@@ -44,7 +44,11 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
|
|||||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||||
from synapse.util import Clock, caches
|
from synapse.util import Clock, caches
|
||||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
from synapse.util.caches.treecache import (
|
||||||
|
TreeCache,
|
||||||
|
TreeCacheNode,
|
||||||
|
iterate_tree_cache_entry,
|
||||||
|
)
|
||||||
from synapse.util.linked_list import ListNode
|
from synapse.util.linked_list import ListNode
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -413,7 +417,7 @@ class LruCache(Generic[KT, VT]):
|
|||||||
else:
|
else:
|
||||||
real_clock = clock
|
real_clock = clock
|
||||||
|
|
||||||
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
|
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache[_Node[KT, VT]]] = cache_type()
|
||||||
self.cache = cache # Used for introspection.
|
self.cache = cache # Used for introspection.
|
||||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||||
|
|
||||||
@@ -537,6 +541,7 @@ class LruCache(Generic[KT, VT]):
|
|||||||
default: Literal[None] = None,
|
default: Literal[None] = None,
|
||||||
callbacks: Collection[Callable[[], None]] = ...,
|
callbacks: Collection[Callable[[], None]] = ...,
|
||||||
update_metrics: bool = ...,
|
update_metrics: bool = ...,
|
||||||
|
update_last_access: bool = ...,
|
||||||
) -> Optional[VT]:
|
) -> Optional[VT]:
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -546,6 +551,7 @@ class LruCache(Generic[KT, VT]):
|
|||||||
default: T,
|
default: T,
|
||||||
callbacks: Collection[Callable[[], None]] = ...,
|
callbacks: Collection[Callable[[], None]] = ...,
|
||||||
update_metrics: bool = ...,
|
update_metrics: bool = ...,
|
||||||
|
update_last_access: bool = ...,
|
||||||
) -> Union[T, VT]:
|
) -> Union[T, VT]:
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -555,9 +561,26 @@ class LruCache(Generic[KT, VT]):
|
|||||||
default: Optional[T] = None,
|
default: Optional[T] = None,
|
||||||
callbacks: Collection[Callable[[], None]] = (),
|
callbacks: Collection[Callable[[], None]] = (),
|
||||||
update_metrics: bool = True,
|
update_metrics: bool = True,
|
||||||
|
update_last_access: bool = True,
|
||||||
) -> Union[None, T, VT]:
|
) -> Union[None, T, VT]:
|
||||||
|
"""Lookup a key in the cache
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key
|
||||||
|
default
|
||||||
|
callbacks: A collection of callbacks that will fire when the
|
||||||
|
node is removed from the cache (either due to invalidation
|
||||||
|
or expiry).
|
||||||
|
update_metrics: Whether to update the hit rate metrics
|
||||||
|
update_last_access: Whether to update the last access metrics
|
||||||
|
on a node if successfully fetched. These metrics are used
|
||||||
|
to determine when to remove the node from the cache. Set
|
||||||
|
to False if this fetch should *not* prevent a node from
|
||||||
|
being expired.
|
||||||
|
"""
|
||||||
node = cache.get(key, None)
|
node = cache.get(key, None)
|
||||||
if node is not None:
|
if node is not None:
|
||||||
|
if update_last_access:
|
||||||
move_node_to_front(node)
|
move_node_to_front(node)
|
||||||
node.add_callbacks(callbacks)
|
node.add_callbacks(callbacks)
|
||||||
if update_metrics and metrics:
|
if update_metrics and metrics:
|
||||||
@@ -568,6 +591,42 @@ class LruCache(Generic[KT, VT]):
|
|||||||
metrics.inc_misses()
|
metrics.inc_misses()
|
||||||
return default
|
return default
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def cache_get_multi(
|
||||||
|
key: tuple,
|
||||||
|
default: Literal[None] = None,
|
||||||
|
update_metrics: bool = True,
|
||||||
|
) -> Union[None, TreeCacheNode]:
|
||||||
|
...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def cache_get_multi(
|
||||||
|
key: tuple,
|
||||||
|
default: T,
|
||||||
|
update_metrics: bool = True,
|
||||||
|
) -> Union[T, TreeCacheNode]:
|
||||||
|
...
|
||||||
|
|
||||||
|
@synchronized
|
||||||
|
def cache_get_multi(
|
||||||
|
key: tuple,
|
||||||
|
default: Optional[T] = None,
|
||||||
|
update_metrics: bool = True,
|
||||||
|
) -> Union[None, T, TreeCacheNode]:
|
||||||
|
"""Used only for `TreeCache` to fetch a subtree."""
|
||||||
|
|
||||||
|
assert isinstance(cache, TreeCache)
|
||||||
|
|
||||||
|
node = cache.get(key, None)
|
||||||
|
if node is not None:
|
||||||
|
if update_metrics and metrics:
|
||||||
|
metrics.inc_hits()
|
||||||
|
return node
|
||||||
|
else:
|
||||||
|
if update_metrics and metrics:
|
||||||
|
metrics.inc_misses()
|
||||||
|
return default
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
def cache_set(
|
def cache_set(
|
||||||
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
|
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
|
||||||
@@ -674,6 +733,8 @@ class LruCache(Generic[KT, VT]):
|
|||||||
self.setdefault = cache_set_default
|
self.setdefault = cache_set_default
|
||||||
self.pop = cache_pop
|
self.pop = cache_pop
|
||||||
self.del_multi = cache_del_multi
|
self.del_multi = cache_del_multi
|
||||||
|
if cache_type is TreeCache:
|
||||||
|
self.get_multi = cache_get_multi
|
||||||
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
|
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
|
||||||
# invalidated by the cache invalidation replication stream.
|
# invalidated by the cache invalidation replication stream.
|
||||||
self.invalidate = cache_del_multi
|
self.invalidate = cache_del_multi
|
||||||
|
|||||||
@@ -12,18 +12,59 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
SENTINEL = object()
|
from enum import Enum
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Dict,
|
||||||
|
Generator,
|
||||||
|
Generic,
|
||||||
|
List,
|
||||||
|
Literal,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
TypeVar,
|
||||||
|
Union,
|
||||||
|
overload,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TreeCacheNode(dict):
|
class Sentinel(Enum):
|
||||||
|
sentinel = object()
|
||||||
|
|
||||||
|
|
||||||
|
V = TypeVar("V")
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class TreeCacheNode(Generic[V]):
|
||||||
"""The type of nodes in our tree.
|
"""The type of nodes in our tree.
|
||||||
|
|
||||||
Has its own type so we can distinguish it from real dicts that are stored at the
|
Either a leaf node or a branch node.
|
||||||
leaves.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
__slots__ = ["leaf_value", "sub_tree"]
|
||||||
|
|
||||||
class TreeCache:
|
def __init__(
|
||||||
|
self,
|
||||||
|
leaf_value: Union[V, Literal[Sentinel.sentinel]] = Sentinel.sentinel,
|
||||||
|
sub_tree: Optional[Dict[Any, "TreeCacheNode[V]"]] = None,
|
||||||
|
) -> None:
|
||||||
|
if leaf_value is Sentinel.sentinel and sub_tree is None:
|
||||||
|
raise Exception("One of leaf or sub tree must be set")
|
||||||
|
|
||||||
|
self.leaf_value: Union[V, Literal[Sentinel.sentinel]] = leaf_value
|
||||||
|
self.sub_tree: Optional[Dict[Any, "TreeCacheNode[V]"]] = sub_tree
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def leaf(value: V) -> "TreeCacheNode[V]":
|
||||||
|
return TreeCacheNode(leaf_value=value)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def empty_branch() -> "TreeCacheNode[V]":
|
||||||
|
return TreeCacheNode(sub_tree={})
|
||||||
|
|
||||||
|
|
||||||
|
class TreeCache(Generic[V]):
|
||||||
"""
|
"""
|
||||||
Tree-based backing store for LruCache. Allows subtrees of data to be deleted
|
Tree-based backing store for LruCache. Allows subtrees of data to be deleted
|
||||||
efficiently.
|
efficiently.
|
||||||
@@ -35,15 +76,15 @@ class TreeCache:
|
|||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.size: int = 0
|
self.size: int = 0
|
||||||
self.root = TreeCacheNode()
|
self.root: TreeCacheNode[V] = TreeCacheNode.empty_branch()
|
||||||
|
|
||||||
def __setitem__(self, key, value) -> None:
|
def __setitem__(self, key: tuple, value: V) -> None:
|
||||||
self.set(key, value)
|
self.set(key, value)
|
||||||
|
|
||||||
def __contains__(self, key) -> bool:
|
def __contains__(self, key: tuple) -> bool:
|
||||||
return self.get(key, SENTINEL) is not SENTINEL
|
return self.get(key, None) is not None
|
||||||
|
|
||||||
def set(self, key, value) -> None:
|
def set(self, key: tuple, value: V) -> None:
|
||||||
if isinstance(value, TreeCacheNode):
|
if isinstance(value, TreeCacheNode):
|
||||||
# this would mean we couldn't tell where our tree ended and the value
|
# this would mean we couldn't tell where our tree ended and the value
|
||||||
# started.
|
# started.
|
||||||
@@ -51,31 +92,56 @@ class TreeCache:
|
|||||||
|
|
||||||
node = self.root
|
node = self.root
|
||||||
for k in key[:-1]:
|
for k in key[:-1]:
|
||||||
next_node = node.get(k, SENTINEL)
|
sub_tree = node.sub_tree
|
||||||
if next_node is SENTINEL:
|
if sub_tree is None:
|
||||||
next_node = node[k] = TreeCacheNode()
|
|
||||||
elif not isinstance(next_node, TreeCacheNode):
|
|
||||||
# this suggests that the caller is not being consistent with its key
|
|
||||||
# length.
|
|
||||||
raise ValueError("value conflicts with an existing subtree")
|
raise ValueError("value conflicts with an existing subtree")
|
||||||
|
|
||||||
|
next_node = sub_tree.get(k, None)
|
||||||
|
if next_node is None:
|
||||||
|
node = TreeCacheNode.empty_branch()
|
||||||
|
sub_tree[k] = node
|
||||||
|
else:
|
||||||
node = next_node
|
node = next_node
|
||||||
|
|
||||||
node[key[-1]] = value
|
if node.sub_tree is None:
|
||||||
|
raise ValueError("value conflicts with an existing subtree")
|
||||||
|
|
||||||
|
node.sub_tree[key[-1]] = TreeCacheNode.leaf(value)
|
||||||
self.size += 1
|
self.size += 1
|
||||||
|
|
||||||
def get(self, key, default=None):
|
@overload
|
||||||
|
def get(self, key: tuple, default: Literal[None] = None) -> Union[None, V]:
|
||||||
|
...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get(self, key: tuple, default: T) -> Union[T, V]:
|
||||||
|
...
|
||||||
|
|
||||||
|
def get(self, key: tuple, default: Optional[T] = None) -> Union[None, T, V]:
|
||||||
node = self.root
|
node = self.root
|
||||||
for k in key[:-1]:
|
for k in key:
|
||||||
node = node.get(k, None)
|
sub_tree = node.sub_tree
|
||||||
if node is None:
|
if sub_tree is None:
|
||||||
|
raise ValueError("get() key too long")
|
||||||
|
|
||||||
|
next_node = sub_tree.get(k, None)
|
||||||
|
if next_node is None:
|
||||||
return default
|
return default
|
||||||
return node.get(key[-1], default)
|
|
||||||
|
node = next_node
|
||||||
|
|
||||||
|
if node.leaf_value is Sentinel.sentinel:
|
||||||
|
raise ValueError("key points to a branch")
|
||||||
|
|
||||||
|
return node.leaf_value
|
||||||
|
|
||||||
def clear(self) -> None:
|
def clear(self) -> None:
|
||||||
self.size = 0
|
self.size = 0
|
||||||
self.root = TreeCacheNode()
|
self.root = TreeCacheNode()
|
||||||
|
|
||||||
def pop(self, key, default=None):
|
def pop(
|
||||||
|
self, key: tuple, default: Optional[T] = None
|
||||||
|
) -> Union[None, T, V, TreeCacheNode[V]]:
|
||||||
"""Remove the given key, or subkey, from the cache
|
"""Remove the given key, or subkey, from the cache
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -91,21 +157,26 @@ class TreeCache:
|
|||||||
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
|
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
|
||||||
|
|
||||||
# a list of the nodes we have touched on the way down the tree
|
# a list of the nodes we have touched on the way down the tree
|
||||||
nodes = []
|
nodes: List[TreeCacheNode[V]] = []
|
||||||
|
|
||||||
node = self.root
|
node = self.root
|
||||||
for k in key[:-1]:
|
for k in key[:-1]:
|
||||||
node = node.get(k, None)
|
sub_tree = node.sub_tree
|
||||||
if node is None:
|
if sub_tree is None:
|
||||||
return default
|
|
||||||
if not isinstance(node, TreeCacheNode):
|
|
||||||
# we've gone off the end of the tree
|
|
||||||
raise ValueError("pop() key too long")
|
raise ValueError("pop() key too long")
|
||||||
nodes.append(node) # don't add the root node
|
|
||||||
popped = node.pop(key[-1], SENTINEL)
|
next_node = sub_tree.get(k, None)
|
||||||
if popped is SENTINEL:
|
if next_node is None:
|
||||||
return default
|
return default
|
||||||
|
|
||||||
|
node = next_node
|
||||||
|
nodes.append(node)
|
||||||
|
|
||||||
|
if node.sub_tree is None:
|
||||||
|
raise ValueError("pop() key too long")
|
||||||
|
|
||||||
|
popped = node.sub_tree.pop(key[-1])
|
||||||
|
|
||||||
# working back up the tree, clear out any nodes that are now empty
|
# working back up the tree, clear out any nodes that are now empty
|
||||||
node_and_keys = list(zip(nodes, key))
|
node_and_keys = list(zip(nodes, key))
|
||||||
node_and_keys.reverse()
|
node_and_keys.reverse()
|
||||||
@@ -116,8 +187,13 @@ class TreeCache:
|
|||||||
|
|
||||||
if n:
|
if n:
|
||||||
break
|
break
|
||||||
|
|
||||||
# found an empty node: remove it from its parent, and loop.
|
# found an empty node: remove it from its parent, and loop.
|
||||||
node_and_keys[i + 1][0].pop(k)
|
node = node_and_keys[i + 1][0]
|
||||||
|
|
||||||
|
# We added it to the list so already know its a branch node.
|
||||||
|
assert node.sub_tree is not None
|
||||||
|
node.sub_tree.pop(k)
|
||||||
|
|
||||||
cnt = sum(1 for _ in iterate_tree_cache_entry(popped))
|
cnt = sum(1 for _ in iterate_tree_cache_entry(popped))
|
||||||
self.size -= cnt
|
self.size -= cnt
|
||||||
@@ -130,12 +206,31 @@ class TreeCache:
|
|||||||
return self.size
|
return self.size
|
||||||
|
|
||||||
|
|
||||||
def iterate_tree_cache_entry(d):
|
def iterate_tree_cache_entry(d: TreeCacheNode[V]) -> Generator[V, None, None]:
|
||||||
"""Helper function to iterate over the leaves of a tree, i.e. a dict of that
|
"""Helper function to iterate over the leaves of a tree, i.e. a dict of that
|
||||||
can contain dicts.
|
can contain dicts.
|
||||||
"""
|
"""
|
||||||
if isinstance(d, TreeCacheNode):
|
|
||||||
for value_d in d.values():
|
if d.sub_tree is not None:
|
||||||
|
for value_d in d.sub_tree.values():
|
||||||
yield from iterate_tree_cache_entry(value_d)
|
yield from iterate_tree_cache_entry(value_d)
|
||||||
else:
|
else:
|
||||||
yield d
|
assert d.leaf_value is not Sentinel.sentinel
|
||||||
|
yield d.leaf_value
|
||||||
|
|
||||||
|
|
||||||
|
def iterate_tree_cache_items(
|
||||||
|
key: tuple, value: TreeCacheNode[V]
|
||||||
|
) -> Generator[Tuple[tuple, V], None, None]:
|
||||||
|
"""Helper function to iterate over the leaves of a tree, i.e. a dict of that
|
||||||
|
can contain dicts.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A generator yielding key/value pairs.
|
||||||
|
"""
|
||||||
|
if value.sub_tree is not None:
|
||||||
|
for sub_key, sub_value in value.sub_tree.items():
|
||||||
|
yield from iterate_tree_cache_items((*key, sub_key), sub_value)
|
||||||
|
else:
|
||||||
|
assert value.leaf_value is not Sentinel.sentinel
|
||||||
|
yield key, value.leaf_value
|
||||||
|
|||||||
@@ -369,7 +369,7 @@ class StateStoreTestCase(HomeserverTestCase):
|
|||||||
state_dict_ids = cache_entry.value
|
state_dict_ids = cache_entry.value
|
||||||
|
|
||||||
self.assertEqual(cache_entry.full, False)
|
self.assertEqual(cache_entry.full, False)
|
||||||
self.assertEqual(cache_entry.known_absent, {(e1.type, e1.state_key)})
|
self.assertEqual(cache_entry.known_absent, set())
|
||||||
self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id})
|
self.assertDictEqual(state_dict_ids, {(e1.type, e1.state_key): e1.event_id})
|
||||||
|
|
||||||
############################################
|
############################################
|
||||||
|
|||||||
Reference in New Issue
Block a user