1
0
This commit is contained in:
Olivier Wilkinson (reivilibre)
2020-08-14 20:20:25 +01:00
parent 74a6f4f137
commit c1b32ae494
3 changed files with 43 additions and 17 deletions

View File

@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from prometheus_client import Counter
@@ -494,7 +494,7 @@ class PerDestinationQueue(object):
"No event IDs found for catch-up: "
"last successful = %d, max catch up = %d",
self._last_successful_stream_order,
self._catch_up_max_stream_order
self._catch_up_max_stream_order,
)
self._catching_up = False
break
@@ -518,8 +518,8 @@ class PerDestinationQueue(object):
if success:
sent_transactions_counter.inc()
final_pdu, _ = catch_up_pdus[-1]
self._last_successful_stream_order = (
final_pdu.internal_metadata.stream_ordering
self._last_successful_stream_order = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_last_successful_stream_ordering(
self._destination, self._last_successful_stream_order

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import logging
from collections import namedtuple
from typing import List, Optional
from typing import Collection, List, Optional
from canonicaljson import encode_canonical_json
@@ -361,7 +361,9 @@ class TransactionStore(SQLBaseStore):
)
@staticmethod
def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]:
def _get_largest_destination_rooms_stream_order_txn(
txn, destination: str
) -> Optional[int]:
txn.execute(
"""
SELECT stream_ordering
@@ -379,7 +381,11 @@ class TransactionStore(SQLBaseStore):
return None
def store_destination_rooms_entries(
self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int
self,
destinations: Collection[str],
room_id: str,
event_id: str,
stream_ordering: int,
):
"""
Updates or creates `destination_rooms` entries in batch for a single event.

View File

@@ -114,8 +114,12 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
room_2 = self.helper.create_room_as("u1", tok=u1_token)
# also critical (2) to federate
self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join"))
self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join"))
self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
@@ -174,7 +178,9 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)
self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join"))
self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)
event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
@@ -201,7 +207,9 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
# take the remote offline
self.is_online = False
self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join"))
self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)
self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
@@ -257,9 +265,15 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
room_1 = self.helper.create_room_as("u1", tok=u1_token)
room_2 = self.helper.create_room_as("u1", tok=u1_token)
room_3 = self.helper.create_room_as("u1", tok=u1_token)
self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join"))
self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join"))
self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join"))
self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
)
# create some events to play with
@@ -305,9 +319,15 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
room_1 = self.helper.create_room_as("u1", tok=u1_token)
room_2 = self.helper.create_room_as("u1", tok=u1_token)
room_3 = self.helper.create_room_as("u1", tok=u1_token)
self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join"))
self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join"))
self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join"))
self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
)
# create some events to play with