Read from the room_stats_state table
This commit is contained in:
@@ -1765,7 +1765,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
# `room_stats_state` table. In the case, where we can't use
|
||||
# `room_stats_state`, we just fetch all of the events manually.
|
||||
if current_state_ids_to_fetch:
|
||||
fetched_events = await self.get_events(current_state_ids_map.values())
|
||||
fetched_events = await self.get_events(current_state_ids_to_fetch)
|
||||
|
||||
# Even if we are joined to the room, `get_events(...)` will filter out
|
||||
# events for unknown room versions that we no longer support. We don't need
|
||||
@@ -2112,6 +2112,19 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
return event_id, membership
|
||||
|
||||
# As long as we get this value before we fetch the current state, we can use it
|
||||
# to check if something has changed since that point.
|
||||
#
|
||||
# Same as `await self.get_stats_positions()` but get around the method
|
||||
# resolution issues that crop up when we try to mixin `StatsStore` into this
|
||||
# store.
|
||||
stats_stream_position = await self.db_pool.simple_select_one_onecol(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
desc="stats_incremental_position",
|
||||
)
|
||||
|
||||
# Map from (room_id, user_id) to ...
|
||||
to_insert_membership_snapshots: Dict[
|
||||
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
|
||||
@@ -2275,50 +2288,123 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
await_full_state=False,
|
||||
)
|
||||
|
||||
# TODO: Read the state from the `room_stats_state` table if we can
|
||||
|
||||
# We're iterating over rooms that we are joined to so they should
|
||||
# have `current_state_events` and we should have some current state
|
||||
# for each room
|
||||
if current_state_ids_map:
|
||||
try:
|
||||
fetched_events = await self.get_events(
|
||||
current_state_ids_to_fetch: List[str] = []
|
||||
|
||||
# Check if the current state has been updated since the stats were
|
||||
# generated.
|
||||
state_deltas_since_stats = (
|
||||
await self.get_current_state_deltas_for_room(
|
||||
room_id,
|
||||
from_token=RoomStreamToken(
|
||||
stream=stats_stream_position
|
||||
),
|
||||
to_token=None,
|
||||
)
|
||||
)
|
||||
can_use_room_stats_state_table = True
|
||||
for state_delta in state_deltas_since_stats:
|
||||
# We only need to check for the state is relevant to the
|
||||
# `sliding_sync_joined_rooms` table.
|
||||
if (
|
||||
state_delta.event_type,
|
||||
state_delta.state_key,
|
||||
) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
can_use_room_stats_state_table = False
|
||||
# Once we find one relevant state event that changed, no need to
|
||||
# look any further
|
||||
break
|
||||
|
||||
# Avoid fetching full events out as much as possible. We can look-up all of
|
||||
# the relevant state values except for the `tombstone_successor_room_id`
|
||||
# from the `room_stats_state` table.
|
||||
if can_use_room_stats_state_table:
|
||||
(name, room_type, encryption) = cast(
|
||||
Tuple[Optional[str], Optional[str], Optional[str]],
|
||||
await self.db_pool.simple_select_one(
|
||||
table="room_stats_state",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcols=["name", "room_type", "encryption"],
|
||||
),
|
||||
)
|
||||
|
||||
# We have current state to work from
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"has_known_state"
|
||||
] = True
|
||||
# And can copy some of the values from the `room_stats_state` table
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"room_name"
|
||||
] = name
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"room_type"
|
||||
] = room_type
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"is_encrypted"
|
||||
] = encryption is not None
|
||||
|
||||
# If the room has a tombstone, we still need to fetch it out manually
|
||||
# because this information isn't available in the `room_stats_state`
|
||||
# table.
|
||||
tombstone_event_id = current_state_ids_map.get(
|
||||
(EventTypes.Tombstone, "")
|
||||
)
|
||||
if tombstone_event_id is not None:
|
||||
current_state_ids_to_fetch.append(tombstone_event_id)
|
||||
|
||||
else:
|
||||
current_state_ids_to_fetch.extend(
|
||||
current_state_ids_map.values()
|
||||
)
|
||||
except (DatabaseCorruptionError, InvalidEventError) as e:
|
||||
logger.warning(
|
||||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
|
||||
room_id,
|
||||
e,
|
||||
|
||||
# Fetch any more events that we need to complete the `state_insert_values`.
|
||||
# For the happy-path, this will just be the tombstone event if the room has
|
||||
# one since we got the rest of the values already from the
|
||||
# `room_stats_state` table. In the case, where we can't use
|
||||
# `room_stats_state`, we just fetch all of the events manually.
|
||||
if current_state_ids_to_fetch:
|
||||
try:
|
||||
fetched_events = await self.get_events(
|
||||
current_state_ids_to_fetch
|
||||
)
|
||||
except (DatabaseCorruptionError, InvalidEventError) as e:
|
||||
logger.warning(
|
||||
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
continue
|
||||
|
||||
current_state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
for state_key, event_id in current_state_ids_map.items()
|
||||
# `get_events(...)` will filter out events for unknown room versions
|
||||
if event_id in fetched_events
|
||||
}
|
||||
|
||||
# Can happen for unknown room versions (old room versions that aren't known
|
||||
# anymore) since `get_events(...)` will filter out events for unknown room
|
||||
# versions
|
||||
if not current_state_map:
|
||||
continue
|
||||
|
||||
more_state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
|
||||
current_state_map
|
||||
)
|
||||
continue
|
||||
# Update our main insert map
|
||||
sliding_sync_membership_snapshots_insert_map.update(
|
||||
more_state_insert_values
|
||||
)
|
||||
# We should have some insert values for each room, even if they are `None`
|
||||
assert sliding_sync_membership_snapshots_insert_map
|
||||
|
||||
current_state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
for state_key, event_id in current_state_ids_map.items()
|
||||
# `get_events(...)` will filter out events for unknown room versions
|
||||
if event_id in fetched_events
|
||||
}
|
||||
|
||||
# Can happen for unknown room versions (old room versions that aren't known
|
||||
# anymore) since `get_events(...)` will filter out events for unknown room
|
||||
# versions
|
||||
if not current_state_map:
|
||||
continue
|
||||
|
||||
state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
|
||||
current_state_map
|
||||
)
|
||||
sliding_sync_membership_snapshots_insert_map.update(
|
||||
state_insert_values
|
||||
)
|
||||
# We should have some insert values for each room, even if they are `None`
|
||||
assert sliding_sync_membership_snapshots_insert_map
|
||||
|
||||
# We have current state to work from
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"has_known_state"
|
||||
] = True
|
||||
# We have current state to work from
|
||||
sliding_sync_membership_snapshots_insert_map[
|
||||
"has_known_state"
|
||||
] = True
|
||||
else:
|
||||
# Although we expect every room to have a create event (even
|
||||
# past unknown room versions since we haven't supported one
|
||||
|
||||
Reference in New Issue
Block a user