diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 925a640f56..f8ca147431 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -25,9 +25,12 @@ from typing import ( cast, ) +from twisted.internet.defer import Deferred + from synapse.api.constants import EventTypes, StickyEvent from synapse.events import EventBase from synapse.events.snapshot import EventPersistencePair +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams._base import StickyEventsStream from synapse.storage.database import ( DatabasePool, @@ -44,6 +47,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Remove entries from the sticky_events table at this frequency. +# Note: this does NOT mean we don't honour shorter expiration timeouts. +# Consumers call 'get_sticky_events_in_rooms' which has `WHERE expires_at > ?` +# to filter out expired sticky events that have yet to be deleted. +DELETE_EXPIRED_STICKY_EVENTS_MS = 60 * 1000 * 60 # 1 hour + class StickyEventsWorkerStore(CacheInvalidationWorkerStore): def __init__( @@ -58,6 +67,12 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): self._instance_name in hs.config.worker.writers.events ) + # Technically this means we will cleanup N times, once per event persister, maybe put on master? + if self._can_write_to_sticky_events: + self._clock.looping_call( + self._run_background_cleanup, DELETE_EXPIRED_STICKY_EVENTS_MS + ) + self._sticky_events_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator( db_conn=db_conn, db=database, @@ -426,5 +441,30 @@ class StickyEventsWorkerStore(CacheInvalidationWorkerStore): # We know the events are otherwise authorised, so we only need to load the current state # and check if the events pass auth at the current state. + async def _delete_expired_sticky_events(self) -> None: + logger.info("delete_expired_sticky_events") + await self.db_pool.runInteraction( + "_delete_expired_sticky_events", + self._delete_expired_sticky_events_txn, + self._now(), + ) + + def _delete_expired_sticky_events_txn( + self, txn: LoggingTransaction, now: int + ) -> None: + txn.execute( + """ + DELETE FROM sticky_events WHERE expires_at < ? + """, + (now,), + ) + def _now(self) -> int: return round(time.time() * 1000) + + def _run_background_cleanup(self) -> Deferred: + return run_as_background_process( + "delete_expired_sticky_events", + self.server_name, + self._delete_expired_sticky_events, + )