diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 1e1a426a54..1cc44bd915 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -98,7 +98,7 @@ class PerDestinationQueue(object): # _catching_up is flipped to False. self._catching_up = True # the maximum stream order to catch up to (PDUs after this are expected - # to be in the main transmission queue), inclusive + # to be in the main transmission queue), inclusive self._catch_up_max_stream_order = None # type: Optional[int] # Cache of the last successfully-transmitted stream ordering for this # destination (we are the only updater so this is safe) @@ -478,7 +478,7 @@ class PerDestinationQueue(object): self._catching_up = False return - # get 50 catchup room/PDUs + # get at most 50 catchup room/PDUs while self._last_successful_stream_order < self._catch_up_max_stream_order: event_ids = await self._store.get_catch_up_room_event_ids( self._destination, @@ -491,7 +491,7 @@ class PerDestinationQueue(object): # tinkering with the database, but I also have limited foresight, # so let's handle this properly logger.warning( - "No event IDs found for catch-up: " + "Unexpectedly, no event IDs were found for catch-up: " "last successful = %d, max catch up = %d", self._last_successful_stream_order, self._catch_up_max_stream_order, @@ -500,9 +500,13 @@ class PerDestinationQueue(object): break # fetch the relevant events from the event store + # - redacted behaviour of REDACT is fine, since we only send metadata + # of redacted events to the destination. + # - don't need to worry about rejected events as we do not actively + # forward received events over federation. events = await self._store.get_events_as_list( event_ids - ) # XXX REVIEW: redact behaviour and allow_rejected ??? + ) # zip them together with their stream orderings catch_up_pdus = [ diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 3d9010265c..0b8b04a9ab 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ # limitations under the License. import logging from collections import namedtuple -from typing import List, Optional +from typing import List, Optional, Iterable from canonicaljson import encode_canonical_json @@ -266,7 +266,7 @@ class TransactionStore(SQLBaseStore): "_cleanup_transactions", _cleanup_transactions_txn ) - def get_last_successful_stream_ordering(self, destination: str): + async def get_last_successful_stream_ordering(self, destination: str) -> Optional[int]: """ Gets the stream ordering of the PDU most-recently successfully sent to the specified destination. @@ -274,7 +274,7 @@ class TransactionStore(SQLBaseStore): Args: destination: the destination we have successfully sent to """ - return self.db_pool.simple_select_one_onecol( + return await self.db_pool.simple_select_one_onecol( "destinations", {"destination": destination}, "last_successful_stream_ordering", @@ -282,9 +282,9 @@ class TransactionStore(SQLBaseStore): desc="get_last_successful_stream_ordering", ) - def set_last_successful_stream_ordering( + async def set_last_successful_stream_ordering( self, destination: str, last_successful_stream_ordering: int - ): + ) -> None: """ Marks that we have successfully sent the PDUs up to and including the one specified. @@ -294,23 +294,23 @@ class TransactionStore(SQLBaseStore): last_successful_stream_ordering: the stream_ordering of the most recent successfully-sent PDU """ - return self.db_pool.simple_upsert( + return await self.db_pool.simple_upsert( "destinations", keyvalues={"destination": destination}, values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) - def get_catch_up_room_event_ids( + async def get_catch_up_room_event_ids( self, destination: str, last_successful_stream_ordering: int, max_stream_order: int, - ): + ) -> List[str]: """ - Returns 50 event IDs and their corresponding stream_orderings that - correspond to the least recent events that have not yet been sent to - a destination. + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. Args: destination: the destination in question @@ -320,9 +320,9 @@ class TransactionStore(SQLBaseStore): to return events for. Returns: - event_ids + list of event_ids """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_catch_up_room_event_ids", self._get_catch_up_room_event_ids_txn, destination, @@ -350,12 +350,12 @@ class TransactionStore(SQLBaseStore): event_ids = [row[0] for row in txn] return event_ids - def get_largest_destination_rooms_stream_order(self, destination: str): + async def get_largest_destination_rooms_stream_order(self, destination: str) -> Optional[int]: """ Returns the largest stream_ordering from the destination_rooms table that corresponds to this destination. """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_largest_destination_rooms_stream_order", self._get_largest_destination_rooms_stream_order_txn, destination, @@ -378,30 +378,33 @@ class TransactionStore(SQLBaseStore): rows = [r[0] for r in txn] if rows: return rows[0] - else: - return None + return None - def store_destination_rooms_entries( + async def store_destination_rooms_entries( self, - destinations: Collection[str], + destinations: Iterable[str], room_id: str, event_id: str, stream_ordering: int, - ): + ) -> None: """ Updates or creates `destination_rooms` entries in batch for a single event. + Args: destinations: list of destinations room_id: the room_id of the event event_id: the ID of the event stream_ordering: the stream_ordering of the event """ - return self.db_pool.runInteraction( + + rows = [(destination, room_id) for destination in destinations] + + return await self.db_pool.runInteraction( "store_destination_rooms_entries", self.db_pool.simple_upsert_many_txn, "destination_rooms", ["destination", "room_id"], - [(d, room_id) for d in destinations], + rows, ["event_id", "stream_ordering"], - [(event_id, stream_ordering)] * len(destinations), + [(event_id, stream_ordering)] * len(rows), ) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index a021658ac1..c70fc9cc2f 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -11,7 +11,7 @@ from synapse.federation.units import Edu from synapse.rest import admin from synapse.rest.client.v1 import login, room -from tests.test_utils import event_injection +from tests.test_utils import event_injection, make_awaitable from tests.unittest import FederatingHomeserverTestCase, override_config @@ -31,12 +31,8 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): # stub out get_current_hosts_in_room state_handler = hs.get_state_handler() - # need to do these Future shenanigans because someone awaits on this - # result # This mock is crucial for destination_rooms to be populated. - fut = Future() - fut.set_result(["test", "host2"]) - state_handler.get_current_hosts_in_room = Mock(return_value=fut) + state_handler.get_current_hosts_in_room = Mock(return_value=make_awaitable(["test", "host2"])) # whenever send_transaction is called, record the pdu data self.pdus = [] @@ -160,8 +156,8 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): # now we need to initiate a federation transaction somehow… # to do that, let's send another event (because it's simple to do) # (do it to another room otherwise the catch-up logic decides it doesn't - # need to catch up room_1 — something I overlooked when first writing - # this test) + # need to catch up room_1 — something I overlooked when first writing + # this test) self.helper.send(room_2, "wombats!", tok=u1_token) # we should now have received both PDUs