From 192fd59943ee7e14c1dfca898a4d127f42126e93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jan 2025 14:00:51 +0000 Subject: [PATCH] Check state epoch on state group persistence --- synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/state/epochs.py | 28 +++++++++++++++++++++++ synapse/storage/databases/state/store.py | 25 +++++++++++--------- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index d49437834d..98940f56c8 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -118,8 +118,8 @@ class Databases(Generic[DataStoreT]): if state: raise Exception("'state' data store already configured") - state = StateGroupDataStore(database, db_conn, hs) state_epochs = StateEpochDataStore(database, db_conn, hs) + state = StateGroupDataStore(database, db_conn, hs, state_epochs) db_conn.commit() diff --git a/synapse/storage/databases/state/epochs.py b/synapse/storage/databases/state/epochs.py index 3ac891732c..c5d108be78 100644 --- a/synapse/storage/databases/state/epochs.py +++ b/synapse/storage/databases/state/epochs.py @@ -157,3 +157,31 @@ class StateEpochDataStore: keyvalues={"state_group": state_group}, desc="mark_state_group_as_used", ) + + def check_prev_group_before_insertion_txn( + self, txn: LoggingTransaction, prev_group: int, new_groups: Collection[int] + ) -> None: + sql = """ + SELECT state_epoch, (SELECT state_epoch FROM state_epoch) + FROM state_groups_pending_deletion + WHERE state_group = ? + """ + txn.execute(sql, (prev_group,)) + row = txn.fetchone() + if row is not None: + pending_deletion_epoch, current_epoch = row + if current_epoch - pending_deletion_epoch >= 2: + raise Exception("") # TODO + + self.db_pool.simple_update_txn( + txn, + table="state_groups_pending_deletion", + keyvalues={"state_group": prev_group}, + updatevalues={"state_epoch": current_epoch}, + ) + self.db_pool.simple_insert_many_txn( + txn, + table="state_groups_pending_deletion", + keys=("state_group", "state_epoch"), + values=[(state_group, current_epoch) for state_group in new_groups], + ) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index fd3f951a81..f073ba5473 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -58,6 +58,7 @@ from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.storage.databases.state.epochs import StateEpochDataStore logger = logging.getLogger(__name__) @@ -86,8 +87,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): database: DatabasePool, db_conn: LoggingDatabaseConnection, hs: "HomeServer", + epoch_store: "StateEpochDataStore", ): super().__init__(database, db_conn, hs) + self._epoch_store = epoch_store # Originally the state store used a single DictionaryCache to cache the # event IDs for the state types in a given state group to avoid hammering @@ -471,16 +474,6 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): A list of state groups """ - # TODO: Check state epochs - # - # If prev_group is in state_deletions? => Mark these groups as for - # deletion too in the same state epoch. Then clear them when we - # persist the events. Maybe bump epoch to the latest? - # - # Deny if scheduled for deletion in an old state epoch that is too old? - # - # OR just copy over stuff from `prev_group`? - is_in_db = self.db_pool.simple_select_one_onecol_txn( txn, table="state_groups", @@ -560,6 +553,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): for key, state_id in context.state_delta_due_to_event.items() ], ) + + # We need to check that the prev group isn't about to be deleted + self._epoch_store.check_prev_group_before_insertion_txn( + txn, prev_group, state_groups + ) + return events_and_context return await self.db_pool.runInteraction( @@ -616,7 +615,6 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): needs to be persisted as a full state. """ - # TODO: Check state epoch. is_in_db = self.db_pool.simple_select_one_onecol_txn( txn, table="state_groups", @@ -660,6 +658,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ], ) + # We need to check that the prev group isn't about to be deleted + self._epoch_store.check_prev_group_before_insertion_txn( + txn, prev_group, [state_group] + ) + return state_group def insert_full_state_txn(