From 87c844a71e3de2ee01efd9f03491f39dfa35b40c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 17 Sep 2024 18:45:33 -0500 Subject: [PATCH] First pass of not fetching left rooms and adding back `newly_left` --- synapse/handlers/sliding_sync/room_lists.py | 187 +++++++++++++------ synapse/storage/databases/main/roommember.py | 46 ++++- synapse/storage/databases/main/state.py | 20 +- 3 files changed, 192 insertions(+), 61 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 475bfbbbcb..9a05d4c33d 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -55,7 +55,6 @@ from synapse.storage.roommember import ( ) from synapse.types import ( MutableStateMap, - PersistedEventPosition, RoomStreamToken, StateMap, StrCollection, @@ -220,6 +219,9 @@ class SlidingSyncRoomLists: # include rooms that are outside the list ranges. all_rooms: Set[str] = set() + # Note: this won't include rooms the user have left themselves. We add back + # `newly_left` rooms below. This is more efficient than fetching all rooms and + # then filtering out the old left rooms. room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( user_id ) @@ -245,36 +247,11 @@ class SlidingSyncRoomLists: event_id=change.event_id, event_pos=change.event_pos, room_version_id=change.room_version_id, - # We keep the current state of the room though + # We keep the state of the room though has_known_state=existing_room.has_known_state, room_type=existing_room.room_type, is_encrypted=existing_room.is_encrypted, ) - else: - # This can happen if we get "state reset" out of the room - # after the `to_token`. In other words, there is no membership - # for the room after the `to_token` but we see membership in - # the token range. - - # Get the state at the time. Note that room type never changes, - # so we can just get current room type - room_type = await self.store.get_room_type(room_id) - is_encrypted = await self.get_is_encrypted_for_room_at_token( - room_id, to_token.room_key - ) - - # Add back rooms that the user was state-reset out of after `to_token` - room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( - room_id=room_id, - sender=change.sender, - membership=change.membership, - event_id=change.event_id, - event_pos=change.event_pos, - room_version_id=change.room_version_id, - has_known_state=True, - room_type=room_type, - is_encrypted=is_encrypted, - ) ( newly_joined_room_ids, @@ -284,38 +261,106 @@ class SlidingSyncRoomLists: ) dm_room_ids = await self._get_dm_rooms_for_user(user_id) - # Handle state resets in the from -> to token range. - state_reset_rooms = ( + # Add back `newly_left` rooms (rooms left in the from -> to token range). We do + # this because `get_sliding_sync_rooms_for_user(...)` doesn't include rooms that + # the user left themselves and it's more efficient to add them back here than to + # fetch all rooms and then filter out the old left rooms. + # + missing_newly_left_rooms = ( newly_left_room_map.keys() - room_membership_for_user_map.keys() ) - if state_reset_rooms: + if missing_newly_left_rooms: room_membership_for_user_map = dict(room_membership_for_user_map) - for room_id in ( - newly_left_room_map.keys() - room_membership_for_user_map.keys() - ): - # Get the state at the time. Note that room type never changes, - # so we can just get current room type - room_type = await self.store.get_room_type(room_id) - is_encrypted = await self.get_is_encrypted_for_room_at_token( - room_id, newly_left_room_map[room_id].to_room_stream_token() - ) + for room_id in missing_newly_left_rooms: + newly_left_room_for_user = newly_left_room_map[room_id] + # This should be a given + assert newly_left_room_for_user.membership == Membership.LEAVE - room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( - room_id=room_id, - sender=None, - membership=Membership.LEAVE, - event_id=None, - event_pos=newly_left_room_map[room_id], - room_version_id=await self.store.get_room_version_id(room_id), - has_known_state=True, - room_type=room_type, - is_encrypted=is_encrypted, - ) + room_type = None + has_known_state = False + is_encrypted = False + # Add back `newly_left` rooms + # + # Normal user left the room + if newly_left_room_for_user.event_id is not None: + # We fetch from the Sliding Sync tables as it's just another membership + newly_left_room_for_user_sliding_sync = ( + await self.store.get_sliding_sync_room_for_user( + user_id, room_id + ) + ) + # The only reason this would happen is if the user was state reset + # out of the room while we were working on the response (between + # when we checked for newly_left rooms and now). + # + # TODO: We should handle this as a state reset case like below + if newly_left_room_for_user_sliding_sync is None: + raise AssertionError( + f"Can't find newly left room for user in the Sliding Sync tables: {user_id} / {room_id}", + ) + + room_membership_for_user_map[room_id] = ( + newly_left_room_for_user_sliding_sync + ) + + change = changes.get(room_id) + if change is not None: + # Update room membership events to the point in time of the `to_token` + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the state of the room though + has_known_state=newly_left_room_for_user_sliding_sync.has_known_state, + room_type=newly_left_room_for_user_sliding_sync.room_type, + is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted, + ) + + # They should match at this point + assert ( + room_membership_for_user_map[room_id].event_id + == newly_left_room_for_user.event_id + ) + + # "state reset" out of the room + else: + # Get the state at the time. We can't read from the Sliding Sync + # tables because the user has no membership in the room according to + # the state (thanks to the state reset). + # + # Note: `room_type` never changes, so we can just get current room + # type + room_type = await self.store.get_room_type(room_id) + has_known_state = room_type is not ROOM_UNKNOWN_SENTINEL + if isinstance(room_type, StateSentinel): + room_type = None + + # Get the encryption status at the time of the token + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, + newly_left_room_for_user.event_pos.to_room_stream_token(), + ) + + room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( + room_id=room_id, + sender=newly_left_room_for_user.sender, + membership=newly_left_room_for_user.membership, + event_id=newly_left_room_for_user.event_id, + event_pos=newly_left_room_for_user.event_pos, + room_version_id=newly_left_room_for_user.room_version_id, + has_known_state=has_known_state, + room_type=room_type, + is_encrypted=is_encrypted, + ) if sync_config.lists: sync_room_map = { room_id: room_membership_for_user for room_id, room_membership_for_user in room_membership_for_user_map.items() + # TODO: Do we need to do this anymore? if filter_membership_for_sync( user_id=user_id, room_membership_for_user=room_membership_for_user, @@ -955,17 +1000,23 @@ class SlidingSyncRoomLists: # Ensure we have entries for rooms that the user has been "state reset" # out of. These are rooms appear in the `newly_left_rooms` map but # aren't in the `rooms_for_user` map. - for room_id, left_event_pos in newly_left_room_ids.items(): + for room_id, newly_left_room_for_user in newly_left_room_ids.items(): + # If we already know about the room, it's not a state reset if room_id in rooms_for_user: continue + # This should be true if it's a state reset + assert newly_left_room_for_user.membership is Membership.LEAVE + assert newly_left_room_for_user.event_id is None + assert newly_left_room_for_user.sender is None + rooms_for_user[room_id] = RoomsForUserStateReset( room_id=room_id, event_id=None, - event_pos=left_event_pos, + event_pos=newly_left_room_for_user.event_pos, membership=Membership.LEAVE, sender=None, - room_version_id=await self.store.get_room_version_id(room_id), + room_version_id=newly_left_room_for_user.room_version_id, ) return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids) @@ -976,7 +1027,9 @@ class SlidingSyncRoomLists: user_id: str, to_token: StreamToken, from_token: Optional[StreamToken], - ) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]: + ) -> Tuple[ + AbstractSet[str], Mapping[str, Union[RoomsForUser, RoomsForUserStateReset]] + ]: """Fetch the sets of rooms that the user newly joined or left in the given token range. @@ -989,7 +1042,7 @@ class SlidingSyncRoomLists: IDs to the event position the leave happened at. """ newly_joined_room_ids: Set[str] = set() - newly_left_room_map: Dict[str, PersistedEventPosition] = {} + newly_left_room_map: Dict[str, Union[RoomsForUser, RoomsForUserStateReset]] = {} # We need to figure out the # @@ -1060,9 +1113,27 @@ class SlidingSyncRoomLists: # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: # 1) Mark this room as `newly_left` - newly_left_room_map[room_id] = ( - last_membership_change_in_from_to_range.event_pos - ) + if last_membership_change_in_from_to_range.event_id is not None: + # This is a normal leave event so this should exist + assert last_membership_change_in_from_to_range.sender is not None + + newly_left_room_map[room_id] = RoomsForUser( + room_id=room_id, + sender=last_membership_change_in_from_to_range.sender, + membership=Membership.LEAVE, + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + room_version_id=await self.store.get_room_version_id(room_id), + ) + else: + newly_left_room_map[room_id] = RoomsForUserStateReset( + room_id=room_id, + sender=last_membership_change_in_from_to_range.sender, + membership=Membership.LEAVE, + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + room_version_id=await self.store.get_room_version_id(room_id), + ) # 2) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e321a1add2..1746917b08 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1403,7 +1403,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) -> Mapping[str, RoomsForUserSlidingSync]: """Get all the rooms for a user to handle a sliding sync request. - Ignores forgotten rooms and rooms that the user has been kicked from. + Ignores forgotten rooms and rooms that the user has left themselves. Returns: Map from room ID to membership info @@ -1428,6 +1428,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') WHERE user_id = ? AND m.forgotten = 0 + AND (m.membership != 'leave' OR m.user_id != m.sender) """ txn.execute(sql, (user_id,)) return { @@ -1450,6 +1451,49 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): get_sliding_sync_rooms_for_user_txn, ) + async def get_sliding_sync_room_for_user( + self, user_id: str, room_id: str + ) -> Optional[RoomsForUserSlidingSync]: + """Get the sliding sync room entry for the given user and room.""" + + def get_sliding_sync_room_for_user_txn( + txn: LoggingTransaction, + ) -> Optional[RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + m.has_known_state, + COALESCE(j.room_type, m.room_type), + COALESCE(j.is_encrypted, m.is_encrypted) + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') + WHERE user_id = ? + AND m.forgotten = 0 + AND m.room_id = ? + """ + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + if not row: + return None + + return RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + has_known_state=bool(row[7]), + room_type=row[8], + is_encrypted=row[9], + ) + + return await self.db_pool.runInteraction( + "get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index ca31122ad3..60312d770d 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -308,8 +308,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return create_event @cached(max_entries=10000) - async def get_room_type(self, room_id: str) -> Optional[str]: - raise NotImplementedError() + async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]: + """Fetch room type for given room. + + Since this function is cached, any missing values would be cached as + `None`. In order to distinguish between an unencrypted room that has + `None` encryption and a room that is unknown to the server where we + might want to omit the value (which would make it cached as `None`), + instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`. + """ + + try: + create_event = await self.get_create_event_for_room(room_id) + return create_event.content.get(EventContentFields.ROOM_TYPE) + except NotFoundError: + # We use the sentinel value to distinguish between `None` which is a + # valid room type and a room that is unknown to the server so the value + # is just unset. + return ROOM_UNKNOWN_SENTINEL @cachedList(cached_method_name="get_room_type", list_name="room_ids") async def bulk_get_room_type(