diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 22662a389f..b05d863bec 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2123,10 +2123,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # Map from room_id to ... # Just some convenience maps for easier lookup by `room_id`. - last_to_insert_membership_snapshots_by_room_id: Dict[ + last_to_insert_join_membership_snapshots_by_room_id: Dict[ str, SlidingSyncMembershipSnapshotSharedInsertValues ] = {} - last_to_insert_membership_infos_by_room_id: Dict[ + last_to_insert_join_membership_infos_by_room_id: Dict[ str, SlidingSyncMembershipInfoWithEventPos ] = {} # TODO: `concurrently_execute` based on buckets of room_ids @@ -2171,34 +2171,50 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS }, ) - # Check if the current state has been updated since the last snapshot we inserted. + # Check if we can potentially re-use the last snapshot we inserted for this + # room. + # can_use_last_snapshot = False # TODO: We could also look in the database to see if a snapshot is available - last_to_insert_membership_snapshot = ( - last_to_insert_membership_snapshots_by_room_id.get(room_id) + last_to_insert_join_membership_snapshot = ( + last_to_insert_join_membership_snapshots_by_room_id.get(room_id) ) - last_to_insert_membership_info = ( - last_to_insert_membership_infos_by_room_id.get(room_id) + last_to_insert_join_membership_info = ( + last_to_insert_join_membership_infos_by_room_id.get(room_id) ) - if last_to_insert_membership_snapshot is not None: + # We can only re-use the last snapshot if their previous membership was in + # the room. Since that's not easy to determine without more extra lookups, + # we only apply this to join memberships for now. + # + # - JOIN: Re-use if no state changes + # - LEAVE: Could re-use if they were previously joined otherwise this could + # be a rescinded/rejected invite/knock and we don't want to leak anything. + # - INVITE/KNOCK: ❌ We can only rely on the stripped state + # - BAN: Could re-use if they were previously joined otherwise they might + # have just been banned after invite/knock and we don't want to leak + # anything. + if ( + last_to_insert_join_membership_snapshot is not None + and membership == Membership.JOIN + ): can_use_last_snapshot = True # These should go hand-in-hand - assert last_to_insert_membership_info is not None + assert last_to_insert_join_membership_info is not None # Sanity check that our tokens are in order. We should be iterating in # ascending order. assert ( - last_to_insert_membership_info.membership_event_stream_ordering + last_to_insert_join_membership_info.membership_event_stream_ordering <= membership_event_stream_ordering ) - # TODO: This only works if someone is from the server is participating in the room. + # Check if the current state has been updated since the last snapshot we inserted. state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room( room_id, # From the last snapshot we inserted from_token=RoomStreamToken( - stream=last_to_insert_membership_info.membership_event_stream_ordering, + stream=last_to_insert_join_membership_info.membership_event_stream_ordering, ), # To our current membership position to_token=RoomStreamToken( @@ -2223,10 +2239,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # just re-use the last snapshot we inserted. if can_use_last_snapshot: # Based on the logic above, we should have a last snapshot to work from - assert last_to_insert_membership_snapshot is not None + assert last_to_insert_join_membership_snapshot is not None sliding_sync_membership_snapshots_insert_map = ( - last_to_insert_membership_snapshot + last_to_insert_join_membership_snapshot ) elif membership == Membership.JOIN: # If we're still joined, we can pull from current state. @@ -2403,14 +2419,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet" ) - to_insert_membership_snapshots[ - (room_id, user_id) - ] = last_to_insert_membership_snapshots_by_room_id[room_id] = ( + to_insert_membership_snapshots[(room_id, user_id)] = ( sliding_sync_membership_snapshots_insert_map ) - to_insert_membership_infos[ - (room_id, user_id) - ] = last_to_insert_membership_infos_by_room_id[room_id] = ( + to_insert_membership_infos[(room_id, user_id)] = ( SlidingSyncMembershipInfoWithEventPos( user_id=user_id, sender=sender, @@ -2422,6 +2434,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS or "master", ) ) + # Keep track of the last join membership snapshot as we may be able to + # re-use all the work we did. + if membership == Membership.JOIN: + last_to_insert_join_membership_snapshots_by_room_id[room_id] = ( + to_insert_membership_snapshots[(room_id, user_id)] + ) + last_to_insert_join_membership_infos_by_room_id[room_id] = ( + to_insert_membership_infos[(room_id, user_id)] + ) # Assemble data so it's ready for the batch queries in the transaction key_names = ("room_id", "user_id")