diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3e1518f1f6..41c2267925 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -117,6 +117,7 @@ class FederationServer(FederationBase): self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() + self._room_member_handler = hs.get_room_member_handler() self._state_storage_controller = hs.get_storage_controllers().state @@ -620,6 +621,15 @@ class FederationServer(FederationBase): ) raise IncompatibleRoomVersionError(room_version=room_version) + # Refuse the request if that room has seen too many joins recently. + # This is in addition to the HS-level rate limiting applied by + # BaseFederationServlet. + # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?) + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + requester=None, + key=room_id, + update=False, + ) pdu = await self.handler.on_make_join_request(origin, room_id, user_id) return {"event": pdu.get_templated_pdu_json(), "room_version": room_version} @@ -654,6 +664,12 @@ class FederationServer(FederationBase): room_id: str, caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + requester=None, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) + event, context = await self._on_send_membership_event( origin, content, Membership.JOIN, room_id ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index de1f1c1d45..319c7a14e7 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -103,6 +103,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second, burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count, ) + # TODO: find a better place to keep this Ratelimiter. + # It needs to be + # - written to by something which can snoop on replication streams + # - read by the RoomMemberHandler to rate limit joins from local users + # - read by the FederationServer to rate limit make_joins and send_joins from + # other homeservers + # I wonder if a homeserver-wide collection of rate limiters might be cleaner? + self._join_rate_per_room_limiter = Ratelimiter( + store=self.store, + clock=self.clock, + rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second, + burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count, + ) self._invites_per_room_limiter = Ratelimiter( store=self.store, @@ -125,6 +138,18 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ) self.request_ratelimiter = hs.get_request_ratelimiter() + hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) + + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: + """Notify the rate limiter that a room join has occurred. + + Use this to inform the RoomMemberHandler about joins that have either + - taken place on another homeserver, or + - on another worker in this homeserver. + Joins actioned by this worker should use the usual `ratelimit` method, which + checks the limit and increments the counter in one go. + """ + self._join_rate_per_room_limiter.record_action(requester=None, key=room_id) @abc.abstractmethod async def _remote_join( @@ -378,6 +403,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # up blocking profile updates. if newly_joined and ratelimit: await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) result_event = await self.event_creation_handler.handle_new_client_event( requester, @@ -826,6 +856,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): await self._join_rate_limiter_remote.ratelimit( requester, ) + await self._join_rate_per_room_limiter.ratelimit( + requester, + key=room_id, + update=self.hs.persists_events_for_room(room_id), + ) inviter = await self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index c2b2588ea5..6af013cd0a 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, List, Tuple from twisted.web.server import Request +from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext @@ -72,6 +73,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self._notifier = hs.get_notifier() @staticmethod async def _serialize_payload( # type: ignore[override] @@ -138,6 +140,16 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id + ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room(event.event_id, event.room_id) + event = await self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users )