Only re-use for join memberships
This commit is contained in:
@@ -2123,10 +2123,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
# Map from room_id to ...
|
||||
# Just some convenience maps for easier lookup by `room_id`.
|
||||
last_to_insert_membership_snapshots_by_room_id: Dict[
|
||||
last_to_insert_join_membership_snapshots_by_room_id: Dict[
|
||||
str, SlidingSyncMembershipSnapshotSharedInsertValues
|
||||
] = {}
|
||||
last_to_insert_membership_infos_by_room_id: Dict[
|
||||
last_to_insert_join_membership_infos_by_room_id: Dict[
|
||||
str, SlidingSyncMembershipInfoWithEventPos
|
||||
] = {}
|
||||
# TODO: `concurrently_execute` based on buckets of room_ids
|
||||
@@ -2171,34 +2171,50 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
},
|
||||
)
|
||||
|
||||
# Check if the current state has been updated since the last snapshot we inserted.
|
||||
# Check if we can potentially re-use the last snapshot we inserted for this
|
||||
# room.
|
||||
#
|
||||
can_use_last_snapshot = False
|
||||
# TODO: We could also look in the database to see if a snapshot is available
|
||||
last_to_insert_membership_snapshot = (
|
||||
last_to_insert_membership_snapshots_by_room_id.get(room_id)
|
||||
last_to_insert_join_membership_snapshot = (
|
||||
last_to_insert_join_membership_snapshots_by_room_id.get(room_id)
|
||||
)
|
||||
last_to_insert_membership_info = (
|
||||
last_to_insert_membership_infos_by_room_id.get(room_id)
|
||||
last_to_insert_join_membership_info = (
|
||||
last_to_insert_join_membership_infos_by_room_id.get(room_id)
|
||||
)
|
||||
if last_to_insert_membership_snapshot is not None:
|
||||
# We can only re-use the last snapshot if their previous membership was in
|
||||
# the room. Since that's not easy to determine without more extra lookups,
|
||||
# we only apply this to join memberships for now.
|
||||
#
|
||||
# - JOIN: Re-use if no state changes
|
||||
# - LEAVE: Could re-use if they were previously joined otherwise this could
|
||||
# be a rescinded/rejected invite/knock and we don't want to leak anything.
|
||||
# - INVITE/KNOCK: ❌ We can only rely on the stripped state
|
||||
# - BAN: Could re-use if they were previously joined otherwise they might
|
||||
# have just been banned after invite/knock and we don't want to leak
|
||||
# anything.
|
||||
if (
|
||||
last_to_insert_join_membership_snapshot is not None
|
||||
and membership == Membership.JOIN
|
||||
):
|
||||
can_use_last_snapshot = True
|
||||
|
||||
# These should go hand-in-hand
|
||||
assert last_to_insert_membership_info is not None
|
||||
assert last_to_insert_join_membership_info is not None
|
||||
|
||||
# Sanity check that our tokens are in order. We should be iterating in
|
||||
# ascending order.
|
||||
assert (
|
||||
last_to_insert_membership_info.membership_event_stream_ordering
|
||||
last_to_insert_join_membership_info.membership_event_stream_ordering
|
||||
<= membership_event_stream_ordering
|
||||
)
|
||||
|
||||
# TODO: This only works if someone is from the server is participating in the room.
|
||||
# Check if the current state has been updated since the last snapshot we inserted.
|
||||
state_deltas_since_last_snapshot = await self.get_current_state_deltas_for_room(
|
||||
room_id,
|
||||
# From the last snapshot we inserted
|
||||
from_token=RoomStreamToken(
|
||||
stream=last_to_insert_membership_info.membership_event_stream_ordering,
|
||||
stream=last_to_insert_join_membership_info.membership_event_stream_ordering,
|
||||
),
|
||||
# To our current membership position
|
||||
to_token=RoomStreamToken(
|
||||
@@ -2223,10 +2239,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
# just re-use the last snapshot we inserted.
|
||||
if can_use_last_snapshot:
|
||||
# Based on the logic above, we should have a last snapshot to work from
|
||||
assert last_to_insert_membership_snapshot is not None
|
||||
assert last_to_insert_join_membership_snapshot is not None
|
||||
|
||||
sliding_sync_membership_snapshots_insert_map = (
|
||||
last_to_insert_membership_snapshot
|
||||
last_to_insert_join_membership_snapshot
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
# If we're still joined, we can pull from current state.
|
||||
@@ -2403,14 +2419,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
|
||||
)
|
||||
|
||||
to_insert_membership_snapshots[
|
||||
(room_id, user_id)
|
||||
] = last_to_insert_membership_snapshots_by_room_id[room_id] = (
|
||||
to_insert_membership_snapshots[(room_id, user_id)] = (
|
||||
sliding_sync_membership_snapshots_insert_map
|
||||
)
|
||||
to_insert_membership_infos[
|
||||
(room_id, user_id)
|
||||
] = last_to_insert_membership_infos_by_room_id[room_id] = (
|
||||
to_insert_membership_infos[(room_id, user_id)] = (
|
||||
SlidingSyncMembershipInfoWithEventPos(
|
||||
user_id=user_id,
|
||||
sender=sender,
|
||||
@@ -2422,6 +2434,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
or "master",
|
||||
)
|
||||
)
|
||||
# Keep track of the last join membership snapshot as we may be able to
|
||||
# re-use all the work we did.
|
||||
if membership == Membership.JOIN:
|
||||
last_to_insert_join_membership_snapshots_by_room_id[room_id] = (
|
||||
to_insert_membership_snapshots[(room_id, user_id)]
|
||||
)
|
||||
last_to_insert_join_membership_infos_by_room_id[room_id] = (
|
||||
to_insert_membership_infos[(room_id, user_id)]
|
||||
)
|
||||
|
||||
# Assemble data so it's ready for the batch queries in the transaction
|
||||
key_names = ("room_id", "user_id")
|
||||
|
||||
Reference in New Issue
Block a user