Compare commits

...

1 Commits

Author SHA1 Message Date
Richard van der Hoff
209cfab7b2 Limit the number of batches of incoming events we process at once 2025-04-08 16:52:02 +01:00

View File

@@ -82,7 +82,10 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
@@ -129,6 +132,7 @@ last_pdu_ts_metric = Gauge(
# federation.
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"
_MAX_STAGED_EVENT_HANDLERS = 2
class FederationServer(FederationBase):
def __init__(self, hs: "HomeServer"):
@@ -185,6 +189,8 @@ class FederationServer(FederationBase):
# Whether we have started handling old events in the staging area.
self._started_handling_of_staged_events = False
self._staged_event_handler_count = 0
@wrap_as_background_process("_handle_old_staged_events")
async def _handle_old_staged_events(self) -> None:
"""Handle old staged events by fetching all rooms that have staged
@@ -201,18 +207,10 @@ class FederationServer(FederationBase):
for room_id in room_ids:
room_version = await self.store.get_room_version(room_id)
# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
self._process_inbound_pdus_in_room(
room_id,
room_version,
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
self._process_incoming_pdus_in_room_inner(
room_id,
room_version,
lock,
)
# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))
@@ -1164,15 +1162,10 @@ class FederationServer(FederationBase):
# Add the event to our staging area
await self.store.insert_received_event_to_staging(origin, pdu)
# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
# Consider starting a background process for handling the events in the room.
self._process_inbound_pdus_in_room(
pdu.room_id, room_version, origin, pdu
)
if lock:
self._process_incoming_pdus_in_room_inner(
pdu.room_id, room_version, lock, origin, pdu
)
async def _get_next_nonspam_staged_event_for_room(
self, room_id: str, room_version: RoomVersion
@@ -1211,20 +1204,54 @@ class FederationServer(FederationBase):
return next
@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
def _process_inbound_pdus_in_room(
self,
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: Optional[str] = None,
latest_event: Optional[EventBase] = None,
) -> None:
"""Process events in the staging area for the given room.
"""Start off a background process to take the inter-process lock and process
events in the staging area for the given room.
The latest_origin and latest_event args are the latest origin and event
received (or None to simply pull the next event from the database).
"""
if self._staged_event_handler_count >= _MAX_STAGED_EVENT_HANDLERS:
logger.info("Not yet processing staged inbound events in room %s: we already have %i active processors",
room_id, self._staged_event_handler_count)
# We'll pick up this event next time _handle_old_staged_events runs
return
async def inner() -> None:
self._staged_event_handler_count += 1
try:
# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
if not lock:
# someone else has the lock on this room and will process these events
return
await self._process_inbound_pdus_in_room_inner(
room_id, room_version, lock, latest_origin, latest_event
)
finally:
self._staged_event_handler_count -= 1
run_as_background_process("process_inbound_pdus_in_room", inner)
async def _process_inbound_pdus_in_room_inner(
self,
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: Optional[str],
latest_event: Optional[EventBase],
) -> None:
logger.info("Handling staged inbound events in %s", room_id)
# The common path is for the event we just received be the only event in
# the room, so instead of pulling the event out of the DB and parsing