diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index aaffe5ecc9..2b323c0272 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -37,10 +37,15 @@ from typing import ( import attr from synapse.config.homeserver import HomeServerConfig -from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor +from synapse.util.iterutils import batch_iter logger = logging.getLogger(__name__) @@ -567,6 +572,161 @@ def _upgrade_existing_database( logger.info("Schema now up to date") + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/TODO) + _clear_out_of_date_sliding_sync_tables( + txn=cur, + ) + + +def _clear_out_of_date_sliding_sync_tables( + txn: LoggingTransaction, +) -> None: + """ + Clears out-of-date entries from the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + + This accounts for when someone downgrades their Synapse version and then upgrades it + again. This will ensure that we don't have any out-of-date data in the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + + FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + foreground update for + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + https://github.com/element-hq/synapse/issues/TODO) + """ + + _clear_out_of_date_sliding_sync_joined_rooms_table(txn) + _clear_out_of_date_sliding_sync_membership_snapshots_table(txn) + + +def _clear_out_of_date_sliding_sync_joined_rooms_table( + txn: LoggingTransaction, +) -> None: + """ + Clears out-of-date entries from the `sliding_sync_joined_rooms` table. + + This accounts for when someone downgrades their Synapse version and then upgrades it + again. This will ensure that we don't have any out-of-date data in the + `sliding_sync_joined_rooms` table. + + FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + foreground update for + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + https://github.com/element-hq/synapse/issues/TODO) + """ + + # Find the point when we stopped writing to the `sliding_sync_joined_rooms` table + txn.execute( + """ + SELECT event_stream_ordering + FROM sliding_sync_joined_rooms + ORDER BY event_stream_ordering DESC + LIMIT 1 + """, + ) + + row = txn.fetchone() + # We have nothing written to the `sliding_sync_joined_rooms` table so there is + # nothing to clean up + if row is None: + return + + max_stream_ordering_sliding_sync_joined_rooms_table = row[0] + + txn.execute( + """ + SELECT DISTINCT(room_id) + FROM events + WHERE stream_ordering > ? + ORDER BY stream_ordering DESC + """, + (max_stream_ordering_sliding_sync_joined_rooms_table,), + ) + + room_rows = txn.fetchall() + # No new events have been written to the `events` table since the last time we wrote + # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is + # the expected normal scenario for people who have not downgraded their Synapse + # version. + if not room_rows: + return + + for chunk in batch_iter(room_rows, 1000): + # Handle updating the `sliding_sync_joined_rooms` table + # + DatabasePool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_joined_rooms", + keys=("room_id",), + values=chunk, + ) + + +def _clear_out_of_date_sliding_sync_membership_snapshots_table( + txn: LoggingTransaction, +) -> None: + """ + Clears out-of-date entries from the `sliding_sync_membership_snapshots` table. + + This accounts for when someone downgrades their Synapse version and then upgrades it + again. This will ensure that we don't have any out-of-date data in the + `sliding_sync_membership_snapshots` table. + + FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + foreground update for + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + https://github.com/element-hq/synapse/issues/TODO) + """ + + # Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table + txn.execute( + """ + SELECT event_stream_ordering + FROM sliding_sync_membership_snapshots + ORDER BY event_stream_ordering DESC + LIMIT 1 + """, + ) + + row = txn.fetchone() + # We have nothing written to the `sliding_sync_membership_snapshots` table so there is + # nothing to clean up + if row is None: + return + + max_stream_ordering_sliding_sync_membership_snapshots_table = row[0] + + txn.execute( + """ + SELECT DISTINCT(user_id, room_id) + FROM room_memberships + WHERE event_stream_ordering > ? + ORDER BY event_stream_ordering DESC + """, + (max_stream_ordering_sliding_sync_membership_snapshots_table,), + ) + + membership_rows = txn.fetchall() + # No new events have been written to the `events` table since the last time we wrote + # to the `sliding_sync_membership_snapshots` table so there is nothing to clean up. + # This is the expected normal scenario for people who have not downgraded their + # Synapse version. + if not membership_rows: + return + + for chunk in batch_iter(membership_rows, 1000): + # Handle updating the `sliding_sync_membership_snapshots` table + # + DatabasePool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_membership_snapshots", + keys=("user_id", "room_id"), + values=chunk, + ) + def _apply_module_schemas( txn: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig