From 86b836f2b5fffcd809c2e80d58774b6256c25df0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 8 May 2023 09:11:11 -0400 Subject: [PATCH] Properly invalidate caches over replication. --- synapse/storage/_base.py | 18 ++++++++-- synapse/storage/databases/main/cache.py | 36 +++++++++++++------ .../databases/main/events_bg_updates.py | 4 +-- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 481fec72fe..8b6294806e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -118,7 +118,10 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,)) def _attempt_to_invalidate_cache( - self, cache_name: str, key: Optional[Collection[Any]] + self, + cache_name: str, + key: Optional[Collection[Any]], + store_name: Optional[str] = None, ) -> bool: """Attempts to invalidate the cache of the given name, ignoring if the cache doesn't exist. Mainly used for invalidating caches on workers, @@ -132,10 +135,21 @@ class SQLBaseStore(metaclass=ABCMeta): cache_name key: Entry to invalidate. If None then invalidates the entire cache. + store_name: The name of the store, leave as None for stores which + have not yet been split out. """ + # First get the store. + store = self + if store_name is not None: + try: + store = getattr(self, store_name) + except AttributeError: + pass + + # Then attempt to find the cache on that store. try: - cache = getattr(self, cache_name) + cache = getattr(store, cache_name) except AttributeError: # Check if an externally defined module cache has been registered cache = self.external_cached_functions.get(cache_name) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 46fa0a73f9..9802d9749c 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -248,10 +248,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] # Caches which might leak edits must be invalidated for the event being # redacted. - self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,)) - self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,)) - self._attempt_to_invalidate_cache("get_thread_id", (redacts,)) - self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,)) + self._attempt_to_invalidate_cache( + "get_relations_for_event", (redacts,), "relations" + ) + self._attempt_to_invalidate_cache( + "get_applicable_edit", (redacts,), "relations" + ) + self._attempt_to_invalidate_cache("get_thread_id", (redacts,), "relations") + self._attempt_to_invalidate_cache( + "get_thread_id_for_receipts", (redacts,), "relations" + ) if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined] @@ -264,12 +270,22 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,)) if relates_to: - self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,)) - self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,)) - self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,)) - self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,)) - self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) - self._attempt_to_invalidate_cache("get_threads", (room_id,)) + self._attempt_to_invalidate_cache( + "get_relations_for_event", (relates_to,), "relations" + ) + self._attempt_to_invalidate_cache( + "get_references_for_event", (relates_to,), "relations" + ) + self._attempt_to_invalidate_cache( + "get_applicable_edit", (relates_to,), "relations" + ) + self._attempt_to_invalidate_cache( + "get_thread_summary", (relates_to,), "relations" + ) + self._attempt_to_invalidate_cache( + "get_thread_participated", (relates_to,), "relations" + ) + self._attempt_to_invalidate_cache("get_threads", (room_id,), "relations") async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index daef3685b0..9cf63313ed 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1217,10 +1217,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): for parent_id in {r[1] for r in relations_to_insert}: cache_tuple = (parent_id,) self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] + txn, self.relations.get_relations_for_event, cache_tuple # type: ignore[attr-defined] ) self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] + txn, self.relations.get_thread_summary, cache_tuple # type: ignore[attr-defined] ) if results: