Update rate limiter in the event persister logic
Simpler, cleaner, faster, stronger.
This commit is contained in:
@@ -667,7 +667,7 @@ class FederationServer(FederationBase):
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
update=self.hs.persists_events_for_room(room_id),
|
||||
update=False,
|
||||
)
|
||||
|
||||
event, context = await self._on_send_membership_event(
|
||||
|
||||
@@ -461,6 +461,7 @@ class EventCreationHandler:
|
||||
)
|
||||
self._events_shard_config = self.config.worker.events_shard_config
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
|
||||
|
||||
@@ -1511,6 +1512,18 @@ class EventCreationHandler:
|
||||
original_event and event.sender != original_event.sender
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
(
|
||||
current_membership,
|
||||
_,
|
||||
) = await self.store.get_local_current_membership_for_user_in_room(
|
||||
event.state_key, event.room_id
|
||||
)
|
||||
if current_membership != Membership.JOIN:
|
||||
self._notifier.notify_user_joined_room(
|
||||
event.event_id, event.room_id
|
||||
)
|
||||
|
||||
await self.request_ratelimiter.ratelimit(
|
||||
requester, is_admin_redaction=is_admin_redaction
|
||||
)
|
||||
|
||||
@@ -105,6 +105,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
)
|
||||
# TODO: find a better place to keep this Ratelimiter.
|
||||
# It needs to be
|
||||
# - written to by event persistence code
|
||||
# - written to by something which can snoop on replication streams
|
||||
# - read by the RoomMemberHandler to rate limit joins from local users
|
||||
# - read by the FederationServer to rate limit make_joins and send_joins from
|
||||
@@ -404,9 +405,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
if newly_joined and ratelimit:
|
||||
await self._join_rate_limiter_local.ratelimit(requester)
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
key=room_id,
|
||||
update=self.hs.persists_events_for_room(room_id),
|
||||
requester, key=room_id, update=False
|
||||
)
|
||||
|
||||
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||
@@ -859,7 +858,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
await self._join_rate_per_room_limiter.ratelimit(
|
||||
requester,
|
||||
key=room_id,
|
||||
update=self.hs.persists_events_for_room(room_id),
|
||||
update=False,
|
||||
)
|
||||
|
||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||
|
||||
@@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.snapshot import EventContext
|
||||
@@ -73,7 +72,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
@@ -140,16 +138,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
(
|
||||
current_membership,
|
||||
_,
|
||||
) = await self.store.get_local_current_membership_for_user_in_room(
|
||||
event.state_key, event.room_id
|
||||
)
|
||||
if current_membership != Membership.JOIN:
|
||||
self._notifier.notify_user_joined_room(event.event_id, event.room_id)
|
||||
|
||||
event = await self.event_creation_handler.persist_and_notify_client_event(
|
||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||
)
|
||||
|
||||
@@ -827,12 +827,3 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self.config.ratelimiting.rc_message,
|
||||
self.config.ratelimiting.rc_admin_redaction,
|
||||
)
|
||||
|
||||
def persists_events_for_room(self, room_id: str) -> bool:
|
||||
"""Is this worker responsible for persisting events in the given room?
|
||||
|
||||
Or does it ask another worker to do that for us?"""
|
||||
return (
|
||||
self.get_instance_name()
|
||||
== self.config.worker.events_shard_config.get_instance(room_id)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user