Convert Sliding Sync tests to use higher-level compute_interested_rooms (#18399)

Spawning from
https://github.com/element-hq/synapse/pull/18375#discussion_r2071768635,

This updates some sliding sync tests to use a higher level function in
order to move test coverage to cover both fallback & new tables.
Important when https://github.com/element-hq/synapse/pull/18375 is
merged.

In other words, adjust tests to target `compute_interested_room(...)`
(relevant to both new and fallback path) instead of the lower level
`get_room_membership_for_user_at_to_token(...)` that only applies to the
fallback path.

### Dev notes

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new
```

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.rest.client.sliding_sync
```

```
SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new.test_display_name_changes_leave_after_token_range
```

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
Devon Hudson
2025-05-07 15:07:58 +00:00
committed by GitHub
parent 740fc885cd
commit ae877aa101
7 changed files with 1240 additions and 439 deletions

1
changelog.d/18399.misc Normal file
View File

@@ -0,0 +1 @@
Refactor [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Simplified Sliding Sync room list tests to cover both new and fallback logic paths.

View File

@@ -244,14 +244,47 @@ class SlidingSyncRoomLists:
# Note: this won't include rooms the user has left themselves. We add back
# `newly_left` rooms below. This is more efficient than fetching all rooms and
# then filtering out the old left rooms.
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
user_id
room_membership_for_user_map = (
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
user_id
)
)
# To play nice with the rewind logic below, we need to go fetch the rooms the
# user has left themselves but only if it changed after the `to_token`.
#
# If a leave happens *after* the token range, we may have still been joined (or
# any non-self-leave which is relevant to sync) to the room before so we need to
# include it in the list of potentially relevant rooms and apply our rewind
# logic (outside of this function) to see if it's actually relevant.
#
# We do this separately from
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` as those results
# are cached and the `to_token` isn't very cache friendly (people are constantly
# requesting with new tokens) so we separate it out here.
self_leave_room_membership_for_user_map = (
await self.store.get_sliding_sync_self_leave_rooms_after_to_token(
user_id, to_token
)
)
if self_leave_room_membership_for_user_map:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
room_membership_for_user_map.update(self_leave_room_membership_for_user_map)
# Remove invites from ignored users
ignored_users = await self.store.ignored_users(user_id)
if ignored_users:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Make a copy so we don't run into an error: `dictionary changed size during
# iteration`, when we remove items
@@ -263,11 +296,23 @@ class SlidingSyncRoomLists:
):
room_membership_for_user_map.pop(room_id, None)
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
changes = await self._get_rewind_changes_to_current_membership_to_token(
sync_config.user, room_membership_for_user_map, to_token=to_token
)
if changes:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id, change in changes.items():
if change is None:
@@ -278,7 +323,7 @@ class SlidingSyncRoomLists:
existing_room = room_membership_for_user_map.get(room_id)
if existing_room is not None:
# Update room membership events to the point in time of the `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
@@ -290,18 +335,18 @@ class SlidingSyncRoomLists:
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
user_id, from_token=from_token, to_token=to_token
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
# Add back `newly_left` rooms (rooms left in the from -> to token range).
#
# We do this because `get_sliding_sync_rooms_for_user(...)` doesn't include
# We do this because `get_sliding_sync_rooms_for_user_from_membership_snapshots(...)` doesn't include
# rooms that the user left themselves as it's more efficient to add them back
# here than to fetch all rooms and then filter out the old left rooms. The user
# only leaves a room once in a blue moon so this barely needs to run.
@@ -310,7 +355,12 @@ class SlidingSyncRoomLists:
newly_left_room_map.keys() - room_membership_for_user_map.keys()
)
if missing_newly_left_rooms:
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id in missing_newly_left_rooms:
newly_left_room_for_user = newly_left_room_map[room_id]
@@ -327,14 +377,21 @@ class SlidingSyncRoomLists:
# If the membership exists, it's just a normal user left the room on
# their own
if newly_left_room_for_user_sliding_sync is not None:
room_membership_for_user_map[room_id] = (
newly_left_room_for_user_sliding_sync
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=newly_left_room_for_user_sliding_sync,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = (
newly_left_room_for_user_sliding_sync
)
else:
room_membership_for_user_map.pop(room_id, None)
change = changes.get(room_id)
if change is not None:
# Update room membership events to the point in time of the `to_token`
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=change.sender,
membership=change.membership,
@@ -346,6 +403,14 @@ class SlidingSyncRoomLists:
room_type=newly_left_room_for_user_sliding_sync.room_type,
is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
# If we are `newly_left` from the room but can't find any membership,
# then we have been "state reset" out of the room
@@ -367,7 +432,7 @@ class SlidingSyncRoomLists:
newly_left_room_for_user.event_pos.to_room_stream_token(),
)
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_for_user = RoomsForUserSlidingSync(
room_id=room_id,
sender=newly_left_room_for_user.sender,
membership=newly_left_room_for_user.membership,
@@ -378,6 +443,16 @@ class SlidingSyncRoomLists:
room_type=room_type,
is_encrypted=is_encrypted,
)
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_for_user,
newly_left=room_id in newly_left_room_map,
):
room_membership_for_user_map[room_id] = room_for_user
else:
room_membership_for_user_map.pop(room_id, None)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
if sync_config.lists:
sync_room_map = room_membership_for_user_map
@@ -493,7 +568,12 @@ class SlidingSyncRoomLists:
if sync_config.room_subscriptions:
with start_active_span("assemble_room_subscriptions"):
# TODO: It would be nice to avoid these copies
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Find which rooms are partially stated and may need to be filtered out

View File

@@ -130,7 +130,7 @@ class SQLBaseStore(metaclass=ABCMeta):
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (user_id,)
"get_sliding_sync_rooms_for_user_from_membership_snapshots", (user_id,)
)
# Purge other caches based on room state.
@@ -138,7 +138,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
def _invalidate_state_caches_all(self, room_id: str) -> None:
"""Invalidates caches that are based on the current state, but does
@@ -168,7 +170,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]

View File

@@ -307,7 +307,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"get_rooms_for_user", (data.state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
@@ -319,7 +319,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
@@ -330,7 +330,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -394,7 +396,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (state_key,)
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
(state_key,),
)
self._attempt_to_invalidate_cache(
@@ -413,7 +416,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
if relates_to:
self._attempt_to_invalidate_cache(
@@ -470,7 +475,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", None
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
@@ -529,7 +536,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user_from_membership_snapshots", None
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)

View File

@@ -53,6 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
MemberSummary,
@@ -65,6 +66,7 @@ from synapse.types import (
PersistedEventPosition,
StateMap,
StrCollection,
StreamToken,
get_domain_from_id,
)
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
txn, self.get_forgotten_rooms_for_user, (user_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_sliding_sync_rooms_for_user, (user_id,)
txn,
self.get_sliding_sync_rooms_for_user_from_membership_snapshots,
(user_id,),
)
await self.db_pool.runInteraction("forget_membership", f)
@@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
)
@cached(iterable=True, max_entries=10000)
async def get_sliding_sync_rooms_for_user(
self,
user_id: str,
async def get_sliding_sync_rooms_for_user_from_membership_snapshots(
self, user_id: str
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.
"""
Get all the rooms for a user to handle a sliding sync request from the
`sliding_sync_membership_snapshots` table. These will be current memberships and
need to be rewound to the token range.
Ignores forgotten rooms and rooms that the user has left themselves.
Args:
user_id: The user ID to get the rooms for.
Returns:
Map from room ID to membership info
"""
def get_sliding_sync_rooms_for_user_txn(
def _txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
# XXX: If you use any new columns that can change (like from
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
# `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
# tests).
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the
# appropriate places (and add tests).
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
@@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
AND (m.membership != 'leave' OR m.user_id != m.sender)
"""
txn.execute(sql, (user_id,))
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
@@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
}
return await self.db_pool.runInteraction(
"get_sliding_sync_rooms_for_user",
get_sliding_sync_rooms_for_user_txn,
"get_sliding_sync_rooms_for_user_from_membership_snapshots",
_txn,
)
async def get_sliding_sync_self_leave_rooms_after_to_token(
self,
user_id: str,
to_token: StreamToken,
) -> Dict[str, RoomsForUserSlidingSync]:
"""
Get all the self-leave rooms for a user after the `to_token` (outside the token
range) that are potentially relevant[1] and needed to handle a sliding sync
request. The results are from the `sliding_sync_membership_snapshots` table and
will be current memberships and need to be rewound to the token range.
[1] If a leave happens after the token range, we may have still been joined (or
any non-self-leave which is relevant to sync) to the room before so we need to
include it in the list of potentially relevant rooms and apply
our rewind logic (outside of this function) to see if it's actually relevant.
This is basically a sister-function to
`get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could
alternatively incorporate this logic into
`get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results
are cached and the `to_token` isn't very cache friendly (people are constantly
requesting with new tokens) so we separate it out here.
Args:
user_id: The user ID to get the rooms for.
to_token: Any self-leave memberships after this position will be returned.
Returns:
Map from room ID to membership info
"""
# TODO: Potential to check
# `self._membership_stream_cache.has_entity_changed(...)` as an early-return
# shortcut.
def _txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
m.room_type,
m.is_encrypted
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
WHERE user_id = ?
AND m.forgotten = 0
AND m.membership = 'leave'
AND m.user_id = m.sender
AND (m.event_stream_ordering > ?)
"""
# If a leave happens after the token range, we may have still been joined
# (or any non-self-leave which is relevant to sync) to the room before so we
# need to include it in the list of potentially relevant rooms and apply our
# rewind logic (outside of this function).
#
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_to_token_position = to_token.room_key.stream
txn.execute(sql, (user_id, min_to_token_position))
# Map from room_id to membership info
room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {}
for row in txn:
room_for_user = RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=bool(row[9]),
)
# We filter out unknown room versions proactively. They shouldn't go
# down sync and their metadata may be in a broken state (causing
# errors).
if row[4] not in KNOWN_ROOM_VERSIONS:
continue
# We only want to include the self-leave membership if it happened after
# the token range.
#
# Since the database pulls out more than necessary, we need to filter it
# down here.
if _filter_results_by_stream(
lower_token=None,
upper_token=to_token.room_key,
instance_name=room_for_user.event_pos.instance_name,
stream_ordering=room_for_user.event_pos.stream,
):
continue
room_membership_for_user_map[room_for_user.room_id] = room_for_user
return room_membership_for_user_map
return await self.db_pool.runInteraction(
"get_sliding_sync_self_leave_rooms_after_to_token",
_txn,
)
async def get_sliding_sync_room_for_user(

View File

@@ -453,6 +453,8 @@ def _filter_results_by_stream(
stream_ordering falls between the two tokens (taking a None
token to mean unbounded).
The token range is defined by > `lower_token` and <= `upper_token`.
Used to filter results from fetching events in the DB against the given
tokens. This is necessary to handle the case where the tokens include
position maps, which we handle by fetching more than necessary from the DB

File diff suppressed because it is too large Load Diff