Last successful stream ordering is about destinations
This commit is contained in:
@@ -363,7 +363,7 @@ class PerDestinationQueue(object):
|
||||
self._last_successful_stream_order = (
|
||||
final_pdu.internal_metadata.stream_ordering
|
||||
)
|
||||
await self._store.set_last_successful_stream_ordering(
|
||||
await self._store.set_destination_last_successful_stream_ordering(
|
||||
self._destination, self._last_successful_stream_order
|
||||
)
|
||||
else:
|
||||
@@ -441,7 +441,7 @@ class PerDestinationQueue(object):
|
||||
async def _catch_up_transmission_loop(self) -> None:
|
||||
if self._last_successful_stream_order is None:
|
||||
# first catch-up, so get from database
|
||||
self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering(
|
||||
self._last_successful_stream_order = await self._store.get_destination_last_successful_stream_ordering(
|
||||
self._destination
|
||||
)
|
||||
|
||||
@@ -529,7 +529,7 @@ class PerDestinationQueue(object):
|
||||
self._last_successful_stream_order = cast(
|
||||
int, final_pdu.internal_metadata.stream_ordering
|
||||
)
|
||||
await self._store.set_last_successful_stream_ordering(
|
||||
await self._store.set_destination_last_successful_stream_ordering(
|
||||
self._destination, self._last_successful_stream_order
|
||||
)
|
||||
|
||||
|
||||
@@ -265,7 +265,7 @@ class TransactionStore(SQLBaseStore):
|
||||
"_cleanup_transactions", _cleanup_transactions_txn
|
||||
)
|
||||
|
||||
async def get_last_successful_stream_ordering(
|
||||
async def get_destination_last_successful_stream_ordering(
|
||||
self, destination: str
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
@@ -283,7 +283,7 @@ class TransactionStore(SQLBaseStore):
|
||||
desc="get_last_successful_stream_ordering",
|
||||
)
|
||||
|
||||
async def set_last_successful_stream_ordering(
|
||||
async def set_destination_last_successful_stream_ordering(
|
||||
self, destination: str, last_successful_stream_ordering: int
|
||||
) -> None:
|
||||
"""
|
||||
|
||||
@@ -213,7 +213,7 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
self.pump()
|
||||
|
||||
lsso_1 = self.get_success(
|
||||
self.hs.get_datastore().get_last_successful_stream_ordering("host2")
|
||||
self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2")
|
||||
)
|
||||
|
||||
self.assertIsNone(
|
||||
@@ -227,7 +227,7 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
|
||||
|
||||
lsso_2 = self.get_success(
|
||||
self.hs.get_datastore().get_last_successful_stream_ordering("host2")
|
||||
self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2")
|
||||
)
|
||||
row_2 = self.get_destination_room(room)
|
||||
|
||||
@@ -286,7 +286,7 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastore().set_last_successful_stream_ordering(
|
||||
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
|
||||
"host2", event_2.internal_metadata.stream_ordering
|
||||
)
|
||||
)
|
||||
@@ -349,7 +349,7 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
||||
event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_datastore().set_last_successful_stream_ordering(
|
||||
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
|
||||
"host2", event_2.internal_metadata.stream_ordering
|
||||
)
|
||||
)
|
||||
|
||||
@@ -161,7 +161,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
None
|
||||
)
|
||||
|
||||
self.datastore.get_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed(
|
||||
self.datastore.get_destination_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed(
|
||||
None
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user