diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 9ea5d82951..aff64b615f 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -62,12 +62,14 @@ from synapse.types.handlers.sliding_sync import ( HaveSentRoomFlag, MutablePerConnectionState, PerConnectionState, + RoomLazyMembershipChanges, RoomSyncConfig, SlidingSyncConfig, SlidingSyncResult, StateValues, ) from synapse.types.state import StateFilter +from synapse.util import MutableOverlayMapping from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client @@ -1138,8 +1140,8 @@ class SlidingSyncHandler: # time we've sent the room down this connection. room_state: StateMap[EventBase] = {} - new_connection_state.room_lazy_membership[room_id] = dict.fromkeys( - required_user_state + new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges( + added=dict.fromkeys(required_user_state) ) if initial: @@ -1163,12 +1165,32 @@ class SlidingSyncHandler: user_ids=required_user_state, ) ) - new_connection_state.room_lazy_membership[room_id].update( + new_connection_state.room_lazy_membership[room_id].added.update( previously_returned_user_state ) else: previously_returned_user_state = {} + if limited: + # If the timeline is limited we need to remove any members + # from list of lazy loaded members that have changed but not + # been sent down the timeline. + previously_returned_user_state = MutableOverlayMapping( + previously_returned_user_state + ) + for event_type, state_key in room_state_delta_id_map: + if event_type != EventTypes.Member: + continue + + if state_key not in required_user_state: + previously_returned_user_state.pop(state_key, None) + new_connection_state.room_lazy_membership[ + room_id + ].added.pop(state_key, None) + new_connection_state.room_lazy_membership[ + room_id + ].removed.add(state_key) + # Check if there are any changes to the required state config # that we need to handle. changed_required_state_map, added_state_filter = ( diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 3d766efc41..489fcde16a 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -33,6 +33,7 @@ from synapse.types.handlers.sliding_sync import ( HaveSentRoomFlag, MutablePerConnectionState, PerConnectionState, + RoomLazyMembershipChanges, RoomStatusMap, RoomSyncConfig, ) @@ -350,15 +351,12 @@ class SlidingSyncStore(SQLBaseStore): value_values=values, ) - for room_id, user_ids in per_connection_state.room_lazy_membership.items(): - self._persist_sliding_sync_connection_lazy_members_txn( - txn, - connection_key, - previous_connection_position, - connection_position, - room_id, - user_ids, - ) + self._persist_sliding_sync_connection_lazy_members_txn( + txn, + connection_key, + connection_position, + per_connection_state.room_lazy_membership, + ) return connection_position @@ -557,50 +555,62 @@ class SlidingSyncStore(SQLBaseStore): self, txn: LoggingTransaction, connection_key: int, - previous_connection_position: int | None, new_connection_position: int, - room_id: str, - user_ids: Mapping[str, int | None], + all_changes: dict[str, RoomLazyMembershipChanges], ) -> None: """Persist that we have sent lazy membership for the given user IDs.""" now = self.clock.time_msec() - to_update: list[str] = [] - for user_id, last_seen_ts in user_ids.items(): - if last_seen_ts is None: - # We've never sent this user before, so we need to record that - # we've sent it at the new connection position. - to_update.append(user_id) - elif last_seen_ts + 60 * 60 * 1000 < now: - # We last saw this user over an hour ago, so we update the - # timestamp. - to_update.append(user_id) + to_update: list[tuple[str, str]] = [] + for room_id, room_changes in all_changes.items(): + for user_id, last_seen_ts in room_changes.added.items(): + if last_seen_ts is None: + # We've never sent this user before, so we need to record that + # we've sent it at the new connection position. + to_update.append((room_id, user_id)) + elif last_seen_ts + 60 * 60 * 1000 < now: + # We last saw this user over an hour ago, so we update the + # timestamp. + to_update.append((room_id, user_id)) - if not to_update: - return + if to_update: + sql = """ + INSERT INTO sliding_sync_connection_lazy_members + (connection_key, connection_position, room_id, user_id, last_seen_ts) + VALUES {value_placeholder} + ON CONFLICT (connection_key, room_id, user_id) + DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts + WHERE sliding_sync_connection_lazy_members.connection_position IS NULL + OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position + """ - sql = """ - INSERT INTO sliding_sync_connection_lazy_members - (connection_key, connection_position, room_id, user_id, last_seen_ts) - VALUES {value_placeholder} - ON CONFLICT (connection_key, room_id, user_id) - DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts - WHERE sliding_sync_connection_lazy_members.connection_position IS NULL - OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position - """ + args = [ + (connection_key, new_connection_position, room_id, user_id, now) + for room_id, user_id in to_update + ] - args = [ - (connection_key, new_connection_position, room_id, user_id, now) - for user_id in to_update - ] + if isinstance(self.database_engine, PostgresEngine): + sql = sql.format(value_placeholder="?") + txn.execute_values(sql, args, fetch=False) + else: + sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)") + txn.execute_batch(sql, args) - if isinstance(self.database_engine, PostgresEngine): - sql = sql.format(value_placeholder="?") - txn.execute_values(sql, args, fetch=False) - else: - sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)") - txn.execute_batch(sql, args) + to_remove: list[tuple[str, str]] = [] + for room_id, room_changes in all_changes.items(): + for user_id in room_changes.removed: + to_remove.append((room_id, user_id)) + + if to_remove: + self.db_pool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_connection_lazy_members", + keys=("connection_key", "room_id", "user_id"), + values=[ + (connection_key, room_id, user_id) for room_id, user_id in to_remove + ], + ) @attr.s(auto_attribs=True, frozen=True) @@ -622,7 +632,7 @@ class PerConnectionStateDB: room_configs: Mapping[str, "RoomSyncConfig"] - room_lazy_membership: dict[str, dict[str, int | None]] + room_lazy_membership: dict[str, RoomLazyMembershipChanges] @staticmethod async def from_state( diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 8f63680249..4c975deb91 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -885,6 +885,31 @@ class PerConnectionState: return len(self.rooms) + len(self.receipts) + len(self.room_configs) +@attr.s(auto_attribs=True) +class RoomLazyMembershipChanges: + """Changes to lazily-loaded room memberships for a given room. + + Attributes: + added: Map from user ID to timestamp for users whose membership we + have lazily loaded. + removed: Set of user IDs whose membership change we have *not* sent + down + """ + + # A map from user ID -> timestamp. Indicates that those memberships have + # been lazily loaded. I.e. that either a) we sent those memberships down, or + # b) we did so previously. The timestamp indicates the time we previously + # saw the membership. + added: dict[str, int | None] = attr.Factory(dict) + + # A set of user IDs whose membership change we have *not* sent + # down + removed: set[str] = attr.Factory(set) + + def __bool__(self) -> bool: + return bool(self.added or self.removed) + + @attr.s(auto_attribs=True) class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" @@ -899,7 +924,7 @@ class MutablePerConnectionState(PerConnectionState): # memberships have been lazily loaded. I.e. that either a) we sent those # memberships down, or b) we did so previously. The timestamp indicates the # time we previously saw the membership. - room_lazy_membership: dict[str, dict[str, int | None]] = attr.Factory(dict) + room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict) def has_updates(self) -> bool: return ( diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 210280bc48..1c672ed6dc 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -841,6 +841,101 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): exact=True, ) + def test_lazy_members_limited_sync(self) -> None: + """Test that when using lazy loading for room members and a limited sync + missing a membership change, we include the membership change next time + said user says something. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Send a message from each user to the room so that both memberships are sent down. + self.helper.send(room_id1, "1", tok=user1_tok) + self.helper.send(room_id1, "2", tok=user2_tok) + + # Make a first sync to establish a position + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 2, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # We should see both membership events in required_state + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + # User2 changes their display name (causing a membership change) + self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user2_id, + body={ + EventContentFields.MEMBERSHIP: Membership.JOIN, + EventContentFields.MEMBERSHIP_DISPLAYNAME: "New Name", + }, + tok=user2_tok, + ) + + # Send a couple of messages to the room to push out the membership change + self.helper.send(room_id1, "3", tok=user1_tok) + self.helper.send(room_id1, "4", tok=user1_tok) + + # Make an incremental Sliding Sync request + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # The membership change should *not* be included yet as user2 doesn't + # have any events in the timeline. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1].get("required_state", []), + set(), + exact=True, + ) + + # Now user2 sends a message to the room + self.helper.send(room_id1, "5", tok=user2_tok) + + # Make another incremental Sliding Sync request + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # The membership change should now be included as user2 has an event + # in the timeline. + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1].get("required_state", []), + { + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + def test_rooms_required_state_me(self) -> None: """ Test `rooms.required_state` correctly handles $ME.