From ed1b4eff452574771742c2a12751bac2ac9243c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Jan 2025 14:14:27 +0000 Subject: [PATCH] Check more stuff --- synapse/state/__init__.py | 25 +++++++++++++++-------- synapse/storage/databases/state/epochs.py | 12 +++++++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index fdf07838d9..4df5eda490 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -696,17 +696,16 @@ class StateResolutionHandler: async with self.resolve_linearizer.queue(group_names): cache = self._state_cache.get(group_names, None) if cache: - pending_deletion = False + state_groups_to_check = [] + if cache.state_group is not None: + state_groups_to_check.append(cache.state_group) - if cache.state_group: - pending_deletion |= await state_res_store.state_epoch_store.is_state_group_pending_deletion( - cache.state_group - ) + if cache.prev_group is not None: + state_groups_to_check.append(cache.prev_group) - if cache.prev_group: - pending_deletion |= await state_res_store.state_epoch_store.is_state_group_pending_deletion( - cache.prev_group - ) + pending_deletion = await state_res_store.state_epoch_store.are_state_groups_pending_deletion( + state_groups_to_check + ) if not pending_deletion: return cache @@ -719,6 +718,14 @@ class StateResolutionHandler: list(group_names), ) + pending_deletion = await state_res_store.state_epoch_store.are_state_groups_pending_deletion( + group_names + ) + if pending_deletion: + raise Exception( + f"state groups are pending deletion: {shortstr(pending_deletion)}" + ) + state_groups_histogram.observe(len(state_groups_ids)) new_state = await self.resolve_events_with_store( diff --git a/synapse/storage/databases/state/epochs.py b/synapse/storage/databases/state/epochs.py index c5d108be78..71b8125db4 100644 --- a/synapse/storage/databases/state/epochs.py +++ b/synapse/storage/databases/state/epochs.py @@ -147,6 +147,18 @@ class StateEpochDataStore: is_state_group_pending_deletion_txn, ) + async def are_state_groups_pending_deletion( + self, state_groups: Collection[int] + ) -> Collection[int]: + rows = await self.db_pool.simple_select_many_batch( + table="state_groups_pending_deletion", + column="state_group", + iterable=state_groups, + retcols=("state_group",), + desc="are_state_groups_pending_deletion", + ) + return {row["state_group"] for row in rows} + async def mark_state_group_as_used(self, state_group: int) -> None: """Mark that a given state group is used"""