1
0

Periodically advance epoch

This commit is contained in:
Erik Johnston
2025-01-08 15:46:51 +00:00
parent b63f5b6580
commit 73a4d298c8
2 changed files with 33 additions and 0 deletions

View File

@@ -48,6 +48,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.tcp.streams import UnPartialStatedEventStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore
@@ -118,6 +119,36 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name()
if hs.config.worker.run_background_tasks:
self._clock.looping_call_now(self._advance_state_epoch, 2 * 60 * 1000)
@wrap_as_background_process("_advance_state_epoch")
async def _advance_state_epoch(self) -> None:
"""Advances the state epoch, checking that we haven't advanced it too
recently.
"""
now = self._clock.time_msec()
update_if_before_ts = now - 10 * 60 * 1000
def advance_state_epoch_txn(txn: LoggingTransaction) -> None:
sql = """
UPDATE state_epoch
SET state_epoch = state_epoch + 1, updated_ts = ?
WHERE updated_ts <= ?
"""
txn.execute(
sql,
(
now,
update_if_before_ts,
),
)
await self.db_pool.runInteraction(
"_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True
)
def process_replication_rows(
self,
stream_name: str,

View File

@@ -19,6 +19,8 @@ CREATE TABLE IF NOT EXISTS state_epoch (
CHECK (Lock='X')
);
INSERT INTO state_epoch (state_epoch, updated_ts) VALUES (0, 0);
CREATE TABLE IF NOT EXISTS state_groups_pending_deletion (
state_group BIGINT NOT NULL,
state_epoch BIGINT NOT NULL,