Properly invalidate caches over replication.
This commit is contained in:
@@ -118,7 +118,10 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
self,
|
||||
cache_name: str,
|
||||
key: Optional[Collection[Any]],
|
||||
store_name: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Attempts to invalidate the cache of the given name, ignoring if the
|
||||
cache doesn't exist. Mainly used for invalidating caches on workers,
|
||||
@@ -132,10 +135,21 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
cache_name
|
||||
key: Entry to invalidate. If None then invalidates the entire
|
||||
cache.
|
||||
store_name: The name of the store, leave as None for stores which
|
||||
have not yet been split out.
|
||||
"""
|
||||
|
||||
# First get the store.
|
||||
store = self
|
||||
if store_name is not None:
|
||||
try:
|
||||
store = getattr(self, store_name)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Then attempt to find the cache on that store.
|
||||
try:
|
||||
cache = getattr(self, cache_name)
|
||||
cache = getattr(store, cache_name)
|
||||
except AttributeError:
|
||||
# Check if an externally defined module cache has been registered
|
||||
cache = self.external_cached_functions.get(cache_name)
|
||||
|
||||
@@ -248,10 +248,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
|
||||
# Caches which might leak edits must be invalidated for the event being
|
||||
# redacted.
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_thread_id", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_relations_for_event", (redacts,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_applicable_edit", (redacts,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_thread_id", (redacts,), "relations")
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_id_for_receipts", (redacts,), "relations"
|
||||
)
|
||||
|
||||
if etype == EventTypes.Member:
|
||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
|
||||
@@ -264,12 +270,22 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
|
||||
|
||||
if relates_to:
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_relations_for_event", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_references_for_event", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_applicable_edit", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_summary", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_participated", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,), "relations")
|
||||
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[Any, ...]
|
||||
|
||||
@@ -1217,10 +1217,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
for parent_id in {r[1] for r in relations_to_insert}:
|
||||
cache_tuple = (parent_id,)
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
|
||||
txn, self.relations.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
|
||||
)
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
|
||||
txn, self.relations.get_thread_summary, cache_tuple # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
if results:
|
||||
|
||||
Reference in New Issue
Block a user