From 815ef2fb9958068121c07a03a8c0f480b575e793 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 9 Sep 2024 23:10:25 -0500 Subject: [PATCH] Read from the `room_stats_state` table --- .../databases/main/events_bg_updates.py | 160 ++++++++++++++---- 1 file changed, 123 insertions(+), 37 deletions(-) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index ca0a627cc6..6a05014756 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1765,7 +1765,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS # `room_stats_state` table. In the case, where we can't use # `room_stats_state`, we just fetch all of the events manually. if current_state_ids_to_fetch: - fetched_events = await self.get_events(current_state_ids_map.values()) + fetched_events = await self.get_events(current_state_ids_to_fetch) # Even if we are joined to the room, `get_events(...)` will filter out # events for unknown room versions that we no longer support. We don't need @@ -2112,6 +2112,19 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return event_id, membership + # As long as we get this value before we fetch the current state, we can use it + # to check if something has changed since that point. + # + # Same as `await self.get_stats_positions()` but get around the method + # resolution issues that crop up when we try to mixin `StatsStore` into this + # store. + stats_stream_position = await self.db_pool.simple_select_one_onecol( + table="stats_incremental_position", + keyvalues={}, + retcol="stream_id", + desc="stats_incremental_position", + ) + # Map from (room_id, user_id) to ... to_insert_membership_snapshots: Dict[ Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues @@ -2275,50 +2288,123 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS await_full_state=False, ) - # TODO: Read the state from the `room_stats_state` table if we can - # We're iterating over rooms that we are joined to so they should # have `current_state_events` and we should have some current state # for each room if current_state_ids_map: - try: - fetched_events = await self.get_events( + current_state_ids_to_fetch: List[str] = [] + + # Check if the current state has been updated since the stats were + # generated. + state_deltas_since_stats = ( + await self.get_current_state_deltas_for_room( + room_id, + from_token=RoomStreamToken( + stream=stats_stream_position + ), + to_token=None, + ) + ) + can_use_room_stats_state_table = True + for state_delta in state_deltas_since_stats: + # We only need to check for the state is relevant to the + # `sliding_sync_joined_rooms` table. + if ( + state_delta.event_type, + state_delta.state_key, + ) in SLIDING_SYNC_RELEVANT_STATE_SET: + can_use_room_stats_state_table = False + # Once we find one relevant state event that changed, no need to + # look any further + break + + # Avoid fetching full events out as much as possible. We can look-up all of + # the relevant state values except for the `tombstone_successor_room_id` + # from the `room_stats_state` table. + if can_use_room_stats_state_table: + (name, room_type, encryption) = cast( + Tuple[Optional[str], Optional[str], Optional[str]], + await self.db_pool.simple_select_one( + table="room_stats_state", + keyvalues={"room_id": room_id}, + retcols=["name", "room_type", "encryption"], + ), + ) + + # We have current state to work from + sliding_sync_membership_snapshots_insert_map[ + "has_known_state" + ] = True + # And can copy some of the values from the `room_stats_state` table + sliding_sync_membership_snapshots_insert_map[ + "room_name" + ] = name + sliding_sync_membership_snapshots_insert_map[ + "room_type" + ] = room_type + sliding_sync_membership_snapshots_insert_map[ + "is_encrypted" + ] = encryption is not None + + # If the room has a tombstone, we still need to fetch it out manually + # because this information isn't available in the `room_stats_state` + # table. + tombstone_event_id = current_state_ids_map.get( + (EventTypes.Tombstone, "") + ) + if tombstone_event_id is not None: + current_state_ids_to_fetch.append(tombstone_event_id) + + else: + current_state_ids_to_fetch.extend( current_state_ids_map.values() ) - except (DatabaseCorruptionError, InvalidEventError) as e: - logger.warning( - "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", - room_id, - e, + + # Fetch any more events that we need to complete the `state_insert_values`. + # For the happy-path, this will just be the tombstone event if the room has + # one since we got the rest of the values already from the + # `room_stats_state` table. In the case, where we can't use + # `room_stats_state`, we just fetch all of the events manually. + if current_state_ids_to_fetch: + try: + fetched_events = await self.get_events( + current_state_ids_to_fetch + ) + except (DatabaseCorruptionError, InvalidEventError) as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue + + current_state_map: StateMap[EventBase] = { + state_key: fetched_events[event_id] + for state_key, event_id in current_state_ids_map.items() + # `get_events(...)` will filter out events for unknown room versions + if event_id in fetched_events + } + + # Can happen for unknown room versions (old room versions that aren't known + # anymore) since `get_events(...)` will filter out events for unknown room + # versions + if not current_state_map: + continue + + more_state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map ) - continue + # Update our main insert map + sliding_sync_membership_snapshots_insert_map.update( + more_state_insert_values + ) + # We should have some insert values for each room, even if they are `None` + assert sliding_sync_membership_snapshots_insert_map - current_state_map: StateMap[EventBase] = { - state_key: fetched_events[event_id] - for state_key, event_id in current_state_ids_map.items() - # `get_events(...)` will filter out events for unknown room versions - if event_id in fetched_events - } - - # Can happen for unknown room versions (old room versions that aren't known - # anymore) since `get_events(...)` will filter out events for unknown room - # versions - if not current_state_map: - continue - - state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( - current_state_map - ) - sliding_sync_membership_snapshots_insert_map.update( - state_insert_values - ) - # We should have some insert values for each room, even if they are `None` - assert sliding_sync_membership_snapshots_insert_map - - # We have current state to work from - sliding_sync_membership_snapshots_insert_map[ - "has_known_state" - ] = True + # We have current state to work from + sliding_sync_membership_snapshots_insert_map[ + "has_known_state" + ] = True else: # Although we expect every room to have a create event (even # past unknown room versions since we haven't supported one