Compare commits

...

3 Commits

Author SHA1 Message Date
David Robertson
0929515049 Track the resync processes 2022-12-15 16:19:02 +00:00
David Robertson
922bab1182 Pull out a method to start a resync 2022-12-15 16:17:15 +00:00
David Robertson
b373c3655a Pull "tidy up after resync" logic out of the loop
I find it cleaner, but YMMV
2022-12-15 16:11:45 +00:00

View File

@@ -37,6 +37,8 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
from twisted.internet.defer import Deferred
from synapse import event_auth
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import (
@@ -179,6 +181,12 @@ class FederationHandler:
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
# The resync processes currently in progress, keyed by room ID.
# Keep track of these so we can cancel them and cleanup if all local users leave
# that room.
# TODO(faster_joins): actually cancel them! https://github.com/matrix-org/synapse/issues/12802
self._partial_state_resyncs: Dict[str, "Deferred[None]"] = {}
@trace
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
@@ -679,12 +687,8 @@ class FederationHandler:
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
await self._start_partial_state_resync(
origin, ret.servers_in_room, room_id
)
# We wait here until this instance has seen the events come down
@@ -1627,14 +1631,25 @@ class FederationHandler:
partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, resync_info in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=resync_info.joined_via,
other_destinations=resync_info.servers_in_room,
room_id=room_id,
await self._start_partial_state_resync(
resync_info.joined_via, resync_info.servers_in_room, room_id
)
async def _start_partial_state_resync(
self, origin: Optional[str], servers_in_room: List[str], room_id: str
) -> None:
# We shouldn't start a second resync if one is already in progress.
assert room_id not in self._partial_state_resyncs
d = run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=origin,
other_destinations=servers_in_room,
room_id=room_id,
)
self._partial_state_resyncs[room_id] = d
async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
@@ -1689,19 +1704,7 @@ class FederationHandler:
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
# https://github.com/matrix-org/synapse/issues/12815
return
break
# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
continue
@@ -1767,6 +1770,19 @@ class FederationHandler:
destination,
)
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(room_id)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
# https://github.com/matrix-org/synapse/issues/12815
# This resync has completed---stop tracking its progress.
self._partial_state_resyncs.pop(room_id, None)
def _prioritise_destinations_for_partial_state_resync(
initial_destination: Optional[str],