From 73603ef1dbd1b559abf4f5bc06fc9a4b31fae6b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Jan 2025 11:08:59 +0000 Subject: [PATCH] WIP --- synapse/state/__init__.py | 9 +++ synapse/storage/controllers/persist_events.py | 29 ++++++++ synapse/storage/databases/state/store.py | 66 ++++++++++++++++++- 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 72b291889b..fe09d9fd4b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -194,6 +194,7 @@ class StateHandler: self._storage_controllers = hs.get_storage_controllers() self._events_shard_config = hs.config.worker.events_shard_config self._instance_name = hs.get_instance_name() + self._state_store = hs.get_datastores().state self._update_current_state_client = ( ReplicationUpdateCurrentStateRestServlet.make_client(hs) @@ -311,6 +312,8 @@ class StateHandler: """ assert not event.internal_metadata.is_outlier() + state_epoch = await self._state_store.get_state_epoch() # TODO: Get state epoch + # # first of all, figure out the state before the event, unless we # already have it. @@ -396,6 +399,7 @@ class StateHandler: delta_ids_to_state_group_before_event=deltas_to_state_group_before_event, partial_state=partial_state, state_map_before_event=state_ids_before_event, + state_epoch=state_epoch, ) # @@ -426,6 +430,7 @@ class StateHandler: delta_ids_to_state_group_before_event=deltas_to_state_group_before_event, partial_state=partial_state, state_map_before_event=state_ids_before_event, + state_epoch=state_epoch, ) async def compute_event_context( @@ -511,6 +516,9 @@ class StateHandler: ) = await self._state_storage_controller.get_state_group_delta( state_group_id ) + + # TODO: Check for deleted state groups + return _StateCacheEntry( state=None, state_group=state_group_id, @@ -663,6 +671,7 @@ class StateResolutionHandler: async with self.resolve_linearizer.queue(group_names): cache = self._state_cache.get(group_names, None) if cache: + # TODO: Check for deleted state groups return cache logger.info( diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 879ee9039e..98600054d6 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -635,6 +635,35 @@ class EventsPersistenceStorageController: room_id, [e for e, _ in chunk] ) + # TODO: Clear pending deletion status of state groups that are + # referenced. Should we mark as to be deleted in the case we fail to + # persist below? No, as we don't know how long the below will take. + # + # TODO: Check that state epoch isn't too old. + # + # TODO: Add a table to track what state groups we're currently + # inserting? There's a race where this transaction takes so long + # that we delete the state groups we're inserting. + # + # NYARGH: We don't have access to the state store here? Do we do + # this a layer above? There's not *that* much happening here? + + min_state_epoch = min(ctx.state_epoch for _, ctx in chunk) + state_groups = { + ctx.state_group + for _, ctx in chunk + if ctx.state_group and not ctx.rejected + } + state_groups.update( + ctx.state_group_before_event + for _, ctx in chunk + if ctx.state_group_before_event is not None + ) + await self.state_store.mark_state_groups_as_used( + min_state_epoch, + state_groups, + ) + await self.persist_events_store._persist_events_and_state_updates( room_id, chunk, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 34bfd8f5c4..f6bbae9246 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -36,7 +36,11 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase +from synapse.events.snapshot import ( + EventContext, + UnpersistedEventContext, + UnpersistedEventContextBase, +) from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore @@ -171,6 +175,60 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True ) + async def get_state_epoch(self) -> int: + return await self.db_pool.simple_select_one_onecol( + table="state_epoch", + retcol="state_epoch", + keyvalues={}, + desc="get_state_epoch", + ) + + async def mark_state_groups_as_used( + self, event_and_contexts: Collection[Tuple[EventBase, EventContext]] + ) -> None: + min_state_epoch = min(ctx.state_epoch for _, ctx in event_and_contexts) + state_groups = { + ctx.state_group + for _, ctx in event_and_contexts + if ctx.state_group and not ctx.rejected + } + state_groups.update( + ctx.state_group_before_event + for _, ctx in event_and_contexts + if ctx.state_group_before_event is not None + ) + + if not state_groups: + return + + await self.db_pool.runInteraction( + "mark_state_groups_as_used", + self._mark_state_groups_as_used_txn, + min_state_epoch, + state_groups, + ) + + def _mark_state_groups_as_used_txn( + self, txn: LoggingTransaction, state_epoch: int, state_groups: Collection[int] + ) -> None: + current_state_epoch = self.db_pool.simple_select_one_onecol_txn( + txn, + table="state_epoch", + retcol="state_epoch", + keyvalues={}, + ) + + # TODO: Move to constant. Is the equality correct? + if current_state_epoch - state_epoch >= 2: + raise Exception("FOO") + + self.db_pool.simple_delete_many_batch_txn( + txn, + table="state_groups_pending_deletion", + keys=("state_group",), + values=[(state_group,) for state_group in state_groups], + ) + @cached() async def is_state_group_pending_deletion_before( self, state_epoch: int, state_group: int @@ -535,7 +593,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # # If prev_group is in state_deletions? => Mark these groups as for # deletion too in the same state epoch. Then clear them when we - # persist the events. + # persist the events. Maybe bump epoch to the latest? + # + # Deny if scheduled for deletion in an old state epoch that is too old? + # + # OR just copy over stuff from `prev_group`? is_in_db = self.db_pool.simple_select_one_onecol_txn( txn,