Compare commits
6 Commits
v1.140.0rc
...
mv/sync-to
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7af1d98df0 | ||
|
|
4cc2f3e990 | ||
|
|
ea8cf6edcf | ||
|
|
5de1f166f9 | ||
|
|
f14bb8768a | ||
|
|
dfca02b6de |
1
changelog.d/13412.feature
Normal file
1
changelog.d/13412.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add `org.matrix.unstable.to_device_limit` field to sync filter. This is an experiment to see if it improves sync performance.
|
||||
@@ -134,6 +134,9 @@ USER_FILTER_SCHEMA = {
|
||||
"pattern": r"^((?!\\\\).)*$",
|
||||
},
|
||||
},
|
||||
# This is an experiment, a MSC will follow if it happens to be useful
|
||||
# for clients sync performance
|
||||
"org.matrix.unstable.to_device_limit": {"type": "number"},
|
||||
},
|
||||
"additionalProperties": False,
|
||||
}
|
||||
@@ -219,6 +222,15 @@ class FilterCollection:
|
||||
self.event_fields = filter_json.get("event_fields", [])
|
||||
self.event_format = filter_json.get("event_format", "client")
|
||||
|
||||
self.to_device_limit = 100
|
||||
if hs.config.experimental.to_device_limit_enabled:
|
||||
self.to_device_limit = filter_json.get(
|
||||
"org.matrix.unstable.to_device_limit", 100
|
||||
)
|
||||
# We don't want to overload the server so let's limit it to under a thousand
|
||||
if self.to_device_limit > 1000:
|
||||
self.to_device_limit = 1000
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
|
||||
|
||||
|
||||
@@ -90,3 +90,9 @@ class ExperimentalConfig(Config):
|
||||
|
||||
# MSC3848: Introduce errcodes for specific event sending failures
|
||||
self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
|
||||
|
||||
# Experimental feature to optimize client sync performance
|
||||
# Will become a proper MSC if it appears to be useful
|
||||
self.to_device_limit_enabled: bool = experimental.get(
|
||||
"to_device_limit_enabled", False
|
||||
)
|
||||
|
||||
@@ -1329,7 +1329,11 @@ class SyncHandler:
|
||||
|
||||
if device_id is not None and since_stream_id != int(now_token.to_device_key):
|
||||
messages, stream_id = await self.store.get_messages_for_device(
|
||||
user_id, device_id, since_stream_id, now_token.to_device_key
|
||||
user_id,
|
||||
device_id,
|
||||
since_stream_id,
|
||||
now_token.to_device_key,
|
||||
sync_result_builder.sync_config.filter_collection.to_device_limit,
|
||||
)
|
||||
|
||||
for message in messages:
|
||||
|
||||
@@ -218,6 +218,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
* The last-processed stream ID. Subsequent calls of this function with the
|
||||
same device should pass this value as 'from_stream_id'.
|
||||
"""
|
||||
if limit == 0:
|
||||
# We return the from token so that if a sync later on asks for
|
||||
# non-zero number of to-device messages we won't have dropped any.
|
||||
return [], from_stream_id
|
||||
|
||||
(
|
||||
user_id_device_id_to_messages,
|
||||
last_processed_stream_id,
|
||||
|
||||
@@ -28,7 +28,16 @@ from synapse.api.constants import (
|
||||
ReceiptTypes,
|
||||
RelationTypes,
|
||||
)
|
||||
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
|
||||
from synapse.rest.client import (
|
||||
devices,
|
||||
knock,
|
||||
login,
|
||||
read_marker,
|
||||
receipts,
|
||||
room,
|
||||
sendtodevice,
|
||||
sync,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
@@ -948,3 +957,119 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"])
|
||||
self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"])
|
||||
|
||||
|
||||
class ToDeviceLimitTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
sendtodevice.register_servlets,
|
||||
sync.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.next_batch = "s0"
|
||||
|
||||
# Register the first user (used to check the received to_device messages).
|
||||
self.user_id = self.register_user("kermit", "monkey")
|
||||
self.tok = self.login("kermit", "monkey")
|
||||
|
||||
# Register the second user (used to send to_device messages to user1device).
|
||||
self.user2 = self.register_user("kermit2", "monkey")
|
||||
self.tok2 = self.login("kermit2", "monkey")
|
||||
|
||||
self.tx_id = 0
|
||||
|
||||
# This will send one to_device message from kermit device to all kermit2 devices
|
||||
def _send_to_device(self) -> None:
|
||||
self.tx_id += 1
|
||||
chan = self.make_request(
|
||||
"PUT",
|
||||
f"/_matrix/client/v3/sendToDevice/m.test/{self.tx_id}",
|
||||
content={"messages": {self.user_id: {"*": {"tx_id": self.tx_id}}}},
|
||||
access_token=self.tok2,
|
||||
)
|
||||
self.assertEqual(chan.code, 200, chan.result)
|
||||
|
||||
# This does an incremental sync for user kermit with org.matrix.unstable.to_device_limit
|
||||
# setted and check the number of returned to_device msgs against
|
||||
# expected_to_device_msgs value
|
||||
def _limited_sync_and_check(
|
||||
self, to_device_limit: int, expected_to_device_msgs: int
|
||||
) -> None:
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
f'/sync?since={self.next_batch}&filter={{"org.matrix.unstable.to_device_limit": {to_device_limit}}}',
|
||||
access_token=self.tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.next_batch = channel.json_body["next_batch"]
|
||||
|
||||
if expected_to_device_msgs > 0:
|
||||
self.assertIn("to_device", channel.json_body)
|
||||
self.assertIn("events", channel.json_body["to_device"])
|
||||
self.assertEqual(
|
||||
expected_to_device_msgs, len(channel.json_body["to_device"]["events"])
|
||||
)
|
||||
|
||||
def test_to_device(self) -> None:
|
||||
"""Tests that to_device messages are correctly flowing to sync,
|
||||
and that to_device_limit is ignored when the experimetal feature is not enabled.
|
||||
"""
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/sync",
|
||||
access_token=self.tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.next_batch = channel.json_body["next_batch"]
|
||||
|
||||
for _ in range(4):
|
||||
self._send_to_device()
|
||||
|
||||
# 100 is the default limit, we should get back our 4 messages
|
||||
self._limited_sync_and_check(100, 4)
|
||||
|
||||
for _ in range(4):
|
||||
self._send_to_device()
|
||||
|
||||
# limit of 3 is used but the experimental feature is not enabled,
|
||||
# so we are still expecting 4 messages
|
||||
self._limited_sync_and_check(3, 4)
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"experimental_features": {
|
||||
"to_device_limit_enabled": True,
|
||||
}
|
||||
}
|
||||
)
|
||||
def test_to_device_limit(self) -> None:
|
||||
"""Tests that to_device messages are correctly batched in incremental syncs
|
||||
according to the specified to_device_limit. The limit can change between sync calls.
|
||||
"""
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/sync",
|
||||
access_token=self.tok,
|
||||
)
|
||||
self.assertEqual(channel.code, 200)
|
||||
self.next_batch = channel.json_body["next_batch"]
|
||||
|
||||
for _ in range(8):
|
||||
self._send_to_device()
|
||||
|
||||
self._limited_sync_and_check(3, 3)
|
||||
self._limited_sync_and_check(4, 4)
|
||||
self._limited_sync_and_check(0, 0)
|
||||
self._limited_sync_and_check(3, 1)
|
||||
|
||||
self._limited_sync_and_check(3, 0)
|
||||
|
||||
for _ in range(1100):
|
||||
self._send_to_device()
|
||||
|
||||
# This tests the hardcoded 1000 limit used to avoid
|
||||
# overloading a server
|
||||
self._limited_sync_and_check(2000, 1000)
|
||||
self._limited_sync_and_check(2000, 100)
|
||||
|
||||
Reference in New Issue
Block a user