Calculate the stream_ordering from newest -> oldest (in the correct order) and persist in the oldest -> newest to get the least missing prev_event fetch thrashing
This commit is contained in:
@@ -31,7 +31,7 @@ federation_ip_range_blacklist: []
|
||||
# Disable server rate-limiting
|
||||
rc_federation:
|
||||
window_size: 1000
|
||||
sleep_limit: 10
|
||||
sleep_limit: 99999
|
||||
sleep_delay: 500
|
||||
reject_limit: 99999
|
||||
concurrent: 3
|
||||
|
||||
@@ -137,6 +137,7 @@ class FederationEventHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self._store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._state_storage_controller = self._storage_controllers.state
|
||||
@@ -644,9 +645,39 @@ class FederationEventHandler:
|
||||
f"room {ev.room_id}, when we were backfilling in {room_id}"
|
||||
)
|
||||
|
||||
# foo
|
||||
#
|
||||
# We expect the events from the `/backfill`response to start from
|
||||
# `?v` and include events that preceded it (so the list will be
|
||||
# newest -> oldest). This is at-most a convention between Synapse
|
||||
# servers as the order is not specced.
|
||||
#
|
||||
# Reverse the list of events
|
||||
reverse_chronological_events = events
|
||||
chronological_events = reverse_chronological_events[::-1]
|
||||
|
||||
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
|
||||
|
||||
# This should only exist on instances that are configured to write
|
||||
assert (
|
||||
self._instance_name in self.hs.config.worker.writers.events
|
||||
), "Can only instantiate xxxfoobarbaz on master"
|
||||
|
||||
# Since we have been configured to write, we ought to have id generators,
|
||||
# rather than id trackers.
|
||||
assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
|
||||
stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
|
||||
len(reverse_chronological_events)
|
||||
)
|
||||
async with stream_ordering_manager as stream_orderings:
|
||||
for event, stream in zip(
|
||||
reverse_chronological_events, stream_orderings
|
||||
):
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
|
||||
await self._process_pulled_events(
|
||||
dest,
|
||||
events,
|
||||
chronological_events,
|
||||
backfilled=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -209,7 +209,9 @@ class PersistEventsStore:
|
||||
|
||||
async with stream_ordering_manager as stream_orderings:
|
||||
for (event, _), stream in zip(events_and_contexts, stream_orderings):
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
# foo
|
||||
if event.internal_metadata.stream_ordering is None:
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"persist_events",
|
||||
|
||||
Reference in New Issue
Block a user