Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d10361dc7d | |||
| 9c2354b2b1 | |||
| f17ff7cf0b | |||
| 394c25a7fe | |||
| ce09ef058b | |||
| 9284cc0110 | |||
| 11f34920e7 |
@@ -0,0 +1 @@
|
||||
Handle requests for more events in a room when using experimental sliding sync.
|
||||
@@ -653,6 +653,13 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
assert_never(status.status)
|
||||
|
||||
if status.timeline_limit is not None and (
|
||||
status.timeline_limit < relevant_room_map[room_id].timeline_limit
|
||||
):
|
||||
# If the timeline limit has increased we want to send down
|
||||
# more historic events (even if nothing has since changed).
|
||||
rooms_should_send.add(room_id)
|
||||
|
||||
# We only need to check for new events since any state changes
|
||||
# will also come down as new events.
|
||||
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
|
||||
@@ -697,6 +704,7 @@ class SlidingSyncHandler:
|
||||
if has_lists or has_room_subscriptions:
|
||||
connection_position = await self.connection_store.record_rooms(
|
||||
sync_config=sync_config,
|
||||
room_configs=relevant_room_map,
|
||||
from_token=from_token,
|
||||
sent_room_ids=relevant_room_map.keys(),
|
||||
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
|
||||
@@ -1475,7 +1483,12 @@ class SlidingSyncHandler:
|
||||
# - When users `newly_joined`
|
||||
# - For an incremental sync where we haven't sent it down this
|
||||
# connection before
|
||||
#
|
||||
# We also decide if we should ignore the timeline bound or not. This is
|
||||
# to handle the case where the client has requested more historical
|
||||
# messages in the room by increasing the timeline limit.
|
||||
from_bound = None
|
||||
ignore_timeline_bound = False
|
||||
initial = True
|
||||
if from_token and not room_membership_for_user_at_to_token.newly_joined:
|
||||
room_status = await self.connection_store.have_sent_room(
|
||||
@@ -1483,6 +1496,7 @@ class SlidingSyncHandler:
|
||||
connection_token=from_token.connection_position,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
if room_status.status == HaveSentRoomFlag.LIVE:
|
||||
from_bound = from_token.stream_token.room_key
|
||||
initial = False
|
||||
@@ -1496,9 +1510,24 @@ class SlidingSyncHandler:
|
||||
else:
|
||||
assert_never(room_status.status)
|
||||
|
||||
if room_status.timeline_limit is not None and (
|
||||
room_status.timeline_limit < room_sync_config.timeline_limit
|
||||
):
|
||||
# If the timeline limit has been increased since previous
|
||||
# requests then we treat it as request for more events. We do
|
||||
# this by sending down a `limited` sync, ignoring the from
|
||||
# bound.
|
||||
ignore_timeline_bound = True
|
||||
|
||||
log_kv({"sliding_sync.room_status": room_status})
|
||||
|
||||
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
|
||||
log_kv(
|
||||
{
|
||||
"sliding_sync.from_bound": from_bound,
|
||||
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
|
||||
"sliding_sync.initial": initial,
|
||||
}
|
||||
)
|
||||
|
||||
# Assemble the list of timeline events
|
||||
#
|
||||
@@ -1541,7 +1570,7 @@ class SlidingSyncHandler:
|
||||
# (from newer to older events) starting at to_bound.
|
||||
# This ensures we fill the `limit` with the newest events first,
|
||||
from_key=to_bound,
|
||||
to_key=from_bound,
|
||||
to_key=from_bound if not ignore_timeline_bound else None,
|
||||
direction=Direction.BACKWARDS,
|
||||
# We add one so we can determine if there are enough events to saturate
|
||||
# the limit or not (see `limited`)
|
||||
@@ -1566,6 +1595,12 @@ class SlidingSyncHandler:
|
||||
stream=timeline_events[0].internal_metadata.stream_ordering - 1
|
||||
)
|
||||
|
||||
if ignore_timeline_bound:
|
||||
# If we're ignoring the timeline bound we *must* set limited to
|
||||
# true, as otherwise the client will append the received events
|
||||
# to the timeline, rather than replacing it.
|
||||
limited = True
|
||||
|
||||
# Make sure we don't expose any events that the client shouldn't see
|
||||
timeline_events = await filter_events_for_client(
|
||||
self.storage_controllers,
|
||||
@@ -2232,19 +2267,26 @@ class HaveSentRoom:
|
||||
contains the last stream token of the last updates we sent down
|
||||
the room, i.e. we still need to send everything since then to the
|
||||
client.
|
||||
timeline_limit: The timeline limit config for the room, if LIVE or
|
||||
PREVIOUSLY. This is used to track if the client has increased
|
||||
the timeline limit to request more events.
|
||||
"""
|
||||
|
||||
status: HaveSentRoomFlag
|
||||
last_token: Optional[RoomStreamToken]
|
||||
timeline_limit: Optional[int]
|
||||
|
||||
@staticmethod
|
||||
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
|
||||
def live(timeline_limit: int) -> "HaveSentRoom":
|
||||
return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
|
||||
|
||||
@staticmethod
|
||||
def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
|
||||
"""Constructor for `PREVIOUSLY` flag."""
|
||||
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
|
||||
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
|
||||
|
||||
|
||||
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
|
||||
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
|
||||
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
@@ -2299,6 +2341,7 @@ class SlidingSyncConnectionStore:
|
||||
async def record_rooms(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
room_configs: Dict[str, RoomSyncConfig],
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
*,
|
||||
sent_room_ids: StrCollection,
|
||||
@@ -2339,8 +2382,12 @@ class SlidingSyncConnectionStore:
|
||||
# end we can treat this as a noop.
|
||||
have_updated = False
|
||||
for room_id in sent_room_ids:
|
||||
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
|
||||
have_updated = True
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
new_room_statuses[room_id] = HaveSentRoom.live(
|
||||
room_configs[room_id].timeline_limit
|
||||
)
|
||||
if prev_state != new_room_statuses[room_id]:
|
||||
have_updated = True
|
||||
|
||||
# Whether we add/update the entries for unsent rooms depends on the
|
||||
# existing entry:
|
||||
@@ -2351,18 +2398,22 @@ class SlidingSyncConnectionStore:
|
||||
# given token, so we don't need to update the entry.
|
||||
# - NEVER: We have never previously sent down the room, and we haven't
|
||||
# sent anything down this time either so we leave it as NEVER.
|
||||
#
|
||||
# We only need to do this if `from_token` is not None, as if it is then
|
||||
# we know that there are no existing entires.
|
||||
|
||||
# Work out the new state for unsent rooms that were `LIVE`.
|
||||
if from_token:
|
||||
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
|
||||
else:
|
||||
new_unsent_state = HAVE_SENT_ROOM_NEVER
|
||||
|
||||
for room_id in unsent_room_ids:
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
|
||||
new_room_statuses[room_id] = new_unsent_state
|
||||
have_updated = True
|
||||
for room_id in unsent_room_ids:
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
if (
|
||||
prev_state is not None
|
||||
and prev_state.status == HaveSentRoomFlag.LIVE
|
||||
):
|
||||
new_room_statuses[room_id] = HaveSentRoom.previously(
|
||||
from_token.stream_token.room_key,
|
||||
room_configs[room_id].timeline_limit,
|
||||
)
|
||||
have_updated = True
|
||||
|
||||
if not have_updated:
|
||||
return prev_connection_token
|
||||
|
||||
+106
-11
@@ -39,7 +39,8 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.sliding_sync import StateValues
|
||||
from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
|
||||
from synapse.http.servlet import validate_json_object
|
||||
from synapse.rest.client import (
|
||||
devices,
|
||||
knock,
|
||||
@@ -53,6 +54,7 @@ from synapse.rest.client import (
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
SlidingSyncStreamToken,
|
||||
StreamKeyType,
|
||||
@@ -60,6 +62,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.handlers import SlidingSyncConfig
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -1357,6 +1360,22 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
|
||||
"Expected `notifier.wait_for_events(...)` to be triggered"
|
||||
)
|
||||
|
||||
def make_sync_config(
|
||||
self, user: UserID, requester: Requester, content: JsonDict
|
||||
) -> SlidingSyncConfig:
|
||||
"""Helper function to turn a dict sync body to a sync config"""
|
||||
body = validate_json_object(content, SlidingSyncBody)
|
||||
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=user,
|
||||
requester=requester,
|
||||
conn_id=body.conn_id,
|
||||
lists=body.lists,
|
||||
room_subscriptions=body.room_subscriptions,
|
||||
extensions=body.extensions,
|
||||
)
|
||||
return sync_config
|
||||
|
||||
|
||||
class SlidingSyncTestCase(SlidingSyncBase):
|
||||
"""
|
||||
@@ -4538,7 +4557,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
|
||||
timeline_limit = 5
|
||||
conn_id = "conn_id"
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -4584,19 +4602,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
requester = self.get_success(
|
||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
||||
)
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=requester.user,
|
||||
requester=requester,
|
||||
conn_id=conn_id,
|
||||
sync_config = self.make_sync_config(
|
||||
user=requester.user, requester=requester, content=sync_body
|
||||
)
|
||||
|
||||
parsed_initial_from_token = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
||||
)
|
||||
assert sync_config.lists
|
||||
room_configs = {
|
||||
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
|
||||
}
|
||||
connection_position = self.get_success(
|
||||
sliding_sync_handler.connection_store.record_rooms(
|
||||
sync_config,
|
||||
parsed_initial_from_token,
|
||||
room_configs=room_configs,
|
||||
from_token=parsed_initial_from_token,
|
||||
sent_room_ids=[],
|
||||
unsent_room_ids=[room_id1],
|
||||
)
|
||||
@@ -4646,7 +4667,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
|
||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
|
||||
conn_id = "conn_id"
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -4693,19 +4713,24 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
requester = self.get_success(
|
||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
||||
)
|
||||
sync_config = SlidingSyncConfig(
|
||||
sync_config = self.make_sync_config(
|
||||
user=requester.user,
|
||||
requester=requester,
|
||||
conn_id=conn_id,
|
||||
content=sync_body,
|
||||
)
|
||||
|
||||
parsed_initial_from_token = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
||||
)
|
||||
assert sync_config.lists
|
||||
room_configs = {
|
||||
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
|
||||
}
|
||||
connection_position = self.get_success(
|
||||
sliding_sync_handler.connection_store.record_rooms(
|
||||
sync_config,
|
||||
parsed_initial_from_token,
|
||||
room_configs=room_configs,
|
||||
from_token=parsed_initial_from_token,
|
||||
sent_room_ids=[],
|
||||
unsent_room_ids=[room_id1],
|
||||
)
|
||||
@@ -4915,6 +4940,76 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
|
||||
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
|
||||
|
||||
def test_increasing_timeline_range_sends_more_messages(self) -> None:
|
||||
"""
|
||||
Test that increasing the timeline limit via room subscriptions sends the
|
||||
room down with more messages in a limited sync.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [[EventTypes.Create, ""]],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
message_events = []
|
||||
for _ in range(10):
|
||||
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
message_events.append(resp["event_id"])
|
||||
|
||||
# Make the first Sliding Sync request
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertEqual(room_response["initial"], True)
|
||||
self.assertEqual(room_response["limited"], True)
|
||||
|
||||
# We only expect the last message at first
|
||||
self.assertEqual(
|
||||
[event["event_id"] for event in room_response["timeline"]],
|
||||
message_events[-1:],
|
||||
room_response["timeline"],
|
||||
)
|
||||
|
||||
# We also expect to get the create event state.
|
||||
self.assertEqual(
|
||||
[event["type"] for event in room_response["required_state"]],
|
||||
[EventTypes.Create],
|
||||
)
|
||||
|
||||
# Now do another request with a room subscription with an increased timeline limit
|
||||
sync_body["room_subscriptions"] = {
|
||||
room_id1: {
|
||||
"required_state": [],
|
||||
"timeline_limit": 10,
|
||||
}
|
||||
}
|
||||
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
room_response = response_body["rooms"][room_id1]
|
||||
|
||||
self.assertNotIn("initial", room_response)
|
||||
self.assertEqual(room_response["limited"], True)
|
||||
|
||||
# Now we expect all the messages
|
||||
self.assertEqual(
|
||||
[event["event_id"] for event in room_response["timeline"]],
|
||||
message_events,
|
||||
room_response["timeline"],
|
||||
)
|
||||
|
||||
# We don't expect to get the room create down, as nothing has changed.
|
||||
self.assertNotIn("required_state", room_response)
|
||||
|
||||
|
||||
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
|
||||
"""Tests for the to-device sliding sync extension"""
|
||||
|
||||
Reference in New Issue
Block a user