1
0

Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston
2024-08-20 10:35:53 +01:00
16 changed files with 846 additions and 150 deletions

1
changelog.d/17562.misc Normal file
View File

@@ -0,0 +1 @@
Test github token before running release script steps.

1
changelog.d/17570.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug where we would respond with an error when a remote server asked for media that had a length of 0, using the new multipart federation media endpoint.

1
changelog.d/17574.misc Normal file
View File

@@ -0,0 +1 @@
Refactor per-connection state in experimental sliding sync handler.

1
changelog.d/17575.misc Normal file
View File

@@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.

1
changelog.d/17579.misc Normal file
View File

@@ -0,0 +1 @@
Handle changes in `timeline_limit` in experimental sliding sync.

1
changelog.d/17589.misc Normal file
View File

@@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.

View File

@@ -324,6 +324,11 @@ def tag(gh_token: Optional[str]) -> None:
def _tag(gh_token: Optional[str]) -> None:
"""Tags the release and generates a draft GitHub release"""
if gh_token:
# Test that the GH Token is valid before continuing.
gh = Github(gh_token)
gh.get_user()
# Make sure we're in a git repo.
repo = get_repo_and_check_clean_checkout()
@@ -418,6 +423,11 @@ def publish(gh_token: str) -> None:
def _publish(gh_token: str) -> None:
"""Publish release on GitHub."""
if gh_token:
# Test that the GH Token is valid before continuing.
gh = Github(gh_token)
gh.get_user()
# Make sure we're in a git repo.
get_repo_and_check_clean_checkout()
@@ -460,6 +470,11 @@ def upload(gh_token: Optional[str]) -> None:
def _upload(gh_token: Optional[str]) -> None:
"""Upload release to pypi."""
if gh_token:
# Test that the GH Token is valid before continuing.
gh = Github(gh_token)
gh.get_user()
current_version = get_package_version()
tag_name = f"v{current_version}"
@@ -555,6 +570,11 @@ def wait_for_actions(gh_token: Optional[str]) -> None:
def _wait_for_actions(gh_token: Optional[str]) -> None:
if gh_token:
# Test that the GH Token is valid before continuing.
gh = Github(gh_token)
gh.get_user()
# Find out the version and tag name.
current_version = get_package_version()
tag_name = f"v{current_version}"
@@ -711,6 +731,11 @@ Ask the designated people to do the blog and tweets."""
@cli.command()
@click.option("--gh-token", envvar=["GH_TOKEN", "GITHUB_TOKEN"], required=True)
def full(gh_token: str) -> None:
if gh_token:
# Test that the GH Token is valid before continuing.
gh = Github(gh_token)
gh.get_user()
click.echo("1. If this is a security release, read the security wiki page.")
click.echo("2. Check for any release blockers before proceeding.")
click.echo(" https://github.com/element-hq/synapse/labels/X-Release-Blocker")

View File

@@ -19,6 +19,8 @@
#
import enum
import logging
import typing
from collections import ChainMap
from enum import Enum
from itertools import chain
from typing import (
@@ -27,14 +29,18 @@ from typing import (
Callable,
Dict,
Final,
Generic,
List,
Literal,
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
cast,
)
import attr
@@ -51,6 +57,7 @@ from synapse.api.constants import (
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.events import EventBase, StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event, strip_event
from synapse.handlers.receipts import ReceiptEventSource
from synapse.handlers.relations import BundledAggregations
from synapse.logging.opentracing import (
SynapseTags,
@@ -571,21 +578,21 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
if from_token:
# Check that we recognize the connection position, if not tell the
# clients that they need to start again.
#
# If we don't do this and the client asks for the full range of
# rooms, we end up sending down all rooms and their state from
# scratch (which can be very slow). By expiring the connection we
# allow the client a chance to do an initial request with a smaller
# range of rooms to get them some results sooner but will end up
# taking the same amount of time (more with round-trips and
# re-processing) in the end to get everything again.
if not await self.connection_store.is_valid_token(
sync_config, from_token.connection_position
):
raise SlidingSyncUnknownPosition()
# Get the per-connection state (if any).
#
# Raises an exception if there is a `connection_position` that we don't
# recognize. If we don't do this and the client asks for the full range
# of rooms, we end up sending down all rooms and their state from
# scratch (which can be very slow). By expiring the connection we allow
# the client a chance to do an initial request with a smaller range of
# rooms to get them some results sooner but will end up taking the same
# amount of time (more with round-trips and re-processing) in the end to
# get everything again.
previous_connection_state = (
await self.connection_store.get_per_connection_state(
sync_config, from_token
)
)
await self.connection_store.mark_token_seen(
sync_config=sync_config,
@@ -780,12 +787,21 @@ class SlidingSyncHandler:
# subscription and have updates we need to send (i.e. either because
# we haven't sent the room down, or we have but there are missing
# updates).
for room_id in relevant_room_map:
status = await self.connection_store.have_sent_room(
sync_config,
from_token.connection_position,
room_id,
for room_id, room_config in relevant_room_map.items():
prev_room_sync_config = previous_connection_state.room_configs.get(
room_id
)
if prev_room_sync_config is not None:
# Always include rooms whose timeline limit has increased.
# (see the "XXX: Odd behavior" described below)
if (
prev_room_sync_config.timeline_limit
< room_config.timeline_limit
):
rooms_should_send.add(room_id)
continue
status = previous_connection_state.rooms.have_sent_room(room_id)
if (
# The room was never sent down before so the client needs to know
# about it regardless of any updates.
@@ -816,11 +832,15 @@ class SlidingSyncHandler:
if room_id in rooms_should_send
}
new_connection_state = previous_connection_state.get_mutable()
@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
room_id=room_id,
room_sync_config=relevant_rooms_to_send_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -841,6 +861,8 @@ class SlidingSyncHandler:
extensions = await self.get_extensions_response(
sync_config=sync_config,
actual_lists=lists,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
# We're purposely using `relevant_room_map` instead of
# `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
# send regardless of whether they have an event update or not. The
@@ -882,11 +904,18 @@ class SlidingSyncHandler:
)
unsent_room_ids = list(missing_event_map_by_room)
connection_position = await self.connection_store.record_rooms(
new_connection_state.rooms.record_unsent_rooms(
unsent_room_ids, from_token.stream_token.room_key
)
new_connection_state.rooms.record_sent_rooms(
relevant_rooms_to_send_map.keys()
)
connection_position = await self.connection_store.record_new_state(
sync_config=sync_config,
from_token=from_token,
sent_room_ids=relevant_rooms_to_send_map.keys(),
unsent_room_ids=unsent_room_ids,
new_connection_state=new_connection_state,
)
elif from_token:
connection_position = from_token.connection_position
@@ -1939,6 +1968,8 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1982,15 +2013,29 @@ class SlidingSyncHandler:
# - For an incremental sync where we haven't sent it down this
# connection before
#
# Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
# Relevant spec issue:
# https://github.com/matrix-org/matrix-spec/issues/1917
#
# XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so
# we ignore the from bound for the timeline to send down a larger chunk of
# history and set `unstable_expanded_timeline` to true. This is only being added
# to match the behavior of the Sliding Sync proxy as we expect the ElementX
# client to feel a certain way and be able to trickle in a full page of timeline
# messages to fill up the screen. This is a bit different to the behavior of the
# Sliding Sync proxy (which sets initial=true, but then doesn't send down the
# full state again), but existing apps, e.g. ElementX, just need `limited` set.
# We don't explicitly set `limited` but this will be the case for any room that
# has more history than we're trying to pull out. Using
# `unstable_expanded_timeline` allows us to avoid contaminating what `initial`
# or `limited` mean for clients that interpret them correctly. In future this
# behavior is almost certainly going to change.
#
# TODO: Also handle changes to `required_state`
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
sync_config=sync_config,
connection_token=from_token.connection_position,
room_id=room_id,
)
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
@@ -2006,7 +2051,26 @@ class SlidingSyncHandler:
log_kv({"sliding_sync.room_status": room_status})
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
# above).
if (
prev_room_sync_config.timeline_limit
< room_sync_config.timeline_limit
):
ignore_timeline_bound = True
# TODO: Check for changes in `required_state``
log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.initial": initial,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
}
)
# Assemble the list of timeline events
#
@@ -2043,6 +2107,10 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
timeline_from_bound = from_bound
if ignore_timeline_bound:
timeline_from_bound = None
# For initial `/sync` (and other historical scenarios mentioned above), we
# want to view a historical section of the timeline; to fetch events by
# `topological_ordering` (best representation of the room DAG as others were
@@ -2068,7 +2136,7 @@ class SlidingSyncHandler:
pagination_method: PaginateFunction = (
# Use `topographical_ordering` for historical events
paginate_room_events_by_topological_ordering
if from_bound is None
if timeline_from_bound is None
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
@@ -2078,7 +2146,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -2436,6 +2504,55 @@ class SlidingSyncHandler:
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
if ignore_timeline_bound:
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
# "XXX: Odd behavior" above).
unstable_expanded_timeline = True
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
elif prev_room_sync_config is not None:
# If the result is `limited` then we need to record that the
# `timeline_limit` has been reduced, as when/if the client later requests
# more timeline then we have more data to send.
#
# Otherwise (when not `limited`) we don't need to record that the
# `timeline_limit` has been reduced, as the *effective* `timeline_limit`
# (i.e. the amount of timeline we have previously sent to the client) is at
# least the previous `timeline_limit`.
#
# This is to handle the case where the `timeline_limit` e.g. goes from 10 to
# 5 to 10 again (without any timeline gaps), where there's no point sending
# down the initial historical chunk events when the `timeline_limit` is
# increased as the client already has the 10 previous events. However, if
# client has a gap in the timeline (i.e. `limited` is True), then we *do*
# need to record the reduced timeline.
#
# TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from
# the gaps we might see on the client because a response was `limited` we're
# talking about above.
if (
limited
and prev_room_sync_config.timeline_limit
> room_sync_config.timeline_limit
):
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_config.required_state_map,
)
# TODO: Record changes in required_state.
else:
new_connection_state.room_configs[room_id] = room_sync_config
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
return SlidingSyncResult.RoomResult(
@@ -2450,6 +2567,7 @@ class SlidingSyncHandler:
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
@@ -2469,6 +2587,8 @@ class SlidingSyncHandler:
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
@@ -2479,6 +2599,9 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
new_connection_state: Snapshot of the current per-connection state
new_per_connection_state: A mutable copy of the per-connection
state, used to record updates to the state during this request.
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
@@ -2523,6 +2646,8 @@ class SlidingSyncHandler:
if sync_config.extensions.receipts is not None:
receipts_response = await self.get_receipts_extension_response(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
actual_room_response_map=actual_room_response_map,
@@ -2839,9 +2964,12 @@ class SlidingSyncHandler:
account_data_by_room_map=account_data_by_room_map,
)
@trace
async def get_receipts_extension_response(
self,
sync_config: SlidingSyncConfig,
previous_connection_state: "PerConnectionState",
new_connection_state: "MutablePerConnectionState",
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
@@ -2853,6 +2981,9 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
previous_connection_state: The current per-connection state
new_connection_state: A mutable copy of the per-connection
state, used to record updates to the state.
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
@@ -2875,50 +3006,146 @@ class SlidingSyncHandler:
room_id_to_receipt_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
# TODO: Take connection tracking into account so that when a room comes back
# into range we can send the receipts that were missed.
receipt_source = self.event_sources.sources.receipt
receipts, _ = await receipt_source.get_new_events(
user=sync_config.user,
from_key=(
from_token.stream_token.receipt_key
if from_token
else MultiWriterStreamToken(stream=0)
),
to_key=to_token.receipt_key,
# This is a dummy value and isn't used in the function
limit=0,
room_ids=relevant_room_ids,
is_guest=False,
# We need to handle the different cases depending on if we have sent
# down receipts previously or not, so we split the relevant rooms
# up into different collections based on status.
live_rooms = set()
previously_rooms: Dict[str, MultiWriterStreamToken] = {}
initial_rooms = set()
for room_id in relevant_room_ids:
if not from_token:
initial_rooms.add(room_id)
continue
# If we're sending down the room from scratch again for some reason, we
# should always resend the receipts as well (regardless of if
# we've sent them down before). This is to mimic the behaviour
# of what happens on initial sync, where you get a chunk of
# timeline with all of the corresponding receipts for the events in the timeline.
room_result = actual_room_response_map.get(room_id)
if room_result is not None and room_result.initial:
initial_rooms.add(room_id)
continue
room_status = previous_connection_state.receipts.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
live_rooms.add(room_id)
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
previously_rooms[room_id] = room_status.last_token
elif room_status.status == HaveSentRoomFlag.NEVER:
initial_rooms.add(room_id)
else:
assert_never(room_status.status)
# The set of receipts that we fetched. Private receipts need to be
# filtered out before returning.
fetched_receipts = []
# For live rooms we just fetch all receipts in those rooms since the
# `since` token.
if live_rooms:
assert from_token is not None
receipts = await self.store.get_linearized_receipts_for_rooms(
room_ids=live_rooms,
from_key=from_token.stream_token.receipt_key,
to_key=to_token.receipt_key,
)
fetched_receipts.extend(receipts)
# For rooms we've previously sent down, but aren't up to date, we
# need to use the from token from the room status.
if previously_rooms:
for room_id, receipt_token in previously_rooms.items():
# TODO: Limit the number of receipts we're about to send down
# for the room, if its too many we should TODO
previously_receipts = (
await self.store.get_linearized_receipts_for_room(
room_id=room_id,
from_key=receipt_token,
to_key=to_token.receipt_key,
)
)
fetched_receipts.extend(previously_receipts)
# For rooms we haven't previously sent down, we could send all receipts
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
# as the number of users in the room increases. (this behavior is part of the spec)
initial_rooms = {
room_id
for room_id in initial_rooms
if room_id in actual_room_response_map
}
if initial_rooms:
initial_receipts = await self.store.get_linearized_receipts_for_rooms(
room_ids=initial_rooms,
to_key=to_token.receipt_key,
)
for receipt in initial_receipts:
relevant_event_ids = {
event.event_id
for event in actual_room_response_map[
receipt["room_id"]
].timeline_events
}
content = {
event_id: content_value
for event_id, content_value in receipt["content"].items()
if event_id in relevant_event_ids
}
if content:
fetched_receipts.append(
{
"type": receipt["type"],
"room_id": receipt["room_id"],
"content": content,
}
)
fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
)
for receipt in receipts:
for receipt in fetched_receipts:
# These fields should exist for every receipt
room_id = receipt["room_id"]
type = receipt["type"]
content = receipt["content"]
# For `inital: True` rooms, we only want to include receipts for events
# in the timeline.
room_result = actual_room_response_map.get(room_id)
if room_result is not None:
if room_result.initial:
# TODO: In the future, it would be good to fetch less receipts
# out of the database in the first place but we would need to
# add a new `event_id` index to `receipts_linearized`.
relevant_event_ids = [
event.event_id for event in room_result.timeline_events
]
assert isinstance(content, dict)
content = {
event_id: content_value
for event_id, content_value in content.items()
if event_id in relevant_event_ids
}
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
# Now we update the per-connection state to track which receipts we have
# and haven't sent down.
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
if from_token:
# Now find the set of rooms that may have receipts that we're not sending
# down. We only need to check rooms that we have previously returned
# receipts for (in `previous_connection_state`) because we only care about
# updating `LIVE` rooms to `PREVIOUSLY`. The `PREVIOUSLY` rooms will just
# stay pointing at their previous position so we don't need to waste time
# checking those and since we default to `NEVER`, rooms that were `NEVER`
# sent before don't need to be recorded as we'll handle them correctly when
# they come into range for the first time.
rooms_no_receipts = [
room_id
for room_id, room_status in previous_connection_state.receipts._statuses.items()
if room_status.status == HaveSentRoomFlag.LIVE
and room_id not in relevant_room_ids
]
changed_rooms = await self.store.get_rooms_with_receipts_between(
rooms_no_receipts,
from_key=from_token.stream_token.receipt_key,
to_key=to_token.receipt_key,
)
new_connection_state.receipts.record_unsent_rooms(
changed_rooms, from_token.stream_token.receipt_key
)
return SlidingSyncResult.Extensions.ReceiptsExtension(
room_id_to_receipt_map=room_id_to_receipt_map,
)
@@ -3009,9 +3236,15 @@ class HaveSentRoomFlag(Enum):
LIVE = 3
T = TypeVar("T")
@attr.s(auto_attribs=True, slots=True, frozen=True)
class HaveSentRoom:
"""Whether we have sent the room down a sliding sync connection.
class HaveSentRoom(Generic[T]):
"""Whether we have sent the room data down a sliding sync connection.
We are generic over the type of token used, e.g. `RoomStreamToken` or
`MultiWriterStreamToken`.
Attributes:
status: Flag of if we have or haven't sent down the room
@@ -3022,16 +3255,167 @@ class HaveSentRoom:
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
last_token: Optional[T]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
def live() -> "HaveSentRoom[T]":
return HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@staticmethod
def previously(last_token: T) -> "HaveSentRoom[T]":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
@staticmethod
def never() -> "HaveSentRoom[T]":
return HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@attr.s(auto_attribs=True, slots=True, frozen=True)
class RoomStatusMap(Generic[T]):
"""For a given stream, e.g. events, records what we have or have not sent
down for that stream in a given room."""
# `room_id` -> `HaveSentRoom`
_statuses: Mapping[str, HaveSentRoom[T]] = attr.Factory(dict)
def have_sent_room(self, room_id: str) -> HaveSentRoom[T]:
"""Return whether we have previously sent the room down"""
return self._statuses.get(room_id, HaveSentRoom.never())
def get_mutable(self) -> "MutableRoomStatusMap[T]":
"""Get a mutable copy of this state."""
return MutableRoomStatusMap(
statuses=self._statuses,
)
def copy(self) -> "RoomStatusMap[T]":
"""Make a copy of the class. Useful for converting from a mutable to
immutable version."""
return RoomStatusMap(statuses=dict(self._statuses))
class MutableRoomStatusMap(RoomStatusMap[T]):
"""A mutable version of `RoomStatusMap`"""
# We use a ChainMap here so that we can easily track what has been updated
# and what hasn't. Note that when we persist the per connection state this
# will get flattened to a normal dict (via calling `.copy()`)
_statuses: typing.ChainMap[str, HaveSentRoom[T]]
def __init__(
self,
statuses: Mapping[str, HaveSentRoom[T]],
) -> None:
# ChainMap requires a mutable mapping, but we're not actually going to
# mutate it.
statuses = cast(MutableMapping, statuses)
super().__init__(
statuses=ChainMap({}, statuses),
)
def get_updates(self) -> Mapping[str, HaveSentRoom[T]]:
"""Return only the changes that were made"""
return self._statuses.maps[0]
def record_sent_rooms(self, room_ids: StrCollection) -> None:
"""Record that we have sent these rooms in the response"""
for room_id in room_ids:
current_status = self._statuses.get(room_id, HaveSentRoom.never())
if current_status.status == HaveSentRoomFlag.LIVE:
continue
self._statuses[room_id] = HaveSentRoom.live()
def record_unsent_rooms(self, room_ids: StrCollection, from_token: T) -> None:
"""Record that we have not sent these rooms in the response, but there
have been updates.
"""
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
for room_id in room_ids:
current_status = self._statuses.get(room_id, HaveSentRoom.never())
if current_status.status != HaveSentRoomFlag.LIVE:
continue
self._statuses[room_id] = HaveSentRoom.previously(from_token)
@attr.s(auto_attribs=True)
class PerConnectionState:
"""The per-connection state. A snapshot of what we've sent down the
connection before.
Currently, we track whether we've sent down various aspects of a given room
before.
We use the `rooms` field to store the position in the events stream for each
room that we've previously sent to the client before. On the next request
that includes the room, we can then send only what's changed since that
recorded position.
Same goes for the `receipts` field so we only need to send the new receipts
since the last time you made a sync request.
Attributes:
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""
rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
room_configs: Mapping[str, RoomSyncConfig] = attr.Factory(dict)
def get_mutable(self) -> "MutablePerConnectionState":
"""Get a mutable copy of this state."""
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)
return MutablePerConnectionState(
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
room_configs=ChainMap({}, room_configs),
)
def copy(self) -> "PerConnectionState":
return PerConnectionState(
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
room_configs=dict(self.room_configs),
)
@attr.s(auto_attribs=True)
class MutablePerConnectionState(PerConnectionState):
"""A mutable version of `PerConnectionState`"""
rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
room_configs: typing.ChainMap[str, RoomSyncConfig]
def has_updates(self) -> bool:
return (
bool(self.rooms.get_updates())
or bool(self.receipts.get_updates())
or bool(self.get_room_config_updates())
)
def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]:
"""Get updates to the room sync config"""
return self.room_configs.maps[0]
@attr.s(auto_attribs=True)
@@ -3063,9 +3447,9 @@ class SlidingSyncConnectionStore:
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
attr.Factory(dict)
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
dict
)
async def is_valid_token(
@@ -3078,48 +3462,51 @@ class SlidingSyncConnectionStore:
conn_key = self._get_connection_key(sync_config)
return connection_token in self._connections.get(conn_key, {})
async def have_sent_room(
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
) -> HaveSentRoom:
"""For the given user_id/conn_id/token, return whether we have
previously sent the room down
"""
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
room_status = sync_statuses.get(connection_token, {}).get(
room_id, HAVE_SENT_ROOM_NEVER
)
return room_status
@trace
async def record_rooms(
async def get_per_connection_state(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
"""Record which rooms we have/haven't sent down in a new response
) -> PerConnectionState:
"""Fetch the per-connection state for the token.
Attributes:
sync_config
from_token: The since token from the request, if any
sent_room_ids: The set of room IDs that we have sent down as
part of this request (only needs to be ones we didn't
previously sent down).
unsent_room_ids: The set of room IDs that have had updates
since the `from_token`, but which were not included in
this request
Raises:
SlidingSyncUnknownPosition if the connection_token is unknown
"""
if from_token is None:
return PerConnectionState()
connection_position = from_token.connection_position
if connection_position == 0:
# Initial sync (request without a `from_token`) starts at `0` so
# there is no existing per-connection state
return PerConnectionState()
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.get(conn_key, {})
connection_state = sync_statuses.get(connection_position)
if connection_state is None:
raise SlidingSyncUnknownPosition()
return connection_state
@trace
async def record_new_state(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
new_connection_state: MutablePerConnectionState,
) -> int:
"""Record updated per-connection state, returning the connection
position associated with the new state.
If there are no changes to the state this may return the same token as
the existing per-connection state.
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_position
# If there are no changes then this is a noop.
if not sent_room_ids and not unsent_room_ids:
if not new_connection_state.has_updates():
return prev_connection_token
conn_key = self._get_connection_key(sync_config)
@@ -3130,42 +3517,9 @@ class SlidingSyncConnectionStore:
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# Copy over and update the room mappings.
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
# Whether we have updated the `new_room_statuses`, if we don't by the
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
if not have_updated:
return prev_connection_token
sync_statuses[new_store_token] = new_room_statuses
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
# don't grow forever.
sync_statuses[new_store_token] = new_connection_state.copy()
return new_store_token

View File

@@ -544,7 +544,7 @@ class MultipartFileConsumer:
Calculate the content length of the multipart response
in bytes.
"""
if not self.length:
if self.length is None:
return None
# calculate length of json field and content-type, disposition headers
json_field = json.dumps(self.json_field)

View File

@@ -1044,6 +1044,11 @@ class SlidingSyncRestServlet(RestServlet):
if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial
if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
# This will be omitted for invite/knock rooms with `stripped_state`
if (
room_result.required_state is not None

View File

@@ -51,10 +51,12 @@ from synapse.types import (
JsonMapping,
MultiWriterStreamToken,
PersistedPosition,
StrCollection,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -550,6 +552,46 @@ class ReceiptsWorkerStore(SQLBaseStore):
return results
async def get_rooms_with_receipts_between(
self,
room_ids: StrCollection,
from_key: MultiWriterStreamToken,
to_key: MultiWriterStreamToken,
) -> StrCollection:
"""Given a set of room_ids, find out which ones (may) have receipts
between the two tokens (> `from_token` and <= `to_token`)."""
room_ids = self._receipts_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
if not room_ids:
return []
def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)
sql = f"""
SELECT DISTINCT room_id FROM receipts_linearized
WHERE {clause} AND ? < stream_id AND stream_id <= ?
"""
args.append(from_key.stream)
args.append(to_key.get_max_stream_pos())
txn.execute(sql, args)
return [room_id for room_id, in txn]
results: List[str] = []
for batch in batch_iter(room_ids, 1000):
batch_result = await self.db_pool.runInteraction(
"get_rooms_with_receipts_between", f, batch
)
results.extend(batch_result)
return results
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:

View File

@@ -171,6 +171,9 @@ class SlidingSyncResult:
their local state. When there is an update, servers MUST omit this flag
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
unstable_expanded_timeline: Flag which is set if we're returning more historic
events due to the timeline limit having increased. See "XXX: Odd behavior"
comment ing `synapse.handlers.sliding_sync`.
required_state: The current state of the room
timeline: Latest events in the room. The last event is the most recent.
bundled_aggregations: A mapping of event ID to the bundled aggregations for
@@ -219,6 +222,7 @@ class SlidingSyncResult:
heroes: Optional[List[StrippedHero]]
is_dm: bool
initial: bool
unstable_expanded_timeline: bool
# Should be empty for invite/knock rooms with `stripped_state`
required_state: List[EventBase]
# Should be empty for invite/knock rooms with `stripped_state`

View File

@@ -677,3 +677,108 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
set(),
exact=True,
)
def test_receipts_incremental_sync_out_of_range(self) -> None:
"""Tests that we don't return read receipts for rooms that fall out of
range, but then do send all read receipts once they're back in range.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)
# Send a message and read receipt into room2
event_response = self.helper.send(room_id2, body="new event", tok=user2_tok)
room2_event_id = event_response["event_id"]
self.helper.send_read_receipt(room_id2, room2_event_id, tok=user1_tok)
# Now send a message into room1 so that it is at the top of the list
self.helper.send(room_id1, body="new event", tok=user2_tok)
# Make a SS request for only the top room.
sync_body = {
"lists": {
"main": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 5,
}
},
"extensions": {
"receipts": {
"enabled": True,
}
},
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# The receipt is in room2, but only room1 is returned, so we don't
# expect to get the receipt.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)
# Move room2 into range.
self.helper.send(room_id2, body="new event", tok=user2_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# We expect to see the read receipt of room2, as that has the most
# recent update.
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user1_id},
exact=True,
)
# Send a message into room1 to bump it to the top, but also send a
# receipt in room2
self.helper.send(room_id1, body="new event", tok=user2_tok)
self.helper.send_read_receipt(room_id2, room2_event_id, tok=user2_tok)
# We don't expect to see the new read receipt.
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
set(),
exact=True,
)
# But if we send a new message into room2, we expect to get the missing receipts
self.helper.send(room_id2, body="new event", tok=user2_tok)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
self.assertIncludes(
response_body["extensions"]["receipts"].get("rooms").keys(),
{room_id2},
exact=True,
)
# We should only see the new receipt
receipt = response_body["extensions"]["receipts"]["rooms"][room_id2]
self.assertIncludes(
receipt["content"][room2_event_id][ReceiptTypes.READ].keys(),
{user2_id},
exact=True,
)

View File

@@ -120,19 +120,26 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase):
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 0,
# We set this to `1` because we're testing `receipts` which
# interact with the `timeline`. With receipts, when a room
# hasn't been sent down the connection before or it appears
# as `initial: true`, we only include receipts for events in
# the timeline to avoid bloating and blowing up the sync
# response as the number of users in the room increases.
# (this behavior is part of the spec)
"timeline_limit": 1,
},
# We expect this list range to include room5, room4, room3
"bar-list": {
"ranges": [[0, 2]],
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
},
},
"room_subscriptions": {
room_id1: {
"required_state": [],
"timeline_limit": 0,
"timeline_limit": 1,
}
},
}

View File

@@ -17,6 +17,7 @@ from typing import List, Optional
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
from synapse.types import StreamToken, StrSequence
@@ -573,3 +574,138 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
# Nothing to see for this banned user in the room in the token range
self.assertIsNone(response_body["rooms"].get(room_id1))
def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}
message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])
# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]
self.assertEqual(room_response["initial"], True)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], True)
# We only expect the last message at first
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events[-1:],
message=str(room_response["timeline"]),
)
# We also expect to get the create event state.
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
room_response["required_state"],
{state_map[(EventTypes.Create, "")]},
exact=True,
)
# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertEqual(room_response["unstable_expanded_timeline"], True)
self.assertEqual(room_response["limited"], True)
# Now we expect all the messages
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=message_events,
message=str(room_response["timeline"]),
)
# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)
# Decreasing the timeline limit shouldn't resend any events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 5,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)
# Increasing the limit to what it was before also should not resend any
# events
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
event_response = self.helper.send(room_id1, "msg", tok=user1_tok)
latest_event_id = event_response["event_id"]
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertNotIn("unstable_expanded_timeline", room_response)
self.assertEqual(room_response["limited"], False)
self._assertTimelineEqual(
room_id=room_id1,
actual_event_ids=[event["event_id"] for event in room_response["timeline"]],
expected_event_ids=[latest_event_id],
message=str(room_response["timeline"]),
)

View File

@@ -45,7 +45,7 @@ from typing_extensions import Literal
from twisted.test.proto_helpers import MemoryReactorClock
from twisted.web.server import Site
from synapse.api.constants import Membership
from synapse.api.constants import Membership, ReceiptTypes
from synapse.api.errors import Codes
from synapse.server import HomeServer
from synapse.types import JsonDict
@@ -944,3 +944,15 @@ class RestHelper:
assert len(p.links) == 1, "not exactly one link in confirmation page"
oauth_uri = p.links[0]
return oauth_uri
def send_read_receipt(self, room_id: str, event_id: str, *, tok: str) -> None:
"""Send a read receipt into the room at the given event"""
channel = make_request(
self.reactor,
self.site,
method="POST",
path=f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_id}",
content={},
access_token=tok,
)
assert channel.code == HTTPStatus.OK, channel.text_body