From 4cf2966797f1cc5c2ed716ef884dc1518dcff471 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Sep 2024 15:38:44 +0100 Subject: [PATCH] WIP don't pull out left rooms --- synapse/handlers/sliding_sync/room_lists.py | 148 +++++++++--------- synapse/storage/databases/main/events.py | 2 + synapse/storage/databases/main/roommember.py | 44 +++++- synapse/storage/databases/main/state.py | 20 ++- synapse/storage/databases/main/stream.py | 1 + .../client/sliding_sync/test_sliding_sync.py | 59 +++++++ 6 files changed, 196 insertions(+), 78 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 165b15c60f..53f8edce06 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -54,7 +54,6 @@ from synapse.storage.roommember import ( ) from synapse.types import ( MutableStateMap, - PersistedEventPosition, RoomStreamToken, StateMap, StrCollection, @@ -219,6 +218,8 @@ class SlidingSyncRoomLists: # include rooms that are outside the list ranges. all_rooms: Set[str] = set() + # Note: this won't include rooms the user have left themselves. We add + # back in rooms that the user left since `from_token` below. room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( user_id ) @@ -234,44 +235,26 @@ class SlidingSyncRoomLists: room_membership_for_user_map.pop(room_id) continue - existing_room = room_membership_for_user_map.get(room_id) - if existing_room is not None: - # Update room membership events to the point in time of the `to_token` - room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( - room_id=room_id, - sender=change.sender, - membership=change.membership, - event_id=change.event_id, - event_pos=change.event_pos, - room_version_id=change.room_version_id, - # We keep the current state of the room though - room_type=existing_room.room_type, - is_encrypted=existing_room.is_encrypted, - ) - else: - # This can happen if we get "state reset" out of the room - # after the `to_token`. In other words, there is no membership - # for the room after the `to_token` but we see membership in - # the token range. + current_room_entry = await self.store.get_sliding_sync_room_for_user( + user_id, room_id + ) + if current_room_entry is None: + # We should always have an entry, even if we get state reset + # out of the room. + logger.error("Can't find room for user: %s / %s", user_id, room_id) + continue - # Get the state at the time. Note that room type never changes, - # so we can just get current room type - room_type = await self.store.get_room_type(room_id) - is_encrypted = await self.get_is_encrypted_for_room_at_token( - room_id, to_token.room_key - ) - - # Add back rooms that the user was state-reset out of after `to_token` - room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( - room_id=room_id, - sender=change.sender, - membership=change.membership, - event_id=change.event_id, - event_pos=change.event_pos, - room_version_id=change.room_version_id, - room_type=room_type, - is_encrypted=is_encrypted, - ) + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the current state of the room though + room_type=current_room_entry.room_type, + is_encrypted=current_room_entry.is_encrypted, + ) ( newly_joined_room_ids, @@ -281,31 +264,33 @@ class SlidingSyncRoomLists: ) dm_room_ids = await self._get_dm_rooms_for_user(user_id) - # Handle state resets in the from -> to token range. - state_reset_rooms = ( - newly_left_room_map.keys() - room_membership_for_user_map.keys() - ) - if state_reset_rooms: + # Handle leaves and state resets in the from -> to token range. + left_rooms = newly_left_room_map.keys() - room_membership_for_user_map.keys() + if left_rooms: room_membership_for_user_map = dict(room_membership_for_user_map) - for room_id in ( - newly_left_room_map.keys() - room_membership_for_user_map.keys() - ): - # Get the state at the time. Note that room type never changes, - # so we can just get current room type - room_type = await self.store.get_room_type(room_id) - is_encrypted = await self.get_is_encrypted_for_room_at_token( - room_id, newly_left_room_map[room_id].to_room_stream_token() + for room_id in left_rooms: + room_for_user = newly_left_room_map[room_id] + + # We fetch the current room entry for the user, as that's the + # easiest way of getting the room type etc. + current_room_entry = await self.store.get_sliding_sync_room_for_user( + user_id, room_id ) + if current_room_entry is None: + # We should always have an entry, even if we get state reset + # out of the room. + logger.error("Can't find room for user: %s / %s", user_id, room_id) + continue room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_id=room_id, - sender=None, - membership=Membership.LEAVE, - event_id=None, - event_pos=newly_left_room_map[room_id], - room_version_id=await self.store.get_room_version_id(room_id), - room_type=room_type, - is_encrypted=is_encrypted, + sender=room_for_user.sender, + membership=room_for_user.membership, + event_id=room_for_user.event_id, + event_pos=room_for_user.event_pos, + room_version_id=room_for_user.room_version_id, + room_type=current_room_entry.room_type, + is_encrypted=current_room_entry.is_encrypted, ) if sync_config.lists: @@ -417,8 +402,19 @@ class SlidingSyncRoomLists: room_subscription, ) in sync_config.room_subscriptions.items(): if room_id not in room_membership_for_user_map: - # TODO: Handle rooms the user isn't in. - continue + # Check if we do have an entry for the room, but didn't + # pull it out above. This could be e.g. a leave that we + # don't pull out by default. + current_room_entry = ( + await self.store.get_sliding_sync_room_for_user( + user_id, room_id + ) + ) + if not current_room_entry: + # TODO: Handle rooms the user isn't in. + continue + + room_membership_for_user_map[room_id] = current_room_entry all_rooms.add(room_id) @@ -944,18 +940,11 @@ class SlidingSyncRoomLists: # Ensure we have entries for rooms that the user has been "state reset" # out of. These are rooms appear in the `newly_left_rooms` map but # aren't in the `rooms_for_user` map. - for room_id, left_event_pos in newly_left_room_ids.items(): + for room_id, room_membership in newly_left_room_ids.items(): if room_id in rooms_for_user: continue - rooms_for_user[room_id] = RoomsForUserStateReset( - room_id=room_id, - event_id=None, - event_pos=left_event_pos, - membership=Membership.LEAVE, - sender=None, - room_version_id=await self.store.get_room_version_id(room_id), - ) + rooms_for_user[room_id] = room_membership return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids) @@ -965,7 +954,7 @@ class SlidingSyncRoomLists: user_id: str, to_token: StreamToken, from_token: Optional[StreamToken], - ) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]: + ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]: """Fetch the sets of rooms that the user newly joined or left in the given token range. @@ -975,10 +964,10 @@ class SlidingSyncRoomLists: Returns: A 2-tuple of newly joined room IDs and a map of newly left room - IDs to the event position the leave happened at. + IDs to the `RoomsForUserStateReset` entry. """ newly_joined_room_ids: Set[str] = set() - newly_left_room_map: Dict[str, PersistedEventPosition] = {} + newly_left_room_map: Dict[str, RoomsForUserStateReset] = {} # We need to figure out the # @@ -1045,12 +1034,21 @@ class SlidingSyncRoomLists: # 2) if last_membership_change_in_from_to_range.membership == Membership.JOIN: possibly_newly_joined_room_ids.add(room_id) + break # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). - if last_membership_change_in_from_to_range.membership == Membership.LEAVE: + if ( + last_membership_change_in_from_to_range.prev_membership + == Membership.JOIN + ): # 1) Mark this room as `newly_left` - newly_left_room_map[room_id] = ( - last_membership_change_in_from_to_range.event_pos + newly_left_room_map[room_id] = RoomsForUserStateReset( + room_id=room_id, + sender=last_membership_change_in_from_to_range.sender, + membership=Membership.LEAVE, + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + room_version_id=await self.store.get_room_version_id(room_id), ) # 2) Figure out `newly_joined` diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e5f63019fd..1a03a05900 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -413,6 +413,8 @@ class PersistEventsStore: to_insert = delta_state.to_insert to_delete = delta_state.to_delete + # TODO: Add entry to membership snapshots on state resets. + # If no state is changing, we don't need to do anything. This can happen when a # partial-stated room is re-syncing the current state. if not to_insert and not to_delete: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 8df760e8a6..916b4cd745 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1384,7 +1384,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) -> Mapping[str, RoomsForUserSlidingSync]: """Get all the rooms for a user to handle a sliding sync request. - Ignores forgotten rooms and rooms that the user has been kicked from. + Ignores forgotten rooms and rooms that the user has left themselves. Returns: Map from room ID to membership info @@ -1404,6 +1404,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') WHERE user_id = ? AND m.forgotten = 0 + AND (m.membership != 'leave' OR m.user_id != m.sender) """ txn.execute(sql, (user_id,)) return { @@ -1425,6 +1426,47 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): get_sliding_sync_rooms_for_user_txn, ) + async def get_sliding_sync_room_for_user( + self, user_id: str, room_id: str + ) -> Optional[RoomsForUserSlidingSync]: + """Get the sliding sync room entry for the given user and room.""" + + def get_sliding_sync_room_for_user_txn( + txn: LoggingTransaction, + ) -> Optional[RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted) + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') + WHERE user_id = ? + AND m.forgotten = 0 + AND m.room_id = ? + """ + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + if not row: + return None + + return RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + room_type=row[7], + is_encrypted=row[8], + ) + + return await self.db_pool.runInteraction( + "get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn + ) + async def have_finished_sliding_sync_background_jobs(self) -> bool: """Return if it's safe to use the sliding sync membership tables.""" diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c5caaf56b0..b0200fbd41 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -308,8 +308,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return create_event @cached(max_entries=10000) - async def get_room_type(self, room_id: str) -> Optional[str]: - raise NotImplementedError() + async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]: + """Fetch room type for given room. + + Since this function is cached, any missing values would be cached as + `None`. In order to distinguish between an unencrypted room that has + `None` encryption and a room that is unknown to the server where we + might want to omit the value (which would make it cached as `None`), + instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`. + """ + + try: + create_event = await self.get_create_event_for_room(room_id) + return create_event.content.get(EventContentFields.ROOM_TYPE) + except NotFoundError: + # We use the sentinel value to distinguish between `None` which is a + # valid room type and a room that is unknown to the server so the value + # is just unset. + return ROOM_UNKNOWN_SENTINEL @cachedList(cached_method_name="get_room_type", list_name="room_ids") async def bulk_get_room_type( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 459436e304..87642293ef 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -947,6 +947,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return [] if from_key: + # TODO: We also need to invalidate this on current state change has_changed = self._membership_stream_cache.has_entity_changed( user_id, int(from_key.stream) ) diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index 930cb5ef45..0b29ac0878 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( AccountDataTypes, EventContentFields, EventTypes, + JoinRules, RoomTypes, ) from synapse.events import EventBase @@ -42,6 +43,7 @@ from synapse.util.stringutils import random_string from tests import unittest from tests.server import TimedOutException +from tests.test_utils.event_injection import create_event logger = logging.getLogger(__name__) @@ -1007,3 +1009,60 @@ class SlidingSyncTestCase(SlidingSyncBase): # Make the Sliding Sync request response_body, _ = self.do_sync(sync_body, tok=user1_tok) self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + def test_state_reset_room_comes_down_incremental_sync(self) -> None: + """Test that a room that we were state reset out of comes down + incremental sync""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, is_public=True, tok=user2_tok) + + event_response = self.helper.send(room_id1, "test", tok=user2_tok) + event_id = event_response["event_id"] + + self.helper.join(room_id1, user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 1, + } + } + } + + # Make the Sliding Sync request + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + # Trigger a state reset + join_rule_event, join_rule_context = self.get_success( + create_event( + self.hs, + prev_event_ids=[event_id], + type=EventTypes.JoinRules, + state_key="", + content={"join_rule": JoinRules.INVITE}, + sender=user2_id, + room_id=room_id1, + room_version=self.get_success(self.store.get_room_version_id(room_id1)), + ) + ) + self.get_success( + self.hs.get_storage_controllers().persistence.persist_event( + join_rule_event, join_rule_context + ) + ) + + users_in_room = self.get_success(self.store.get_users_in_room(room_id1)) + self.assertIncludes(set(users_in_room), {user2_id}, exact=True) + + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # TODO: What should we expect here? Probably at least *something*? + print(response_body["rooms"][room_id1])