1
0

Add data store functions and delta

This commit is contained in:
Olivier Wilkinson (reivilibre)
2020-08-14 19:55:59 +01:00
parent 07a415ca94
commit 5bc321ae1a
2 changed files with 170 additions and 1 deletions

View File

@@ -0,0 +1,35 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This schema delta alters the schema to enable 'catching up' remote homeservers
-- after there has been a connectivity problem for any reason.
-- This stores, for each (destination, room) pair, the event_id and stream_ordering
-- of the latest event to be enqueued for transmission to that destination.
CREATE TABLE IF NOT EXISTS destination_rooms (
-- the destination in question
destination TEXT NOT NULL,
-- the ID of the room in question
room_id TEXT NOT NULL,
-- the stream_ordering of the event
stream_ordering INTEGER,
-- the event_id of the event
event_id TEXT NOT NULL,
PRIMARY KEY (destination, room_id)
);
-- this column tracks the stream_ordering of the event that was most recently
-- successfully transmitted to the destination.
ALTER TABLE destinations
ADD COLUMN last_successful_stream_ordering INTEGER;

View File

@@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import namedtuple
from typing import List, Optional
from canonicaljson import encode_canonical_json
@@ -267,3 +267,137 @@ class TransactionStore(SQLBaseStore):
return self.db.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)
def get_last_successful_stream_ordering(self, destination: str):
"""
Gets the stream ordering of the PDU most-recently successfully sent
to the specified destination.
Args:
destination: the destination we have successfully sent to
"""
return self.db.simple_select_one_onecol(
"destinations",
{"destination": destination},
"last_successful_stream_ordering",
allow_none=True,
desc="get_last_successful_stream_ordering",
)
def set_last_successful_stream_ordering(
self, destination: str, last_successful_stream_ordering: int
):
"""
Marks that we have successfully sent the PDUs up to and including the
one specified.
Args:
destination: the destination we have successfully sent to
last_successful_stream_ordering: the stream_ordering of the most
recent successfully-sent PDU
"""
return self.db.simple_upsert(
"destinations",
keyvalues={"destination": destination},
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)
def get_catch_up_room_event_ids(
self,
destination: str,
last_successful_stream_ordering: int,
max_stream_order: int,
):
"""
Returns 50 event IDs and their corresponding stream_orderings that
correspond to the least recent events that have not yet been sent to
a destination.
Args:
destination: the destination in question
last_successful_stream_ordering: the stream_ordering of the
most-recently successfully-transmitted event to the destination
max_stream_order: an upper bound, inclusive, of the stream ordering
to return events for.
Returns:
event_ids
"""
return self.db.runInteraction(
"get_catch_up_room_event_ids",
self._get_catch_up_room_event_ids_txn,
destination,
last_successful_stream_ordering,
max_stream_order,
)
@staticmethod
def _get_catch_up_room_event_ids_txn(
txn,
destination: str,
last_successful_stream_ordering: int,
max_stream_order: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
WHERE destination = ?
AND stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 50
"""
txn.execute(
q, (destination, last_successful_stream_ordering, max_stream_order),
)
event_ids = [row[0] for row in txn]
return event_ids
def get_largest_destination_rooms_stream_order(self, destination: str):
"""
Returns the largest stream_ordering from the destination_rooms table
that corresponds to this destination.
"""
return self.db.runInteraction(
"get_largest_destination_rooms_stream_order",
self._get_largest_destination_rooms_stream_order_txn,
destination,
)
@staticmethod
def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]:
txn.execute(
"""
SELECT stream_ordering
FROM destination_rooms
WHERE destination = ?
ORDER BY stream_ordering DESC
LIMIT 1
""",
(destination,),
)
rows = [r[0] for r in txn]
if rows:
return rows[0]
else:
return None
def store_destination_rooms_entries(
self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int
):
"""
Updates or creates `destination_rooms` entries in batch for a single event.
Args:
destinations: list of destinations
room_id: the room_id of the event
event_id: the ID of the event
stream_ordering: the stream_ordering of the event
"""
return self.db.runInteraction(
"store_destination_rooms_entries",
self.db.simple_upsert_many_txn,
"destination_rooms",
["destination", "room_id"],
[(d, room_id) for d in destinations],
["event_id", "stream_ordering"],
[(event_id, stream_ordering)] * len(destinations),
)