diff --git a/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql new file mode 100644 index 0000000000..c903419eb4 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql @@ -0,0 +1,35 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- This schema delta alters the schema to enable 'catching up' remote homeservers +-- after there has been a connectivity problem for any reason. + +-- This stores, for each (destination, room) pair, the event_id and stream_ordering +-- of the latest event to be enqueued for transmission to that destination. +CREATE TABLE IF NOT EXISTS destination_rooms ( + -- the destination in question + destination TEXT NOT NULL, + -- the ID of the room in question + room_id TEXT NOT NULL, + -- the stream_ordering of the event + stream_ordering INTEGER, + -- the event_id of the event + event_id TEXT NOT NULL, + PRIMARY KEY (destination, room_id) +); + +-- this column tracks the stream_ordering of the event that was most recently +-- successfully transmitted to the destination. +ALTER TABLE destinations + ADD COLUMN last_successful_stream_ordering INTEGER; diff --git a/synapse/storage/data_stores/main/transactions.py b/synapse/storage/data_stores/main/transactions.py index a9bf457939..dcce59fdf7 100644 --- a/synapse/storage/data_stores/main/transactions.py +++ b/synapse/storage/data_stores/main/transactions.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from collections import namedtuple +from typing import List, Optional from canonicaljson import encode_canonical_json @@ -267,3 +267,137 @@ class TransactionStore(SQLBaseStore): return self.db.runInteraction( "_cleanup_transactions", _cleanup_transactions_txn ) + + def get_last_successful_stream_ordering(self, destination: str): + """ + Gets the stream ordering of the PDU most-recently successfully sent + to the specified destination. + + Args: + destination: the destination we have successfully sent to + """ + return self.db.simple_select_one_onecol( + "destinations", + {"destination": destination}, + "last_successful_stream_ordering", + allow_none=True, + desc="get_last_successful_stream_ordering", + ) + + def set_last_successful_stream_ordering( + self, destination: str, last_successful_stream_ordering: int + ): + """ + Marks that we have successfully sent the PDUs up to and including the + one specified. + + Args: + destination: the destination we have successfully sent to + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + return self.db.simple_upsert( + "destinations", + keyvalues={"destination": destination}, + values={"last_successful_stream_ordering": last_successful_stream_ordering}, + desc="set_last_successful_stream_ordering", + ) + + def get_catch_up_room_event_ids( + self, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ): + """ + Returns 50 event IDs and their corresponding stream_orderings that + correspond to the least recent events that have not yet been sent to + a destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + max_stream_order: an upper bound, inclusive, of the stream ordering + to return events for. + + Returns: + event_ids + """ + return self.db.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + max_stream_order, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + WHERE destination = ? + AND stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering, max_stream_order), + ) + event_ids = [row[0] for row in txn] + return event_ids + + def get_largest_destination_rooms_stream_order(self, destination: str): + """ + Returns the largest stream_ordering from the destination_rooms table + that corresponds to this destination. + """ + return self.db.runInteraction( + "get_largest_destination_rooms_stream_order", + self._get_largest_destination_rooms_stream_order_txn, + destination, + ) + + @staticmethod + def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]: + txn.execute( + """ + SELECT stream_ordering + FROM destination_rooms + WHERE destination = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """, + (destination,), + ) + rows = [r[0] for r in txn] + if rows: + return rows[0] + else: + return None + + def store_destination_rooms_entries( + self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int + ): + """ + Updates or creates `destination_rooms` entries in batch for a single event. + Args: + destinations: list of destinations + room_id: the room_id of the event + event_id: the ID of the event + stream_ordering: the stream_ordering of the event + """ + return self.db.runInteraction( + "store_destination_rooms_entries", + self.db.simple_upsert_many_txn, + "destination_rooms", + ["destination", "room_id"], + [(d, room_id) for d in destinations], + ["event_id", "stream_ordering"], + [(event_id, stream_ordering)] * len(destinations), + )