Compare commits
3 Commits
patch-1
...
anoa/new_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8acd2c01bc | ||
|
|
f1d98d3b70 | ||
|
|
6ff8ba5fc6 |
@@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
|
|||||||
# changed its content in the database. We can't call
|
# changed its content in the database. We can't call
|
||||||
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
|
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
|
||||||
# right type.
|
# right type.
|
||||||
self.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event.event_id)
|
||||||
# Send that invalidation to replication so that other workers also invalidate
|
# Send that invalidation to replication so that other workers also invalidate
|
||||||
# the event cache.
|
# the event cache.
|
||||||
self._send_invalidation_to_replication(
|
self._send_invalidation_to_replication(
|
||||||
|
|||||||
@@ -1294,8 +1294,10 @@ class PersistEventsStore:
|
|||||||
"""
|
"""
|
||||||
depth_updates: Dict[str, int] = {}
|
depth_updates: Dict[str, int] = {}
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
# Remove the any existing cache entries for the event_ids
|
# Remove any existing cache entries for the event_ids
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
self.store.invalidate_get_event_cache_by_event_id_after_txn(
|
||||||
|
txn, event.event_id
|
||||||
|
)
|
||||||
# Then update the `stream_ordering` position to mark the latest
|
# Then update the `stream_ordering` position to mark the latest
|
||||||
# event as the front of the room. This should not be done for
|
# event as the front of the room. This should not be done for
|
||||||
# backfilled events because backfilled events have negative
|
# backfilled events because backfilled events have negative
|
||||||
@@ -1703,7 +1705,7 @@ class PersistEventsStore:
|
|||||||
_invalidate_caches_for_event.
|
_invalidate_caches_for_event.
|
||||||
"""
|
"""
|
||||||
assert event.redacts is not None
|
assert event.redacts is not None
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
self.store.invalidate_get_event_cache_by_event_id_after_txn(txn, event.redacts)
|
||||||
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
|
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
|
||||||
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
|
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
|
||||||
|
|
||||||
|
|||||||
@@ -80,6 +80,7 @@ from synapse.types import JsonDict, get_domain_from_id
|
|||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||||
from synapse.util.caches.descriptors import cached, cachedList
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
|
from synapse.util.caches.dual_lookup_cache import DualLookupCache
|
||||||
from synapse.util.caches.lrucache import AsyncLruCache
|
from synapse.util.caches.lrucache import AsyncLruCache
|
||||||
from synapse.util.cancellation import cancellable
|
from synapse.util.cancellation import cancellable
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
@@ -245,6 +246,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
] = AsyncLruCache(
|
] = AsyncLruCache(
|
||||||
cache_name="*getEvent*",
|
cache_name="*getEvent*",
|
||||||
max_size=hs.config.caches.event_cache_size,
|
max_size=hs.config.caches.event_cache_size,
|
||||||
|
cache_type=DualLookupCache,
|
||||||
|
dual_lookup_secondary_key_function=lambda v: (v.event.room_id,),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Map from event ID to a deferred that will result in a map from event
|
# Map from event ID to a deferred that will result in a map from event
|
||||||
@@ -733,7 +736,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
return event_entry_map
|
return event_entry_map
|
||||||
|
|
||||||
def invalidate_get_event_cache_after_txn(
|
def invalidate_get_event_cache_by_event_id_after_txn(
|
||||||
self, txn: LoggingTransaction, event_id: str
|
self, txn: LoggingTransaction, event_id: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -747,10 +750,31 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
event_id: the event ID to be invalidated from caches
|
event_id: the event ID to be invalidated from caches
|
||||||
"""
|
"""
|
||||||
|
|
||||||
txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
|
txn.async_call_after(
|
||||||
txn.call_after(self._invalidate_local_get_event_cache, event_id)
|
self._invalidate_async_get_event_cache_by_event_id, event_id
|
||||||
|
)
|
||||||
|
txn.call_after(self._invalidate_local_get_event_cache_by_event_id, event_id)
|
||||||
|
|
||||||
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
|
def invalidate_get_event_cache_by_room_id_after_txn(
|
||||||
|
self, txn: LoggingTransaction, room_id: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Prepares a database transaction to invalidate the get event cache for a given
|
||||||
|
room ID when executed successfully. This is achieved by attaching two callbacks
|
||||||
|
to the transaction, one to invalidate the async cache and one for the in memory
|
||||||
|
sync cache (importantly called in that order).
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
txn: the database transaction to attach the callbacks to.
|
||||||
|
room_id: the room ID to invalidate all associated event caches for.
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.async_call_after(self._invalidate_async_get_event_cache_by_room_id, room_id)
|
||||||
|
txn.call_after(self._invalidate_local_get_event_cache_by_room_id, room_id)
|
||||||
|
|
||||||
|
async def _invalidate_async_get_event_cache_by_event_id(
|
||||||
|
self, event_id: str
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Invalidates an event in the asyncronous get event cache, which may be remote.
|
Invalidates an event in the asyncronous get event cache, which may be remote.
|
||||||
|
|
||||||
@@ -760,7 +784,18 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
|
|
||||||
await self._get_event_cache.invalidate((event_id,))
|
await self._get_event_cache.invalidate((event_id,))
|
||||||
|
|
||||||
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
|
async def _invalidate_async_get_event_cache_by_room_id(self, room_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Invalidates all events associated with a given room in the asyncronous get event
|
||||||
|
cache, which may be remote.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
room_id: the room ID to invalidate associated events of.
|
||||||
|
"""
|
||||||
|
|
||||||
|
await self._get_event_cache.invalidate((room_id,))
|
||||||
|
|
||||||
|
def _invalidate_local_get_event_cache_by_event_id(self, event_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
Invalidates an event in local in-memory get event caches.
|
Invalidates an event in local in-memory get event caches.
|
||||||
|
|
||||||
@@ -772,6 +807,18 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
self._event_ref.pop(event_id, None)
|
self._event_ref.pop(event_id, None)
|
||||||
self._current_event_fetches.pop(event_id, None)
|
self._current_event_fetches.pop(event_id, None)
|
||||||
|
|
||||||
|
def _invalidate_local_get_event_cache_by_room_id(self, room_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Invalidates all events associated with a given room ID in local in-memory
|
||||||
|
get event caches.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
room_id: the room ID to invalidate events of.
|
||||||
|
"""
|
||||||
|
self._get_event_cache.invalidate_local((room_id,))
|
||||||
|
|
||||||
|
# TODO: invalidate _event_ref and _current_event_fetches. How?
|
||||||
|
|
||||||
async def _get_events_from_cache(
|
async def _get_events_from_cache(
|
||||||
self, events: Iterable[str], update_metrics: bool = True
|
self, events: Iterable[str], update_metrics: bool = True
|
||||||
) -> Dict[str, EventCacheEntry]:
|
) -> Dict[str, EventCacheEntry]:
|
||||||
@@ -2284,7 +2331,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
updatevalues={"rejection_reason": rejection_reason},
|
updatevalues={"rejection_reason": rejection_reason},
|
||||||
)
|
)
|
||||||
|
|
||||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
|
||||||
|
|
||||||
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
|
# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
|
||||||
# call '_send_invalidation_to_replication', but we actually need the other
|
# call '_send_invalidation_to_replication', but we actually need the other
|
||||||
|
|||||||
@@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
self._invalidate_cache_and_stream(
|
self._invalidate_cache_and_stream(
|
||||||
txn, self.have_seen_event, (room_id, event_id)
|
txn, self.have_seen_event, (room_id, event_id)
|
||||||
)
|
)
|
||||||
self.invalidate_get_event_cache_after_txn(txn, event_id)
|
self.invalidate_get_event_cache_by_event_id_after_txn(txn, event_id)
|
||||||
|
|
||||||
logger.info("[purge] done")
|
logger.info("[purge] done")
|
||||||
|
|
||||||
@@ -478,6 +478,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
# XXX: as with purge_history, this is racy, but no worse than other races
|
# XXX: as with purge_history, this is racy, but no worse than other races
|
||||||
# that already exist.
|
# that already exist.
|
||||||
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
||||||
|
self._invalidate_local_get_event_cache_by_room_id(room_id)
|
||||||
|
|
||||||
logger.info("[purge] done")
|
logger.info("[purge] done")
|
||||||
|
|
||||||
|
|||||||
238
synapse/util/caches/dual_lookup_cache.py
Normal file
238
synapse/util/caches/dual_lookup_cache.py
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Generic,
|
||||||
|
ItemsView,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
TypeVar,
|
||||||
|
Union,
|
||||||
|
ValuesView,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Used to discern between a value not existing in a map, or the value being 'None'.
|
||||||
|
SENTINEL = object()
|
||||||
|
|
||||||
|
# The type of the primary dict's keys.
|
||||||
|
PKT = TypeVar("PKT")
|
||||||
|
# The type of the primary dict's values.
|
||||||
|
PVT = TypeVar("PVT")
|
||||||
|
# The type of the secondary dict's keys.
|
||||||
|
SKT = TypeVar("SKT")
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SecondarySet(set):
|
||||||
|
"""
|
||||||
|
Used to differentiate between an entry in the secondary_dict, and a set stored
|
||||||
|
in the primary_dict. This is necessary as pop() can return either.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class DualLookupCache(Generic[PKT, PVT, SKT]):
|
||||||
|
"""
|
||||||
|
A backing store for LruCache that supports multiple entry points.
|
||||||
|
Allows subsets of data to be deleted efficiently without requiring extra
|
||||||
|
information to query.
|
||||||
|
|
||||||
|
The data structure is two dictionaries:
|
||||||
|
* primary_dict containing a mapping of primary_key -> value.
|
||||||
|
* secondary_dict containing a mapping of secondary_key -> set of primary_key.
|
||||||
|
|
||||||
|
On insert, a mapping in the primary_dict must be created. A mapping in the
|
||||||
|
secondary_dict from a secondary_key to (a set containing) the same
|
||||||
|
primary_key will be made. The secondary_key
|
||||||
|
must be derived from the inserted value via a lambda function provided at cache
|
||||||
|
initialisation. This is so invalidated entries in the primary_dict may automatically
|
||||||
|
invalidate those in the secondary_dict. The secondary_key may be associated with one
|
||||||
|
or more primary_key's.
|
||||||
|
|
||||||
|
This creates an interface which allows for efficient lookups of a value given
|
||||||
|
a primary_key, as well as efficient invalidation of a subset of mapping in the
|
||||||
|
primary_dict given a secondary_key. A primary_key may not be associated with more
|
||||||
|
than one secondary_key.
|
||||||
|
|
||||||
|
As a worked example, consider storing a cache of room events. We could configure
|
||||||
|
the cache to store mappings between EventIDs and EventBase in the primary_dict,
|
||||||
|
while storing a mapping between room IDs and event IDs as the secondary_dict:
|
||||||
|
|
||||||
|
primary_dict: EventID -> EventBase
|
||||||
|
secondary_dict: RoomID -> {EventID, EventID, ...}
|
||||||
|
|
||||||
|
This would be efficient for the following operations:
|
||||||
|
* Given an EventID, look up the associated EventBase, and thus the roomID.
|
||||||
|
* Given a RoomID, invalidate all primary_dict entries for events in that room.
|
||||||
|
|
||||||
|
Since this is intended as a backing store for LRUCache, when it came time to evict
|
||||||
|
an entry from the primary_dict (EventID -> EventBase), the secondary_key could be
|
||||||
|
derived from a provided lambda function:
|
||||||
|
secondary_key = lambda event_base: event_base.room_id
|
||||||
|
|
||||||
|
The EventID set under room_id would then have the appropriate EventID entry evicted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, secondary_key_function: Callable[[PVT], SKT]) -> None:
|
||||||
|
self._primary_dict: Dict[PKT, PVT] = {}
|
||||||
|
self._secondary_dict: Dict[SKT, SecondarySet] = {}
|
||||||
|
self._secondary_key_function = secondary_key_function
|
||||||
|
|
||||||
|
def __setitem__(self, key: PKT, value: PVT) -> None:
|
||||||
|
self.set(key, value)
|
||||||
|
|
||||||
|
def __contains__(self, key: PKT) -> bool:
|
||||||
|
return key in self._primary_dict
|
||||||
|
|
||||||
|
def set(self, key: PKT, value: PVT) -> None:
|
||||||
|
"""Add an entry to the cache.
|
||||||
|
|
||||||
|
Will add an entry to the primary_dict consisting of key->value, as well as append
|
||||||
|
to the set referred to by secondary_key_function(value) in the secondary_dict.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: The key for a new mapping in primary_dict.
|
||||||
|
value: The value for a new mapping in primary_dict.
|
||||||
|
"""
|
||||||
|
# Create an entry in the primary_dict.
|
||||||
|
self._primary_dict[key] = value
|
||||||
|
|
||||||
|
# Derive the secondary_key to use from the given primary_value.
|
||||||
|
secondary_key = self._secondary_key_function(value)
|
||||||
|
|
||||||
|
# TODO: If the lambda function resolves to None, don't insert an entry?
|
||||||
|
|
||||||
|
# And create a mapping in the secondary_dict to a set containing the
|
||||||
|
# primary_key, creating the set if necessary.
|
||||||
|
secondary_key_set = self._secondary_dict.setdefault(
|
||||||
|
secondary_key, SecondarySet()
|
||||||
|
)
|
||||||
|
secondary_key_set.add(key)
|
||||||
|
|
||||||
|
logger.info("*** Insert into primary_dict: %s: %s", key, value)
|
||||||
|
logger.info("*** Insert into secondary_dict: %s: %s", secondary_key, key)
|
||||||
|
|
||||||
|
def get(self, key: PKT, default: Optional[PVT] = None) -> Optional[PVT]:
|
||||||
|
"""Retrieve a value from the cache if it exists. If not, return the default
|
||||||
|
value.
|
||||||
|
|
||||||
|
This method simply pulls entries from the primary_dict.
|
||||||
|
|
||||||
|
# TODO: Any use cases for externally getting entries from the secondary_dict?
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: The key to search the cache for.
|
||||||
|
default: The default value to return if the given key is not found.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The value referenced by the given key, if it exists in the cache. If not,
|
||||||
|
the value of `default` will be returned.
|
||||||
|
"""
|
||||||
|
logger.info("*** Retrieving key from primary_dict: %s", key)
|
||||||
|
return self._primary_dict.get(key, default)
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""Evicts all entries from the cache."""
|
||||||
|
self._primary_dict.clear()
|
||||||
|
self._secondary_dict.clear()
|
||||||
|
|
||||||
|
def pop(
|
||||||
|
self, key: Union[PKT, SKT], default: Optional[Union[Dict[PKT, PVT], PVT]] = None
|
||||||
|
) -> Optional[Union[Dict[PKT, PVT], PVT]]:
|
||||||
|
"""Remove an entry from either the primary_dict or secondary_dict.
|
||||||
|
|
||||||
|
The primary_dict is checked first for the key. If an entry is found, it is
|
||||||
|
removed from the primary_dict and returned.
|
||||||
|
|
||||||
|
If no entry in the primary_dict exists, then the secondary_dict is checked.
|
||||||
|
If an entry exists, all associated entries in the primary_dict will be
|
||||||
|
deleted, and all primary_dict keys returned from this function in a SecondarySet.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: A key to drop from either the primary_dict or secondary_dict.
|
||||||
|
default: The default value if the key does not exist in either dict.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Either a matched value from the primary_dict or the secondary_dict. If no
|
||||||
|
value is found for the key, then None.
|
||||||
|
"""
|
||||||
|
# Attempt to remove from the primary_dict first.
|
||||||
|
primary_value = self._primary_dict.pop(key, SENTINEL)
|
||||||
|
if primary_value is not SENTINEL:
|
||||||
|
# We found a value in the primary_dict. Remove it from the corresponding
|
||||||
|
# entry in the secondary_dict, and then return it.
|
||||||
|
logger.info(
|
||||||
|
"*** Popped entry from primary_dict: %s: %s", key, primary_value
|
||||||
|
)
|
||||||
|
|
||||||
|
# Derive the secondary_key from the primary_value
|
||||||
|
secondary_key = self._secondary_key_function(primary_value)
|
||||||
|
|
||||||
|
# Pop the entry from the secondary_dict
|
||||||
|
secondary_key_set = self._secondary_dict[secondary_key]
|
||||||
|
if len(secondary_key_set) > 1:
|
||||||
|
# Delete just the set entry for the given key.
|
||||||
|
secondary_key_set.remove(key)
|
||||||
|
logger.info(
|
||||||
|
"*** Popping from secondary_dict: %s: %s", secondary_key, key
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Delete the entire set referenced by the secondary_key, as it only
|
||||||
|
# has one entry.
|
||||||
|
del self._secondary_dict[secondary_key]
|
||||||
|
logger.info("*** Popping from secondary_dict: %s", secondary_key)
|
||||||
|
|
||||||
|
return primary_value
|
||||||
|
|
||||||
|
# There was no matching value in the primary_dict. Attempt the secondary_dict.
|
||||||
|
primary_key_set = self._secondary_dict.pop(key, SENTINEL)
|
||||||
|
if primary_key_set is not SENTINEL:
|
||||||
|
# We found a set in the secondary_dict.
|
||||||
|
logger.info(
|
||||||
|
"*** Found '%s' in secondary_dict: %s: ",
|
||||||
|
key,
|
||||||
|
primary_key_set,
|
||||||
|
)
|
||||||
|
|
||||||
|
popped_primary_dict_values: List[PVT] = []
|
||||||
|
|
||||||
|
# We found an entry in the secondary_dict. Delete all related entries in the
|
||||||
|
# primary_dict.
|
||||||
|
logger.info(
|
||||||
|
"*** Found key in secondary_dict to pop: %s. "
|
||||||
|
"Popping primary_dict entries",
|
||||||
|
key,
|
||||||
|
)
|
||||||
|
for primary_key in primary_key_set:
|
||||||
|
primary_value = self._primary_dict.pop(primary_key)
|
||||||
|
logger.info("*** Popping entry from primary_dict: %s - %s", primary_key, primary_value)
|
||||||
|
logger.info("*** primary_dict: %s", self._primary_dict)
|
||||||
|
popped_primary_dict_values.append(primary_value)
|
||||||
|
|
||||||
|
# Now return the unmodified copy of the set.
|
||||||
|
return popped_primary_dict_values
|
||||||
|
|
||||||
|
# No match in either dict.
|
||||||
|
return default
|
||||||
|
|
||||||
|
def values(self) -> ValuesView:
|
||||||
|
return self._primary_dict.values()
|
||||||
|
|
||||||
|
def items(self) -> ItemsView:
|
||||||
|
return self._primary_dict.items()
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._primary_dict)
|
||||||
@@ -46,8 +46,10 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
|
|||||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||||
from synapse.util import Clock, caches
|
from synapse.util import Clock, caches
|
||||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||||
|
from synapse.util.caches.dual_lookup_cache import DualLookupCache, SecondarySet
|
||||||
from synapse.util.caches.treecache import (
|
from synapse.util.caches.treecache import (
|
||||||
TreeCache,
|
TreeCache,
|
||||||
|
TreeCacheNode,
|
||||||
iterate_tree_cache_entry,
|
iterate_tree_cache_entry,
|
||||||
iterate_tree_cache_items,
|
iterate_tree_cache_items,
|
||||||
)
|
)
|
||||||
@@ -375,12 +377,13 @@ class LruCache(Generic[KT, VT]):
|
|||||||
self,
|
self,
|
||||||
max_size: int,
|
max_size: int,
|
||||||
cache_name: Optional[str] = None,
|
cache_name: Optional[str] = None,
|
||||||
cache_type: Type[Union[dict, TreeCache]] = dict,
|
cache_type: Type[Union[dict, TreeCache, DualLookupCache]] = dict,
|
||||||
size_callback: Optional[Callable[[VT], int]] = None,
|
size_callback: Optional[Callable[[VT], int]] = None,
|
||||||
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
||||||
apply_cache_factor_from_config: bool = True,
|
apply_cache_factor_from_config: bool = True,
|
||||||
clock: Optional[Clock] = None,
|
clock: Optional[Clock] = None,
|
||||||
prune_unread_entries: bool = True,
|
prune_unread_entries: bool = True,
|
||||||
|
dual_lookup_secondary_key_function: Optional[Callable[[Any], Any]] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
@@ -411,6 +414,10 @@ class LruCache(Generic[KT, VT]):
|
|||||||
prune_unread_entries: If True, cache entries that haven't been read recently
|
prune_unread_entries: If True, cache entries that haven't been read recently
|
||||||
will be evicted from the cache in the background. Set to False to
|
will be evicted from the cache in the background. Set to False to
|
||||||
opt-out of this behaviour.
|
opt-out of this behaviour.
|
||||||
|
|
||||||
|
# TODO: At this point we should probably just pass an initialised cache type
|
||||||
|
# to LruCache, no?
|
||||||
|
dual_lookup_secondary_key_function:
|
||||||
"""
|
"""
|
||||||
# Default `clock` to something sensible. Note that we rename it to
|
# Default `clock` to something sensible. Note that we rename it to
|
||||||
# `real_clock` so that mypy doesn't think its still `Optional`.
|
# `real_clock` so that mypy doesn't think its still `Optional`.
|
||||||
@@ -419,7 +426,30 @@ class LruCache(Generic[KT, VT]):
|
|||||||
else:
|
else:
|
||||||
real_clock = clock
|
real_clock = clock
|
||||||
|
|
||||||
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
|
# TODO: I've had to make this ugly to appease mypy :(
|
||||||
|
# Perhaps initialise the backing cache and then pass to LruCache?
|
||||||
|
cache: Union[Dict[KT, _Node[KT, VT]], TreeCache, DualLookupCache]
|
||||||
|
if cache_type is DualLookupCache:
|
||||||
|
# The dual_lookup_secondary_key_function is a function that's intended to
|
||||||
|
# extract a key from the value in the cache. Since we wrap values given to
|
||||||
|
# us in a _Node object, this function will actually operate on a _Node,
|
||||||
|
# instead of directly on the object type callers are expecting.
|
||||||
|
#
|
||||||
|
# Thus, we wrap the function given by the caller in another one that
|
||||||
|
# extracts the value from the _Node, before then handing it off to the
|
||||||
|
# given function for processing.
|
||||||
|
def key_function_wrapper(node: Any) -> Any:
|
||||||
|
assert dual_lookup_secondary_key_function is not None
|
||||||
|
return dual_lookup_secondary_key_function(node.value)
|
||||||
|
|
||||||
|
cache = DualLookupCache(
|
||||||
|
secondary_key_function=key_function_wrapper,
|
||||||
|
)
|
||||||
|
elif cache_type is TreeCache:
|
||||||
|
cache = TreeCache()
|
||||||
|
else:
|
||||||
|
cache = {}
|
||||||
|
|
||||||
self.cache = cache # Used for introspection.
|
self.cache = cache # Used for introspection.
|
||||||
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
self.apply_cache_factor_from_config = apply_cache_factor_from_config
|
||||||
|
|
||||||
@@ -722,13 +752,25 @@ class LruCache(Generic[KT, VT]):
|
|||||||
may be of lower cardinality than the TreeCache - in which case the whole
|
may be of lower cardinality than the TreeCache - in which case the whole
|
||||||
subtree is deleted.
|
subtree is deleted.
|
||||||
"""
|
"""
|
||||||
popped = cache.pop(key, None)
|
# Remove an entry from the cache.
|
||||||
if popped is None:
|
# In the case of a 'dict' cache type, we're just removing an entry from the
|
||||||
|
# dict. For a TreeCache, we're removing a subtree which has children.
|
||||||
|
popped_entry: _Node[KT, VT] = cache.pop(key, None)
|
||||||
|
if popped_entry is None:
|
||||||
return
|
return
|
||||||
# for each deleted node, we now need to remove it from the linked list
|
|
||||||
# and run its callbacks.
|
if isinstance(popped_entry, TreeCacheNode):
|
||||||
for leaf in iterate_tree_cache_entry(popped):
|
# We've popped a subtree from a TreeCache - now we need to clean up
|
||||||
delete_node(leaf)
|
# each child node.
|
||||||
|
for leaf in iterate_tree_cache_entry(popped_entry):
|
||||||
|
# For each deleted child node, we remove it from the linked list and
|
||||||
|
# run its callbacks.
|
||||||
|
delete_node(leaf)
|
||||||
|
elif isinstance(popped_entry, SecondarySet):
|
||||||
|
for leaf in popped_entry:
|
||||||
|
delete_node(leaf)
|
||||||
|
else:
|
||||||
|
delete_node(popped_entry)
|
||||||
|
|
||||||
@synchronized
|
@synchronized
|
||||||
def cache_clear() -> None:
|
def cache_clear() -> None:
|
||||||
|
|||||||
@@ -115,6 +115,5 @@ class PurgeTests(HomeserverTestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# The events aren't found.
|
# The events aren't found.
|
||||||
self.store._invalidate_local_get_event_cache(create_event.event_id)
|
|
||||||
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
|
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
|
||||||
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)
|
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)
|
||||||
|
|||||||
Reference in New Issue
Block a user