From f078df7eee045c8a4d3c1ed78522c9841fe80bc4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 17 Sep 2024 23:38:06 -0500 Subject: [PATCH] Fix state reset when filtering --- synapse/handlers/sliding_sync/room_lists.py | 48 ++++++++++++++++++- .../client/sliding_sync/test_sliding_sync.py | 15 +++--- tests/storage/test_stream.py | 6 +-- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index ef8141ed11..5040374838 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -363,6 +363,7 @@ class SlidingSyncRoomLists: newly_left=room_id in newly_left_room_map, ) } + logger.info("asdf sync_room_map: %s", sync_room_map) with start_active_span("assemble_sliding_window_lists"): for list_key, list_config in sync_config.lists.items(): # Apply filters @@ -371,10 +372,14 @@ class SlidingSyncRoomLists: filtered_sync_room_map = await self.filter_rooms_using_tables( user_id, sync_room_map, + previous_connection_state, list_config.filters, to_token, dm_room_ids, ) + logger.info( + "asdf filtered_sync_room_map: %s", filtered_sync_room_map + ) # Find which rooms are partially stated and may need to be filtered out # depending on the `required_state` requested (see below). @@ -574,6 +579,7 @@ class SlidingSyncRoomLists: filtered_sync_room_map = await self.filter_rooms( sync_config.user, sync_room_map, + previous_connection_state, list_config.filters, to_token, dm_room_ids, @@ -1573,6 +1579,7 @@ class SlidingSyncRoomLists: self, user: UserID, sync_room_map: Dict[str, RoomsForUserType], + previous_connection_state: PerConnectionState, filters: SlidingSyncConfig.SlidingSyncList.Filters, to_token: StreamToken, dm_room_ids: AbstractSet[str], @@ -1758,14 +1765,33 @@ class SlidingSyncRoomLists: ) } + # Keep rooms if the user has been state reset out of it but we previously sent + # down the connection before. We want to make sure that we send these down to + # the client regardless of filters so they find out about the state reset. + # + # We don't always have access to the state in a room after being state reset if + # no one else locally on the server is participating in the room so we patch + # these back in manually. + state_reset_out_of_room_id_set = { + room_id + for room_id in sync_room_map.keys() + if sync_room_map[room_id].event_id is None + and previous_connection_state.rooms.have_sent_room(room_id).status + != HaveSentRoomFlag.NEVER + } + # Assemble a new sync room map but only with the `filtered_room_id_set` - return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + return { + room_id: sync_room_map[room_id] + for room_id in filtered_room_id_set | state_reset_out_of_room_id_set + } @trace async def filter_rooms_using_tables( self, user_id: str, sync_room_map: Mapping[str, RoomsForUserSlidingSync], + previous_connection_state: PerConnectionState, filters: SlidingSyncConfig.SlidingSyncList.Filters, to_token: StreamToken, dm_room_ids: AbstractSet[str], @@ -1907,8 +1933,26 @@ class SlidingSyncRoomLists: ) } + # Keep rooms if the user has been state reset out of it but we previously sent + # down the connection before. We want to make sure that we send these down to + # the client regardless of filters so they find out about the state reset. + # + # We don't always have access to the state in a room after being state reset if + # no one else locally on the server is participating in the room so we patch + # these back in manually. + state_reset_out_of_room_id_set = { + room_id + for room_id in sync_room_map.keys() + if sync_room_map[room_id].event_id is None + and previous_connection_state.rooms.have_sent_room(room_id).status + != HaveSentRoomFlag.NEVER + } + # Assemble a new sync room map but only with the `filtered_room_id_set` - return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} + return { + room_id: sync_room_map[room_id] + for room_id in filtered_room_id_set | state_reset_out_of_room_id_set + } @trace async def sort_rooms( diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index 0e503e9bda..cf8b25308f 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -424,6 +424,9 @@ class SlidingSyncTestCase(SlidingSyncBase): self.event_sources = hs.get_event_sources() self.storage_controllers = hs.get_storage_controllers() self.account_data_handler = hs.get_account_data_handler() + persistence = self.hs.get_storage_controllers().persistence + assert persistence is not None + self.persistence = persistence super().prepare(reactor, clock, hs) @@ -902,9 +905,7 @@ class SlidingSyncTestCase(SlidingSyncBase): ) ) _, join_rule_event_pos, _ = self.get_success( - self.hs.get_storage_controllers().persistence.persist_event( - join_rule_event, join_rule_context - ) + self.persistence.persist_event(join_rule_event, join_rule_context) ) # FIXME: We're manually busting the cache since @@ -1034,9 +1035,7 @@ class SlidingSyncTestCase(SlidingSyncBase): ) ) _, join_rule_event_pos, _ = self.get_success( - self.hs.get_storage_controllers().persistence.persist_event( - join_rule_event, join_rule_context - ) + self.persistence.persist_event(join_rule_event, join_rule_context) ) # FIXME: We're manually busting the cache since @@ -1049,6 +1048,10 @@ class SlidingSyncTestCase(SlidingSyncBase): users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) + # User2 also leaves the room so the server is no longer participating in the room + # and we don't have access to current state + self.helper.leave(space_room_id, user2_id, tok=user2_tok) + # Make another Sliding Sync request (incremental) response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index e6fa9dfad6..ed5f286243 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -30,8 +30,8 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import ( Direction, EventTypes, - Membership, JoinRules, + Membership, RelationTypes, ) from synapse.api.filtering import Filter @@ -1206,9 +1206,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): ) ) _, join_rule_event_pos, _ = self.get_success( - self.hs.get_storage_controllers().persistence.persist_event( - join_rule_event, join_rule_context - ) + self.persistence.persist_event(join_rule_event, join_rule_context) ) # FIXME: We're manually busting the cache since