Fix sliding sync performance slow down for long lived connections. (#19206)
Fixes https://github.com/element-hq/synapse/issues/19175 This PR moves tracking of what lazy loaded membership we've sent to each room out of the required state table. This avoids that table from continuously growing, which massively helps performance as we pull out all matching rows for the connection when we receive a request. The new table is only read when we have data in a room to send, so we end up reading a lot fewer rows from the DB. Though we now read from that table for every room we have events to return in, rather than once at the start of the request. For an explanation of how the new table works, see the [comment](https://github.com/element-hq/synapse/blob/erikj/sss_better_membership_storage2/synapse/storage/schema/main/delta/93/02_sliding_sync_members.sql#L15-L38) on the table schema. The table is designed so that we can later prune old entries if we wish, but that is not implemented in this PR. Reviewable commit-by-commit. --------- Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
@@ -17,6 +17,7 @@ import logging
|
||||
from itertools import chain
|
||||
from typing import TYPE_CHECKING, AbstractSet, Mapping
|
||||
|
||||
import attr
|
||||
from prometheus_client import Histogram
|
||||
from typing_extensions import assert_never
|
||||
|
||||
@@ -62,6 +63,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomSyncConfig,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
@@ -106,7 +108,7 @@ class SlidingSyncHandler:
|
||||
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.connection_store = SlidingSyncConnectionStore(self.store)
|
||||
self.connection_store = SlidingSyncConnectionStore(self.clock, self.store)
|
||||
self.extensions = SlidingSyncExtensionHandler(hs)
|
||||
self.room_lists = SlidingSyncRoomLists(hs)
|
||||
|
||||
@@ -981,14 +983,15 @@ class SlidingSyncHandler:
|
||||
#
|
||||
# Calculate the `StateFilter` based on the `required_state` for the room
|
||||
required_state_filter = StateFilter.none()
|
||||
# The requested `required_state_map` with the lazy membership expanded and
|
||||
# `$ME` replaced with the user's ID. This allows us to see what membership we've
|
||||
# sent down to the client in the next request.
|
||||
#
|
||||
# Make a copy so we can modify it. Still need to be careful to make a copy of
|
||||
# the state key sets if we want to add/remove from them. We could make a deep
|
||||
# copy but this saves us some work.
|
||||
expanded_required_state_map = dict(room_sync_config.required_state_map)
|
||||
|
||||
# Keep track of which users' state we may need to fetch. We split this
|
||||
# into explicit users and lazy loaded users.
|
||||
explicit_user_state = set()
|
||||
lazy_load_user_ids = set()
|
||||
|
||||
# Whether lazy-loading of room members is enabled.
|
||||
lazy_load_room_members = False
|
||||
|
||||
if room_membership_for_user_at_to_token.membership not in (
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
@@ -1036,7 +1039,6 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
required_state_types: list[tuple[str, str | None]] = []
|
||||
num_wild_state_keys = 0
|
||||
lazy_load_room_members = False
|
||||
num_others = 0
|
||||
for (
|
||||
state_type,
|
||||
@@ -1068,43 +1070,60 @@ class SlidingSyncHandler:
|
||||
timeline_event.state_key
|
||||
)
|
||||
|
||||
# The client needs to know the membership of everyone in
|
||||
# the timeline we're returning.
|
||||
lazy_load_user_ids.update(timeline_membership)
|
||||
|
||||
# Update the required state filter so we pick up the new
|
||||
# membership
|
||||
for user_id in timeline_membership:
|
||||
required_state_types.append(
|
||||
(EventTypes.Member, user_id)
|
||||
)
|
||||
if limited or initial:
|
||||
# If the timeline is limited, we only need to
|
||||
# return the membership changes for people in
|
||||
# the timeline.
|
||||
for user_id in timeline_membership:
|
||||
required_state_types.append(
|
||||
(EventTypes.Member, user_id)
|
||||
)
|
||||
else:
|
||||
# For non-limited timelines we always return all
|
||||
# membership changes. This is so that clients
|
||||
# who have fetched the full membership list
|
||||
# already can continue to maintain it for
|
||||
# non-limited syncs.
|
||||
#
|
||||
# This assumes that for non-limited syncs there
|
||||
# won't be many membership changes that wouldn't
|
||||
# have been included already (this can only
|
||||
# happen if membership state was rolled back due
|
||||
# to state resolution anyway).
|
||||
#
|
||||
# `None` is a wildcard in the `StateFilter`
|
||||
required_state_types.append((EventTypes.Member, None))
|
||||
|
||||
# Add an explicit entry for each user in the timeline
|
||||
#
|
||||
# Make a new set or copy of the state key set so we can
|
||||
# modify it without affecting the original
|
||||
# `required_state_map`
|
||||
expanded_required_state_map[EventTypes.Member] = (
|
||||
expanded_required_state_map.get(
|
||||
EventTypes.Member, set()
|
||||
# Record the extra members we're returning.
|
||||
lazy_load_user_ids.update(
|
||||
state_key
|
||||
for event_type, state_key in room_state_delta_id_map
|
||||
if event_type == EventTypes.Member
|
||||
)
|
||||
| timeline_membership
|
||||
)
|
||||
elif state_key == StateValues.ME:
|
||||
else:
|
||||
num_others += 1
|
||||
required_state_types.append((state_type, user.to_string()))
|
||||
|
||||
# Replace `$ME` with the user's ID so we can deduplicate
|
||||
# when someone requests the same state with `$ME` or with
|
||||
# their user ID.
|
||||
#
|
||||
# Make a new set or copy of the state key set so we can
|
||||
# modify it without affecting the original
|
||||
# `required_state_map`
|
||||
expanded_required_state_map[EventTypes.Member] = (
|
||||
expanded_required_state_map.get(
|
||||
EventTypes.Member, set()
|
||||
)
|
||||
| {user.to_string()}
|
||||
normalized_state_key = state_key
|
||||
if state_key == StateValues.ME:
|
||||
normalized_state_key = user.to_string()
|
||||
|
||||
if state_type == EventTypes.Member:
|
||||
# Also track explicitly requested member state for
|
||||
# lazy membership tracking.
|
||||
explicit_user_state.add(normalized_state_key)
|
||||
|
||||
required_state_types.append(
|
||||
(state_type, normalized_state_key)
|
||||
)
|
||||
else:
|
||||
num_others += 1
|
||||
required_state_types.append((state_type, state_key))
|
||||
|
||||
set_tag(
|
||||
SynapseTags.FUNC_ARG_PREFIX
|
||||
@@ -1122,6 +1141,10 @@ class SlidingSyncHandler:
|
||||
|
||||
required_state_filter = StateFilter.from_types(required_state_types)
|
||||
|
||||
# Remove any explicitly requested user state from the lazy-loaded set,
|
||||
# as we track them separately.
|
||||
lazy_load_user_ids -= explicit_user_state
|
||||
|
||||
# We need this base set of info for the response so let's just fetch it along
|
||||
# with the `required_state` for the room
|
||||
hero_room_state = [
|
||||
@@ -1149,6 +1172,22 @@ class SlidingSyncHandler:
|
||||
# We can return all of the state that was requested if this was the first
|
||||
# time we've sent the room down this connection.
|
||||
room_state: StateMap[EventBase] = {}
|
||||
|
||||
# Includes the state for the heroes if we need them (may contain other
|
||||
# state as well).
|
||||
hero_membership_state: StateMap[EventBase] = {}
|
||||
|
||||
# By default, we mark all `lazy_load_user_ids` as being sent down
|
||||
# for the first time in this sync. We later check if we sent any of them
|
||||
# down previously and update `returned_user_id_to_last_seen_ts_map` if
|
||||
# we have.
|
||||
returned_user_id_to_last_seen_ts_map = {}
|
||||
if lazy_load_room_members:
|
||||
returned_user_id_to_last_seen_ts_map = dict.fromkeys(lazy_load_user_ids)
|
||||
new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges(
|
||||
returned_user_id_to_last_seen_ts_map=returned_user_id_to_last_seen_ts_map
|
||||
)
|
||||
|
||||
if initial:
|
||||
room_state = await self.get_current_state_at(
|
||||
room_id=room_id,
|
||||
@@ -1156,28 +1195,97 @@ class SlidingSyncHandler:
|
||||
state_filter=state_filter,
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
# The `room_state` includes the hero membership state if needed.
|
||||
# We'll later filter this down so we don't need to do so here.
|
||||
hero_membership_state = room_state
|
||||
else:
|
||||
assert from_token is not None
|
||||
assert from_bound is not None
|
||||
|
||||
if prev_room_sync_config is not None:
|
||||
# Define `all_required_user_state` as all user state we want, which
|
||||
# is the explicitly requested members, any needed for lazy
|
||||
# loading, and users whose membership has changed.
|
||||
all_required_user_state = explicit_user_state | lazy_load_user_ids
|
||||
for state_type, state_key in room_state_delta_id_map:
|
||||
if state_type == EventTypes.Member:
|
||||
all_required_user_state.add(state_key)
|
||||
|
||||
# We need to know what user state we previously sent down the
|
||||
# connection so we can determine what has changed.
|
||||
#
|
||||
# We need to fetch all users whose memberships we may want
|
||||
# to send down this sync. This includes (and matches
|
||||
# `all_required_user_state`):
|
||||
# 1. Explicitly requested user state
|
||||
# 2. Lazy loaded members, i.e. users who appear in the
|
||||
# timeline.
|
||||
# 3. The users whose membership has changed in the room, i.e.
|
||||
# in the state deltas.
|
||||
#
|
||||
# This is to correctly handle the cases where a user was
|
||||
# previously sent down as a lazy loaded member:
|
||||
# - and is now explicitly requested (so shouldn't be sent down
|
||||
# again); or
|
||||
# - their membership has changed (so we need to invalidate
|
||||
# their entry in the lazy loaded table if we don't send the
|
||||
# change down).
|
||||
if all_required_user_state:
|
||||
previously_returned_user_to_last_seen = (
|
||||
await self.store.get_sliding_sync_connection_lazy_members(
|
||||
connection_position=from_token.connection_position,
|
||||
room_id=room_id,
|
||||
user_ids=all_required_user_state,
|
||||
)
|
||||
)
|
||||
|
||||
# Update the room lazy membership changes to track which
|
||||
# lazy loaded members were needed for this sync. This is so
|
||||
# that we can correctly track the last time we sent down
|
||||
# users' membership (and so can evict old membership state
|
||||
# from the DB tables).
|
||||
returned_user_id_to_last_seen_ts_map.update(
|
||||
(user_id, timestamp)
|
||||
for user_id, timestamp in previously_returned_user_to_last_seen.items()
|
||||
if user_id in lazy_load_user_ids
|
||||
)
|
||||
else:
|
||||
previously_returned_user_to_last_seen = {}
|
||||
|
||||
# Check if there are any changes to the required state config
|
||||
# that we need to handle.
|
||||
changed_required_state_map, added_state_filter = (
|
||||
_required_state_changes(
|
||||
user.to_string(),
|
||||
prev_required_state_map=prev_room_sync_config.required_state_map,
|
||||
request_required_state_map=expanded_required_state_map,
|
||||
state_deltas=room_state_delta_id_map,
|
||||
)
|
||||
changes_return = _required_state_changes(
|
||||
user.to_string(),
|
||||
prev_required_state_map=prev_room_sync_config.required_state_map,
|
||||
request_required_state_map=room_sync_config.required_state_map,
|
||||
previously_returned_lazy_user_ids=previously_returned_user_to_last_seen.keys(),
|
||||
request_lazy_load_user_ids=lazy_load_user_ids,
|
||||
state_deltas=room_state_delta_id_map,
|
||||
)
|
||||
changed_required_state_map = changes_return.changed_required_state_map
|
||||
|
||||
if added_state_filter:
|
||||
new_connection_state.room_lazy_membership[
|
||||
room_id
|
||||
].invalidated_user_ids = changes_return.lazy_members_invalidated
|
||||
|
||||
# Add any previously returned explicit memberships to the lazy
|
||||
# loaded table. This happens when a client requested explicit
|
||||
# members and then converted them to lazy loading.
|
||||
for user_id in changes_return.extra_users_to_add_to_lazy_cache:
|
||||
# We don't know the right timestamp to use here, as we don't
|
||||
# know the last time we would have sent the membership down.
|
||||
# So we don't overwrite it if we have a timestamp already,
|
||||
# and fallback to `None` (which means now) if we don't.
|
||||
returned_user_id_to_last_seen_ts_map.setdefault(user_id, None)
|
||||
|
||||
if changes_return.added_state_filter:
|
||||
# Some state entries got added, so we pull out the current
|
||||
# state for them. If we don't do this we'd only send down new deltas.
|
||||
state_ids = await self.get_current_state_ids_at(
|
||||
room_id=room_id,
|
||||
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
|
||||
state_filter=added_state_filter,
|
||||
state_filter=changes_return.added_state_filter,
|
||||
to_token=to_token,
|
||||
)
|
||||
room_state_delta_id_map.update(state_ids)
|
||||
@@ -1189,6 +1297,7 @@ class SlidingSyncHandler:
|
||||
|
||||
# If the membership changed and we have to get heroes, get the remaining
|
||||
# heroes from the state
|
||||
hero_membership_state = {}
|
||||
if hero_user_ids:
|
||||
hero_membership_state = await self.get_current_state_at(
|
||||
room_id=room_id,
|
||||
@@ -1196,7 +1305,6 @@ class SlidingSyncHandler:
|
||||
state_filter=StateFilter.from_types(hero_room_state),
|
||||
to_token=to_token,
|
||||
)
|
||||
room_state.update(hero_membership_state)
|
||||
|
||||
required_room_state: StateMap[EventBase] = {}
|
||||
if required_state_filter != StateFilter.none():
|
||||
@@ -1219,7 +1327,7 @@ class SlidingSyncHandler:
|
||||
# Assemble heroes: extract the info from the state we just fetched
|
||||
heroes: list[SlidingSyncResult.RoomResult.StrippedHero] = []
|
||||
for hero_user_id in hero_user_ids:
|
||||
member_event = room_state.get((EventTypes.Member, hero_user_id))
|
||||
member_event = hero_membership_state.get((EventTypes.Member, hero_user_id))
|
||||
if member_event is not None:
|
||||
heroes.append(
|
||||
SlidingSyncResult.RoomResult.StrippedHero(
|
||||
@@ -1281,7 +1389,7 @@ class SlidingSyncHandler:
|
||||
bump_stamp = 0
|
||||
|
||||
room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = (
|
||||
expanded_required_state_map
|
||||
room_sync_config.required_state_map
|
||||
)
|
||||
if changed_required_state_map:
|
||||
room_sync_required_state_map_to_persist = changed_required_state_map
|
||||
@@ -1471,13 +1579,37 @@ class SlidingSyncHandler:
|
||||
return None
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class _RequiredStateChangesReturn:
|
||||
"""Return type for _required_state_changes."""
|
||||
|
||||
changed_required_state_map: Mapping[str, AbstractSet[str]] | None
|
||||
"""The updated required state map to store in the room config, or None if
|
||||
there is no change."""
|
||||
|
||||
added_state_filter: StateFilter
|
||||
"""The state filter to use to fetch any additional current state that needs
|
||||
to be returned to the client."""
|
||||
|
||||
extra_users_to_add_to_lazy_cache: AbstractSet[str] = frozenset()
|
||||
"""The set of user IDs we should add to the lazy members cache that we had
|
||||
previously returned. Handles the case where a user was previously sent down
|
||||
explicitly but is now being lazy loaded."""
|
||||
|
||||
lazy_members_invalidated: AbstractSet[str] = frozenset()
|
||||
"""The set of user IDs whose membership has changed but we didn't send down,
|
||||
so we need to invalidate them from the cache."""
|
||||
|
||||
|
||||
def _required_state_changes(
|
||||
user_id: str,
|
||||
*,
|
||||
prev_required_state_map: Mapping[str, AbstractSet[str]],
|
||||
request_required_state_map: Mapping[str, AbstractSet[str]],
|
||||
previously_returned_lazy_user_ids: AbstractSet[str],
|
||||
request_lazy_load_user_ids: AbstractSet[str],
|
||||
state_deltas: StateMap[str],
|
||||
) -> tuple[Mapping[str, AbstractSet[str]] | None, StateFilter]:
|
||||
) -> _RequiredStateChangesReturn:
|
||||
"""Calculates the changes between the required state room config from the
|
||||
previous requests compared with the current request.
|
||||
|
||||
@@ -1491,14 +1623,62 @@ def _required_state_changes(
|
||||
added, removed and then added again to the required state. In that case we
|
||||
only want to re-send that entry down sync if it has changed.
|
||||
|
||||
Returns:
|
||||
A 2-tuple of updated required state config (or None if there is no update)
|
||||
and the state filter to use to fetch extra current state that we need to
|
||||
return.
|
||||
Args:
|
||||
user_id: The user ID of the user making the request.
|
||||
prev_required_state_map: The required state map from the previous
|
||||
request.
|
||||
request_required_state_map: The required state map from the current
|
||||
request.
|
||||
previously_returned_lazy_user_ids: The set of user IDs whose membership
|
||||
we have previously returned to the client due to lazy loading. This
|
||||
is filtered to only include users who have either sent events in the
|
||||
`timeline`, `required_state` or whose membership changed.
|
||||
request_lazy_load_user_ids: The set of user IDs whose lazy-loaded
|
||||
membership is required for this request.
|
||||
state_deltas: The state deltas in the room in the request token range,
|
||||
considering user membership. See `get_current_state_deltas_for_room`
|
||||
for more details.
|
||||
"""
|
||||
|
||||
# First we find any lazy members that have been invalidated due to state
|
||||
# changes that we are not sending down.
|
||||
lazy_members_invalidated = set()
|
||||
for event_type, state_key in state_deltas:
|
||||
if event_type != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if state_key in request_lazy_load_user_ids:
|
||||
# Because it's part of the `request_lazy_load_user_ids`, we're going to
|
||||
# send this member change down.
|
||||
continue
|
||||
|
||||
if state_key not in previously_returned_lazy_user_ids:
|
||||
# We've not previously returned this member so nothing to
|
||||
# invalidate.
|
||||
continue
|
||||
|
||||
lazy_members_invalidated.add(state_key)
|
||||
|
||||
if prev_required_state_map == request_required_state_map:
|
||||
# There has been no change. Return immediately.
|
||||
return None, StateFilter.none()
|
||||
# There has been no change in state, just need to check lazy members.
|
||||
newly_returned_lazy_members = (
|
||||
request_lazy_load_user_ids - previously_returned_lazy_user_ids
|
||||
)
|
||||
if newly_returned_lazy_members:
|
||||
# There are some new lazy members we need to fetch.
|
||||
added_types: list[tuple[str, str | None]] = []
|
||||
for new_user_id in newly_returned_lazy_members:
|
||||
added_types.append((EventTypes.Member, new_user_id))
|
||||
|
||||
added_state_filter = StateFilter.from_types(added_types)
|
||||
else:
|
||||
added_state_filter = StateFilter.none()
|
||||
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=None,
|
||||
added_state_filter=added_state_filter,
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
|
||||
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
|
||||
@@ -1508,17 +1688,29 @@ def _required_state_changes(
|
||||
# already fetching everything, we don't have to fetch anything now that they've
|
||||
# narrowed.
|
||||
if StateValues.WILDCARD in prev_wildcard:
|
||||
return request_required_state_map, StateFilter.none()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.none(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
# If a event type wildcard has been added or removed we don't try and do
|
||||
# anything fancy, and instead always update the effective room required
|
||||
# state config to match the request.
|
||||
if request_wildcard - prev_wildcard:
|
||||
# Some keys were added, so we need to fetch everything
|
||||
return request_required_state_map, StateFilter.all()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.all(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
if prev_wildcard - request_wildcard:
|
||||
# Keys were only removed, so we don't have to fetch everything.
|
||||
return request_required_state_map, StateFilter.none()
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=request_required_state_map,
|
||||
added_state_filter=StateFilter.none(),
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
)
|
||||
|
||||
# Contains updates to the required state map compared with the previous room
|
||||
# config. This has the same format as `RoomSyncConfig.required_state`
|
||||
@@ -1550,6 +1742,17 @@ def _required_state_changes(
|
||||
# Nothing *added*, so we skip. Removals happen below.
|
||||
continue
|
||||
|
||||
# Handle the special case of adding `$LAZY` membership, where we want to
|
||||
# always record the change to be lazy loading, as we immediately start
|
||||
# using the lazy loading tables so there is no point *not* recording the
|
||||
# change to lazy load in the effective room config.
|
||||
if event_type == EventTypes.Member:
|
||||
old_state_key_lazy = StateValues.LAZY in old_state_keys
|
||||
request_state_key_lazy = StateValues.LAZY in request_state_keys
|
||||
if not old_state_key_lazy and request_state_key_lazy:
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# We only remove state keys from the effective state if they've been
|
||||
# removed from the request *and* the state has changed. This ensures
|
||||
# that if a client removes and then re-adds a state key, we only send
|
||||
@@ -1620,9 +1823,31 @@ def _required_state_changes(
|
||||
# LAZY values should also be ignore for event types that are
|
||||
# not membership.
|
||||
pass
|
||||
elif event_type == EventTypes.Member:
|
||||
if state_key not in previously_returned_lazy_user_ids:
|
||||
# Only add *explicit* members we haven't previously sent
|
||||
# down.
|
||||
added.append((event_type, state_key))
|
||||
else:
|
||||
added.append((event_type, state_key))
|
||||
|
||||
previously_required_state_members = set(
|
||||
prev_required_state_map.get(EventTypes.Member, ())
|
||||
)
|
||||
if StateValues.ME in previously_required_state_members:
|
||||
previously_required_state_members.add(user_id)
|
||||
|
||||
# We also need to pull out any lazy members that are now required but
|
||||
# haven't previously been returned.
|
||||
for required_user_id in (
|
||||
request_lazy_load_user_ids
|
||||
# Remove previously returned users
|
||||
- previously_returned_lazy_user_ids
|
||||
# Exclude previously explicitly requested members.
|
||||
- previously_required_state_members
|
||||
):
|
||||
added.append((EventTypes.Member, required_user_id))
|
||||
|
||||
added_state_filter = StateFilter.from_types(added)
|
||||
|
||||
# Figure out what changes we need to apply to the effective required state
|
||||
@@ -1663,13 +1888,25 @@ def _required_state_changes(
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# When handling $LAZY membership, we want to either a) not update the
|
||||
# state or b) update it to match the request. This is to avoid churn of
|
||||
# the effective required state for rooms (we deduplicate required state
|
||||
# between rooms), and because we can store the previously returned
|
||||
# explicit memberships with the lazy loaded memberships.
|
||||
if event_type == EventTypes.Member:
|
||||
old_state_key_lazy = StateValues.LAZY in old_state_keys
|
||||
request_state_key_lazy = StateValues.LAZY in request_state_keys
|
||||
has_lazy = old_state_key_lazy or request_state_key_lazy
|
||||
|
||||
# If a "$LAZY" has been added or removed we always update to match
|
||||
# the request.
|
||||
if old_state_key_lazy != request_state_key_lazy:
|
||||
# If a "$LAZY" has been added or removed we always update the effective room
|
||||
# required state config to match the request.
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
# Or if we have lazy membership and there are invalidated
|
||||
# explicit memberships.
|
||||
if has_lazy and invalidated_state_keys:
|
||||
changes[event_type] = request_state_keys
|
||||
continue
|
||||
|
||||
@@ -1684,6 +1921,28 @@ def _required_state_changes(
|
||||
if invalidated_state_keys:
|
||||
changes[event_type] = old_state_keys - invalidated_state_keys
|
||||
|
||||
# Check for any explicit membership changes that were removed that we can
|
||||
# add to the lazy members previously returned. This is so that we don't
|
||||
# return a user due to lazy loading if they were previously returned as an
|
||||
# explicit membership.
|
||||
users_to_add_to_lazy_cache: set[str] = set()
|
||||
|
||||
membership_changes = changes.get(EventTypes.Member, set())
|
||||
if membership_changes and StateValues.LAZY in request_state_keys:
|
||||
for state_key in prev_required_state_map.get(EventTypes.Member, set()):
|
||||
if state_key == StateValues.WILDCARD or state_key == StateValues.LAZY:
|
||||
# Ignore non-user IDs.
|
||||
continue
|
||||
|
||||
if state_key == StateValues.ME:
|
||||
# Normalize to proper user ID
|
||||
state_key = user_id
|
||||
|
||||
# We remember the user if they haven't been invalidated
|
||||
if (EventTypes.Member, state_key) not in state_deltas:
|
||||
users_to_add_to_lazy_cache.add(state_key)
|
||||
|
||||
new_required_state_map = None
|
||||
if changes:
|
||||
# Update the required state config based on the changes.
|
||||
new_required_state_map = dict(prev_required_state_map)
|
||||
@@ -1694,6 +1953,9 @@ def _required_state_changes(
|
||||
# Remove entries with empty state keys.
|
||||
new_required_state_map.pop(event_type, None)
|
||||
|
||||
return new_required_state_map, added_state_filter
|
||||
else:
|
||||
return None, added_state_filter
|
||||
return _RequiredStateChangesReturn(
|
||||
changed_required_state_map=new_required_state_map,
|
||||
added_state_filter=added_state_filter,
|
||||
lazy_members_invalidated=lazy_members_invalidated,
|
||||
extra_users_to_add_to_lazy_cache=users_to_add_to_lazy_cache,
|
||||
)
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import attr
|
||||
|
||||
@@ -25,9 +24,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
PerConnectionState,
|
||||
SlidingSyncConfig,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,7 +58,8 @@ class SlidingSyncConnectionStore:
|
||||
to mapping of room ID to `HaveSentRoom`.
|
||||
"""
|
||||
|
||||
store: "DataStore"
|
||||
clock: Clock
|
||||
store: DataStore
|
||||
|
||||
async def get_and_clear_connection_positions(
|
||||
self,
|
||||
@@ -101,7 +99,7 @@ class SlidingSyncConnectionStore:
|
||||
If there are no changes to the state this may return the same token as
|
||||
the existing per-connection state.
|
||||
"""
|
||||
if not new_connection_state.has_updates():
|
||||
if not new_connection_state.has_updates(self.clock):
|
||||
if from_token is not None:
|
||||
return from_token.connection_position
|
||||
else:
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Mapping, cast
|
||||
from typing import TYPE_CHECKING, AbstractSet, Mapping, cast
|
||||
|
||||
import attr
|
||||
|
||||
@@ -26,13 +26,16 @@ from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoom,
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomStatusMap,
|
||||
RoomSyncConfig,
|
||||
)
|
||||
@@ -373,6 +376,13 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
value_values=values,
|
||||
)
|
||||
|
||||
self._persist_sliding_sync_connection_lazy_members_txn(
|
||||
txn,
|
||||
connection_key,
|
||||
connection_position,
|
||||
per_connection_state.room_lazy_membership,
|
||||
)
|
||||
|
||||
return connection_position
|
||||
|
||||
@cached(iterable=True, max_entries=100000)
|
||||
@@ -446,6 +456,23 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
"""
|
||||
txn.execute(sql, (connection_key, connection_position))
|
||||
|
||||
# Move any lazy membership entries for this connection position to have
|
||||
# `NULL` connection position, indicating that it applies to all future
|
||||
# positions on this connection. This is safe because we have deleted all
|
||||
# other (potentially forked) connection positions, and so all future
|
||||
# positions in this connection will be a continuation of the current
|
||||
# position. Thus any lazy membership entries we have sent down will still
|
||||
# be valid.
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_lazy_members",
|
||||
keyvalues={
|
||||
"connection_key": connection_key,
|
||||
"connection_position": connection_position,
|
||||
},
|
||||
updatevalues={"connection_position": None},
|
||||
)
|
||||
|
||||
# Fetch and create a mapping from required state ID to the actual
|
||||
# required state for the connection.
|
||||
rows = self.db_pool.simple_select_list_txn(
|
||||
@@ -525,8 +552,153 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=room_configs,
|
||||
room_lazy_membership={},
|
||||
)
|
||||
|
||||
async def get_sliding_sync_connection_lazy_members(
|
||||
self,
|
||||
connection_position: int,
|
||||
room_id: str,
|
||||
user_ids: AbstractSet[str],
|
||||
) -> Mapping[str, int]:
|
||||
"""Get which user IDs in the room we have previously sent lazy
|
||||
membership for.
|
||||
|
||||
Args:
|
||||
connection_position: The sliding sync connection position.
|
||||
room_id: The room ID to get lazy members for.
|
||||
user_ids: The user IDs to check whether we've previously sent
|
||||
because of lazy membership.
|
||||
|
||||
Returns:
|
||||
The mapping of user IDs to the last seen timestamp for those user
|
||||
IDs. Only includes user IDs that we have previously sent lazy
|
||||
membership for, and so may be a subset of the `user_ids` passed in.
|
||||
"""
|
||||
|
||||
def get_sliding_sync_connection_lazy_members_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Mapping[str, int]:
|
||||
user_clause, user_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "user_id", user_ids
|
||||
)
|
||||
|
||||
# Fetch all the lazy membership entries for the given connection,
|
||||
# room and user IDs. We don't have the `connection_key` here, so we
|
||||
# join against `sliding_sync_connection_positions` to get it.
|
||||
#
|
||||
# Beware that there are two `connection_position` columns in the
|
||||
# query which are different, the one in
|
||||
# `sliding_sync_connection_positions` is the one we match to get the
|
||||
# connection_key, whereas the one in
|
||||
# `sliding_sync_connection_lazy_members` is what we filter against
|
||||
# (it may be null or the same as the one passed in).
|
||||
#
|
||||
# FIXME: We should pass in `connection_key` here to avoid the join.
|
||||
# We don't do this currently as the caller doesn't have it handy.
|
||||
sql = f"""
|
||||
SELECT user_id, members.connection_position, last_seen_ts
|
||||
FROM sliding_sync_connection_lazy_members AS members
|
||||
INNER JOIN sliding_sync_connection_positions AS pos USING (connection_key)
|
||||
WHERE pos.connection_position = ? AND room_id = ? AND {user_clause}
|
||||
"""
|
||||
|
||||
txn.execute(sql, (connection_position, room_id, *user_args))
|
||||
|
||||
# Filter out any cache entries that only apply to forked connection
|
||||
# positions. Entries with `NULL` `connection_position` apply to all
|
||||
# positions on the connection.
|
||||
return {
|
||||
user_id: last_seen_ts
|
||||
for user_id, db_connection_position, last_seen_ts in txn
|
||||
if db_connection_position == connection_position
|
||||
or db_connection_position is None
|
||||
}
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_connection_lazy_members",
|
||||
get_sliding_sync_connection_lazy_members_txn,
|
||||
db_autocommit=True, # Avoid transaction for single read
|
||||
)
|
||||
|
||||
def _persist_sliding_sync_connection_lazy_members_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
connection_key: int,
|
||||
new_connection_position: int,
|
||||
all_changes: dict[str, RoomLazyMembershipChanges],
|
||||
) -> None:
|
||||
"""Persist that we have sent lazy membership for the given user IDs."""
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
# Figure out which cache entries to add or update.
|
||||
#
|
||||
# These are either a) new entries we've never sent before (i.e. with a
|
||||
# None last_seen_ts), or b) where the `last_seen_ts` is old enough that
|
||||
# we want to update it.
|
||||
#
|
||||
# We don't update the timestamp every time to avoid hammering the DB
|
||||
# with writes, and we don't need the timestamp to be precise. It is used
|
||||
# to evict old entries that haven't been used in a while.
|
||||
to_update: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
user_ids_to_update = room_changes.get_returned_user_ids_to_update(
|
||||
self.clock
|
||||
)
|
||||
to_update.extend((room_id, user_id) for user_id in user_ids_to_update)
|
||||
|
||||
if to_update:
|
||||
# Upsert the new/updated entries.
|
||||
#
|
||||
# Ignore conflicts where the existing entry has a different
|
||||
# connection position (i.e. from a forked connection position). This
|
||||
# may mean that we lose some updates, but that's acceptable as this
|
||||
# is a cache and its fine for it to *not* include rows. (Downstream
|
||||
# this will cause us to maybe send a few extra lazy members down
|
||||
# sync, but we're allowed to send extra members).
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_lazy_members
|
||||
(connection_key, connection_position, room_id, user_id, last_seen_ts)
|
||||
VALUES {value_placeholder}
|
||||
ON CONFLICT (connection_key, room_id, user_id)
|
||||
DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts
|
||||
WHERE sliding_sync_connection_lazy_members.connection_position IS NULL
|
||||
OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position
|
||||
"""
|
||||
|
||||
args = [
|
||||
(connection_key, new_connection_position, room_id, user_id, now)
|
||||
for room_id, user_id in to_update
|
||||
]
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = sql.format(value_placeholder="?")
|
||||
txn.execute_values(sql, args, fetch=False)
|
||||
else:
|
||||
sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)")
|
||||
txn.execute_batch(sql, args)
|
||||
|
||||
# Remove any invalidated entries.
|
||||
to_remove: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
for user_id in room_changes.invalidated_user_ids:
|
||||
to_remove.append((room_id, user_id))
|
||||
|
||||
if to_remove:
|
||||
# We don't try and match on connection position here: it's fine to
|
||||
# remove it from all forks. This is a cache so it's fine to expire
|
||||
# arbitrary entries, the worst that happens is we send a few extra
|
||||
# lazy members down sync.
|
||||
self.db_pool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_lazy_members",
|
||||
keys=("connection_key", "room_id", "user_id"),
|
||||
values=[
|
||||
(connection_key, room_id, user_id) for room_id, user_id in to_remove
|
||||
],
|
||||
)
|
||||
|
||||
@wrap_as_background_process("delete_old_sliding_sync_connections")
|
||||
async def delete_old_sliding_sync_connections(self) -> None:
|
||||
"""Delete sliding sync connections that have not been used for a long time."""
|
||||
@@ -564,6 +736,10 @@ class PerConnectionStateDB:
|
||||
|
||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges]
|
||||
"""Lazy membership changes to persist alongside this state. Only used
|
||||
when persisting."""
|
||||
|
||||
@staticmethod
|
||||
async def from_state(
|
||||
per_connection_state: "MutablePerConnectionState", store: "DataStore"
|
||||
@@ -618,6 +794,7 @@ class PerConnectionStateDB:
|
||||
receipts=RoomStatusMap(receipts),
|
||||
account_data=RoomStatusMap(account_data),
|
||||
room_configs=per_connection_state.room_configs.maps[0],
|
||||
room_lazy_membership=per_connection_state.room_lazy_membership,
|
||||
)
|
||||
|
||||
async def to_state(self, store: "DataStore") -> "PerConnectionState":
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 Element Creations Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Tracks which member states have been sent to the client for lazy-loaded
|
||||
-- members in sliding sync. This is a *cache* as it doesn't matter if we send
|
||||
-- down members we've previously sent down, i.e. it's safe to delete any rows.
|
||||
--
|
||||
-- We could have tracked these as part of the
|
||||
-- `sliding_sync_connection_required_state` table, but that would bloat that
|
||||
-- table significantly as most rooms will have lazy-loaded members. We want to
|
||||
-- keep that table small as we always pull out all rows for the connection for
|
||||
-- every request, so storing lots of data there would be bad for performance. To
|
||||
-- keep that table small we also deduplicate the requested state across
|
||||
-- different rooms, which if we stored lazy members there would prevent.
|
||||
--
|
||||
-- We track a *rough* `last_seen_ts` for each user in each room which indicates
|
||||
-- when we last would've sent their member state to the client. `last_seen_ts`
|
||||
-- is used so that we can remove members which haven't been seen for a while to
|
||||
-- save space. This is a *rough* timestamp as we don't want to update the
|
||||
-- timestamp every time to avoid hammering the DB with writes, and we don't need
|
||||
-- the timestamp to be precise (as it is used to evict old entries that haven't
|
||||
-- been used in a while).
|
||||
--
|
||||
-- Care must be taken when handling "forked" positions, i.e. we have responded
|
||||
-- to a request with a position and then get another different request using the
|
||||
-- previous position as a base. We track this by including a
|
||||
-- `connection_position` for newly inserted rows. When we advance the position
|
||||
-- we set this to NULL for all rows which were present at that position, and
|
||||
-- delete all other rows. When reading rows we can then filter out any rows
|
||||
-- which have a non-NULL `connection_position` which is not the current
|
||||
-- position.
|
||||
--
|
||||
-- I.e. `connection_position` is NULL for rows which are valid for *all*
|
||||
-- positions on the connection, and is non-NULL for rows which are only valid
|
||||
-- for a specific position.
|
||||
--
|
||||
-- When invalidating rows, we can just delete them. Technically this could
|
||||
-- invalidate for a forked position, but this is acceptable as equivalent to a
|
||||
-- cache eviction.
|
||||
CREATE TABLE sliding_sync_connection_lazy_members (
|
||||
connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE,
|
||||
connection_position BIGINT REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE,
|
||||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
last_seen_ts BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX sliding_sync_connection_lazy_members_idx ON sliding_sync_connection_lazy_members (connection_key, room_id, user_id);
|
||||
CREATE INDEX sliding_sync_connection_lazy_members_pos_idx ON sliding_sync_connection_lazy_members (connection_key, connection_position) WHERE connection_position IS NOT NULL;
|
||||
@@ -49,12 +49,21 @@ from synapse.types import (
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.duration import Duration
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# How often to update the last seen timestamp for lazy members.
|
||||
#
|
||||
# We don't update the timestamp every time to avoid hammering the DB with
|
||||
# writes, and we don't need the timestamp to be precise (as it is used to evict
|
||||
# old entries that haven't been used in a while).
|
||||
LAZY_MEMBERS_UPDATE_INTERVAL = Duration(hours=1)
|
||||
|
||||
|
||||
class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
@@ -891,6 +900,69 @@ class PerConnectionState:
|
||||
return len(self.rooms) + len(self.receipts) + len(self.room_configs)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class RoomLazyMembershipChanges:
|
||||
"""Changes to lazily-loaded room memberships for a given room."""
|
||||
|
||||
returned_user_id_to_last_seen_ts_map: Mapping[str, int | None] = attr.Factory(dict)
|
||||
"""Map from user ID to timestamp for users whose membership we have lazily
|
||||
loaded in this room an request. The timestamp indicates the time we
|
||||
previously needed the membership, or None if we sent it down for the first
|
||||
time in this request.
|
||||
|
||||
We track a *rough* `last_seen_ts` for each user in each room which indicates
|
||||
when we last would've sent their member state to the client. This is used so
|
||||
that we can remove members which haven't been seen for a while to save
|
||||
space.
|
||||
|
||||
Note: this will include users whose membership we would have sent down but
|
||||
didn't due to us having previously sent them.
|
||||
"""
|
||||
|
||||
invalidated_user_ids: AbstractSet[str] = attr.Factory(set)
|
||||
"""Set of user IDs whose latest membership we have *not* sent down"""
|
||||
|
||||
def get_returned_user_ids_to_update(self, clock: Clock) -> StrCollection:
|
||||
"""Get the user IDs whose last seen timestamp we need to update in the
|
||||
database.
|
||||
|
||||
This is a subset of user IDs in `returned_user_id_to_last_seen_ts_map`,
|
||||
whose timestamp is either None (first time we've sent them) or older
|
||||
than `LAZY_MEMBERS_UPDATE_INTERVAL`.
|
||||
|
||||
We only update the timestamp in the database every so often to avoid
|
||||
hammering the DB with writes. We don't need the timestamp to be precise,
|
||||
as the timestamp is used to evict old entries that haven't been used in
|
||||
a while.
|
||||
"""
|
||||
|
||||
now_ms = clock.time_msec()
|
||||
return [
|
||||
user_id
|
||||
for user_id, last_seen_ts in self.returned_user_id_to_last_seen_ts_map.items()
|
||||
if last_seen_ts is None
|
||||
or now_ms - last_seen_ts >= LAZY_MEMBERS_UPDATE_INTERVAL.as_millis()
|
||||
]
|
||||
|
||||
def has_updates(self, clock: Clock) -> bool:
|
||||
"""Check if there are any updates to the lazy membership changes.
|
||||
|
||||
Called to check if we need to persist changes to the lazy membership
|
||||
state for the room. We want to avoid persisting the state if there are
|
||||
no changes, to avoid unnecessary writes (and cache misses due to new
|
||||
connection position).
|
||||
"""
|
||||
|
||||
# We consider there to be updates if there are any invalidated user
|
||||
# IDs...
|
||||
if self.invalidated_user_ids:
|
||||
return True
|
||||
|
||||
# ...or if any of the returned user IDs need their last seen timestamp
|
||||
# updating in the database.
|
||||
return bool(self.get_returned_user_ids_to_update(clock))
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class MutablePerConnectionState(PerConnectionState):
|
||||
"""A mutable version of `PerConnectionState`"""
|
||||
@@ -903,12 +975,28 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
|
||||
room_configs: typing.ChainMap[str, RoomSyncConfig]
|
||||
|
||||
def has_updates(self) -> bool:
|
||||
# A map from room ID to the lazily-loaded memberships needed for the
|
||||
# request in that room.
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict)
|
||||
|
||||
def has_updates(self, clock: Clock) -> bool:
|
||||
"""Check if there are any updates to the per-connection state that need
|
||||
persisting.
|
||||
|
||||
It is important that we don't spuriously do persistence, as that will
|
||||
always generate a new connection position which will invalidate some of
|
||||
the caches. It doesn't need to be perfect, but we should avoid always
|
||||
generating new connection positions when doing lazy loading
|
||||
"""
|
||||
return (
|
||||
bool(self.rooms.get_updates())
|
||||
or bool(self.receipts.get_updates())
|
||||
or bool(self.account_data.get_updates())
|
||||
or bool(self.get_room_config_updates())
|
||||
or any(
|
||||
change.has_updates(clock)
|
||||
for change in self.room_lazy_membership.values()
|
||||
)
|
||||
)
|
||||
|
||||
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
|
||||
|
||||
Reference in New Issue
Block a user