diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e3c40f9744..ca0a627cc6 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -2129,147 +2129,272 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS last_to_insert_join_membership_infos_by_room_id: Dict[ str, SlidingSyncMembershipInfoWithEventPos ] = {} - # TODO: `concurrently_execute` based on buckets of room_ids - for ( - room_id, - room_id_from_rooms_table, - user_id, - sender, - membership_event_id, - membership, - membership_event_stream_ordering, - membership_event_instance_name, - is_outlier, - ) in memberships_to_update_rows: - # We don't know how to handle `membership` values other than these. The - # code below would need to be updated. - assert membership in ( - Membership.JOIN, - Membership.INVITE, - Membership.KNOCK, - Membership.LEAVE, - Membership.BAN, - ) - # There are some old out-of-band memberships (before - # https://github.com/matrix-org/synapse/issues/6983) where we don't have the - # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` - # constraint on the `sliding_sync_membership_snapshots` table so we have to - # fix-up these memberships by adding the room to the `rooms` table. - if room_id_from_rooms_table is None: - await self.db_pool.simple_insert( - table="rooms", - values={ - "room_id": room_id, - # Only out-of-band memberships are missing from the `rooms` - # table so that is the only type of membership we're dealing - # with here. Since we don't calculate the "chain cover" for - # out-of-band memberships, we can just set this to `True` as if - # the user ever joins the room, we will end up calculating the - # "chain cover" anyway. - "has_auth_chain_index": True, - }, + async def _handle_memberships_to_update_rows_for_room( + memberships_to_update_rows: List[ + Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] + ], + ) -> None: + if not memberships_to_update_rows: + return + + room_id_for_all_rows: str = memberships_to_update_rows[0][0] + + for ( + room_id, + room_id_from_rooms_table, + user_id, + sender, + membership_event_id, + membership, + membership_event_stream_ordering, + membership_event_instance_name, + is_outlier, + ) in memberships_to_update_rows: + # Sanity check that we're working on the same room so the snapshot + # re-use logic can work properly (we need to process memberships in a + # room sequentially chronologically) + assert room_id == room_id_for_all_rows + + # We don't know how to handle `membership` values other than these. The + # code below would need to be updated. + assert membership in ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, ) - # Check if we can potentially re-use the last snapshot we inserted for this - # room. - # - can_use_last_snapshot = False - # TODO: We could also look in the database to see if a snapshot is - # available. Is it worth the lookup time? - last_to_insert_join_membership_snapshot = ( - last_to_insert_join_membership_snapshots_by_room_id.get(room_id) - ) - last_to_insert_join_membership_info = ( - last_to_insert_join_membership_infos_by_room_id.get(room_id) - ) - # We can only re-use the last snapshot if their previous membership was in - # the room. Since that's not easy to determine without more extra lookups, - # we only apply this to join memberships for now. - # - # - JOIN: Re-use if no state changes - # - LEAVE: Could re-use if they were previously joined otherwise this could - # be a rescinded/rejected invite/knock and we don't want to leak anything. - # - INVITE/KNOCK: ❌ We can only rely on the stripped state - # - BAN: Could re-use if they were previously joined otherwise they might - # have just been banned after invite/knock and we don't want to leak - # anything. - if ( - last_to_insert_join_membership_snapshot is not None - and membership == Membership.JOIN - ): - can_use_last_snapshot = True + # There are some old out-of-band memberships (before + # https://github.com/matrix-org/synapse/issues/6983) where we don't have the + # corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY` + # constraint on the `sliding_sync_membership_snapshots` table so we have to + # fix-up these memberships by adding the room to the `rooms` table. + if room_id_from_rooms_table is None: + await self.db_pool.simple_insert( + table="rooms", + values={ + "room_id": room_id, + # Only out-of-band memberships are missing from the `rooms` + # table so that is the only type of membership we're dealing + # with here. Since we don't calculate the "chain cover" for + # out-of-band memberships, we can just set this to `True` as if + # the user ever joins the room, we will end up calculating the + # "chain cover" anyway. + "has_auth_chain_index": True, + }, + ) - # These should go hand-in-hand - assert last_to_insert_join_membership_info is not None - - # Sanity check that our tokens are in order. We should be iterating in - # ascending order. - assert ( - last_to_insert_join_membership_info.membership_event_stream_ordering - <= membership_event_stream_ordering + # Check if we can potentially re-use the last snapshot we inserted for this + # room. + # + can_use_last_snapshot = False + # TODO: We could also look in the database to see if a snapshot is + # available. Is it worth the lookup time? + last_to_insert_join_membership_snapshot = ( + last_to_insert_join_membership_snapshots_by_room_id.get(room_id) ) - - # Check if the current state has been updated since the last snapshot we inserted. - state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room( - room_id, - # From the last snapshot we inserted - from_token=RoomStreamToken( - stream=last_to_insert_join_membership_info.membership_event_stream_ordering, - ), - # To our current membership position - to_token=RoomStreamToken( - stream=membership_event_stream_ordering, - ), + last_to_insert_join_membership_info = ( + last_to_insert_join_membership_infos_by_room_id.get(room_id) ) - for state_delta in state_deltas_since_last_snapshot: - # We only need to check for the state is relevant to the - # `sliding_sync_joined_rooms` table. - if ( - state_delta.event_type, - state_delta.state_key, - ) in SLIDING_SYNC_RELEVANT_STATE_SET: - can_use_last_snapshot = False - # Once we find one relevant state event that changed, no need to - # look any further - break + # We can only re-use the last snapshot if their previous membership was in + # the room. Since that's not easy to determine without more extra lookups, + # we only apply this to join memberships for now. + # + # - JOIN: Re-use if no state changes + # - LEAVE: Could re-use if they were previously joined otherwise this could + # be a rescinded/rejected invite/knock and we don't want to leak anything. + # - INVITE/KNOCK: ❌ We can only rely on the stripped state + # - BAN: Could re-use if they were previously joined otherwise they might + # have just been banned after invite/knock and we don't want to leak + # anything. + if ( + last_to_insert_join_membership_snapshot is not None + and membership == Membership.JOIN + ): + can_use_last_snapshot = True - # Map of values to insert/update in the `sliding_sync_membership_snapshots` table - sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {} - # If none of the relevant state has changed since the last snapshot, we can - # just re-use the last snapshot we inserted. - if can_use_last_snapshot: - # Based on the logic above, we should have a last snapshot to work from - assert last_to_insert_join_membership_snapshot is not None + # These should go hand-in-hand + assert last_to_insert_join_membership_info is not None - sliding_sync_membership_snapshots_insert_map = ( - last_to_insert_join_membership_snapshot - ) - elif membership == Membership.JOIN: - # If we're still joined, we can pull from current state. - current_state_ids_map: StateMap[ - str - ] = await self.hs.get_storage_controllers().state.get_current_state_ids( - room_id, - state_filter=StateFilter.from_types( - SLIDING_SYNC_RELEVANT_STATE_SET - ), - # Partially-stated rooms should have all state events except for - # remote membership events so we don't need to wait at all because - # we only want some non-membership state - await_full_state=False, - ) + # Sanity check that our tokens are in order. We should be iterating in + # ascending order. + assert ( + last_to_insert_join_membership_info.membership_event_stream_ordering + <= membership_event_stream_ordering + ) - # TODO: Read the state from the `room_stats_state` table if we can + # Check if the current state has been updated since the last snapshot we inserted. + state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room( + room_id, + # From the last snapshot we inserted + from_token=RoomStreamToken( + stream=last_to_insert_join_membership_info.membership_event_stream_ordering, + ), + # To our current membership position + to_token=RoomStreamToken( + stream=membership_event_stream_ordering, + ), + ) + for state_delta in state_deltas_since_last_snapshot: + # We only need to check for the state is relevant to the + # `sliding_sync_joined_rooms` table. + if ( + state_delta.event_type, + state_delta.state_key, + ) in SLIDING_SYNC_RELEVANT_STATE_SET: + can_use_last_snapshot = False + # Once we find one relevant state event that changed, no need to + # look any further + break - # We're iterating over rooms that we are joined to so they should - # have `current_state_events` and we should have some current state - # for each room - if current_state_ids_map: - try: - fetched_events = await self.get_events( - current_state_ids_map.values() + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {} + # If none of the relevant state has changed since the last snapshot, we can + # just re-use the last snapshot we inserted. + if can_use_last_snapshot: + # Based on the logic above, we should have a last snapshot to work from + assert last_to_insert_join_membership_snapshot is not None + + sliding_sync_membership_snapshots_insert_map = ( + last_to_insert_join_membership_snapshot + ) + elif membership == Membership.JOIN: + # If we're still joined, we can pull from current state. + current_state_ids_map: StateMap[ + str + ] = await self.hs.get_storage_controllers().state.get_current_state_ids( + room_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + # Partially-stated rooms should have all state events except for + # remote membership events so we don't need to wait at all because + # we only want some non-membership state + await_full_state=False, + ) + + # TODO: Read the state from the `room_stats_state` table if we can + + # We're iterating over rooms that we are joined to so they should + # have `current_state_events` and we should have some current state + # for each room + if current_state_ids_map: + try: + fetched_events = await self.get_events( + current_state_ids_map.values() + ) + except (DatabaseCorruptionError, InvalidEventError) as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue + + current_state_map: StateMap[EventBase] = { + state_key: fetched_events[event_id] + for state_key, event_id in current_state_ids_map.items() + # `get_events(...)` will filter out events for unknown room versions + if event_id in fetched_events + } + + # Can happen for unknown room versions (old room versions that aren't known + # anymore) since `get_events(...)` will filter out events for unknown room + # versions + if not current_state_map: + continue + + state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map ) + sliding_sync_membership_snapshots_insert_map.update( + state_insert_values + ) + # We should have some insert values for each room, even if they are `None` + assert sliding_sync_membership_snapshots_insert_map + + # We have current state to work from + sliding_sync_membership_snapshots_insert_map[ + "has_known_state" + ] = True + else: + # Although we expect every room to have a create event (even + # past unknown room versions since we haven't supported one + # without it), there seem to be some corrupted rooms in + # practice that don't have the create event in the + # `current_state_events` table. The create event does exist + # in the events table though. We'll just say that we don't + # know the state for these rooms and continue on with our + # day. + sliding_sync_membership_snapshots_insert_map[ + "has_known_state" + ] = False + elif membership in (Membership.INVITE, Membership.KNOCK) or ( + membership in (Membership.LEAVE, Membership.BAN) and is_outlier + ): + invite_or_knock_event_id = membership_event_id + invite_or_knock_membership = membership + + # If the event is an `out_of_band_membership` (special case of + # `outlier`), we never had historical state so we have to pull from + # the stripped state on the previous invite/knock event. This gives + # us a consistent view of the room state regardless of your + # membership (i.e. the room shouldn't disappear if your using the + # `is_encrypted` filter and you leave). + if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: + ( + invite_or_knock_event_id, + invite_or_knock_membership, + ) = await self.db_pool.runInteraction( + "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", + _find_previous_invite_or_knock_membership_txn, + room_id, + user_id, + membership_event_id, + ) + + # Pull from the stripped state on the invite/knock event + invite_or_knock_event = await self.get_event( + invite_or_knock_event_id + ) + + raw_stripped_state_events = None + if invite_or_knock_membership == Membership.INVITE: + invite_room_state = invite_or_knock_event.unsigned.get( + "invite_room_state" + ) + raw_stripped_state_events = invite_room_state + elif invite_or_knock_membership == Membership.KNOCK: + knock_room_state = invite_or_knock_event.unsigned.get( + "knock_room_state" + ) + raw_stripped_state_events = knock_room_state + + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( + raw_stripped_state_events + ) + + # We should have some insert values for each room, even if no + # stripped state is on the event because we still want to record + # that we have no known state + assert sliding_sync_membership_snapshots_insert_map + elif membership in (Membership.LEAVE, Membership.BAN): + # Pull from historical state + state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event( + membership_event_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + # Partially-stated rooms should have all state events except for + # remote membership events so we don't need to wait at all because + # we only want some non-membership state + await_full_state=False, + ) + + try: + fetched_events = await self.get_events(state_ids_map.values()) except (DatabaseCorruptionError, InvalidEventError) as e: logger.warning( "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", @@ -2278,9 +2403,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) continue - current_state_map: StateMap[EventBase] = { + state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] - for state_key, event_id in current_state_ids_map.items() + for state_key, event_id in state_ids_map.items() # `get_events(...)` will filter out events for unknown room versions if event_id in fetched_events } @@ -2288,11 +2413,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # Can happen for unknown room versions (old room versions that aren't known # anymore) since `get_events(...)` will filter out events for unknown room # versions - if not current_state_map: + if not state_map: continue state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( - current_state_map + state_map ) sliding_sync_membership_snapshots_insert_map.update( state_insert_values @@ -2300,152 +2425,67 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # We should have some insert values for each room, even if they are `None` assert sliding_sync_membership_snapshots_insert_map - # We have current state to work from + # We have historical state to work from sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( True ) else: - # Although we expect every room to have a create event (even - # past unknown room versions since we haven't supported one - # without it), there seem to be some corrupted rooms in - # practice that don't have the create event in the - # `current_state_events` table. The create event does exist - # in the events table though. We'll just say that we don't - # know the state for these rooms and continue on with our - # day. - sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( - False - ) - elif membership in (Membership.INVITE, Membership.KNOCK) or ( - membership in (Membership.LEAVE, Membership.BAN) and is_outlier - ): - invite_or_knock_event_id = membership_event_id - invite_or_knock_membership = membership - - # If the event is an `out_of_band_membership` (special case of - # `outlier`), we never had historical state so we have to pull from - # the stripped state on the previous invite/knock event. This gives - # us a consistent view of the room state regardless of your - # membership (i.e. the room shouldn't disappear if your using the - # `is_encrypted` filter and you leave). - if membership in (Membership.LEAVE, Membership.BAN) and is_outlier: - ( - invite_or_knock_event_id, - invite_or_knock_membership, - ) = await self.db_pool.runInteraction( - "sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn", - _find_previous_invite_or_knock_membership_txn, - room_id, - user_id, - membership_event_id, + # We don't know how to handle this type of membership yet + # + # FIXME: We should use `assert_never` here but for some reason + # the exhaustive matching doesn't recognize the `Never` here. + # assert_never(membership) + raise AssertionError( + f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet" ) - # Pull from the stripped state on the invite/knock event - invite_or_knock_event = await self.get_event(invite_or_knock_event_id) - - raw_stripped_state_events = None - if invite_or_knock_membership == Membership.INVITE: - invite_room_state = invite_or_knock_event.unsigned.get( - "invite_room_state" - ) - raw_stripped_state_events = invite_room_state - elif invite_or_knock_membership == Membership.KNOCK: - knock_room_state = invite_or_knock_event.unsigned.get( - "knock_room_state" - ) - raw_stripped_state_events = knock_room_state - - sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state( - raw_stripped_state_events + to_insert_membership_snapshots[(room_id, user_id)] = ( + sliding_sync_membership_snapshots_insert_map ) - - # We should have some insert values for each room, even if no - # stripped state is on the event because we still want to record - # that we have no known state - assert sliding_sync_membership_snapshots_insert_map - elif membership in (Membership.LEAVE, Membership.BAN): - # Pull from historical state - state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event( - membership_event_id, - state_filter=StateFilter.from_types( - SLIDING_SYNC_RELEVANT_STATE_SET - ), - # Partially-stated rooms should have all state events except for - # remote membership events so we don't need to wait at all because - # we only want some non-membership state - await_full_state=False, - ) - - try: - fetched_events = await self.get_events(state_ids_map.values()) - except (DatabaseCorruptionError, InvalidEventError) as e: - logger.warning( - "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", - room_id, - e, - ) - continue - - state_map: StateMap[EventBase] = { - state_key: fetched_events[event_id] - for state_key, event_id in state_ids_map.items() - # `get_events(...)` will filter out events for unknown room versions - if event_id in fetched_events - } - - # Can happen for unknown room versions (old room versions that aren't known - # anymore) since `get_events(...)` will filter out events for unknown room - # versions - if not state_map: - continue - - state_insert_values = ( - PersistEventsStore._get_sliding_sync_insert_values_from_state_map( - state_map + to_insert_membership_infos[(room_id, user_id)] = ( + SlidingSyncMembershipInfoWithEventPos( + user_id=user_id, + sender=sender, + membership_event_id=membership_event_id, + membership=membership, + membership_event_stream_ordering=membership_event_stream_ordering, + # If instance_name is null we default to "master" + membership_event_instance_name=membership_event_instance_name + or "master", ) ) - sliding_sync_membership_snapshots_insert_map.update(state_insert_values) - # We should have some insert values for each room, even if they are `None` - assert sliding_sync_membership_snapshots_insert_map + # Keep track of the last join membership snapshot as we may be able to + # re-use all the work we did. + if membership == Membership.JOIN: + last_to_insert_join_membership_snapshots_by_room_id[room_id] = ( + to_insert_membership_snapshots[(room_id, user_id)] + ) + last_to_insert_join_membership_infos_by_room_id[room_id] = ( + to_insert_membership_infos[(room_id, user_id)] + ) - # We have historical state to work from - sliding_sync_membership_snapshots_insert_map["has_known_state"] = True - else: - # We don't know how to handle this type of membership yet - # - # FIXME: We should use `assert_never` here but for some reason - # the exhaustive matching doesn't recognize the `Never` here. - # assert_never(membership) - raise AssertionError( - f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet" - ) + # Bucket the memberships by room_id so we can process them concurrently. + room_id_to_memberships_to_update_rows: Dict[ + str, + List[ + Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool] + ], + ] = {} + for row in memberships_to_update_rows: + room_id = row[0] + room_id_to_memberships_to_update_rows.setdefault(room_id, []).append(row) - to_insert_membership_snapshots[(room_id, user_id)] = ( - sliding_sync_membership_snapshots_insert_map - ) - to_insert_membership_infos[(room_id, user_id)] = ( - SlidingSyncMembershipInfoWithEventPos( - user_id=user_id, - sender=sender, - membership_event_id=membership_event_id, - membership=membership, - membership_event_stream_ordering=membership_event_stream_ordering, - # If instance_name is null we default to "master" - membership_event_instance_name=membership_event_instance_name - or "master", - ) - ) - # Keep track of the last join membership snapshot as we may be able to - # re-use all the work we did. - if membership == Membership.JOIN: - last_to_insert_join_membership_snapshots_by_room_id[room_id] = ( - to_insert_membership_snapshots[(room_id, user_id)] - ) - last_to_insert_join_membership_infos_by_room_id[room_id] = ( - to_insert_membership_infos[(room_id, user_id)] - ) + # We can batch process each room concurrently. + # + # In order for the snapshot re-use logic to work correctly, we need to process + # memberships in a room sequentially chronologically. + await concurrently_execute( + _handle_memberships_to_update_rows_for_room, + room_id_to_memberships_to_update_rows.values(), + 10, + ) - # Assemble data so it's ready for the batch queries in the transaction + # Re-assemble data so it's ready for the batch queries in the transaction key_names = ("room_id", "user_id") key_values: List[Tuple[str, str]] = [] insertion_value_names = (