Ratelimit presence updates (#18000)
This commit is contained in:
1
changelog.d/18000.bugfix
Normal file
1
changelog.d/18000.bugfix
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Add rate limit `rc_presence.per_user`. This prevents load from excessive presence updates sent by clients via sync api. Also rate limit `/_matrix/client/v3/presence` as per the spec. Contributed by @rda0.
|
||||||
@@ -89,6 +89,11 @@ rc_invites:
|
|||||||
per_second: 1000
|
per_second: 1000
|
||||||
burst_count: 1000
|
burst_count: 1000
|
||||||
|
|
||||||
|
rc_presence:
|
||||||
|
per_user:
|
||||||
|
per_second: 9999
|
||||||
|
burst_count: 9999
|
||||||
|
|
||||||
federation_rr_transactions_per_room_per_second: 9999
|
federation_rr_transactions_per_room_per_second: 9999
|
||||||
|
|
||||||
allow_device_name_lookup_over_federation: true
|
allow_device_name_lookup_over_federation: true
|
||||||
|
|||||||
@@ -1868,6 +1868,27 @@ rc_federation:
|
|||||||
concurrent: 5
|
concurrent: 5
|
||||||
```
|
```
|
||||||
---
|
---
|
||||||
|
### `rc_presence`
|
||||||
|
|
||||||
|
This option sets ratelimiting for presence.
|
||||||
|
|
||||||
|
The `rc_presence.per_user` option sets rate limits on how often a specific
|
||||||
|
users' presence updates are evaluated. Ratelimited presence updates sent via sync are
|
||||||
|
ignored, and no error is returned to the client.
|
||||||
|
This option also sets the rate limit for the
|
||||||
|
[`PUT /_matrix/client/v3/presence/{userId}/status`](https://spec.matrix.org/latest/client-server-api/#put_matrixclientv3presenceuseridstatus)
|
||||||
|
endpoint.
|
||||||
|
|
||||||
|
`per_user` defaults to `per_second: 0.1`, `burst_count: 1`.
|
||||||
|
|
||||||
|
Example configuration:
|
||||||
|
```yaml
|
||||||
|
rc_presence:
|
||||||
|
per_user:
|
||||||
|
per_second: 0.05
|
||||||
|
burst_count: 0.5
|
||||||
|
```
|
||||||
|
---
|
||||||
### `federation_rr_transactions_per_room_per_second`
|
### `federation_rr_transactions_per_room_per_second`
|
||||||
|
|
||||||
Sets outgoing federation transaction frequency for sending read-receipts,
|
Sets outgoing federation transaction frequency for sending read-receipts,
|
||||||
|
|||||||
@@ -275,6 +275,7 @@ class Ratelimiter:
|
|||||||
update: bool = True,
|
update: bool = True,
|
||||||
n_actions: int = 1,
|
n_actions: int = 1,
|
||||||
_time_now_s: Optional[float] = None,
|
_time_now_s: Optional[float] = None,
|
||||||
|
pause: Optional[float] = 0.5,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Checks if an action can be performed. If not, raises a LimitExceededError
|
"""Checks if an action can be performed. If not, raises a LimitExceededError
|
||||||
|
|
||||||
@@ -298,6 +299,8 @@ class Ratelimiter:
|
|||||||
at all.
|
at all.
|
||||||
_time_now_s: The current time. Optional, defaults to the current time according
|
_time_now_s: The current time. Optional, defaults to the current time according
|
||||||
to self.clock. Only used by tests.
|
to self.clock. Only used by tests.
|
||||||
|
pause: Time in seconds to pause when an action is being limited. Defaults to 0.5
|
||||||
|
to stop clients from "tight-looping" on retrying their request.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
LimitExceededError: If an action could not be performed, along with the time in
|
LimitExceededError: If an action could not be performed, along with the time in
|
||||||
@@ -316,9 +319,8 @@ class Ratelimiter:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not allowed:
|
if not allowed:
|
||||||
# We pause for a bit here to stop clients from "tight-looping" on
|
if pause:
|
||||||
# retrying their request.
|
await self.clock.sleep(pause)
|
||||||
await self.clock.sleep(0.5)
|
|
||||||
|
|
||||||
raise LimitExceededError(
|
raise LimitExceededError(
|
||||||
limiter_name=self._limiter_name,
|
limiter_name=self._limiter_name,
|
||||||
|
|||||||
@@ -228,3 +228,9 @@ class RatelimitConfig(Config):
|
|||||||
config.get("remote_media_download_burst_count", "500M")
|
config.get("remote_media_download_burst_count", "500M")
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.rc_presence_per_user = RatelimitSettings.parse(
|
||||||
|
config,
|
||||||
|
"rc_presence.per_user",
|
||||||
|
defaults={"per_second": 0.1, "burst_count": 1},
|
||||||
|
)
|
||||||
|
|||||||
@@ -24,7 +24,8 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Tuple
|
from typing import TYPE_CHECKING, Tuple
|
||||||
|
|
||||||
from synapse.api.errors import AuthError, SynapseError
|
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
|
||||||
|
from synapse.api.ratelimiting import Ratelimiter
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
from synapse.http.server import HttpServer
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||||
@@ -48,6 +49,14 @@ class PresenceStatusRestServlet(RestServlet):
|
|||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
|
self.store = hs.get_datastores().main
|
||||||
|
|
||||||
|
# Ratelimiter for presence updates, keyed by requester.
|
||||||
|
self._presence_per_user_limiter = Ratelimiter(
|
||||||
|
store=self.store,
|
||||||
|
clock=self.clock,
|
||||||
|
cfg=hs.config.ratelimiting.rc_presence_per_user,
|
||||||
|
)
|
||||||
|
|
||||||
async def on_GET(
|
async def on_GET(
|
||||||
self, request: SynapseRequest, user_id: str
|
self, request: SynapseRequest, user_id: str
|
||||||
@@ -82,6 +91,17 @@ class PresenceStatusRestServlet(RestServlet):
|
|||||||
if requester.user != user:
|
if requester.user != user:
|
||||||
raise AuthError(403, "Can only set your own presence state")
|
raise AuthError(403, "Can only set your own presence state")
|
||||||
|
|
||||||
|
# ignore the presence update if the ratelimit is exceeded
|
||||||
|
try:
|
||||||
|
await self._presence_per_user_limiter.ratelimit(requester)
|
||||||
|
except LimitExceededError as e:
|
||||||
|
logger.debug("User presence ratelimit exceeded; ignoring it.")
|
||||||
|
return 429, {
|
||||||
|
"errcode": Codes.LIMIT_EXCEEDED,
|
||||||
|
"error": "Too many requests",
|
||||||
|
"retry_after_ms": e.retry_after_ms,
|
||||||
|
}
|
||||||
|
|
||||||
state = {}
|
state = {}
|
||||||
|
|
||||||
content = parse_json_object_from_request(request)
|
content = parse_json_object_from_request(request)
|
||||||
|
|||||||
@@ -24,9 +24,10 @@ from collections import defaultdict
|
|||||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
|
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
|
||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
|
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
|
||||||
from synapse.api.errors import Codes, StoreError, SynapseError
|
from synapse.api.errors import Codes, LimitExceededError, StoreError, SynapseError
|
||||||
from synapse.api.filtering import FilterCollection
|
from synapse.api.filtering import FilterCollection
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
|
from synapse.api.ratelimiting import Ratelimiter
|
||||||
from synapse.events.utils import (
|
from synapse.events.utils import (
|
||||||
SerializeEventConfig,
|
SerializeEventConfig,
|
||||||
format_event_for_client_v2_without_room_id,
|
format_event_for_client_v2_without_room_id,
|
||||||
@@ -126,6 +127,13 @@ class SyncRestServlet(RestServlet):
|
|||||||
cache_name="sync_valid_filter",
|
cache_name="sync_valid_filter",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Ratelimiter for presence updates, keyed by requester.
|
||||||
|
self._presence_per_user_limiter = Ratelimiter(
|
||||||
|
store=self.store,
|
||||||
|
clock=self.clock,
|
||||||
|
cfg=hs.config.ratelimiting.rc_presence_per_user,
|
||||||
|
)
|
||||||
|
|
||||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||||
# This will always be set by the time Twisted calls us.
|
# This will always be set by the time Twisted calls us.
|
||||||
assert request.args is not None
|
assert request.args is not None
|
||||||
@@ -239,7 +247,14 @@ class SyncRestServlet(RestServlet):
|
|||||||
# send any outstanding server notices to the user.
|
# send any outstanding server notices to the user.
|
||||||
await self._server_notices_sender.on_user_syncing(user.to_string())
|
await self._server_notices_sender.on_user_syncing(user.to_string())
|
||||||
|
|
||||||
affect_presence = set_presence != PresenceState.OFFLINE
|
# ignore the presence update if the ratelimit is exceeded but do not pause the request
|
||||||
|
try:
|
||||||
|
await self._presence_per_user_limiter.ratelimit(requester, pause=0.0)
|
||||||
|
except LimitExceededError:
|
||||||
|
affect_presence = False
|
||||||
|
logger.debug("User set_presence ratelimit exceeded; ignoring it.")
|
||||||
|
else:
|
||||||
|
affect_presence = set_presence != PresenceState.OFFLINE
|
||||||
|
|
||||||
context = await self.presence_handler.user_syncing(
|
context = await self.presence_handler.user_syncing(
|
||||||
user.to_string(),
|
user.to_string(),
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ from synapse.handlers.presence import (
|
|||||||
handle_update,
|
handle_update,
|
||||||
)
|
)
|
||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import room
|
from synapse.rest.client import login, room, sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.database import LoggingDatabaseConnection
|
from synapse.storage.database import LoggingDatabaseConnection
|
||||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
@@ -53,10 +53,15 @@ from synapse.util import Clock
|
|||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||||
|
from tests.unittest import override_config
|
||||||
|
|
||||||
|
|
||||||
class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
servlets = [admin.register_servlets]
|
servlets = [
|
||||||
|
admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
sync.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
def prepare(
|
def prepare(
|
||||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||||
@@ -425,6 +430,102 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
wheel_timer.insert.assert_not_called()
|
wheel_timer.insert.assert_not_called()
|
||||||
|
|
||||||
|
# `rc_presence` is set very high during unit tests to avoid ratelimiting
|
||||||
|
# subtly impacting unrelated tests. We set the ratelimiting back to a
|
||||||
|
# reasonable value for the tests specific to presence ratelimiting.
|
||||||
|
@override_config(
|
||||||
|
{"rc_presence": {"per_user": {"per_second": 0.1, "burst_count": 1}}}
|
||||||
|
)
|
||||||
|
def test_over_ratelimit_offline_to_online_to_unavailable(self) -> None:
|
||||||
|
"""
|
||||||
|
Send a presence update, check that it went through, immediately send another one and
|
||||||
|
check that it was ignored.
|
||||||
|
"""
|
||||||
|
self._test_ratelimit_offline_to_online_to_unavailable(ratelimited=True)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"rc_presence": {"per_user": {"per_second": 0.1, "burst_count": 1}}}
|
||||||
|
)
|
||||||
|
def test_within_ratelimit_offline_to_online_to_unavailable(self) -> None:
|
||||||
|
"""
|
||||||
|
Send a presence update, check that it went through, advancing time a sufficient amount,
|
||||||
|
send another presence update and check that it also worked.
|
||||||
|
"""
|
||||||
|
self._test_ratelimit_offline_to_online_to_unavailable(ratelimited=False)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"rc_presence": {"per_user": {"per_second": 0.1, "burst_count": 1}}}
|
||||||
|
)
|
||||||
|
def _test_ratelimit_offline_to_online_to_unavailable(
|
||||||
|
self, ratelimited: bool
|
||||||
|
) -> None:
|
||||||
|
"""Test rate limit for presence updates sent with sync requests.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ratelimited: Test rate limited case.
|
||||||
|
"""
|
||||||
|
wheel_timer = Mock()
|
||||||
|
user_id = "@user:pass"
|
||||||
|
now = 5000000
|
||||||
|
sync_url = "/sync?access_token=%s&set_presence=%s"
|
||||||
|
|
||||||
|
# Register the user who syncs presence
|
||||||
|
user_id = self.register_user("user", "pass")
|
||||||
|
access_token = self.login("user", "pass")
|
||||||
|
|
||||||
|
# Get the handler (which kicks off a bunch of timers).
|
||||||
|
presence_handler = self.hs.get_presence_handler()
|
||||||
|
|
||||||
|
# Ensure the user is initially offline.
|
||||||
|
prev_state = UserPresenceState.default(user_id)
|
||||||
|
new_state = prev_state.copy_and_replace(
|
||||||
|
state=PresenceState.OFFLINE, last_active_ts=now
|
||||||
|
)
|
||||||
|
|
||||||
|
state, persist_and_notify, federation_ping = handle_update(
|
||||||
|
prev_state,
|
||||||
|
new_state,
|
||||||
|
is_mine=True,
|
||||||
|
wheel_timer=wheel_timer,
|
||||||
|
now=now,
|
||||||
|
persist=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that the user is offline.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.OFFLINE)
|
||||||
|
|
||||||
|
# Send sync request with set_presence=online.
|
||||||
|
channel = self.make_request("GET", sync_url % (access_token, "online"))
|
||||||
|
self.assertEqual(200, channel.code)
|
||||||
|
|
||||||
|
# Assert the user is now online.
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(user_id))
|
||||||
|
)
|
||||||
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||||
|
|
||||||
|
if not ratelimited:
|
||||||
|
# Advance time a sufficient amount to avoid rate limiting.
|
||||||
|
self.reactor.advance(30)
|
||||||
|
|
||||||
|
# Send another sync request with set_presence=unavailable.
|
||||||
|
channel = self.make_request("GET", sync_url % (access_token, "unavailable"))
|
||||||
|
self.assertEqual(200, channel.code)
|
||||||
|
|
||||||
|
state = self.get_success(
|
||||||
|
presence_handler.get_state(UserID.from_string(user_id))
|
||||||
|
)
|
||||||
|
|
||||||
|
if ratelimited:
|
||||||
|
# Assert the user is still online and presence update was ignored.
|
||||||
|
self.assertEqual(state.state, PresenceState.ONLINE)
|
||||||
|
else:
|
||||||
|
# Assert the user is now unavailable.
|
||||||
|
self.assertEqual(state.state, PresenceState.UNAVAILABLE)
|
||||||
|
|
||||||
|
|
||||||
class PresenceTimeoutTestCase(unittest.TestCase):
|
class PresenceTimeoutTestCase(unittest.TestCase):
|
||||||
"""Tests different timers and that the timer does not change `status_msg` of user."""
|
"""Tests different timers and that the timer does not change `status_msg` of user."""
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ from synapse.types import UserID
|
|||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
from tests.unittest import override_config
|
||||||
|
|
||||||
|
|
||||||
class PresenceTestCase(unittest.HomeserverTestCase):
|
class PresenceTestCase(unittest.HomeserverTestCase):
|
||||||
@@ -95,3 +96,54 @@ class PresenceTestCase(unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
self.assertEqual(channel.code, HTTPStatus.OK)
|
self.assertEqual(channel.code, HTTPStatus.OK)
|
||||||
self.assertEqual(self.presence_handler.set_state.call_count, 0)
|
self.assertEqual(self.presence_handler.set_state.call_count, 0)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"rc_presence": {"per_user": {"per_second": 0.1, "burst_count": 1}}}
|
||||||
|
)
|
||||||
|
def test_put_presence_over_ratelimit(self) -> None:
|
||||||
|
"""
|
||||||
|
Multiple PUTs to the status endpoint without sufficient delay will be rate limited.
|
||||||
|
"""
|
||||||
|
self.hs.config.server.presence_enabled = True
|
||||||
|
|
||||||
|
body = {"presence": "here", "status_msg": "beep boop"}
|
||||||
|
channel = self.make_request(
|
||||||
|
"PUT", "/presence/%s/status" % (self.user_id,), body
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.OK)
|
||||||
|
|
||||||
|
body = {"presence": "here", "status_msg": "beep boop"}
|
||||||
|
channel = self.make_request(
|
||||||
|
"PUT", "/presence/%s/status" % (self.user_id,), body
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.TOO_MANY_REQUESTS)
|
||||||
|
self.assertEqual(self.presence_handler.set_state.call_count, 1)
|
||||||
|
|
||||||
|
@override_config(
|
||||||
|
{"rc_presence": {"per_user": {"per_second": 0.1, "burst_count": 1}}}
|
||||||
|
)
|
||||||
|
def test_put_presence_within_ratelimit(self) -> None:
|
||||||
|
"""
|
||||||
|
Multiple PUTs to the status endpoint with sufficient delay should all call set_state.
|
||||||
|
"""
|
||||||
|
self.hs.config.server.presence_enabled = True
|
||||||
|
|
||||||
|
body = {"presence": "here", "status_msg": "beep boop"}
|
||||||
|
channel = self.make_request(
|
||||||
|
"PUT", "/presence/%s/status" % (self.user_id,), body
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.OK)
|
||||||
|
|
||||||
|
# Advance time a sufficient amount to avoid rate limiting.
|
||||||
|
self.reactor.advance(30)
|
||||||
|
|
||||||
|
body = {"presence": "here", "status_msg": "beep boop"}
|
||||||
|
channel = self.make_request(
|
||||||
|
"PUT", "/presence/%s/status" % (self.user_id,), body
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(channel.code, HTTPStatus.OK)
|
||||||
|
self.assertEqual(self.presence_handler.set_state.call_count, 2)
|
||||||
|
|||||||
@@ -200,6 +200,7 @@ def default_config(
|
|||||||
"per_user": {"per_second": 10000, "burst_count": 10000},
|
"per_user": {"per_second": 10000, "burst_count": 10000},
|
||||||
},
|
},
|
||||||
"rc_3pid_validation": {"per_second": 10000, "burst_count": 10000},
|
"rc_3pid_validation": {"per_second": 10000, "burst_count": 10000},
|
||||||
|
"rc_presence": {"per_user": {"per_second": 10000, "burst_count": 10000}},
|
||||||
"saml2_enabled": False,
|
"saml2_enabled": False,
|
||||||
"public_baseurl": None,
|
"public_baseurl": None,
|
||||||
"default_identity_server": None,
|
"default_identity_server": None,
|
||||||
|
|||||||
Reference in New Issue
Block a user