Handle suggestions from review
This commit is contained in:
@@ -98,7 +98,7 @@ class PerDestinationQueue(object):
|
||||
# _catching_up is flipped to False.
|
||||
self._catching_up = True
|
||||
# the maximum stream order to catch up to (PDUs after this are expected
|
||||
# to be in the main transmission queue), inclusive
|
||||
# to be in the main transmission queue), inclusive
|
||||
self._catch_up_max_stream_order = None # type: Optional[int]
|
||||
# Cache of the last successfully-transmitted stream ordering for this
|
||||
# destination (we are the only updater so this is safe)
|
||||
@@ -478,7 +478,7 @@ class PerDestinationQueue(object):
|
||||
self._catching_up = False
|
||||
return
|
||||
|
||||
# get 50 catchup room/PDUs
|
||||
# get at most 50 catchup room/PDUs
|
||||
while self._last_successful_stream_order < self._catch_up_max_stream_order:
|
||||
event_ids = await self._store.get_catch_up_room_event_ids(
|
||||
self._destination,
|
||||
@@ -491,7 +491,7 @@ class PerDestinationQueue(object):
|
||||
# tinkering with the database, but I also have limited foresight,
|
||||
# so let's handle this properly
|
||||
logger.warning(
|
||||
"No event IDs found for catch-up: "
|
||||
"Unexpectedly, no event IDs were found for catch-up: "
|
||||
"last successful = %d, max catch up = %d",
|
||||
self._last_successful_stream_order,
|
||||
self._catch_up_max_stream_order,
|
||||
@@ -500,9 +500,13 @@ class PerDestinationQueue(object):
|
||||
break
|
||||
|
||||
# fetch the relevant events from the event store
|
||||
# - redacted behaviour of REDACT is fine, since we only send metadata
|
||||
# of redacted events to the destination.
|
||||
# - don't need to worry about rejected events as we do not actively
|
||||
# forward received events over federation.
|
||||
events = await self._store.get_events_as_list(
|
||||
event_ids
|
||||
) # XXX REVIEW: redact behaviour and allow_rejected ???
|
||||
)
|
||||
|
||||
# zip them together with their stream orderings
|
||||
catch_up_pdus = [
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Iterable
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
@@ -266,7 +266,7 @@ class TransactionStore(SQLBaseStore):
|
||||
"_cleanup_transactions", _cleanup_transactions_txn
|
||||
)
|
||||
|
||||
def get_last_successful_stream_ordering(self, destination: str):
|
||||
async def get_last_successful_stream_ordering(self, destination: str) -> Optional[int]:
|
||||
"""
|
||||
Gets the stream ordering of the PDU most-recently successfully sent
|
||||
to the specified destination.
|
||||
@@ -274,7 +274,7 @@ class TransactionStore(SQLBaseStore):
|
||||
Args:
|
||||
destination: the destination we have successfully sent to
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
"destinations",
|
||||
{"destination": destination},
|
||||
"last_successful_stream_ordering",
|
||||
@@ -282,9 +282,9 @@ class TransactionStore(SQLBaseStore):
|
||||
desc="get_last_successful_stream_ordering",
|
||||
)
|
||||
|
||||
def set_last_successful_stream_ordering(
|
||||
async def set_last_successful_stream_ordering(
|
||||
self, destination: str, last_successful_stream_ordering: int
|
||||
):
|
||||
) -> None:
|
||||
"""
|
||||
Marks that we have successfully sent the PDUs up to and including the
|
||||
one specified.
|
||||
@@ -294,23 +294,23 @@ class TransactionStore(SQLBaseStore):
|
||||
last_successful_stream_ordering: the stream_ordering of the most
|
||||
recent successfully-sent PDU
|
||||
"""
|
||||
return self.db_pool.simple_upsert(
|
||||
return await self.db_pool.simple_upsert(
|
||||
"destinations",
|
||||
keyvalues={"destination": destination},
|
||||
values={"last_successful_stream_ordering": last_successful_stream_ordering},
|
||||
desc="set_last_successful_stream_ordering",
|
||||
)
|
||||
|
||||
def get_catch_up_room_event_ids(
|
||||
async def get_catch_up_room_event_ids(
|
||||
self,
|
||||
destination: str,
|
||||
last_successful_stream_ordering: int,
|
||||
max_stream_order: int,
|
||||
):
|
||||
) -> List[str]:
|
||||
"""
|
||||
Returns 50 event IDs and their corresponding stream_orderings that
|
||||
correspond to the least recent events that have not yet been sent to
|
||||
a destination.
|
||||
Returns at most 50 event IDs and their corresponding stream_orderings
|
||||
that correspond to the oldest events that have not yet been sent to
|
||||
the destination.
|
||||
|
||||
Args:
|
||||
destination: the destination in question
|
||||
@@ -320,9 +320,9 @@ class TransactionStore(SQLBaseStore):
|
||||
to return events for.
|
||||
|
||||
Returns:
|
||||
event_ids
|
||||
list of event_ids
|
||||
"""
|
||||
return self.db_pool.runInteraction(
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_catch_up_room_event_ids",
|
||||
self._get_catch_up_room_event_ids_txn,
|
||||
destination,
|
||||
@@ -350,12 +350,12 @@ class TransactionStore(SQLBaseStore):
|
||||
event_ids = [row[0] for row in txn]
|
||||
return event_ids
|
||||
|
||||
def get_largest_destination_rooms_stream_order(self, destination: str):
|
||||
async def get_largest_destination_rooms_stream_order(self, destination: str) -> Optional[int]:
|
||||
"""
|
||||
Returns the largest stream_ordering from the destination_rooms table
|
||||
that corresponds to this destination.
|
||||
"""
|
||||
return self.db_pool.runInteraction(
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_largest_destination_rooms_stream_order",
|
||||
self._get_largest_destination_rooms_stream_order_txn,
|
||||
destination,
|
||||
@@ -378,30 +378,33 @@ class TransactionStore(SQLBaseStore):
|
||||
rows = [r[0] for r in txn]
|
||||
if rows:
|
||||
return rows[0]
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
def store_destination_rooms_entries(
|
||||
async def store_destination_rooms_entries(
|
||||
self,
|
||||
destinations: Collection[str],
|
||||
destinations: Iterable[str],
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
stream_ordering: int,
|
||||
):
|
||||
) -> None:
|
||||
"""
|
||||
Updates or creates `destination_rooms` entries in batch for a single event.
|
||||
|
||||
Args:
|
||||
destinations: list of destinations
|
||||
room_id: the room_id of the event
|
||||
event_id: the ID of the event
|
||||
stream_ordering: the stream_ordering of the event
|
||||
"""
|
||||
return self.db_pool.runInteraction(
|
||||
|
||||
rows = [(destination, room_id) for destination in destinations]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"store_destination_rooms_entries",
|
||||
self.db_pool.simple_upsert_many_txn,
|
||||
"destination_rooms",
|
||||
["destination", "room_id"],
|
||||
[(d, room_id) for d in destinations],
|
||||
rows,
|
||||
["event_id", "stream_ordering"],
|
||||
[(event_id, stream_ordering)] * len(destinations),
|
||||
[(event_id, stream_ordering)] * len(rows),
|
||||
)
|
||||
|
||||
@@ -11,7 +11,7 @@ from synapse.federation.units import Edu
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests.test_utils import event_injection
|
||||
from tests.test_utils import event_injection, make_awaitable
|
||||
from tests.unittest import FederatingHomeserverTestCase, override_config
|
||||
|
||||
|
||||
@@ -31,12 +31,8 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
# stub out get_current_hosts_in_room
|
||||
state_handler = hs.get_state_handler()
|
||||
|
||||
# need to do these Future shenanigans because someone awaits on this
|
||||
# result
|
||||
# This mock is crucial for destination_rooms to be populated.
|
||||
fut = Future()
|
||||
fut.set_result(["test", "host2"])
|
||||
state_handler.get_current_hosts_in_room = Mock(return_value=fut)
|
||||
state_handler.get_current_hosts_in_room = Mock(return_value=make_awaitable(["test", "host2"]))
|
||||
|
||||
# whenever send_transaction is called, record the pdu data
|
||||
self.pdus = []
|
||||
@@ -160,8 +156,8 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
# now we need to initiate a federation transaction somehow…
|
||||
# to do that, let's send another event (because it's simple to do)
|
||||
# (do it to another room otherwise the catch-up logic decides it doesn't
|
||||
# need to catch up room_1 — something I overlooked when first writing
|
||||
# this test)
|
||||
# need to catch up room_1 — something I overlooked when first writing
|
||||
# this test)
|
||||
self.helper.send(room_2, "wombats!", tok=u1_token)
|
||||
|
||||
# we should now have received both PDUs
|
||||
|
||||
Reference in New Issue
Block a user