From 73a4d298c8bdfee2dcc41d06091e2c0fe4299c72 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jan 2025 15:46:51 +0000 Subject: [PATCH] Periodically advance epoch --- synapse/storage/databases/main/state.py | 31 +++++++++++++++++++ .../main/delta/88/02_state_groups_epochs.sql | 2 ++ 2 files changed, 33 insertions(+) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 788f7d1e32..101a82c509 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -48,6 +48,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.opentracing import trace +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import UnPartialStatedEventStream from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore @@ -118,6 +119,36 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): super().__init__(database, db_conn, hs) self._instance_name: str = hs.get_instance_name() + if hs.config.worker.run_background_tasks: + self._clock.looping_call_now(self._advance_state_epoch, 2 * 60 * 1000) + + @wrap_as_background_process("_advance_state_epoch") + async def _advance_state_epoch(self) -> None: + """Advances the state epoch, checking that we haven't advanced it too + recently. + """ + + now = self._clock.time_msec() + update_if_before_ts = now - 10 * 60 * 1000 + + def advance_state_epoch_txn(txn: LoggingTransaction) -> None: + sql = """ + UPDATE state_epoch + SET state_epoch = state_epoch + 1, updated_ts = ? + WHERE updated_ts <= ? + """ + txn.execute( + sql, + ( + now, + update_if_before_ts, + ), + ) + + await self.db_pool.runInteraction( + "_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True + ) + def process_replication_rows( self, stream_name: str, diff --git a/synapse/storage/schema/main/delta/88/02_state_groups_epochs.sql b/synapse/storage/schema/main/delta/88/02_state_groups_epochs.sql index afc36a65c3..ea2a4c2f8a 100644 --- a/synapse/storage/schema/main/delta/88/02_state_groups_epochs.sql +++ b/synapse/storage/schema/main/delta/88/02_state_groups_epochs.sql @@ -19,6 +19,8 @@ CREATE TABLE IF NOT EXISTS state_epoch ( CHECK (Lock='X') ); +INSERT INTO state_epoch (state_epoch, updated_ts) VALUES (0, 0); + CREATE TABLE IF NOT EXISTS state_groups_pending_deletion ( state_group BIGINT NOT NULL, state_epoch BIGINT NOT NULL,