Compare commits

...

6 Commits

Author SHA1 Message Date
Mathieu Velten
7af1d98df0 Update synapse/storage/databases/main/deviceinbox.py
Co-authored-by: Erik Johnston <erik@matrix.org>
2022-08-03 14:47:26 +02:00
Mathieu Velten
4cc2f3e990 Use org.matrix.unstable prefix 2022-08-03 14:46:46 +02:00
Mathieu Velten
ea8cf6edcf Type mistake 2022-08-02 15:23:52 +02:00
Mathieu Velten
5de1f166f9 Address comment 2022-08-02 15:18:57 +02:00
Mathieu Velten
f14bb8768a Apply suggestions from code review
Co-authored-by: Erik Johnston <erik@matrix.org>
2022-08-02 15:08:12 +02:00
Mathieu Velten
dfca02b6de Add do_not_use_to_device_limit to sync filter 2022-08-02 15:08:12 +02:00
6 changed files with 155 additions and 2 deletions

View File

@@ -0,0 +1 @@
Add `org.matrix.unstable.to_device_limit` field to sync filter. This is an experiment to see if it improves sync performance.

View File

@@ -134,6 +134,9 @@ USER_FILTER_SCHEMA = {
"pattern": r"^((?!\\\\).)*$",
},
},
# This is an experiment, a MSC will follow if it happens to be useful
# for clients sync performance
"org.matrix.unstable.to_device_limit": {"type": "number"},
},
"additionalProperties": False,
}
@@ -219,6 +222,15 @@ class FilterCollection:
self.event_fields = filter_json.get("event_fields", [])
self.event_format = filter_json.get("event_format", "client")
self.to_device_limit = 100
if hs.config.experimental.to_device_limit_enabled:
self.to_device_limit = filter_json.get(
"org.matrix.unstable.to_device_limit", 100
)
# We don't want to overload the server so let's limit it to under a thousand
if self.to_device_limit > 1000:
self.to_device_limit = 1000
def __repr__(self) -> str:
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)

View File

@@ -90,3 +90,9 @@ class ExperimentalConfig(Config):
# MSC3848: Introduce errcodes for specific event sending failures
self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
# Experimental feature to optimize client sync performance
# Will become a proper MSC if it appears to be useful
self.to_device_limit_enabled: bool = experimental.get(
"to_device_limit_enabled", False
)

View File

@@ -1329,7 +1329,11 @@ class SyncHandler:
if device_id is not None and since_stream_id != int(now_token.to_device_key):
messages, stream_id = await self.store.get_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
user_id,
device_id,
since_stream_id,
now_token.to_device_key,
sync_result_builder.sync_config.filter_collection.to_device_limit,
)
for message in messages:

View File

@@ -218,6 +218,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
* The last-processed stream ID. Subsequent calls of this function with the
same device should pass this value as 'from_stream_id'.
"""
if limit == 0:
# We return the from token so that if a sync later on asks for
# non-zero number of to-device messages we won't have dropped any.
return [], from_stream_id
(
user_id_device_id_to_messages,
last_processed_stream_id,

View File

@@ -28,7 +28,16 @@ from synapse.api.constants import (
ReceiptTypes,
RelationTypes,
)
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.rest.client import (
devices,
knock,
login,
read_marker,
receipts,
room,
sendtodevice,
sync,
)
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
@@ -948,3 +957,119 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"])
self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"])
class ToDeviceLimitTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
sendtodevice.register_servlets,
sync.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.next_batch = "s0"
# Register the first user (used to check the received to_device messages).
self.user_id = self.register_user("kermit", "monkey")
self.tok = self.login("kermit", "monkey")
# Register the second user (used to send to_device messages to user1device).
self.user2 = self.register_user("kermit2", "monkey")
self.tok2 = self.login("kermit2", "monkey")
self.tx_id = 0
# This will send one to_device message from kermit device to all kermit2 devices
def _send_to_device(self) -> None:
self.tx_id += 1
chan = self.make_request(
"PUT",
f"/_matrix/client/v3/sendToDevice/m.test/{self.tx_id}",
content={"messages": {self.user_id: {"*": {"tx_id": self.tx_id}}}},
access_token=self.tok2,
)
self.assertEqual(chan.code, 200, chan.result)
# This does an incremental sync for user kermit with org.matrix.unstable.to_device_limit
# setted and check the number of returned to_device msgs against
# expected_to_device_msgs value
def _limited_sync_and_check(
self, to_device_limit: int, expected_to_device_msgs: int
) -> None:
channel = self.make_request(
"GET",
f'/sync?since={self.next_batch}&filter={{"org.matrix.unstable.to_device_limit": {to_device_limit}}}',
access_token=self.tok,
)
self.assertEqual(channel.code, 200)
self.next_batch = channel.json_body["next_batch"]
if expected_to_device_msgs > 0:
self.assertIn("to_device", channel.json_body)
self.assertIn("events", channel.json_body["to_device"])
self.assertEqual(
expected_to_device_msgs, len(channel.json_body["to_device"]["events"])
)
def test_to_device(self) -> None:
"""Tests that to_device messages are correctly flowing to sync,
and that to_device_limit is ignored when the experimetal feature is not enabled.
"""
channel = self.make_request(
"GET",
"/sync",
access_token=self.tok,
)
self.assertEqual(channel.code, 200)
self.next_batch = channel.json_body["next_batch"]
for _ in range(4):
self._send_to_device()
# 100 is the default limit, we should get back our 4 messages
self._limited_sync_and_check(100, 4)
for _ in range(4):
self._send_to_device()
# limit of 3 is used but the experimental feature is not enabled,
# so we are still expecting 4 messages
self._limited_sync_and_check(3, 4)
@override_config(
{
"experimental_features": {
"to_device_limit_enabled": True,
}
}
)
def test_to_device_limit(self) -> None:
"""Tests that to_device messages are correctly batched in incremental syncs
according to the specified to_device_limit. The limit can change between sync calls.
"""
channel = self.make_request(
"GET",
"/sync",
access_token=self.tok,
)
self.assertEqual(channel.code, 200)
self.next_batch = channel.json_body["next_batch"]
for _ in range(8):
self._send_to_device()
self._limited_sync_and_check(3, 3)
self._limited_sync_and_check(4, 4)
self._limited_sync_and_check(0, 0)
self._limited_sync_and_check(3, 1)
self._limited_sync_and_check(3, 0)
for _ in range(1100):
self._send_to_device()
# This tests the hardcoded 1000 limit used to avoid
# overloading a server
self._limited_sync_and_check(2000, 1000)
self._limited_sync_and_check(2000, 100)