1
0

Compare commits

...

7 Commits

Author SHA1 Message Date
Erik Johnston d10361dc7d Newsfile 2024-07-30 18:40:55 +01:00
Erik Johnston 9c2354b2b1 Force limited=true 2024-07-30 18:40:02 +01:00
Erik Johnston f17ff7cf0b Handle increases in timeline limit 2024-07-30 16:01:13 +01:00
Erik Johnston 394c25a7fe Remember previous timeline limit 2024-07-30 13:34:44 +01:00
Erik Johnston ce09ef058b Only mark as updated if entry has changed 2024-07-30 12:50:37 +01:00
Erik Johnston 9284cc0110 Add fast path if from_token is None 2024-07-30 12:49:25 +01:00
Erik Johnston 11f34920e7 Refactor to make LIVE non-static value 2024-07-30 12:46:21 +01:00
3 changed files with 176 additions and 29 deletions
+1
View File
@@ -0,0 +1 @@
Handle requests for more events in a room when using experimental sliding sync.
+69 -18
View File
@@ -653,6 +653,13 @@ class SlidingSyncHandler:
else:
assert_never(status.status)
if status.timeline_limit is not None and (
status.timeline_limit < relevant_room_map[room_id].timeline_limit
):
# If the timeline limit has increased we want to send down
# more historic events (even if nothing has since changed).
rooms_should_send.add(room_id)
# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
@@ -697,6 +704,7 @@ class SlidingSyncHandler:
if has_lists or has_room_subscriptions:
connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
room_configs=relevant_room_map,
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
@@ -1475,7 +1483,12 @@ class SlidingSyncHandler:
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
#
# We also decide if we should ignore the timeline bound or not. This is
# to handle the case where the client has requested more historical
# messages in the room by increasing the timeline limit.
from_bound = None
ignore_timeline_bound = False
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
@@ -1483,6 +1496,7 @@ class SlidingSyncHandler:
connection_token=from_token.connection_position,
room_id=room_id,
)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
@@ -1496,9 +1510,24 @@ class SlidingSyncHandler:
else:
assert_never(room_status.status)
if room_status.timeline_limit is not None and (
room_status.timeline_limit < room_sync_config.timeline_limit
):
# If the timeline limit has been increased since previous
# requests then we treat it as request for more events. We do
# this by sending down a `limited` sync, ignoring the from
# bound.
ignore_timeline_bound = True
log_kv({"sliding_sync.room_status": room_status})
log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
log_kv(
{
"sliding_sync.from_bound": from_bound,
"sliding_sync.ignore_timeline_bound": ignore_timeline_bound,
"sliding_sync.initial": initial,
}
)
# Assemble the list of timeline events
#
@@ -1541,7 +1570,7 @@ class SlidingSyncHandler:
# (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
to_key=from_bound if not ignore_timeline_bound else None,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -1566,6 +1595,12 @@ class SlidingSyncHandler:
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
if ignore_timeline_bound:
# If we're ignoring the timeline bound we *must* set limited to
# true, as otherwise the client will append the received events
# to the timeline, rather than replacing it.
limited = True
# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
@@ -2232,19 +2267,26 @@ class HaveSentRoom:
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
timeline_limit: The timeline limit config for the room, if LIVE or
PREVIOUSLY. This is used to track if the client has increased
the timeline limit to request more events.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
timeline_limit: Optional[int]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
def live(timeline_limit: int) -> "HaveSentRoom":
return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
@staticmethod
def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
@attr.s(auto_attribs=True)
@@ -2299,6 +2341,7 @@ class SlidingSyncConnectionStore:
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
room_configs: Dict[str, RoomSyncConfig],
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
@@ -2339,8 +2382,12 @@ class SlidingSyncConnectionStore:
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
prev_state = new_room_statuses.get(room_id)
new_room_statuses[room_id] = HaveSentRoom.live(
room_configs[room_id].timeline_limit
)
if prev_state != new_room_statuses[room_id]:
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
@@ -2351,18 +2398,22 @@ class SlidingSyncConnectionStore:
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
#
# We only need to do this if `from_token` is not None, as if it is then
# we know that there are no existing entires.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if (
prev_state is not None
and prev_state.status == HaveSentRoomFlag.LIVE
):
new_room_statuses[room_id] = HaveSentRoom.previously(
from_token.stream_token.room_key,
room_configs[room_id].timeline_limit,
)
have_updated = True
if not have_updated:
return prev_connection_token
+106 -11
View File
@@ -39,7 +39,8 @@ from synapse.api.constants import (
)
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.handlers.sliding_sync import StateValues
from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
from synapse.http.servlet import validate_json_object
from synapse.rest.client import (
devices,
knock,
@@ -53,6 +54,7 @@ from synapse.rest.client import (
from synapse.server import HomeServer
from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StreamKeyType,
@@ -60,6 +62,7 @@ from synapse.types import (
UserID,
)
from synapse.types.handlers import SlidingSyncConfig
from synapse.types.rest.client import SlidingSyncBody
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -1357,6 +1360,22 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
"Expected `notifier.wait_for_events(...)` to be triggered"
)
def make_sync_config(
self, user: UserID, requester: Requester, content: JsonDict
) -> SlidingSyncConfig:
"""Helper function to turn a dict sync body to a sync config"""
body = validate_json_object(content, SlidingSyncBody)
sync_config = SlidingSyncConfig(
user=user,
requester=requester,
conn_id=body.conn_id,
lists=body.lists,
room_subscriptions=body.room_subscriptions,
extensions=body.extensions,
)
return sync_config
class SlidingSyncTestCase(SlidingSyncBase):
"""
@@ -4538,7 +4557,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "msg", tok=user1_tok)
timeline_limit = 5
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
@@ -4584,19 +4602,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
sync_config = self.make_sync_config(
user=requester.user, requester=requester, content=sync_body
)
parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
assert sync_config.lists
room_configs = {
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
}
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
room_configs=room_configs,
from_token=parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
@@ -4646,7 +4667,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "msg", tok=user1_tok)
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
@@ -4693,19 +4713,24 @@ class SlidingSyncTestCase(SlidingSyncBase):
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
sync_config = self.make_sync_config(
user=requester.user,
requester=requester,
conn_id=conn_id,
content=sync_body,
)
parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
assert sync_config.lists
room_configs = {
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
}
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
room_configs=room_configs,
from_token=parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
@@ -4915,6 +4940,76 @@ class SlidingSyncTestCase(SlidingSyncBase):
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
def test_increasing_timeline_range_sends_more_messages(self) -> None:
"""
Test that increasing the timeline limit via room subscriptions sends the
room down with more messages in a limited sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [[EventTypes.Create, ""]],
"timeline_limit": 1,
}
}
}
message_events = []
for _ in range(10):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
message_events.append(resp["event_id"])
# Make the first Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
room_response = response_body["rooms"][room_id1]
self.assertEqual(room_response["initial"], True)
self.assertEqual(room_response["limited"], True)
# We only expect the last message at first
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events[-1:],
room_response["timeline"],
)
# We also expect to get the create event state.
self.assertEqual(
[event["type"] for event in room_response["required_state"]],
[EventTypes.Create],
)
# Now do another request with a room subscription with an increased timeline limit
sync_body["room_subscriptions"] = {
room_id1: {
"required_state": [],
"timeline_limit": 10,
}
}
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
room_response = response_body["rooms"][room_id1]
self.assertNotIn("initial", room_response)
self.assertEqual(room_response["limited"], True)
# Now we expect all the messages
self.assertEqual(
[event["event_id"] for event in room_response["timeline"]],
message_events,
room_response["timeline"],
)
# We don't expect to get the room create down, as nothing has changed.
self.assertNotIn("required_state", room_response)
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""