From 064237a9a41510020a175b64985fe03a06d9ffd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2026 12:37:52 +0000 Subject: [PATCH] Prune `sliding_sync_connection_required_state` table (#19306) When we change the `required_state` config for a room in sliding sync, we insert a new entry into the `sliding_sync_connection_required_state` table. As the sliding sync connection advances we can accrue a lot of stale entries, so let's clear those out. This is a sort of follow on from #19211 --------- Co-authored-by: Eric Eastwood --- changelog.d/19306.misc | 1 + .../storage/databases/main/sliding_sync.py | 47 +++- tests/storage/test_sliding_sync_tables.py | 242 ++++++++++++++++++ 3 files changed, 287 insertions(+), 3 deletions(-) create mode 100644 changelog.d/19306.misc diff --git a/changelog.d/19306.misc b/changelog.d/19306.misc new file mode 100644 index 0000000000..463f87eac3 --- /dev/null +++ b/changelog.d/19306.misc @@ -0,0 +1 @@ +Prune stale entries from `sliding_sync_connection_required_state` table. diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index c66002dae4..9a09c0f9b5 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -450,6 +450,9 @@ class SlidingSyncStore(SQLBaseStore): # Now that we have seen the client has received and used the connection # position, we can delete all the other connection positions. + # + # Note: the rest of the code here assumes this is the only remaining + # connection position. sql = """ DELETE FROM sliding_sync_connection_positions WHERE connection_key = ? AND connection_position != ? @@ -485,9 +488,10 @@ class SlidingSyncStore(SQLBaseStore): ), ) - required_state_map: dict[int, dict[str, set[str]]] = {} + # Map from required_state_id -> event type -> set of state keys. + stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {} for row in rows: - state = required_state_map[row[0]] = {} + state = stored_required_state_id_maps[row[0]] = {} for event_type, state_key in db_to_json(row[1]): state.setdefault(event_type, set()).add(state_key) @@ -512,7 +516,44 @@ class SlidingSyncStore(SQLBaseStore): ) in room_config_rows: room_configs[room_id] = RoomSyncConfig( timeline_limit=timeline_limit, - required_state_map=required_state_map[required_state_id], + required_state_map=stored_required_state_id_maps[required_state_id], + ) + + # Clean up any `required_state_id`s that are no longer used by any + # connection position on this connection. + # + # We store the required state config per-connection per-room. Since this + # can be a lot of data, we deduplicate the required state JSON and store + # it separately, with multiple rooms referencing the same `required_state_id`. + # Over time as the required state configs change, some `required_state_id`s + # may no longer be referenced by any room config, so we need + # to clean them up. + # + # We do this by noting that we have pulled out *all* rows from + # `sliding_sync_connection_required_state` for this connection above. We + # have also pulled out all referenced `required_state_id`s for *this* + # connection position, which is the only connection position that + # remains (we deleted the others above). + # + # Thus we can compute the unused `required_state_id`s by looking for any + # `required_state_id`s that are not referenced by the remaining connection + # position. + used_required_state_ids = { + required_state_id for _, _, required_state_id in room_config_rows + } + + unused_required_state_ids = ( + stored_required_state_id_maps.keys() - used_required_state_ids + ) + if unused_required_state_ids: + self.db_pool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_connection_required_state", + keys=("connection_key", "required_state_id"), + values=[ + (connection_key, required_state_id) + for required_state_id in unused_required_state_ids + ], ) # Now look up the per-room stream data. diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index cb9be29c5d..f5bbd49663 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -3120,6 +3120,248 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase): # The timestamp for user1 should be updated. self.assertGreater(lazy_member_entries[user1_id], prev_timestamp) + def test_pruning_sliding_sync_connection_required_state(self) -> None: + """Test that we prune old entries from + `sliding_sync_connection_required_state`. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + self.helper.send_state( + room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok + ) + + # Do an initial sync, this will pull down the above room and thus cause + # us to store a single required state entry for the room. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that we have an entry in sliding_sync_connection_required_state + connection_pos1 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + connection_key = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="sliding_sync_connection_positions", + keyvalues={"connection_position": connection_pos1}, + retcol="connection_key", + ) + ) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect a single entry here for the one room ID. + self.assertEqual(len(required_state_entries), 1) + first_required_state_id = required_state_entries[0][0] + + # Update the sync body to request more required state, so that we get + # another entry in the table. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Name, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + + # We need to send a message to cause the room to come down the next + # sync. This shouldn't be necessary, but we don't currently implement + # immediately sending down the room when required_state is updated, + # see https://github.com/element-hq/synapse/issues/18844 + self.helper.send(room_id, "msg1", tok=user1_tok) + + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect two entries here, one for old state and one for new state. + # The old entry doesn't get pruned yet as the previous from_token could + # still be used. + self.assertEqual(len(required_state_entries), 2) + + # Sync again with the latest token. This time we expect the old + # entry to be pruned. + self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 1) + + # Double check that we have pruned the old entry. + self.assertNotEqual(required_state_entries[0][0], first_required_state_id) + + def test_pruning_sliding_sync_connection_required_state_forks(self) -> None: + """Test that we prune entries in + `sliding_sync_connection_required_state` for forked positions. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + self.helper.send_state( + room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok + ) + + # Do an initial sync, this will pull down the above room and thus cause + # us to store a single required state entry for the room. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that we have an entry in sliding_sync_connection_required_state + connection_pos1 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + connection_key = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="sliding_sync_connection_positions", + keyvalues={"connection_position": connection_pos1}, + retcol="connection_key", + ) + ) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect a single entry here for the one room ID. + self.assertEqual(len(required_state_entries), 1) + first_required_state_id = required_state_entries[0][0] + + # Update the sync body to request more required state, so that we get + # another entry in the table. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Name, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + + # We need to send a message to cause the room to come down the next + # sync. This shouldn't be necessary, but we don't currently implement + # immediately sending down the room when required_state is updated, + # see https://github.com/element-hq/synapse/issues/18844 + self.helper.send(room_id, "msg1", tok=user1_tok) + + _, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect two entries here, one for old state and one for new state. + # The old entry doesn't get pruned yet as the previous from_token could + # still be used. + self.assertEqual(len(required_state_entries), 2) + second_required_state_id = sorted(required_state_entries)[1][0] + + # We sync again, but with the old token, creating a fork in the + # connection positions. We change the sync body again so that the + # `required_state` doesn't get deduplicated. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Topic, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # There should now be three entries, one for each of the required_state. + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 3) + + # Sync again with the latest token. This should prune all except the + # latest entry in `sliding_sync_connection_required_state`. + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 1) + + # Double check that we have pruned the old entry. + self.assertNotEqual(required_state_entries[0][0], first_required_state_id) + self.assertNotEqual(required_state_entries[0][0], second_required_state_id) + class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): """