From 20c896a1469c10f7e02b702edcd8824b9679e924 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:56:53 +0100 Subject: [PATCH] Add catch-up logic --- .../sender/per_destination_queue.py | 155 +++++++++++++++++- 1 file changed, 153 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3436741783..9fc2ad8dff 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple from prometheus_client import Counter @@ -92,6 +92,18 @@ class PerDestinationQueue(object): self._destination = destination self.transmission_loop_running = False + # True whilst we are sending events that the remote homeserver missed + # because it was unreachable. + # New events will only be sent once this is finished, at which point + # _catching_up is flipped to False. + self._catching_up = True + # the maximum stream order to catch up to (PDUs after this are expected + # to be in the main transmission queue), inclusive + self._catch_up_max_stream_order = None # type: Optional[int] + # Cache of the last successfully-transmitted stream ordering for this + # destination (we are the only updater so this is safe) + self._last_successful_stream_order = None # type: Optional[int] + # a list of tuples of (pending pdu, order) self._pending_pdus = [] # type: List[Tuple[EventBase, int]] @@ -137,8 +149,15 @@ class PerDestinationQueue(object): Args: pdu: pdu to send - order + order: an arbitrary order for the PDU — NOT the stream ordering """ + if ( + self._catch_up_max_stream_order + and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order + ): + # we are in catch-up mode and this PDU is already scheduled to be + # part of the catch-up + return self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() @@ -219,6 +238,17 @@ class PerDestinationQueue(object): # hence why we throw the result away. await get_retry_limiter(self._destination, self._clock, self._store) + if self._catching_up: + # we're catching up, so we should send old events instead + # in this case, we don't send anything from the new queue + # this keeps the catching-up logic simple + await self._catch_up_transmission_loop() + if self._catching_up: + # XXX if we aren't actually caught up still, shouldn't + # carry on to the main loop + # (but need to consider what we do in a failure...?) + return + pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus @@ -326,6 +356,15 @@ class PerDestinationQueue(object): self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + final_pdu, _ = pending_pdus[-1] + self._last_successful_stream_order = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) else: break except NotRetryingDestination as e: @@ -337,6 +376,8 @@ class PerDestinationQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + + self._catching_up = True except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: @@ -346,6 +387,8 @@ class PerDestinationQueue(object): e.code, e, ) + + # XXX REVIEW should we be catching up? except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -365,6 +408,99 @@ class PerDestinationQueue(object): # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False + async def _catch_up_transmission_loop(self) -> None: + if self._last_successful_stream_order is None: + # first catch-up, so get from database + self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering( + self._destination + ) + + if self._last_successful_stream_order is None: + # if it's still None, then this means we don't have the information + # in our database (oh, the perils of being a new feature). + # So we can't actually do anything here, and in this case, we don't + # know what to catch up, sadly. + # Trying to catch up right now is futile, so let's stop. + self._catching_up = False + return + + if self._catch_up_max_stream_order is None: + # this is our first catch-up so we need to determine how much we + # want to catch-up. + if self._pending_pdus: + # we have PDUs already in the main queue so no need to ask the + # database + first_non_catch_up_pdu, _ = self._pending_pdus[0] + # -1 because we wish to exclude that one — we don't need to catch + # it up as it's in our main queue + self._catch_up_max_stream_order = ( + first_non_catch_up_pdu.internal_metadata.stream_ordering - 1 + ) + else: + # we don't have any PDUs in the main queue so instead find out + # the largest stream order that we know of that has, once upon a + # time, been queued for this destination (i.e. this is what we + # *should* have sent if the remote server was reachable). + self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order( + self._destination + ) + if self._catch_up_max_stream_order is None: + # not enough info to catch up + self._catching_up = False + return + + # get 50 catchup room/PDUs + while self._last_successful_stream_order < self._catch_up_max_stream_order: + event_ids = await self._store.get_catch_up_room_event_ids( + self._destination, + self._last_successful_stream_order, + self._catch_up_max_stream_order, + ) + + if not event_ids: + # I don't believe this *should* happen unless someone has been + # tinkering with the database, but I also have limited foresight, + # so let's handle this properly + logger.warning( + "No event IDs found for catch-up: " + "last successful = %d, max catch up = %d", + self._last_successful_stream_order, + self._catch_up_max_stream_order + ) + self._catching_up = False + break + + # fetch the relevant events from the event store + events = await self._store.get_events_as_list( + event_ids + ) # XXX REVIEW: redact behaviour and allow_rejected ??? + + # zip them together with their stream orderings + catch_up_pdus = [ + (event, event.internal_metadata.stream_ordering) for event in events + ] + + if not catch_up_pdus: + break + + success = await self._transaction_manager.send_new_transaction( + self._destination, catch_up_pdus, [] + ) + if success: + sent_transactions_counter.inc() + final_pdu, _ = catch_up_pdus[-1] + self._last_successful_stream_order = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) + else: + return + + # once we have reached this point, catch-up is done! + self._catching_up = False + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return @@ -408,6 +544,21 @@ class PerDestinationQueue(object): return (edus, now_stream_id) + def reset_catch_up_state(self) -> None: + """ + Resets the catch-up state of this destination. + This does the following: + - marks catch-up mode + - unsets the catch-up limit (max. stream order) + so that it is reset to the highest catch-up next time we have a + chance to catch up + - empties the main PDU queue as any PDUs in it will now be handled + by catch-up (because the max stream order limit was unset). + """ + self._pending_pdus = [] + self._catching_up = True + self._catch_up_max_stream_order = None + async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token()