WIP
This commit is contained in:
@@ -27,6 +27,7 @@ from typing import (
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
@@ -169,6 +170,9 @@ class FederationHandler:
|
||||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
|
||||
# A set of room IDs with partial state which we are currently syncing.
|
||||
self._active_partial_state_syncs: Set[str] = set()
|
||||
|
||||
# if this is the main process, fire off a background process to resume
|
||||
# any partial-state-resync operations which were in flight when we
|
||||
# were shut down.
|
||||
@@ -1534,81 +1538,90 @@ class FederationHandler:
|
||||
|
||||
# `destination` is the current remote homeserver we're pulling from.
|
||||
destination = next(destination_iter)
|
||||
logger.info("Syncing state for room %s via %s", room_id, destination)
|
||||
|
||||
# we work through the queue in order of increasing stream ordering.
|
||||
while True:
|
||||
batch = await self.store.get_partial_state_events_batch(room_id)
|
||||
if not batch:
|
||||
# all the events are updated, so we can update current state and
|
||||
# clear the lazy-loading flag.
|
||||
logger.info("Updating current state for %s", room_id)
|
||||
assert (
|
||||
self._storage_controllers.persistence is not None
|
||||
), "TODO(faster_joins): support for workers"
|
||||
await self._storage_controllers.persistence.update_current_state(
|
||||
room_id
|
||||
)
|
||||
if room_id in self._active_partial_state_syncs:
|
||||
logger.info("Syncing state for room %s is already in progress", room_id)
|
||||
return
|
||||
try:
|
||||
self._active_partial_state_syncs.add(room_id)
|
||||
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
logger.info("State resync complete for %s", room_id)
|
||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||
logger.info("Syncing state for room %s via %s", room_id, destination)
|
||||
|
||||
# we work through the queue in order of increasing stream ordering.
|
||||
while True:
|
||||
batch = await self.store.get_partial_state_events_batch(room_id)
|
||||
if not batch:
|
||||
# all the events are updated, so we can update current state and
|
||||
# clear the lazy-loading flag.
|
||||
logger.info("Updating current state for %s", room_id)
|
||||
assert (
|
||||
self._storage_controllers.persistence is not None
|
||||
), "TODO(faster_joins): support for workers"
|
||||
await self._storage_controllers.persistence.update_current_state(
|
||||
room_id
|
||||
)
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
return
|
||||
|
||||
# we raced against more events arriving with partial state. Go round
|
||||
# the loop again. We've already logged a warning, so no need for more.
|
||||
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
|
||||
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
|
||||
# having partial state.
|
||||
continue
|
||||
|
||||
events = await self.store.get_events_as_list(
|
||||
batch,
|
||||
redact_behaviour=EventRedactBehaviour.as_is,
|
||||
allow_rejected=True,
|
||||
)
|
||||
for event in events:
|
||||
for attempt in itertools.count():
|
||||
try:
|
||||
await self._federation_event_handler.update_state_for_partial_state_event(
|
||||
destination, event
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
logger.info("State resync complete for %s", room_id)
|
||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||
room_id
|
||||
)
|
||||
break
|
||||
except FederationError as e:
|
||||
if attempt == len(destinations) - 1:
|
||||
# We have tried every remote server for this event. Give up.
|
||||
# TODO(faster_joins) giving up isn't the right thing to do
|
||||
# if there's a temporary network outage. retrying
|
||||
# indefinitely is also not the right thing to do if we can
|
||||
# reach all homeservers and they all claim they don't have
|
||||
# the state we want.
|
||||
logger.error(
|
||||
"Failed to get state for %s at %s from %s because %s, "
|
||||
"giving up!",
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
return
|
||||
|
||||
# we raced against more events arriving with partial state. Go round
|
||||
# the loop again. We've already logged a warning, so no need for more.
|
||||
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
|
||||
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
|
||||
# having partial state.
|
||||
continue
|
||||
|
||||
events = await self.store.get_events_as_list(
|
||||
batch,
|
||||
redact_behaviour=EventRedactBehaviour.as_is,
|
||||
allow_rejected=True,
|
||||
)
|
||||
for event in events:
|
||||
for attempt in itertools.count():
|
||||
try:
|
||||
await self._federation_event_handler.update_state_for_partial_state_event(
|
||||
destination, event
|
||||
)
|
||||
break
|
||||
except FederationError as e:
|
||||
if attempt == len(destinations) - 1:
|
||||
# We have tried every remote server for this event. Give up.
|
||||
# TODO(faster_joins) giving up isn't the right thing to do
|
||||
# if there's a temporary network outage. retrying
|
||||
# indefinitely is also not the right thing to do if we can
|
||||
# reach all homeservers and they all claim they don't have
|
||||
# the state we want.
|
||||
logger.error(
|
||||
"Failed to get state for %s at %s from %s because %s, "
|
||||
"giving up!",
|
||||
room_id,
|
||||
event,
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
raise
|
||||
|
||||
# Try the next remote server.
|
||||
logger.info(
|
||||
"Failed to get state for %s at %s from %s because %s",
|
||||
room_id,
|
||||
event,
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
raise
|
||||
|
||||
# Try the next remote server.
|
||||
logger.info(
|
||||
"Failed to get state for %s at %s from %s because %s",
|
||||
room_id,
|
||||
event,
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
destination = next(destination_iter)
|
||||
logger.info(
|
||||
"Syncing state for room %s via %s instead",
|
||||
room_id,
|
||||
destination,
|
||||
)
|
||||
destination = next(destination_iter)
|
||||
logger.info(
|
||||
"Syncing state for room %s via %s instead",
|
||||
room_id,
|
||||
destination,
|
||||
)
|
||||
finally:
|
||||
self._active_partial_state_syncs.remove(room_id)
|
||||
|
||||
Reference in New Issue
Block a user