From a5d25bb623de3602b4d1d00b09e1d3e9ce60b4fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 14:15:36 +0100 Subject: [PATCH 1/6] Test github token before running release script (#17562) This stops people from getting half way through a step and it failing due to the github token having expired (this happens to me every damn time). --- changelog.d/17562.misc | 1 + scripts-dev/release.py | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 changelog.d/17562.misc diff --git a/changelog.d/17562.misc b/changelog.d/17562.misc new file mode 100644 index 0000000000..a267df8b83 --- /dev/null +++ b/changelog.d/17562.misc @@ -0,0 +1 @@ +Test github token before running release script steps. diff --git a/scripts-dev/release.py b/scripts-dev/release.py index 5e519bb758..1ace804682 100755 --- a/scripts-dev/release.py +++ b/scripts-dev/release.py @@ -324,6 +324,11 @@ def tag(gh_token: Optional[str]) -> None: def _tag(gh_token: Optional[str]) -> None: """Tags the release and generates a draft GitHub release""" + if gh_token: + # Test that the GH Token is valid before continuing. + gh = Github(gh_token) + gh.get_user() + # Make sure we're in a git repo. repo = get_repo_and_check_clean_checkout() @@ -418,6 +423,11 @@ def publish(gh_token: str) -> None: def _publish(gh_token: str) -> None: """Publish release on GitHub.""" + if gh_token: + # Test that the GH Token is valid before continuing. + gh = Github(gh_token) + gh.get_user() + # Make sure we're in a git repo. get_repo_and_check_clean_checkout() @@ -460,6 +470,11 @@ def upload(gh_token: Optional[str]) -> None: def _upload(gh_token: Optional[str]) -> None: """Upload release to pypi.""" + if gh_token: + # Test that the GH Token is valid before continuing. + gh = Github(gh_token) + gh.get_user() + current_version = get_package_version() tag_name = f"v{current_version}" @@ -555,6 +570,11 @@ def wait_for_actions(gh_token: Optional[str]) -> None: def _wait_for_actions(gh_token: Optional[str]) -> None: + if gh_token: + # Test that the GH Token is valid before continuing. + gh = Github(gh_token) + gh.get_user() + # Find out the version and tag name. current_version = get_package_version() tag_name = f"v{current_version}" @@ -711,6 +731,11 @@ Ask the designated people to do the blog and tweets.""" @cli.command() @click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"], required=True) def full(gh_token: str) -> None: + if gh_token: + # Test that the GH Token is valid before continuing. + gh = Github(gh_token) + gh.get_user() + click.echo("1. If this is a security release, read the security wiki page.") click.echo("2. Check for any release blockers before proceeding.") click.echo(" https://github.com/element-hq/synapse/labels/X-Release-Blocker") From 993644ded0d9d80fc0ef87781b5b784ad8212903 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 15:06:44 +0100 Subject: [PATCH 2/6] Fix zero length media handling (#17570) Results in: ``` AssertionError: null File "synapse/http/server.py", line 332, in _async_render_wrapper callback_return = await self._async_render(request) File "synapse/http/server.py", line 544, in _async_render callback_return = await raw_callback_return File "synapse/federation/transport/server/_base.py", line 369, in new_func response = await func( File "synapse/federation/transport/server/federation.py", line 826, in on_GET await self.media_repo.get_local_media( File "synapse/media/media_repository.py", line 473, in get_local_media await respond_with_multipart_responder( File "synapse/media/_base.py", line 353, in respond_with_multipart_responder assert content_length is not None ``` --- changelog.d/17570.bugfix | 1 + synapse/media/media_storage.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/17570.bugfix diff --git a/changelog.d/17570.bugfix b/changelog.d/17570.bugfix new file mode 100644 index 0000000000..e2964168b1 --- /dev/null +++ b/changelog.d/17570.bugfix @@ -0,0 +1 @@ +Fix bug where we would respond with an error when a remote server asked for media that had a length of 0, using the new multipart federation media endpoint. diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index cf4208eb71..c25d1a9ba3 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -544,7 +544,7 @@ class MultipartFileConsumer: Calculate the content length of the multipart response in bytes. """ - if not self.length: + if self.length is None: return None # calculate length of json field and content-type, disposition headers json_field = json.dumps(self.json_field) From 261e7462814871b7a122fcd5518afad82530a44c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 20:09:41 +0100 Subject: [PATCH 3/6] Sliding sync: Add classes for per-connection state (#17574) This is some prep work ahead of correctly tracking receipts, where we will also want to track the room status in terms of last receipt we had sent down. Essentially, we add two classes `PerConnectionState` and a mutable version, and then operate on those. --------- Co-authored-by: Eric Eastwood --- changelog.d/17574.misc | 1 + synapse/handlers/sliding_sync.py | 295 ++++++++++++++++++++----------- 2 files changed, 196 insertions(+), 100 deletions(-) create mode 100644 changelog.d/17574.misc diff --git a/changelog.d/17574.misc b/changelog.d/17574.misc new file mode 100644 index 0000000000..71020abec4 --- /dev/null +++ b/changelog.d/17574.misc @@ -0,0 +1 @@ +Refactor per-connection state in experimental sliding sync handler. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 99510254f3..c615cc7c32 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -19,6 +19,8 @@ # import enum import logging +import typing +from collections import ChainMap from enum import Enum from itertools import chain from typing import ( @@ -30,11 +32,13 @@ from typing import ( List, Literal, Mapping, + MutableMapping, Optional, Sequence, Set, Tuple, Union, + cast, ) import attr @@ -571,21 +575,21 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() - if from_token: - # Check that we recognize the connection position, if not tell the - # clients that they need to start again. - # - # If we don't do this and the client asks for the full range of - # rooms, we end up sending down all rooms and their state from - # scratch (which can be very slow). By expiring the connection we - # allow the client a chance to do an initial request with a smaller - # range of rooms to get them some results sooner but will end up - # taking the same amount of time (more with round-trips and - # re-processing) in the end to get everything again. - if not await self.connection_store.is_valid_token( - sync_config, from_token.connection_position - ): - raise SlidingSyncUnknownPosition() + # Get the per-connection state (if any). + # + # Raises an exception if there is a `connection_position` that we don't + # recognize. If we don't do this and the client asks for the full range + # of rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we allow + # the client a chance to do an initial request with a smaller range of + # rooms to get them some results sooner but will end up taking the same + # amount of time (more with round-trips and re-processing) in the end to + # get everything again. + previous_connection_state = ( + await self.connection_store.get_per_connection_state( + sync_config, from_token + ) + ) await self.connection_store.mark_token_seen( sync_config=sync_config, @@ -781,11 +785,7 @@ class SlidingSyncHandler: # we haven't sent the room down, or we have but there are missing # updates). for room_id in relevant_room_map: - status = await self.connection_store.have_sent_room( - sync_config, - from_token.connection_position, - room_id, - ) + status = previous_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know # about it regardless of any updates. @@ -821,6 +821,7 @@ class SlidingSyncHandler: async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, + per_connection_state=previous_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -853,6 +854,8 @@ class SlidingSyncHandler: ) if has_lists or has_room_subscriptions: + new_connection_state = previous_connection_state.get_mutable() + # We now calculate if any rooms outside the range have had updates, # which we are not sending down. # @@ -882,11 +885,18 @@ class SlidingSyncHandler: ) unsent_room_ids = list(missing_event_map_by_room) - connection_position = await self.connection_store.record_rooms( + new_connection_state.rooms.record_unsent_rooms( + unsent_room_ids, from_token.stream_token + ) + + new_connection_state.rooms.record_sent_rooms( + relevant_rooms_to_send_map.keys() + ) + + connection_position = await self.connection_store.record_new_state( sync_config=sync_config, from_token=from_token, - sent_room_ids=relevant_rooms_to_send_map.keys(), - unsent_room_ids=unsent_room_ids, + per_connection_state=new_connection_state, ) elif from_token: connection_position = from_token.connection_position @@ -1939,6 +1949,7 @@ class SlidingSyncHandler: async def get_room_sync_data( self, sync_config: SlidingSyncConfig, + per_connection_state: "PerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1986,11 +1997,7 @@ class SlidingSyncHandler: from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = await self.connection_store.have_sent_room( - sync_config=sync_config, - connection_token=from_token.connection_position, - room_id=room_id, - ) + room_status = per_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -3034,6 +3041,121 @@ HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) +@attr.s(auto_attribs=True, slots=True, frozen=True) +class RoomStatusMap: + """For a given stream, e.g. events, records what we have or have not sent + down for that stream in a given room.""" + + # `room_id` -> `HaveSentRoom` + _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) + + def have_sent_room(self, room_id: str) -> HaveSentRoom: + """Return whether we have previously sent the room down""" + return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + + def get_mutable(self) -> "MutableRoomStatusMap": + """Get a mutable copy of this state.""" + return MutableRoomStatusMap( + statuses=self._statuses, + ) + + def copy(self) -> "RoomStatusMap": + """Make a copy of the class. Useful for converting from a mutable to + immutable version.""" + + return RoomStatusMap(statuses=dict(self._statuses)) + + +class MutableRoomStatusMap(RoomStatusMap): + """A mutable version of `RoomStatusMap`""" + + # We use a ChainMap here so that we can easily track what has been updated + # and what hasn't. Note that when we persist the per connection state this + # will get flattened to a normal dict (via calling `.copy()`) + _statuses: typing.ChainMap[str, HaveSentRoom] + + def __init__( + self, + statuses: Mapping[str, HaveSentRoom], + ) -> None: + # ChainMap requires a mutable mapping, but we're not actually going to + # mutate it. + statuses = cast(MutableMapping, statuses) + + super().__init__( + statuses=ChainMap({}, statuses), + ) + + def get_updates(self) -> Mapping[str, HaveSentRoom]: + """Return only the changes that were made""" + return self._statuses.maps[0] + + def record_sent_rooms(self, room_ids: StrCollection) -> None: + """Record that we have sent these rooms in the response""" + for room_id in room_ids: + current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + if current_status.status == HaveSentRoomFlag.LIVE: + continue + + self._statuses[room_id] = HAVE_SENT_ROOM_LIVE + + def record_unsent_rooms( + self, room_ids: StrCollection, from_token: StreamToken + ) -> None: + """Record that we have not sent these rooms in the response, but there + have been updates. + """ + # Whether we add/update the entries for unsent rooms depends on the + # existing entry: + # - LIVE: We have previously sent down everything up to + # `last_room_token, so we update the entry to be `PREVIOUSLY` with + # `last_room_token`. + # - PREVIOUSLY: We have previously sent down everything up to *a* + # given token, so we don't need to update the entry. + # - NEVER: We have never previously sent down the room, and we haven't + # sent anything down this time either so we leave it as NEVER. + + for room_id in room_ids: + current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + if current_status.status != HaveSentRoomFlag.LIVE: + continue + + self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) + + +@attr.s(auto_attribs=True) +class PerConnectionState: + """The per-connection state. A snapshot of what we've sent down the connection before. + + Currently, we track whether we've sent down various aspects of a given room before. + + We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position. + + Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request. + + Attributes: + rooms: The status of each room for the events stream. + """ + + rooms: RoomStatusMap = attr.Factory(RoomStatusMap) + + def get_mutable(self) -> "MutablePerConnectionState": + """Get a mutable copy of this state.""" + return MutablePerConnectionState( + rooms=self.rooms.get_mutable(), + ) + + +@attr.s(auto_attribs=True) +class MutablePerConnectionState(PerConnectionState): + """A mutable version of `PerConnectionState`""" + + rooms: MutableRoomStatusMap + + def has_updates(self) -> bool: + return bool(self.rooms.get_updates()) + + @attr.s(auto_attribs=True) class SlidingSyncConnectionStore: """In-memory store of per-connection state, including what rooms we have @@ -3063,9 +3185,9 @@ class SlidingSyncConnectionStore: to mapping of room ID to `HaveSentRoom`. """ - # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` - _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( - attr.Factory(dict) + # `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState` + _connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory( + dict ) async def is_valid_token( @@ -3078,48 +3200,52 @@ class SlidingSyncConnectionStore: conn_key = self._get_connection_key(sync_config) return connection_token in self._connections.get(conn_key, {}) - async def have_sent_room( - self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str - ) -> HaveSentRoom: - """For the given user_id/conn_id/token, return whether we have - previously sent the room down - """ - - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) - room_status = sync_statuses.get(connection_token, {}).get( - room_id, HAVE_SENT_ROOM_NEVER - ) - - return room_status - - @trace - async def record_rooms( + async def get_per_connection_state( self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], - *, - sent_room_ids: StrCollection, - unsent_room_ids: StrCollection, - ) -> int: - """Record which rooms we have/haven't sent down in a new response + ) -> PerConnectionState: + """Fetch the per-connection state for the token. - Attributes: - sync_config - from_token: The since token from the request, if any - sent_room_ids: The set of room IDs that we have sent down as - part of this request (only needs to be ones we didn't - previously sent down). - unsent_room_ids: The set of room IDs that have had updates - since the `from_token`, but which were not included in - this request + Raises: + SlidingSyncUnknownPosition if the connection_token is unknown + """ + if from_token is None: + return PerConnectionState() + + connection_position = from_token.connection_position + if connection_position == 0: + # Initial sync (request without a `from_token`) starts at `0` so + # there is no existing per-connection state + return PerConnectionState() + + conn_key = self._get_connection_key(sync_config) + sync_statuses = self._connections.get(conn_key, {}) + connection_state = sync_statuses.get(connection_position) + + if connection_state is None: + raise SlidingSyncUnknownPosition() + + return connection_state + + @trace + async def record_new_state( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + per_connection_state: MutablePerConnectionState, + ) -> int: + """Record updated per-connection state, returning the connection + position associated with the new state. + + If there are no changes to the state this may return the same token as + the existing per-connection state. """ prev_connection_token = 0 if from_token is not None: prev_connection_token = from_token.connection_position - # If there are no changes then this is a noop. - if not sent_room_ids and not unsent_room_ids: + if not per_connection_state.has_updates(): return prev_connection_token conn_key = self._get_connection_key(sync_config) @@ -3130,42 +3256,11 @@ class SlidingSyncConnectionStore: new_store_token = prev_connection_token + 1 sync_statuses.pop(new_store_token, None) - # Copy over and update the room mappings. - new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) - - # Whether we have updated the `new_room_statuses`, if we don't by the - # end we can treat this as a noop. - have_updated = False - for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True - - # Whether we add/update the entries for unsent rooms depends on the - # existing entry: - # - LIVE: We have previously sent down everything up to - # `last_room_token, so we update the entry to be `PREVIOUSLY` with - # `last_room_token`. - # - PREVIOUSLY: We have previously sent down everything up to *a* - # given token, so we don't need to update the entry. - # - NEVER: We have never previously sent down the room, and we haven't - # sent anything down this time either so we leave it as NEVER. - - # Work out the new state for unsent rooms that were `LIVE`. - if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True - - if not have_updated: - return prev_connection_token - - sync_statuses[new_store_token] = new_room_statuses + # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s + # don't grow forever. + sync_statuses[new_store_token] = PerConnectionState( + rooms=per_connection_state.rooms.copy(), + ) return new_store_token From 8b8d74d12f29a9b58c367715d7b8234e39a5eb1a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Aug 2024 21:16:07 +0100 Subject: [PATCH 4/6] Sliding sync: Correctly track which read receipts we have or have not sent down. (#17575) Add connection tracking to the receipts extension. Based on #17574 --------- Co-authored-by: Eric Eastwood --- changelog.d/17575.misc | 1 + synapse/handlers/sliding_sync.py | 283 +++++++++++++----- synapse/storage/databases/main/receipts.py | 42 +++ .../sliding_sync/test_extension_receipts.py | 105 +++++++ .../client/sliding_sync/test_extensions.py | 13 +- tests/rest/client/utils.py | 14 +- 6 files changed, 378 insertions(+), 80 deletions(-) create mode 100644 changelog.d/17575.misc diff --git a/changelog.d/17575.misc b/changelog.d/17575.misc new file mode 100644 index 0000000000..1b4a53ee17 --- /dev/null +++ b/changelog.d/17575.misc @@ -0,0 +1 @@ +Correctly track read receipts that should be sent down in experimental sliding sync. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c615cc7c32..64b5acbe98 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -29,6 +29,7 @@ from typing import ( Callable, Dict, Final, + Generic, List, Literal, Mapping, @@ -37,6 +38,7 @@ from typing import ( Sequence, Set, Tuple, + TypeVar, Union, cast, ) @@ -55,6 +57,7 @@ from synapse.api.constants import ( from synapse.api.errors import SlidingSyncUnknownPosition from synapse.events import EventBase, StrippedStateEvent from synapse.events.utils import parse_stripped_state_event, strip_event +from synapse.handlers.receipts import ReceiptEventSource from synapse.handlers.relations import BundledAggregations from synapse.logging.opentracing import ( SynapseTags, @@ -821,7 +824,7 @@ class SlidingSyncHandler: async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, - per_connection_state=previous_connection_state, + previous_connection_state=previous_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -839,9 +842,13 @@ class SlidingSyncHandler: with start_active_span("sliding_sync.generate_room_entries"): await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) + new_connection_state = previous_connection_state.get_mutable() + extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, # We're purposely using `relevant_room_map` instead of # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could # send regardless of whether they have an event update or not. The @@ -854,8 +861,6 @@ class SlidingSyncHandler: ) if has_lists or has_room_subscriptions: - new_connection_state = previous_connection_state.get_mutable() - # We now calculate if any rooms outside the range have had updates, # which we are not sending down. # @@ -886,7 +891,7 @@ class SlidingSyncHandler: unsent_room_ids = list(missing_event_map_by_room) new_connection_state.rooms.record_unsent_rooms( - unsent_room_ids, from_token.stream_token + unsent_room_ids, from_token.stream_token.room_key ) new_connection_state.rooms.record_sent_rooms( @@ -896,7 +901,7 @@ class SlidingSyncHandler: connection_position = await self.connection_store.record_new_state( sync_config=sync_config, from_token=from_token, - per_connection_state=new_connection_state, + new_connection_state=new_connection_state, ) elif from_token: connection_position = from_token.connection_position @@ -1949,7 +1954,7 @@ class SlidingSyncHandler: async def get_room_sync_data( self, sync_config: SlidingSyncConfig, - per_connection_state: "PerConnectionState", + previous_connection_state: "PerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1997,7 +2002,7 @@ class SlidingSyncHandler: from_bound = None initial = True if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = per_connection_state.rooms.have_sent_room(room_id) + room_status = previous_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: from_bound = from_token.stream_token.room_key initial = False @@ -2476,6 +2481,8 @@ class SlidingSyncHandler: async def get_extensions_response( self, sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2486,6 +2493,9 @@ class SlidingSyncHandler: Args: sync_config: Sync configuration + new_connection_state: Snapshot of the current per-connection state + new_per_connection_state: A mutable copy of the per-connection + state, used to record updates to the state during this request. actual_lists: Sliding window API. A map of list key to list results in the Sliding Sync response. actual_room_ids: The actual room IDs in the the Sliding Sync response. @@ -2530,6 +2540,8 @@ class SlidingSyncHandler: if sync_config.extensions.receipts is not None: receipts_response = await self.get_receipts_extension_response( sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, actual_lists=actual_lists, actual_room_ids=actual_room_ids, actual_room_response_map=actual_room_response_map, @@ -2849,6 +2861,8 @@ class SlidingSyncHandler: async def get_receipts_extension_response( self, sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], @@ -2860,6 +2874,9 @@ class SlidingSyncHandler: Args: sync_config: Sync configuration + previous_connection_state: The current per-connection state + new_connection_state: A mutable copy of the per-connection + state, used to record updates to the state. actual_lists: Sliding window API. A map of list key to list results in the Sliding Sync response. actual_room_ids: The actual room IDs in the the Sliding Sync response. @@ -2882,50 +2899,145 @@ class SlidingSyncHandler: room_id_to_receipt_map: Dict[str, JsonMapping] = {} if len(relevant_room_ids) > 0: - # TODO: Take connection tracking into account so that when a room comes back - # into range we can send the receipts that were missed. - receipt_source = self.event_sources.sources.receipt - receipts, _ = await receipt_source.get_new_events( - user=sync_config.user, - from_key=( - from_token.stream_token.receipt_key - if from_token - else MultiWriterStreamToken(stream=0) - ), - to_key=to_token.receipt_key, - # This is a dummy value and isn't used in the function - limit=0, - room_ids=relevant_room_ids, - is_guest=False, + # We need to handle the different cases depending on if we have sent + # down receipts previously or not, so we split the relevant rooms + # up into different collections based on status. + live_rooms = set() + previously_rooms: Dict[str, MultiWriterStreamToken] = {} + initial_rooms = set() + + for room_id in relevant_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + # If we're sending down the room from scratch again for some reason, we + # should always resend the receipts as well (regardless of if + # we've sent them down before). This is to mimic the behaviour + # of what happens on initial sync, where you get a chunk of + # timeline with all of the corresponding receipts for the events in the timeline. + room_result = actual_room_response_map.get(room_id) + if room_result is not None and room_result.initial: + initial_rooms.add(room_id) + continue + + room_status = previous_connection_state.receipts.have_sent_room(room_id) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + # The set of receipts that we fetched. Private receipts need to be + # filtered out before returning. + fetched_receipts = [] + + # For live rooms we just fetch all receipts in those rooms since the + # `since` token. + if live_rooms: + assert from_token is not None + receipts = await self.store.get_linearized_receipts_for_rooms( + room_ids=live_rooms, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, + ) + fetched_receipts.extend(receipts) + + # For rooms we've previously sent down, but aren't up to date, we + # need to use the from token from the room status. + if previously_rooms: + for room_id, receipt_token in previously_rooms.items(): + # TODO: Limit the number of receipts we're about to send down + # for the room, if its too many we should TODO + previously_receipts = ( + await self.store.get_linearized_receipts_for_room( + room_id=room_id, + from_key=receipt_token, + to_key=to_token.receipt_key, + ) + ) + fetched_receipts.extend(previously_receipts) + + # For rooms we haven't previously sent down, we could send all receipts + # from that room but we only want to include receipts for events + # in the timeline to avoid bloating and blowing up the sync response + # as the number of users in the room increases. (this behavior is part of the spec) + for room_id in initial_rooms: + room_result = actual_room_response_map.get(room_id) + if room_result is None: + continue + + relevant_event_ids = [ + event.event_id for event in room_result.timeline_events + ] + + # TODO: In the future, it would be good to fetch less receipts + # out of the database in the first place but we would need to + # add a new `event_id` index to `receipts_linearized`. + initial_receipts = await self.store.get_linearized_receipts_for_room( + room_id=room_id, + to_key=to_token.receipt_key, + ) + + for receipt in initial_receipts: + content = { + event_id: content_value + for event_id, content_value in receipt["content"].items() + if event_id in relevant_event_ids + } + if content: + fetched_receipts.append( + { + "type": receipt["type"], + "room_id": receipt["room_id"], + "content": content, + } + ) + + fetched_receipts = ReceiptEventSource.filter_out_private_receipts( + fetched_receipts, sync_config.user.to_string() ) - for receipt in receipts: + for receipt in fetched_receipts: # These fields should exist for every receipt room_id = receipt["room_id"] type = receipt["type"] content = receipt["content"] - # For `inital: True` rooms, we only want to include receipts for events - # in the timeline. - room_result = actual_room_response_map.get(room_id) - if room_result is not None: - if room_result.initial: - # TODO: In the future, it would be good to fetch less receipts - # out of the database in the first place but we would need to - # add a new `event_id` index to `receipts_linearized`. - relevant_event_ids = [ - event.event_id for event in room_result.timeline_events - ] - - assert isinstance(content, dict) - content = { - event_id: content_value - for event_id, content_value in content.items() - if event_id in relevant_event_ids - } - room_id_to_receipt_map[room_id] = {"type": type, "content": content} + # Now we update the per-connection state to track which receipts we have + # and haven't sent down. + new_connection_state.receipts.record_sent_rooms(relevant_room_ids) + + if from_token: + # Now find the set of rooms that may have receipts that we're not sending + # down. We only need to check rooms that we have previously returned + # receipts for (in `previous_connection_state`) because we only care about + # updating `LIVE` rooms to `PREVIOUSLY`. The `PREVIOUSLY` rooms will just + # stay pointing at their previous position so we don't need to waste time + # checking those and since we default to `NEVER`, rooms that were `NEVER` + # sent before don't need to be recorded as we'll handle them correctly when + # they come into range for the first time. + rooms_no_receipts = [ + room_id + for room_id, room_status in previous_connection_state.receipts._statuses.items() + if room_status.status == HaveSentRoomFlag.LIVE + and room_id not in relevant_room_ids + ] + changed_rooms = await self.store.get_rooms_with_receipts_between( + rooms_no_receipts, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, + ) + new_connection_state.receipts.record_unsent_rooms( + changed_rooms, from_token.stream_token.receipt_key + ) + return SlidingSyncResult.Extensions.ReceiptsExtension( room_id_to_receipt_map=room_id_to_receipt_map, ) @@ -3016,9 +3128,15 @@ class HaveSentRoomFlag(Enum): LIVE = 3 +T = TypeVar("T") + + @attr.s(auto_attribs=True, slots=True, frozen=True) -class HaveSentRoom: - """Whether we have sent the room down a sliding sync connection. +class HaveSentRoom(Generic[T]): + """Whether we have sent the room data down a sliding sync connection. + + We are generic over the type of token used, e.g. `RoomStreamToken` or + `MultiWriterStreamToken`. Attributes: status: Flag of if we have or haven't sent down the room @@ -3029,54 +3147,58 @@ class HaveSentRoom: """ status: HaveSentRoomFlag - last_token: Optional[RoomStreamToken] + last_token: Optional[T] @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + def live() -> "HaveSentRoom[T]": + return HaveSentRoom(HaveSentRoomFlag.LIVE, None) + + @staticmethod + def previously(last_token: T) -> "HaveSentRoom[T]": """Constructor for `PREVIOUSLY` flag.""" return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) - -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) + @staticmethod + def never() -> "HaveSentRoom[T]": + return HaveSentRoom(HaveSentRoomFlag.NEVER, None) @attr.s(auto_attribs=True, slots=True, frozen=True) -class RoomStatusMap: +class RoomStatusMap(Generic[T]): """For a given stream, e.g. events, records what we have or have not sent down for that stream in a given room.""" # `room_id` -> `HaveSentRoom` - _statuses: Mapping[str, HaveSentRoom] = attr.Factory(dict) + _statuses: Mapping[str, HaveSentRoom[T]] = attr.Factory(dict) - def have_sent_room(self, room_id: str) -> HaveSentRoom: + def have_sent_room(self, room_id: str) -> HaveSentRoom[T]: """Return whether we have previously sent the room down""" - return self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + return self._statuses.get(room_id, HaveSentRoom.never()) - def get_mutable(self) -> "MutableRoomStatusMap": + def get_mutable(self) -> "MutableRoomStatusMap[T]": """Get a mutable copy of this state.""" return MutableRoomStatusMap( statuses=self._statuses, ) - def copy(self) -> "RoomStatusMap": + def copy(self) -> "RoomStatusMap[T]": """Make a copy of the class. Useful for converting from a mutable to immutable version.""" return RoomStatusMap(statuses=dict(self._statuses)) -class MutableRoomStatusMap(RoomStatusMap): +class MutableRoomStatusMap(RoomStatusMap[T]): """A mutable version of `RoomStatusMap`""" # We use a ChainMap here so that we can easily track what has been updated # and what hasn't. Note that when we persist the per connection state this # will get flattened to a normal dict (via calling `.copy()`) - _statuses: typing.ChainMap[str, HaveSentRoom] + _statuses: typing.ChainMap[str, HaveSentRoom[T]] def __init__( self, - statuses: Mapping[str, HaveSentRoom], + statuses: Mapping[str, HaveSentRoom[T]], ) -> None: # ChainMap requires a mutable mapping, but we're not actually going to # mutate it. @@ -3086,22 +3208,20 @@ class MutableRoomStatusMap(RoomStatusMap): statuses=ChainMap({}, statuses), ) - def get_updates(self) -> Mapping[str, HaveSentRoom]: + def get_updates(self) -> Mapping[str, HaveSentRoom[T]]: """Return only the changes that were made""" return self._statuses.maps[0] def record_sent_rooms(self, room_ids: StrCollection) -> None: """Record that we have sent these rooms in the response""" for room_id in room_ids: - current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + current_status = self._statuses.get(room_id, HaveSentRoom.never()) if current_status.status == HaveSentRoomFlag.LIVE: continue - self._statuses[room_id] = HAVE_SENT_ROOM_LIVE + self._statuses[room_id] = HaveSentRoom.live() - def record_unsent_rooms( - self, room_ids: StrCollection, from_token: StreamToken - ) -> None: + def record_unsent_rooms(self, room_ids: StrCollection, from_token: T) -> None: """Record that we have not sent these rooms in the response, but there have been updates. """ @@ -3116,33 +3236,42 @@ class MutableRoomStatusMap(RoomStatusMap): # sent anything down this time either so we leave it as NEVER. for room_id in room_ids: - current_status = self._statuses.get(room_id, HAVE_SENT_ROOM_NEVER) + current_status = self._statuses.get(room_id, HaveSentRoom.never()) if current_status.status != HaveSentRoomFlag.LIVE: continue - self._statuses[room_id] = HaveSentRoom.previously(from_token.room_key) + self._statuses[room_id] = HaveSentRoom.previously(from_token) @attr.s(auto_attribs=True) class PerConnectionState: - """The per-connection state. A snapshot of what we've sent down the connection before. + """The per-connection state. A snapshot of what we've sent down the + connection before. - Currently, we track whether we've sent down various aspects of a given room before. + Currently, we track whether we've sent down various aspects of a given room + before. - We use the `rooms` field to store the position in the events stream for each room that we've previously sent to the client before. On the next request that includes the room, we can then send only what's changed since that recorded position. + We use the `rooms` field to store the position in the events stream for each + room that we've previously sent to the client before. On the next request + that includes the room, we can then send only what's changed since that + recorded position. - Same goes for the `receipts` field so we only need to send the new receipts since the last time you made a sync request. + Same goes for the `receipts` field so we only need to send the new receipts + since the last time you made a sync request. Attributes: rooms: The status of each room for the events stream. + receipts: The status of each room for the receipts stream. """ - rooms: RoomStatusMap = attr.Factory(RoomStatusMap) + rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) + receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" return MutablePerConnectionState( rooms=self.rooms.get_mutable(), + receipts=self.receipts.get_mutable(), ) @@ -3150,10 +3279,11 @@ class PerConnectionState: class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" - rooms: MutableRoomStatusMap + rooms: MutableRoomStatusMap[RoomStreamToken] + receipts: MutableRoomStatusMap[MultiWriterStreamToken] def has_updates(self) -> bool: - return bool(self.rooms.get_updates()) + return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) @attr.s(auto_attribs=True) @@ -3233,7 +3363,7 @@ class SlidingSyncConnectionStore: self, sync_config: SlidingSyncConfig, from_token: Optional[SlidingSyncStreamToken], - per_connection_state: MutablePerConnectionState, + new_connection_state: MutablePerConnectionState, ) -> int: """Record updated per-connection state, returning the connection position associated with the new state. @@ -3245,7 +3375,7 @@ class SlidingSyncConnectionStore: if from_token is not None: prev_connection_token = from_token.connection_position - if not per_connection_state.has_updates(): + if not new_connection_state.has_updates(): return prev_connection_token conn_key = self._get_connection_key(sync_config) @@ -3259,7 +3389,8 @@ class SlidingSyncConnectionStore: # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s # don't grow forever. sync_statuses[new_store_token] = PerConnectionState( - rooms=per_connection_state.rooms.copy(), + rooms=new_connection_state.rooms.copy(), + receipts=new_connection_state.receipts.copy(), ) return new_store_token diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3bde0ae0d4..e266cc2a20 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -51,10 +51,12 @@ from synapse.types import ( JsonMapping, MultiWriterStreamToken, PersistedPosition, + StrCollection, ) from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer @@ -550,6 +552,46 @@ class ReceiptsWorkerStore(SQLBaseStore): return results + async def get_rooms_with_receipts_between( + self, + room_ids: StrCollection, + from_key: MultiWriterStreamToken, + to_key: MultiWriterStreamToken, + ) -> StrCollection: + """Given a set of room_ids, find out which ones (may) have receipts + between the two tokens (> `from_token` and <= `to_token`).""" + + room_ids = self._receipts_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) + if not room_ids: + return [] + + def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + + sql = f""" + SELECT DISTINCT room_id FROM receipts_linearized + WHERE {clause} AND ? < stream_id AND stream_id <= ? + """ + args.append(from_key.stream) + args.append(to_key.get_max_stream_pos()) + + txn.execute(sql, args) + + return [room_id for room_id, in txn] + + results: List[str] = [] + for batch in batch_iter(room_ids, 1000): + batch_result = await self.db_pool.runInteraction( + "get_rooms_with_receipts_between", f, batch + ) + results.extend(batch_result) + + return results + async def get_users_sent_receipts_between( self, last_id: int, current_id: int ) -> List[str]: diff --git a/tests/rest/client/sliding_sync/test_extension_receipts.py b/tests/rest/client/sliding_sync/test_extension_receipts.py index 65fbac260e..39c51b367c 100644 --- a/tests/rest/client/sliding_sync/test_extension_receipts.py +++ b/tests/rest/client/sliding_sync/test_extension_receipts.py @@ -677,3 +677,108 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase): set(), exact=True, ) + + def test_receipts_incremental_sync_out_of_range(self) -> None: + """Tests that we don't return read receipts for rooms that fall out of + range, but then do send all read receipts once they're back in range. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # Send a message and read receipt into room2 + event_response = self.helper.send(room_id2, body="new event", tok=user2_tok) + room2_event_id = event_response["event_id"] + + self.helper.send_read_receipt(room_id2, room2_event_id, tok=user1_tok) + + # Now send a message into room1 so that it is at the top of the list + self.helper.send(room_id1, body="new event", tok=user2_tok) + + # Make a SS request for only the top room. + sync_body = { + "lists": { + "main": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 5, + } + }, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # The receipt is in room2, but only room1 is returned, so we don't + # expect to get the receipt. + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + set(), + exact=True, + ) + + # Move room2 into range. + self.helper.send(room_id2, body="new event", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # We expect to see the read receipt of room2, as that has the most + # recent update. + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + receipt = response_body["extensions"]["receipts"]["rooms"][room_id2] + self.assertIncludes( + receipt["content"][room2_event_id][ReceiptTypes.READ].keys(), + {user1_id}, + exact=True, + ) + + # Send a message into room1 to bump it to the top, but also send a + # receipt in room2 + self.helper.send(room_id1, body="new event", tok=user2_tok) + self.helper.send_read_receipt(room_id2, room2_event_id, tok=user2_tok) + + # We don't expect to see the new read receipt. + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + set(), + exact=True, + ) + + # But if we send a new message into room2, we expect to get the missing receipts + self.helper.send(room_id2, body="new event", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + self.assertIncludes( + response_body["extensions"]["receipts"].get("rooms").keys(), + {room_id2}, + exact=True, + ) + + # We should only see the new receipt + receipt = response_body["extensions"]["receipts"]["rooms"][room_id2] + self.assertIncludes( + receipt["content"][room2_event_id][ReceiptTypes.READ].keys(), + {user2_id}, + exact=True, + ) diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py index 68f6661334..ae823d5415 100644 --- a/tests/rest/client/sliding_sync/test_extensions.py +++ b/tests/rest/client/sliding_sync/test_extensions.py @@ -120,19 +120,26 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase): "foo-list": { "ranges": [[0, 1]], "required_state": [], - "timeline_limit": 0, + # We set this to `1` because we're testing `receipts` which + # interact with the `timeline`. With receipts, when a room + # hasn't been sent down the connection before or it appears + # as `initial: true`, we only include receipts for events in + # the timeline to avoid bloating and blowing up the sync + # response as the number of users in the room increases. + # (this behavior is part of the spec) + "timeline_limit": 1, }, # We expect this list range to include room5, room4, room3 "bar-list": { "ranges": [[0, 2]], "required_state": [], - "timeline_limit": 0, + "timeline_limit": 1, }, }, "room_subscriptions": { room_id1: { "required_state": [], - "timeline_limit": 0, + "timeline_limit": 1, } }, } diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index e43140720d..9614cdd66a 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -45,7 +45,7 @@ from typing_extensions import Literal from twisted.test.proto_helpers import MemoryReactorClock from twisted.web.server import Site -from synapse.api.constants import Membership +from synapse.api.constants import Membership, ReceiptTypes from synapse.api.errors import Codes from synapse.server import HomeServer from synapse.types import JsonDict @@ -944,3 +944,15 @@ class RestHelper: assert len(p.links) == 1, "not exactly one link in confirmation page" oauth_uri = p.links[0] return oauth_uri + + def send_read_receipt(self, room_id: str, event_id: str, *, tok: str) -> None: + """Send a read receipt into the room at the given event""" + channel = make_request( + self.reactor, + self.site, + method="POST", + path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}", + content={}, + access_token=tok, + ) + assert channel.code == HTTPStatus.OK, channel.text_body From 950ba844f7f0655bfea800d744aada690b9384ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:13:26 +0100 Subject: [PATCH 5/6] Sliding Sync: Batch up fetching receipts (#17589) This is to make initial sliding sync a bit faster --- changelog.d/17589.misc | 1 + synapse/handlers/sliding_sync.py | 30 ++++++++++++++++-------------- 2 files changed, 17 insertions(+), 14 deletions(-) create mode 100644 changelog.d/17589.misc diff --git a/changelog.d/17589.misc b/changelog.d/17589.misc new file mode 100644 index 0000000000..1b4a53ee17 --- /dev/null +++ b/changelog.d/17589.misc @@ -0,0 +1 @@ +Correctly track read receipts that should be sent down in experimental sliding sync. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 64b5acbe98..c6834a1036 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -2858,6 +2858,7 @@ class SlidingSyncHandler: account_data_by_room_map=account_data_by_room_map, ) + @trace async def get_receipts_extension_response( self, sync_config: SlidingSyncConfig, @@ -2966,24 +2967,25 @@ class SlidingSyncHandler: # from that room but we only want to include receipts for events # in the timeline to avoid bloating and blowing up the sync response # as the number of users in the room increases. (this behavior is part of the spec) - for room_id in initial_rooms: - room_result = actual_room_response_map.get(room_id) - if room_result is None: - continue - - relevant_event_ids = [ - event.event_id for event in room_result.timeline_events - ] - - # TODO: In the future, it would be good to fetch less receipts - # out of the database in the first place but we would need to - # add a new `event_id` index to `receipts_linearized`. - initial_receipts = await self.store.get_linearized_receipts_for_room( - room_id=room_id, + initial_rooms = { + room_id + for room_id in initial_rooms + if room_id in actual_room_response_map + } + if initial_rooms: + initial_receipts = await self.store.get_linearized_receipts_for_rooms( + room_ids=initial_rooms, to_key=to_token.receipt_key, ) for receipt in initial_receipts: + relevant_event_ids = { + event.event_id + for event in actual_room_response_map[ + receipt["room_id"] + ].timeline_events + } + content = { event_id: content_value for event_id, content_value in receipt["content"].items() From 6eb98a4f1cbc1707ebcb1b8c39ac464d31d47609 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2024 10:31:25 +0100 Subject: [PATCH 6/6] Sliding Sync: Handle timeline limit changes (take 2) (#17579) This supersedes #17503, given the per-connection state is being heavily rewritten it felt easier to recreate the PR on top of that work. This correctly handles the case of timeline limits going up and down. This does not handle changes in `required_state`, but that can be done as a separate PR. Based on #17575. --------- Co-authored-by: Eric Eastwood --- changelog.d/17579.misc | 1 + synapse/handlers/sliding_sync.py | 152 ++++++++++++++++-- synapse/rest/client/sync.py | 5 + synapse/types/handlers/__init__.py | 4 + .../sliding_sync/test_rooms_timeline.py | 136 ++++++++++++++++ 5 files changed, 285 insertions(+), 13 deletions(-) create mode 100644 changelog.d/17579.misc diff --git a/changelog.d/17579.misc b/changelog.d/17579.misc new file mode 100644 index 0000000000..5eb3d5c7b4 --- /dev/null +++ b/changelog.d/17579.misc @@ -0,0 +1 @@ +Handle changes in `timeline_limit` in experimental sliding sync. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c6834a1036..c7c81b1554 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -787,7 +787,20 @@ class SlidingSyncHandler: # subscription and have updates we need to send (i.e. either because # we haven't sent the room down, or we have but there are missing # updates). - for room_id in relevant_room_map: + for room_id, room_config in relevant_room_map.items(): + prev_room_sync_config = previous_connection_state.room_configs.get( + room_id + ) + if prev_room_sync_config is not None: + # Always include rooms whose timeline limit has increased. + # (see the "XXX: Odd behavior" described below) + if ( + prev_room_sync_config.timeline_limit + < room_config.timeline_limit + ): + rooms_should_send.add(room_id) + continue + status = previous_connection_state.rooms.have_sent_room(room_id) if ( # The room was never sent down before so the client needs to know @@ -819,12 +832,15 @@ class SlidingSyncHandler: if room_id in rooms_should_send } + new_connection_state = previous_connection_state.get_mutable() + @trace @tag_args async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( sync_config=sync_config, previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, room_id=room_id, room_sync_config=relevant_rooms_to_send_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -842,8 +858,6 @@ class SlidingSyncHandler: with start_active_span("sliding_sync.generate_room_entries"): await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) - new_connection_state = previous_connection_state.get_mutable() - extensions = await self.get_extensions_response( sync_config=sync_config, actual_lists=lists, @@ -1955,6 +1969,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1998,9 +2013,27 @@ class SlidingSyncHandler: # - For an incremental sync where we haven't sent it down this # connection before # - # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # Relevant spec issue: + # https://github.com/matrix-org/matrix-spec/issues/1917 + # + # XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so + # we ignore the from bound for the timeline to send down a larger chunk of + # history and set `unstable_expanded_timeline` to true. This is only being added + # to match the behavior of the Sliding Sync proxy as we expect the ElementX + # client to feel a certain way and be able to trickle in a full page of timeline + # messages to fill up the screen. This is a bit different to the behavior of the + # Sliding Sync proxy (which sets initial=true, but then doesn't send down the + # full state again), but existing apps, e.g. ElementX, just need `limited` set. + # We don't explicitly set `limited` but this will be the case for any room that + # has more history than we're trying to pull out. Using + # `unstable_expanded_timeline` allows us to avoid contaminating what `initial` + # or `limited` mean for clients that interpret them correctly. In future this + # behavior is almost certainly going to change. + # + # TODO: Also handle changes to `required_state` from_bound = None initial = True + ignore_timeline_bound = False if from_token and not room_membership_for_user_at_to_token.newly_joined: room_status = previous_connection_state.rooms.have_sent_room(room_id) if room_status.status == HaveSentRoomFlag.LIVE: @@ -2018,7 +2051,26 @@ class SlidingSyncHandler: log_kv({"sliding_sync.room_status": room_status}) - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) + prev_room_sync_config = previous_connection_state.room_configs.get(room_id) + if prev_room_sync_config is not None: + # Check if the timeline limit has increased, if so ignore the + # timeline bound and record the change (see "XXX: Odd behavior" + # above). + if ( + prev_room_sync_config.timeline_limit + < room_sync_config.timeline_limit + ): + ignore_timeline_bound = True + + # TODO: Check for changes in `required_state`` + + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.initial": initial, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + } + ) # Assemble the list of timeline events # @@ -2055,6 +2107,10 @@ class SlidingSyncHandler: room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) + timeline_from_bound = from_bound + if ignore_timeline_bound: + timeline_from_bound = None + # For initial `/sync` (and other historical scenarios mentioned above), we # want to view a historical section of the timeline; to fetch events by # `topological_ordering` (best representation of the room DAG as others were @@ -2080,7 +2136,7 @@ class SlidingSyncHandler: pagination_method: PaginateFunction = ( # Use `topographical_ordering` for historical events paginate_room_events_by_topological_ordering - if from_bound is None + if timeline_from_bound is None # Use `stream_ordering` for updates else paginate_room_events_by_stream_ordering ) @@ -2090,7 +2146,7 @@ class SlidingSyncHandler: # (from newer to older events) starting at to_bound. # This ensures we fill the `limit` with the newest events first, from_key=to_bound, - to_key=from_bound, + to_key=timeline_from_bound, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -2448,6 +2504,55 @@ class SlidingSyncHandler: if new_bump_event_pos.stream > 0: bump_stamp = new_bump_event_pos.stream + unstable_expanded_timeline = False + prev_room_sync_config = previous_connection_state.room_configs.get(room_id) + # Record the `room_sync_config` if we're `ignore_timeline_bound` (which means + # that the `timeline_limit` has increased) + if ignore_timeline_bound: + # FIXME: We signal the fact that we're sending down more events to + # the client by setting `unstable_expanded_timeline` to true (see + # "XXX: Odd behavior" above). + unstable_expanded_timeline = True + + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_config.required_state_map, + ) + elif prev_room_sync_config is not None: + # If the result is `limited` then we need to record that the + # `timeline_limit` has been reduced, as when/if the client later requests + # more timeline then we have more data to send. + # + # Otherwise (when not `limited`) we don't need to record that the + # `timeline_limit` has been reduced, as the *effective* `timeline_limit` + # (i.e. the amount of timeline we have previously sent to the client) is at + # least the previous `timeline_limit`. + # + # This is to handle the case where the `timeline_limit` e.g. goes from 10 to + # 5 to 10 again (without any timeline gaps), where there's no point sending + # down the initial historical chunk events when the `timeline_limit` is + # increased as the client already has the 10 previous events. However, if + # client has a gap in the timeline (i.e. `limited` is True), then we *do* + # need to record the reduced timeline. + # + # TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from + # the gaps we might see on the client because a response was `limited` we're + # talking about above. + if ( + limited + and prev_room_sync_config.timeline_limit + > room_sync_config.timeline_limit + ): + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_config.required_state_map, + ) + + # TODO: Record changes in required_state. + + else: + new_connection_state.room_configs[room_id] = room_sync_config + set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) return SlidingSyncResult.RoomResult( @@ -2462,6 +2567,7 @@ class SlidingSyncHandler: stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, + unstable_expanded_timeline=unstable_expanded_timeline, num_live=num_live, bump_stamp=bump_stamp, joined_count=room_membership_summary.get( @@ -3264,16 +3370,30 @@ class PerConnectionState: Attributes: rooms: The status of each room for the events stream. receipts: The status of each room for the receipts stream. + room_configs: Map from room_id to the `RoomSyncConfig` of all + rooms that we have previously sent down. """ rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) + room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict) + def get_mutable(self) -> "MutablePerConnectionState": """Get a mutable copy of this state.""" + room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs) + return MutablePerConnectionState( rooms=self.rooms.get_mutable(), receipts=self.receipts.get_mutable(), + room_configs=ChainMap({}, room_configs), + ) + + def copy(self) -> "PerConnectionState": + return PerConnectionState( + rooms=self.rooms.copy(), + receipts=self.receipts.copy(), + room_configs=dict(self.room_configs), ) @@ -3284,8 +3404,18 @@ class MutablePerConnectionState(PerConnectionState): rooms: MutableRoomStatusMap[RoomStreamToken] receipts: MutableRoomStatusMap[MultiWriterStreamToken] + room_configs: typing.ChainMap[str, RoomSyncConfig] + def has_updates(self) -> bool: - return bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) + return ( + bool(self.rooms.get_updates()) + or bool(self.receipts.get_updates()) + or bool(self.get_room_config_updates()) + ) + + def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: + """Get updates to the room sync config""" + return self.room_configs.maps[0] @attr.s(auto_attribs=True) @@ -3369,7 +3499,6 @@ class SlidingSyncConnectionStore: ) -> int: """Record updated per-connection state, returning the connection position associated with the new state. - If there are no changes to the state this may return the same token as the existing per-connection state. """ @@ -3390,10 +3519,7 @@ class SlidingSyncConnectionStore: # We copy the `MutablePerConnectionState` so that the inner `ChainMap`s # don't grow forever. - sync_statuses[new_store_token] = PerConnectionState( - rooms=new_connection_state.rooms.copy(), - receipts=new_connection_state.receipts.copy(), - ) + sync_statuses[new_store_token] = new_connection_state.copy() return new_store_token diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 8c5db2a513..21b90b0674 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1044,6 +1044,11 @@ class SlidingSyncRestServlet(RestServlet): if room_result.initial: serialized_rooms[room_id]["initial"] = room_result.initial + if room_result.unstable_expanded_timeline: + serialized_rooms[room_id][ + "unstable_expanded_timeline" + ] = room_result.unstable_expanded_timeline + # This will be omitted for invite/knock rooms with `stripped_state` if ( room_result.required_state is not None diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 363f060bef..580342d98a 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -171,6 +171,9 @@ class SlidingSyncResult: their local state. When there is an update, servers MUST omit this flag entirely and NOT send "initial":false as this is wasteful on bandwidth. The absence of this flag means 'false'. + unstable_expanded_timeline: Flag which is set if we're returning more historic + events due to the timeline limit having increased. See "XXX: Odd behavior" + comment ing `synapse.handlers.sliding_sync`. required_state: The current state of the room timeline: Latest events in the room. The last event is the most recent. bundled_aggregations: A mapping of event ID to the bundled aggregations for @@ -219,6 +222,7 @@ class SlidingSyncResult: heroes: Optional[List[StrippedHero]] is_dm: bool initial: bool + unstable_expanded_timeline: bool # Should be empty for invite/knock rooms with `stripped_state` required_state: List[EventBase] # Should be empty for invite/knock rooms with `stripped_state` diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 2e9586ca73..eeac0d6aa9 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -17,6 +17,7 @@ from typing import List, Optional from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin +from synapse.api.constants import EventTypes from synapse.rest.client import login, room, sync from synapse.server import HomeServer from synapse.types import StreamToken, StrSequence @@ -573,3 +574,138 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): # Nothing to see for this banned user in the room in the token range self.assertIsNone(response_body["rooms"].get(room_id1)) + + def test_increasing_timeline_range_sends_more_messages(self) -> None: + """ + Test that increasing the timeline limit via room subscriptions sends the + room down with more messages in a limited sync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [[EventTypes.Create, ""]], + "timeline_limit": 1, + } + } + } + + message_events = [] + for _ in range(10): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + message_events.append(resp["event_id"]) + + # Make the first Sliding Sync request + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + room_response = response_body["rooms"][room_id1] + + self.assertEqual(room_response["initial"], True) + self.assertNotIn("unstable_expanded_timeline", room_response) + self.assertEqual(room_response["limited"], True) + + # We only expect the last message at first + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=message_events[-1:], + message=str(room_response["timeline"]), + ) + + # We also expect to get the create event state. + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + room_response["required_state"], + {state_map[(EventTypes.Create, "")]}, + exact=True, + ) + + # Now do another request with a room subscription with an increased timeline limit + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 10, + } + } + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertEqual(room_response["unstable_expanded_timeline"], True) + self.assertEqual(room_response["limited"], True) + + # Now we expect all the messages + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=message_events, + message=str(room_response["timeline"]), + ) + + # We don't expect to get the room create down, as nothing has changed. + self.assertNotIn("required_state", room_response) + + # Decreasing the timeline limit shouldn't resend any events + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 5, + } + } + + event_response = self.helper.send(room_id1, "msg", tok=user1_tok) + latest_event_id = event_response["event_id"] + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertNotIn("unstable_expanded_timeline", room_response) + self.assertEqual(room_response["limited"], False) + + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=[latest_event_id], + message=str(room_response["timeline"]), + ) + + # Increasing the limit to what it was before also should not resend any + # events + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [], + "timeline_limit": 10, + } + } + + event_response = self.helper.send(room_id1, "msg", tok=user1_tok) + latest_event_id = event_response["event_id"] + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + room_response = response_body["rooms"][room_id1] + + self.assertNotIn("initial", room_response) + self.assertNotIn("unstable_expanded_timeline", room_response) + self.assertEqual(room_response["limited"], False) + + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[event["event_id"] for event in room_response["timeline"]], + expected_event_ids=[latest_event_id], + message=str(room_response["timeline"]), + )