Add catch-up logic
This commit is contained in:
@@ -15,7 +15,7 @@
|
||||
# limitations under the License.
|
||||
import datetime
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -92,6 +92,18 @@ class PerDestinationQueue(object):
|
||||
self._destination = destination
|
||||
self.transmission_loop_running = False
|
||||
|
||||
# True whilst we are sending events that the remote homeserver missed
|
||||
# because it was unreachable.
|
||||
# New events will only be sent once this is finished, at which point
|
||||
# _catching_up is flipped to False.
|
||||
self._catching_up = True
|
||||
# the maximum stream order to catch up to (PDUs after this are expected
|
||||
# to be in the main transmission queue), inclusive
|
||||
self._catch_up_max_stream_order = None # type: Optional[int]
|
||||
# Cache of the last successfully-transmitted stream ordering for this
|
||||
# destination (we are the only updater so this is safe)
|
||||
self._last_successful_stream_order = None # type: Optional[int]
|
||||
|
||||
# a list of tuples of (pending pdu, order)
|
||||
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
|
||||
|
||||
@@ -137,8 +149,15 @@ class PerDestinationQueue(object):
|
||||
|
||||
Args:
|
||||
pdu: pdu to send
|
||||
order
|
||||
order: an arbitrary order for the PDU — NOT the stream ordering
|
||||
"""
|
||||
if (
|
||||
self._catch_up_max_stream_order
|
||||
and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order
|
||||
):
|
||||
# we are in catch-up mode and this PDU is already scheduled to be
|
||||
# part of the catch-up
|
||||
return
|
||||
self._pending_pdus.append((pdu, order))
|
||||
self.attempt_new_transaction()
|
||||
|
||||
@@ -219,6 +238,17 @@ class PerDestinationQueue(object):
|
||||
# hence why we throw the result away.
|
||||
await get_retry_limiter(self._destination, self._clock, self._store)
|
||||
|
||||
if self._catching_up:
|
||||
# we're catching up, so we should send old events instead
|
||||
# in this case, we don't send anything from the new queue
|
||||
# this keeps the catching-up logic simple
|
||||
await self._catch_up_transmission_loop()
|
||||
if self._catching_up:
|
||||
# XXX if we aren't actually caught up still, shouldn't
|
||||
# carry on to the main loop
|
||||
# (but need to consider what we do in a failure...?)
|
||||
return
|
||||
|
||||
pending_pdus = []
|
||||
while True:
|
||||
# We have to keep 2 free slots for presence and rr_edus
|
||||
@@ -326,6 +356,15 @@ class PerDestinationQueue(object):
|
||||
|
||||
self._last_device_stream_id = device_stream_id
|
||||
self._last_device_list_stream_id = dev_list_id
|
||||
|
||||
if pending_pdus:
|
||||
final_pdu, _ = pending_pdus[-1]
|
||||
self._last_successful_stream_order = (
|
||||
final_pdu.internal_metadata.stream_ordering
|
||||
)
|
||||
await self._store.set_last_successful_stream_ordering(
|
||||
self._destination, self._last_successful_stream_order
|
||||
)
|
||||
else:
|
||||
break
|
||||
except NotRetryingDestination as e:
|
||||
@@ -337,6 +376,8 @@ class PerDestinationQueue(object):
|
||||
(e.retry_last_ts + e.retry_interval) / 1000.0
|
||||
),
|
||||
)
|
||||
|
||||
self._catching_up = True
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
except HttpResponseException as e:
|
||||
@@ -346,6 +387,8 @@ class PerDestinationQueue(object):
|
||||
e.code,
|
||||
e,
|
||||
)
|
||||
|
||||
# XXX REVIEW should we be catching up?
|
||||
except RequestSendFailed as e:
|
||||
logger.warning(
|
||||
"TX [%s] Failed to send transaction: %s", self._destination, e
|
||||
@@ -365,6 +408,99 @@ class PerDestinationQueue(object):
|
||||
# We want to be *very* sure we clear this after we stop processing
|
||||
self.transmission_loop_running = False
|
||||
|
||||
async def _catch_up_transmission_loop(self) -> None:
|
||||
if self._last_successful_stream_order is None:
|
||||
# first catch-up, so get from database
|
||||
self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering(
|
||||
self._destination
|
||||
)
|
||||
|
||||
if self._last_successful_stream_order is None:
|
||||
# if it's still None, then this means we don't have the information
|
||||
# in our database (oh, the perils of being a new feature).
|
||||
# So we can't actually do anything here, and in this case, we don't
|
||||
# know what to catch up, sadly.
|
||||
# Trying to catch up right now is futile, so let's stop.
|
||||
self._catching_up = False
|
||||
return
|
||||
|
||||
if self._catch_up_max_stream_order is None:
|
||||
# this is our first catch-up so we need to determine how much we
|
||||
# want to catch-up.
|
||||
if self._pending_pdus:
|
||||
# we have PDUs already in the main queue so no need to ask the
|
||||
# database
|
||||
first_non_catch_up_pdu, _ = self._pending_pdus[0]
|
||||
# -1 because we wish to exclude that one — we don't need to catch
|
||||
# it up as it's in our main queue
|
||||
self._catch_up_max_stream_order = (
|
||||
first_non_catch_up_pdu.internal_metadata.stream_ordering - 1
|
||||
)
|
||||
else:
|
||||
# we don't have any PDUs in the main queue so instead find out
|
||||
# the largest stream order that we know of that has, once upon a
|
||||
# time, been queued for this destination (i.e. this is what we
|
||||
# *should* have sent if the remote server was reachable).
|
||||
self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order(
|
||||
self._destination
|
||||
)
|
||||
if self._catch_up_max_stream_order is None:
|
||||
# not enough info to catch up
|
||||
self._catching_up = False
|
||||
return
|
||||
|
||||
# get 50 catchup room/PDUs
|
||||
while self._last_successful_stream_order < self._catch_up_max_stream_order:
|
||||
event_ids = await self._store.get_catch_up_room_event_ids(
|
||||
self._destination,
|
||||
self._last_successful_stream_order,
|
||||
self._catch_up_max_stream_order,
|
||||
)
|
||||
|
||||
if not event_ids:
|
||||
# I don't believe this *should* happen unless someone has been
|
||||
# tinkering with the database, but I also have limited foresight,
|
||||
# so let's handle this properly
|
||||
logger.warning(
|
||||
"No event IDs found for catch-up: "
|
||||
"last successful = %d, max catch up = %d",
|
||||
self._last_successful_stream_order,
|
||||
self._catch_up_max_stream_order
|
||||
)
|
||||
self._catching_up = False
|
||||
break
|
||||
|
||||
# fetch the relevant events from the event store
|
||||
events = await self._store.get_events_as_list(
|
||||
event_ids
|
||||
) # XXX REVIEW: redact behaviour and allow_rejected ???
|
||||
|
||||
# zip them together with their stream orderings
|
||||
catch_up_pdus = [
|
||||
(event, event.internal_metadata.stream_ordering) for event in events
|
||||
]
|
||||
|
||||
if not catch_up_pdus:
|
||||
break
|
||||
|
||||
success = await self._transaction_manager.send_new_transaction(
|
||||
self._destination, catch_up_pdus, []
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
final_pdu, _ = catch_up_pdus[-1]
|
||||
self._last_successful_stream_order = (
|
||||
final_pdu.internal_metadata.stream_ordering
|
||||
)
|
||||
await self._store.set_last_successful_stream_ordering(
|
||||
self._destination, self._last_successful_stream_order
|
||||
)
|
||||
else:
|
||||
return
|
||||
|
||||
# once we have reached this point, catch-up is done!
|
||||
self._catching_up = False
|
||||
|
||||
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
|
||||
if not self._pending_rrs:
|
||||
return
|
||||
@@ -408,6 +544,21 @@ class PerDestinationQueue(object):
|
||||
|
||||
return (edus, now_stream_id)
|
||||
|
||||
def reset_catch_up_state(self) -> None:
|
||||
"""
|
||||
Resets the catch-up state of this destination.
|
||||
This does the following:
|
||||
- marks catch-up mode
|
||||
- unsets the catch-up limit (max. stream order)
|
||||
so that it is reset to the highest catch-up next time we have a
|
||||
chance to catch up
|
||||
- empties the main PDU queue as any PDUs in it will now be handled
|
||||
by catch-up (because the max stream order limit was unset).
|
||||
"""
|
||||
self._pending_pdus = []
|
||||
self._catching_up = True
|
||||
self._catch_up_max_stream_order = None
|
||||
|
||||
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
|
||||
last_device_stream_id = self._last_device_stream_id
|
||||
to_device_stream_id = self._store.get_to_device_stream_token()
|
||||
|
||||
Reference in New Issue
Block a user