1
0

Get snapshots info concurrently

This commit is contained in:
Eric Eastwood
2024-09-09 22:53:52 -05:00
parent 1078163731
commit a4cc5eb02c

View File

@@ -2129,147 +2129,272 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
last_to_insert_join_membership_infos_by_room_id: Dict[
str, SlidingSyncMembershipInfoWithEventPos
] = {}
# TODO: `concurrently_execute` based on buckets of room_ids
for (
room_id,
room_id_from_rooms_table,
user_id,
sender,
membership_event_id,
membership,
membership_event_stream_ordering,
membership_event_instance_name,
is_outlier,
) in memberships_to_update_rows:
# We don't know how to handle `membership` values other than these. The
# code below would need to be updated.
assert membership in (
Membership.JOIN,
Membership.INVITE,
Membership.KNOCK,
Membership.LEAVE,
Membership.BAN,
)
# There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
# constraint on the `sliding_sync_membership_snapshots` table so we have to
# fix-up these memberships by adding the room to the `rooms` table.
if room_id_from_rooms_table is None:
await self.db_pool.simple_insert(
table="rooms",
values={
"room_id": room_id,
# Only out-of-band memberships are missing from the `rooms`
# table so that is the only type of membership we're dealing
# with here. Since we don't calculate the "chain cover" for
# out-of-band memberships, we can just set this to `True` as if
# the user ever joins the room, we will end up calculating the
# "chain cover" anyway.
"has_auth_chain_index": True,
},
async def _handle_memberships_to_update_rows_for_room(
memberships_to_update_rows: List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
],
) -> None:
if not memberships_to_update_rows:
return
room_id_for_all_rows: str = memberships_to_update_rows[0][0]
for (
room_id,
room_id_from_rooms_table,
user_id,
sender,
membership_event_id,
membership,
membership_event_stream_ordering,
membership_event_instance_name,
is_outlier,
) in memberships_to_update_rows:
# Sanity check that we're working on the same room so the snapshot
# re-use logic can work properly (we need to process memberships in a
# room sequentially chronologically)
assert room_id == room_id_for_all_rows
# We don't know how to handle `membership` values other than these. The
# code below would need to be updated.
assert membership in (
Membership.JOIN,
Membership.INVITE,
Membership.KNOCK,
Membership.LEAVE,
Membership.BAN,
)
# Check if we can potentially re-use the last snapshot we inserted for this
# room.
#
can_use_last_snapshot = False
# TODO: We could also look in the database to see if a snapshot is
# available. Is it worth the lookup time?
last_to_insert_join_membership_snapshot = (
last_to_insert_join_membership_snapshots_by_room_id.get(room_id)
)
last_to_insert_join_membership_info = (
last_to_insert_join_membership_infos_by_room_id.get(room_id)
)
# We can only re-use the last snapshot if their previous membership was in
# the room. Since that's not easy to determine without more extra lookups,
# we only apply this to join memberships for now.
#
# - JOIN: Re-use if no state changes
# - LEAVE: Could re-use if they were previously joined otherwise this could
# be a rescinded/rejected invite/knock and we don't want to leak anything.
# - INVITE/KNOCK: ❌ We can only rely on the stripped state
# - BAN: Could re-use if they were previously joined otherwise they might
# have just been banned after invite/knock and we don't want to leak
# anything.
if (
last_to_insert_join_membership_snapshot is not None
and membership == Membership.JOIN
):
can_use_last_snapshot = True
# There are some old out-of-band memberships (before
# https://github.com/matrix-org/synapse/issues/6983) where we don't have the
# corresponding room stored in the `rooms` table`. We have a `FOREIGN KEY`
# constraint on the `sliding_sync_membership_snapshots` table so we have to
# fix-up these memberships by adding the room to the `rooms` table.
if room_id_from_rooms_table is None:
await self.db_pool.simple_insert(
table="rooms",
values={
"room_id": room_id,
# Only out-of-band memberships are missing from the `rooms`
# table so that is the only type of membership we're dealing
# with here. Since we don't calculate the "chain cover" for
# out-of-band memberships, we can just set this to `True` as if
# the user ever joins the room, we will end up calculating the
# "chain cover" anyway.
"has_auth_chain_index": True,
},
)
# These should go hand-in-hand
assert last_to_insert_join_membership_info is not None
# Sanity check that our tokens are in order. We should be iterating in
# ascending order.
assert (
last_to_insert_join_membership_info.membership_event_stream_ordering
<= membership_event_stream_ordering
# Check if we can potentially re-use the last snapshot we inserted for this
# room.
#
can_use_last_snapshot = False
# TODO: We could also look in the database to see if a snapshot is
# available. Is it worth the lookup time?
last_to_insert_join_membership_snapshot = (
last_to_insert_join_membership_snapshots_by_room_id.get(room_id)
)
# Check if the current state has been updated since the last snapshot we inserted.
state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room(
room_id,
# From the last snapshot we inserted
from_token=RoomStreamToken(
stream=last_to_insert_join_membership_info.membership_event_stream_ordering,
),
# To our current membership position
to_token=RoomStreamToken(
stream=membership_event_stream_ordering,
),
last_to_insert_join_membership_info = (
last_to_insert_join_membership_infos_by_room_id.get(room_id)
)
for state_delta in state_deltas_since_last_snapshot:
# We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table.
if (
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
can_use_last_snapshot = False
# Once we find one relevant state event that changed, no need to
# look any further
break
# We can only re-use the last snapshot if their previous membership was in
# the room. Since that's not easy to determine without more extra lookups,
# we only apply this to join memberships for now.
#
# - JOIN: Re-use if no state changes
# - LEAVE: Could re-use if they were previously joined otherwise this could
# be a rescinded/rejected invite/knock and we don't want to leak anything.
# - INVITE/KNOCK: ❌ We can only rely on the stripped state
# - BAN: Could re-use if they were previously joined otherwise they might
# have just been banned after invite/knock and we don't want to leak
# anything.
if (
last_to_insert_join_membership_snapshot is not None
and membership == Membership.JOIN
):
can_use_last_snapshot = True
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
# If none of the relevant state has changed since the last snapshot, we can
# just re-use the last snapshot we inserted.
if can_use_last_snapshot:
# Based on the logic above, we should have a last snapshot to work from
assert last_to_insert_join_membership_snapshot is not None
# These should go hand-in-hand
assert last_to_insert_join_membership_info is not None
sliding_sync_membership_snapshots_insert_map = (
last_to_insert_join_membership_snapshot
)
elif membership == Membership.JOIN:
# If we're still joined, we can pull from current state.
current_state_ids_map: StateMap[
str
] = await self.hs.get_storage_controllers().state.get_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
# Partially-stated rooms should have all state events except for
# remote membership events so we don't need to wait at all because
# we only want some non-membership state
await_full_state=False,
)
# Sanity check that our tokens are in order. We should be iterating in
# ascending order.
assert (
last_to_insert_join_membership_info.membership_event_stream_ordering
<= membership_event_stream_ordering
)
# TODO: Read the state from the `room_stats_state` table if we can
# Check if the current state has been updated since the last snapshot we inserted.
state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room(
room_id,
# From the last snapshot we inserted
from_token=RoomStreamToken(
stream=last_to_insert_join_membership_info.membership_event_stream_ordering,
),
# To our current membership position
to_token=RoomStreamToken(
stream=membership_event_stream_ordering,
),
)
for state_delta in state_deltas_since_last_snapshot:
# We only need to check for the state is relevant to the
# `sliding_sync_joined_rooms` table.
if (
state_delta.event_type,
state_delta.state_key,
) in SLIDING_SYNC_RELEVANT_STATE_SET:
can_use_last_snapshot = False
# Once we find one relevant state event that changed, no need to
# look any further
break
# We're iterating over rooms that we are joined to so they should
# have `current_state_events` and we should have some current state
# for each room
if current_state_ids_map:
try:
fetched_events = await self.get_events(
current_state_ids_map.values()
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
sliding_sync_membership_snapshots_insert_map: SlidingSyncMembershipSnapshotSharedInsertValues = {}
# If none of the relevant state has changed since the last snapshot, we can
# just re-use the last snapshot we inserted.
if can_use_last_snapshot:
# Based on the logic above, we should have a last snapshot to work from
assert last_to_insert_join_membership_snapshot is not None
sliding_sync_membership_snapshots_insert_map = (
last_to_insert_join_membership_snapshot
)
elif membership == Membership.JOIN:
# If we're still joined, we can pull from current state.
current_state_ids_map: StateMap[
str
] = await self.hs.get_storage_controllers().state.get_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
# Partially-stated rooms should have all state events except for
# remote membership events so we don't need to wait at all because
# we only want some non-membership state
await_full_state=False,
)
# TODO: Read the state from the `room_stats_state` table if we can
# We're iterating over rooms that we are joined to so they should
# have `current_state_events` and we should have some current state
# for each room
if current_state_ids_map:
try:
fetched_events = await self.get_events(
current_state_ids_map.values()
)
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
e,
)
continue
current_state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events
}
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
if not current_state_map:
continue
state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
)
sliding_sync_membership_snapshots_insert_map.update(
state_insert_values
)
# We should have some insert values for each room, even if they are `None`
assert sliding_sync_membership_snapshots_insert_map
# We have current state to work from
sliding_sync_membership_snapshots_insert_map[
"has_known_state"
] = True
else:
# Although we expect every room to have a create event (even
# past unknown room versions since we haven't supported one
# without it), there seem to be some corrupted rooms in
# practice that don't have the create event in the
# `current_state_events` table. The create event does exist
# in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our
# day.
sliding_sync_membership_snapshots_insert_map[
"has_known_state"
] = False
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
# If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from
# the stripped state on the previous invite/knock event. This gives
# us a consistent view of the room state regardless of your
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn,
room_id,
user_id,
membership_event_id,
)
# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(
invite_or_knock_event_id
)
raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
)
# We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record
# that we have no known state
assert sliding_sync_membership_snapshots_insert_map
elif membership in (Membership.LEAVE, Membership.BAN):
# Pull from historical state
state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event(
membership_event_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
# Partially-stated rooms should have all state events except for
# remote membership events so we don't need to wait at all because
# we only want some non-membership state
await_full_state=False,
)
try:
fetched_events = await self.get_events(state_ids_map.values())
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
@@ -2278,9 +2403,9 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
)
continue
current_state_map: StateMap[EventBase] = {
state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
for state_key, event_id in current_state_ids_map.items()
for state_key, event_id in state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events
}
@@ -2288,11 +2413,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
if not current_state_map:
if not state_map:
continue
state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
current_state_map
state_map
)
sliding_sync_membership_snapshots_insert_map.update(
state_insert_values
@@ -2300,152 +2425,67 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
# We should have some insert values for each room, even if they are `None`
assert sliding_sync_membership_snapshots_insert_map
# We have current state to work from
# We have historical state to work from
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
True
)
else:
# Although we expect every room to have a create event (even
# past unknown room versions since we haven't supported one
# without it), there seem to be some corrupted rooms in
# practice that don't have the create event in the
# `current_state_events` table. The create event does exist
# in the events table though. We'll just say that we don't
# know the state for these rooms and continue on with our
# day.
sliding_sync_membership_snapshots_insert_map["has_known_state"] = (
False
)
elif membership in (Membership.INVITE, Membership.KNOCK) or (
membership in (Membership.LEAVE, Membership.BAN) and is_outlier
):
invite_or_knock_event_id = membership_event_id
invite_or_knock_membership = membership
# If the event is an `out_of_band_membership` (special case of
# `outlier`), we never had historical state so we have to pull from
# the stripped state on the previous invite/knock event. This gives
# us a consistent view of the room state regardless of your
# membership (i.e. the room shouldn't disappear if your using the
# `is_encrypted` filter and you leave).
if membership in (Membership.LEAVE, Membership.BAN) and is_outlier:
(
invite_or_knock_event_id,
invite_or_knock_membership,
) = await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_bg_update._find_previous_invite_or_knock_membership_txn",
_find_previous_invite_or_knock_membership_txn,
room_id,
user_id,
membership_event_id,
# We don't know how to handle this type of membership yet
#
# FIXME: We should use `assert_never` here but for some reason
# the exhaustive matching doesn't recognize the `Never` here.
# assert_never(membership)
raise AssertionError(
f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
)
# Pull from the stripped state on the invite/knock event
invite_or_knock_event = await self.get_event(invite_or_knock_event_id)
raw_stripped_state_events = None
if invite_or_knock_membership == Membership.INVITE:
invite_room_state = invite_or_knock_event.unsigned.get(
"invite_room_state"
)
raw_stripped_state_events = invite_room_state
elif invite_or_knock_membership == Membership.KNOCK:
knock_room_state = invite_or_knock_event.unsigned.get(
"knock_room_state"
)
raw_stripped_state_events = knock_room_state
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
raw_stripped_state_events
to_insert_membership_snapshots[(room_id, user_id)] = (
sliding_sync_membership_snapshots_insert_map
)
# We should have some insert values for each room, even if no
# stripped state is on the event because we still want to record
# that we have no known state
assert sliding_sync_membership_snapshots_insert_map
elif membership in (Membership.LEAVE, Membership.BAN):
# Pull from historical state
state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event(
membership_event_id,
state_filter=StateFilter.from_types(
SLIDING_SYNC_RELEVANT_STATE_SET
),
# Partially-stated rooms should have all state events except for
# remote membership events so we don't need to wait at all because
# we only want some non-membership state
await_full_state=False,
)
try:
fetched_events = await self.get_events(state_ids_map.values())
except (DatabaseCorruptionError, InvalidEventError) as e:
logger.warning(
"Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s",
room_id,
e,
)
continue
state_map: StateMap[EventBase] = {
state_key: fetched_events[event_id]
for state_key, event_id in state_ids_map.items()
# `get_events(...)` will filter out events for unknown room versions
if event_id in fetched_events
}
# Can happen for unknown room versions (old room versions that aren't known
# anymore) since `get_events(...)` will filter out events for unknown room
# versions
if not state_map:
continue
state_insert_values = (
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
state_map
to_insert_membership_infos[(room_id, user_id)] = (
SlidingSyncMembershipInfoWithEventPos(
user_id=user_id,
sender=sender,
membership_event_id=membership_event_id,
membership=membership,
membership_event_stream_ordering=membership_event_stream_ordering,
# If instance_name is null we default to "master"
membership_event_instance_name=membership_event_instance_name
or "master",
)
)
sliding_sync_membership_snapshots_insert_map.update(state_insert_values)
# We should have some insert values for each room, even if they are `None`
assert sliding_sync_membership_snapshots_insert_map
# Keep track of the last join membership snapshot as we may be able to
# re-use all the work we did.
if membership == Membership.JOIN:
last_to_insert_join_membership_snapshots_by_room_id[room_id] = (
to_insert_membership_snapshots[(room_id, user_id)]
)
last_to_insert_join_membership_infos_by_room_id[room_id] = (
to_insert_membership_infos[(room_id, user_id)]
)
# We have historical state to work from
sliding_sync_membership_snapshots_insert_map["has_known_state"] = True
else:
# We don't know how to handle this type of membership yet
#
# FIXME: We should use `assert_never` here but for some reason
# the exhaustive matching doesn't recognize the `Never` here.
# assert_never(membership)
raise AssertionError(
f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
)
# Bucket the memberships by room_id so we can process them concurrently.
room_id_to_memberships_to_update_rows: Dict[
str,
List[
Tuple[str, Optional[str], str, str, str, str, int, Optional[str], bool]
],
] = {}
for row in memberships_to_update_rows:
room_id = row[0]
room_id_to_memberships_to_update_rows.setdefault(room_id, []).append(row)
to_insert_membership_snapshots[(room_id, user_id)] = (
sliding_sync_membership_snapshots_insert_map
)
to_insert_membership_infos[(room_id, user_id)] = (
SlidingSyncMembershipInfoWithEventPos(
user_id=user_id,
sender=sender,
membership_event_id=membership_event_id,
membership=membership,
membership_event_stream_ordering=membership_event_stream_ordering,
# If instance_name is null we default to "master"
membership_event_instance_name=membership_event_instance_name
or "master",
)
)
# Keep track of the last join membership snapshot as we may be able to
# re-use all the work we did.
if membership == Membership.JOIN:
last_to_insert_join_membership_snapshots_by_room_id[room_id] = (
to_insert_membership_snapshots[(room_id, user_id)]
)
last_to_insert_join_membership_infos_by_room_id[room_id] = (
to_insert_membership_infos[(room_id, user_id)]
)
# We can batch process each room concurrently.
#
# In order for the snapshot re-use logic to work correctly, we need to process
# memberships in a room sequentially chronologically.
await concurrently_execute(
_handle_memberships_to_update_rows_for_room,
room_id_to_memberships_to_update_rows.values(),
10,
)
# Assemble data so it's ready for the batch queries in the transaction
# Re-assemble data so it's ready for the batch queries in the transaction
key_names = ("room_id", "user_id")
key_values: List[Tuple[str, str]] = []
insertion_value_names = (