1
0

Track per-room join rates actioned by this worker

and consult it when actioning joins.
Only bump rate limit if we will persist the event; otherwise we'll see
it over replication
This commit is contained in:
David Robertson
2022-06-29 12:28:53 +01:00
parent 4230112526
commit 6b47e82ca2
3 changed files with 63 additions and 0 deletions

View File

@@ -117,6 +117,7 @@ class FederationServer(FederationBase):
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()
self._state_storage_controller = hs.get_storage_controllers().state
@@ -620,6 +621,15 @@ class FederationServer(FederationBase):
)
raise IncompatibleRoomVersionError(room_version=room_version)
# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
@@ -654,6 +664,12 @@ class FederationServer(FederationBase):
room_id: str,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)
event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
)

View File

@@ -103,6 +103,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
# - written to by something which can snoop on replication streams
# - read by the RoomMemberHandler to rate limit joins from local users
# - read by the FederationServer to rate limit make_joins and send_joins from
# other homeservers
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
)
self._invites_per_room_limiter = Ratelimiter(
store=self.store,
@@ -125,6 +138,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
)
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Use this to inform the RoomMemberHandler about joins that have either
- taken place on another homeserver, or
- on another worker in this homeserver.
Joins actioned by this worker should use the usual `ratelimit` method, which
checks the limit and increments the counter in one go.
"""
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)
@abc.abstractmethod
async def _remote_join(
@@ -378,6 +403,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
@@ -826,6 +856,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
await self._join_rate_limiter_remote.ratelimit(
requester,
)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)
inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):

View File

@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, List, Tuple
from twisted.web.server import Request
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
@@ -72,6 +73,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self._notifier = hs.get_notifier()
@staticmethod
async def _serialize_payload( # type: ignore[override]
@@ -138,6 +140,16 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
_,
) = await self.store.get_local_current_membership_for_user_in_room(
event.state_key, event.room_id
)
if current_membership != Membership.JOIN:
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
event = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)