Compare commits

...

1 Commits

Author SHA1 Message Date
Sean Quah
e237ff91bb WIP 2022-06-01 16:41:59 +01:00

View File

@@ -27,6 +27,7 @@ from typing import (
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
@@ -169,6 +170,9 @@ class FederationHandler:
self.third_party_event_rules = hs.get_third_party_event_rules()
# A set of room IDs with partial state which we are currently syncing.
self._active_partial_state_syncs: Set[str] = set()
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
@@ -1534,81 +1538,90 @@ class FederationHandler:
# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
logger.info("Syncing state for room %s via %s", room_id, destination)
# we work through the queue in order of increasing stream ordering.
while True:
batch = await self.store.get_partial_state_events_batch(room_id)
if not batch:
# all the events are updated, so we can update current state and
# clear the lazy-loading flag.
logger.info("Updating current state for %s", room_id)
assert (
self._storage_controllers.persistence is not None
), "TODO(faster_joins): support for workers"
await self._storage_controllers.persistence.update_current_state(
room_id
)
if room_id in self._active_partial_state_syncs:
logger.info("Syncing state for room %s is already in progress", room_id)
return
try:
self._active_partial_state_syncs.add(room_id)
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
logger.info("Syncing state for room %s via %s", room_id, destination)
# we work through the queue in order of increasing stream ordering.
while True:
batch = await self.store.get_partial_state_events_batch(room_id)
if not batch:
# all the events are updated, so we can update current state and
# clear the lazy-loading flag.
logger.info("Updating current state for %s", room_id)
assert (
self._storage_controllers.persistence is not None
), "TODO(faster_joins): support for workers"
await self._storage_controllers.persistence.update_current_state(
room_id
)
# TODO(faster_joins) update room stats and user directory?
return
# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
# having partial state.
continue
events = await self.store.get_events_as_list(
batch,
redact_behaviour=EventRedactBehaviour.as_is,
allow_rejected=True,
)
for event in events:
for attempt in itertools.count():
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
break
except FederationError as e:
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
# if there's a temporary network outage. retrying
# indefinitely is also not the right thing to do if we can
# reach all homeservers and they all claim they don't have
# the state we want.
logger.error(
"Failed to get state for %s at %s from %s because %s, "
"giving up!",
# TODO(faster_joins) update room stats and user directory?
return
# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
# having partial state.
continue
events = await self.store.get_events_as_list(
batch,
redact_behaviour=EventRedactBehaviour.as_is,
allow_rejected=True,
)
for event in events:
for attempt in itertools.count():
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationError as e:
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
# if there's a temporary network outage. retrying
# indefinitely is also not the right thing to do if we can
# reach all homeservers and they all claim they don't have
# the state we want.
logger.error(
"Failed to get state for %s at %s from %s because %s, "
"giving up!",
room_id,
event,
destination,
e,
)
raise
# Try the next remote server.
logger.info(
"Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
raise
# Try the next remote server.
logger.info(
"Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
destination = next(destination_iter)
logger.info(
"Syncing state for room %s via %s instead",
room_id,
destination,
)
destination = next(destination_iter)
logger.info(
"Syncing state for room %s via %s instead",
room_id,
destination,
)
finally:
self._active_partial_state_syncs.remove(room_id)