diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index ff99a7b0b6..3603f46678 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -47,6 +47,8 @@ from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES from synapse.types.state import StateFilter +from synapse.util import json_encoder +from synapse.util.iterutils import batch_iter if TYPE_CHECKING: from synapse.server import HomeServer @@ -325,6 +327,15 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS self._sliding_sync_membership_snapshots_bg_update, ) + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/TODO) + with db_conn.cursor(txn_name="resolve_sliding_sync") as txn: + _resolve_stale_data_in_sliding_sync_tables( + txn=txn, + ) + async def _background_reindex_fields_sender( self, progress: JsonDict, batch_size: int ) -> int: @@ -2147,3 +2158,245 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) return len(memberships_to_update_rows) + + +def _resolve_stale_data_in_sliding_sync_tables( + txn: LoggingTransaction, +) -> None: + """ + Clears stale/out-of-date entries from the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + + This accounts for when someone downgrades their Synapse version and then upgrades it + again. This will ensure that we don't have any stale/out-of-date data in the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new + events sent in rooms would have also needed to be written to the sliding sync + tables. For example a new event needs to bump `event_stream_ordering` in + `sliding_sync_joined_rooms` table or some state in the room changing (like the room + name). Or another example of someone's membership changing in a room affecting + `sliding_sync_membership_snapshots`. + + This way, if a row exists in the sliding sync tables, we are able to rely on it + (accurate data). And if a row doesn't exist, we use a fallback to get the same info + until the background updates fill in the rows or a new event comes in triggering it + to be fully inserted. + + FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + foreground update for + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + https://github.com/element-hq/synapse/issues/TODO) + """ + + _resolve_stale_data_in_sliding_sync_joined_rooms_table(txn) + _resolve_stale_data_in_sliding_sync_membership_snapshots_table(txn) + + +def _resolve_stale_data_in_sliding_sync_joined_rooms_table( + txn: LoggingTransaction, +) -> None: + """ + Clears stale/out-of-date entries from the `sliding_sync_joined_rooms` table and + kicks-off the background update to catch-up with what we missed while Synapse was + downgraded. + + See `_resolve_stale_data_in_sliding_sync_tables()` description above for more + context. + """ + + # Find the point when we stopped writing to the `sliding_sync_joined_rooms` table + txn.execute( + """ + SELECT event_stream_ordering + FROM sliding_sync_joined_rooms + ORDER BY event_stream_ordering DESC + LIMIT 1 + """, + ) + + # If we have nothing written to the `sliding_sync_joined_rooms` table, there is + # nothing to clean up + row = cast(Optional[Tuple[int]], txn.fetchone()) + max_stream_ordering_sliding_sync_joined_rooms_table = None + depends_on = None + if row is not None: + (max_stream_ordering_sliding_sync_joined_rooms_table,) = row + + txn.execute( + """ + SELECT room_id + FROM events + WHERE stream_ordering > ? + GROUP BY room_id + ORDER BY MAX(stream_ordering) ASC + """, + (max_stream_ordering_sliding_sync_joined_rooms_table,), + ) + + room_rows = txn.fetchall() + # No new events have been written to the `events` table since the last time we wrote + # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is + # the expected normal scenario for people who have not downgraded their Synapse + # version. + if not room_rows: + return + + # 1000 is an arbitrary batch size with no testing + for chunk in batch_iter(room_rows, 1000): + # Handle updating the `sliding_sync_joined_rooms` table + # + # Clear out the stale data + DatabasePool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_joined_rooms", + keys=("room_id",), + values=chunk, + ) + + # Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms + # that went stale and now need to be recalculated. + DatabasePool.simple_upsert_many_txn_native_upsert( + txn, + table="sliding_sync_joined_rooms_to_recalculate", + key_names=("room_id",), + key_values=chunk, + value_names=(), + # No value columns, therefore make a blank list so that the following + # zip() works correctly. + value_values=[() for x in range(len(chunk))], + ) + else: + # Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is + # nothing in the `sliding_sync_joined_rooms` table + DatabasePool.simple_upsert_txn_native_upsert( + txn, + table="background_updates", + keyvalues={ + "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE + }, + values={}, + # Only insert the row if it doesn't already exist. If it already exists, + # we're already working on it + insertion_values={ + "progress_json": "{}", + }, + ) + depends_on = ( + _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE + ) + + # Now kick-off the background update to catch-up with what we missed while Synapse + # was downgraded. + # + # We may need to catch-up on everything if we have nothing written to the + # `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms + # on their server (so the normal background update completes), downgrade Synapse + # versions, join and create some new rooms, and upgrade again. + DatabasePool.simple_upsert_txn_native_upsert( + txn, + table="background_updates", + keyvalues={ + "update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE + }, + values={}, + # Only insert the row if it doesn't already exist. If it already exists, we will + # eventually fill in the rows we're trying to populate. + insertion_values={ + # Empty progress is expected since it's not used for this background update. + "progress_json": "{}", + # Wait for the prefill to finish + "depends_on": depends_on, + }, + ) + + +def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( + txn: LoggingTransaction, +) -> None: + """ + Clears stale/out-of-date entries from the `sliding_sync_membership_snapshots` table + and kicks-off the background update to catch-up with what we missed while Synapse + was downgraded. + + See `_resolve_stale_data_in_sliding_sync_tables()` description above for more + context. + """ + + # Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table + txn.execute( + """ + SELECT event_stream_ordering + FROM sliding_sync_membership_snapshots + ORDER BY event_stream_ordering DESC + LIMIT 1 + """, + ) + + # If we have nothing written to the `sliding_sync_membership_snapshots` table, + # there is nothing to clean up + row = cast(Optional[Tuple[int]], txn.fetchone()) + max_stream_ordering_sliding_sync_membership_snapshots_table = None + if row is not None: + (max_stream_ordering_sliding_sync_membership_snapshots_table,) = row + + # XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is + # set out-of-band, there is no way to tell whether it was set while Synapse was + # downgraded. The only thing the user can do is `/forget` again if they run into + # this. + # + # This only picks up changes to memberships. + txn.execute( + """ + SELECT user_id, room_id + FROM local_current_membership + WHERE event_stream_ordering > ? + ORDER BY event_stream_ordering ASC + """, + (max_stream_ordering_sliding_sync_membership_snapshots_table,), + ) + + membership_rows = txn.fetchall() + # No new events have been written to the `events` table since the last time we wrote + # to the `sliding_sync_membership_snapshots` table so there is nothing to clean up. + # This is the expected normal scenario for people who have not downgraded their + # Synapse version. + if not membership_rows: + return + + # 1000 is an arbitrary batch size with no testing + for chunk in batch_iter(membership_rows, 1000): + # Handle updating the `sliding_sync_membership_snapshots` table + # + DatabasePool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_membership_snapshots", + keys=("user_id", "room_id"), + values=chunk, + ) + + # Now kick-off the background update to catch-up with what we missed while Synapse + # was downgraded. + # + # We may need to catch-up on everything if we have nothing written to the + # `sliding_sync_membership_snapshots` table yet. This could happen if someone had + # zero rooms on their server (so the normal background update completes), downgrade + # Synapse versions, join and create some new rooms, and upgrade again. + # + progress_json: JsonDict = {} + if max_stream_ordering_sliding_sync_membership_snapshots_table is not None: + progress_json["last_event_stream_ordering"] = ( + max_stream_ordering_sliding_sync_membership_snapshots_table + ) + + DatabasePool.simple_upsert_txn_native_upsert( + txn, + table="background_updates", + keyvalues={ + "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE + }, + values={}, + # Only insert the row if it doesn't already exist. If it already exists, we will + # eventually fill in the rows we're trying to populate. + insertion_values={ + "progress_json": json_encoder.encode(progress_json), + }, + ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 0c171b380b..aaffe5ecc9 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -32,24 +32,15 @@ from typing import ( Optional, TextIO, Tuple, - cast, ) import attr from synapse.config.homeserver import HomeServerConfig -from synapse.storage.database import ( - DatabasePool, - LoggingDatabaseConnection, - LoggingTransaction, -) -from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates +from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor -from synapse.types import JsonDict -from synapse.util import json_encoder -from synapse.util.iterutils import batch_iter logger = logging.getLogger(__name__) @@ -576,256 +567,6 @@ def _upgrade_existing_database( logger.info("Schema now up to date") - # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the - # foreground update for - # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by - # https://github.com/element-hq/synapse/issues/TODO) - _resolve_stale_data_in_sliding_sync_tables( - txn=cur, - ) - - -def _resolve_stale_data_in_sliding_sync_tables( - txn: LoggingTransaction, -) -> None: - """ - Clears stale/out-of-date entries from the - `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. - - This accounts for when someone downgrades their Synapse version and then upgrades it - again. This will ensure that we don't have any stale/out-of-date data in the - `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables since any new - events sent in rooms would have also needed to be written to the sliding sync - tables. For example a new event needs to bump `event_stream_ordering` in - `sliding_sync_joined_rooms` table or some state in the room changing (like the room - name). Or another example of someone's membership changing in a room affecting - `sliding_sync_membership_snapshots`. - - This way, if a row exists in the sliding sync tables, we are able to rely on it - (accurate data). And if a row doesn't exist, we use a fallback to get the same info - until the background updates fill in the rows or a new event comes in triggering it - to be fully inserted. - - FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the - foreground update for - `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by - https://github.com/element-hq/synapse/issues/TODO) - """ - - _resolve_stale_data_in_sliding_sync_joined_rooms_table(txn) - _resolve_stale_data_in_sliding_sync_membership_snapshots_table(txn) - - -def _resolve_stale_data_in_sliding_sync_joined_rooms_table( - txn: LoggingTransaction, -) -> None: - """ - Clears stale/out-of-date entries from the `sliding_sync_joined_rooms` table and - kicks-off the background update to catch-up with what we missed while Synapse was - downgraded. - - See `_resolve_stale_data_in_sliding_sync_tables()` description above for more - context. - """ - - # Find the point when we stopped writing to the `sliding_sync_joined_rooms` table - txn.execute( - """ - SELECT event_stream_ordering - FROM sliding_sync_joined_rooms - ORDER BY event_stream_ordering DESC - LIMIT 1 - """, - ) - - # If we have nothing written to the `sliding_sync_joined_rooms` table, there is - # nothing to clean up - row = cast(Optional[Tuple[int]], txn.fetchone()) - max_stream_ordering_sliding_sync_joined_rooms_table = None - depends_on = None - if row is not None: - (max_stream_ordering_sliding_sync_joined_rooms_table,) = row - - txn.execute( - """ - SELECT room_id - FROM events - WHERE stream_ordering > ? - GROUP BY room_id - ORDER BY MAX(stream_ordering) ASC - """, - (max_stream_ordering_sliding_sync_joined_rooms_table,), - ) - - room_rows = txn.fetchall() - # No new events have been written to the `events` table since the last time we wrote - # to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is - # the expected normal scenario for people who have not downgraded their Synapse - # version. - if not room_rows: - return - - # 1000 is an arbitrary batch size with no testing - for chunk in batch_iter(room_rows, 1000): - # Handle updating the `sliding_sync_joined_rooms` table - # - # Clear out the stale data - DatabasePool.simple_delete_many_batch_txn( - txn, - table="sliding_sync_joined_rooms", - keys=("room_id",), - values=chunk, - ) - - # Update the `sliding_sync_joined_rooms_to_recalculate` table with the rooms - # that went stale and now need to be recalculated. - DatabasePool.simple_upsert_many_txn_native_upsert( - txn, - table="sliding_sync_joined_rooms_to_recalculate", - key_names=("room_id",), - key_values=chunk, - value_names=(), - # No value columns, therefore make a blank list so that the following - # zip() works correctly. - value_values=[() for x in range(len(chunk))], - ) - else: - # Re-run the `sliding_sync_joined_rooms_to_recalculate` prefill if there is - # nothing in the `sliding_sync_joined_rooms` table - DatabasePool.simple_upsert_txn_native_upsert( - txn, - table="background_updates", - keyvalues={ - "update_name": _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE - }, - values={}, - # Only insert the row if it doesn't already exist. If it already exists, - # we're already working on it - insertion_values={ - "progress_json": "{}", - }, - ) - depends_on = ( - _BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE - ) - - # Now kick-off the background update to catch-up with what we missed while Synapse - # was downgraded. - # - # We may need to catch-up on everything if we have nothing written to the - # `sliding_sync_joined_rooms` table yet. This could happen if someone had zero rooms - # on their server (so the normal background update completes), downgrade Synapse - # versions, join and create some new rooms, and upgrade again. - DatabasePool.simple_upsert_txn_native_upsert( - txn, - table="background_updates", - keyvalues={ - "update_name": _BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE - }, - values={}, - # Only insert the row if it doesn't already exist. If it already exists, we will - # eventually fill in the rows we're trying to populate. - insertion_values={ - # Empty progress is expected since it's not used for this background update. - "progress_json": "{}", - # Wait for the prefill to finish - "depends_on": depends_on, - }, - ) - - -def _resolve_stale_data_in_sliding_sync_membership_snapshots_table( - txn: LoggingTransaction, -) -> None: - """ - Clears stale/out-of-date entries from the `sliding_sync_membership_snapshots` table - and kicks-off the background update to catch-up with what we missed while Synapse - was downgraded. - - See `_resolve_stale_data_in_sliding_sync_tables()` description above for more - context. - """ - - # Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table - txn.execute( - """ - SELECT event_stream_ordering - FROM sliding_sync_membership_snapshots - ORDER BY event_stream_ordering DESC - LIMIT 1 - """, - ) - - # If we have nothing written to the `sliding_sync_membership_snapshots` table, - # there is nothing to clean up - row = cast(Optional[Tuple[int]], txn.fetchone()) - max_stream_ordering_sliding_sync_membership_snapshots_table = None - if row is not None: - (max_stream_ordering_sliding_sync_membership_snapshots_table,) = row - - # XXX: Since `forgotten` is simply a flag on the `room_memberships` table that is - # set out-of-band, there is no way to tell whether it was set while Synapse was - # downgraded. The only thing the user can do is `/forget` again if they run into - # this. - # - # This only picks up changes to memberships. - txn.execute( - """ - SELECT user_id, room_id - FROM local_current_membership - WHERE event_stream_ordering > ? - ORDER BY event_stream_ordering ASC - """, - (max_stream_ordering_sliding_sync_membership_snapshots_table,), - ) - - membership_rows = txn.fetchall() - # No new events have been written to the `events` table since the last time we wrote - # to the `sliding_sync_membership_snapshots` table so there is nothing to clean up. - # This is the expected normal scenario for people who have not downgraded their - # Synapse version. - if not membership_rows: - return - - # 1000 is an arbitrary batch size with no testing - for chunk in batch_iter(membership_rows, 1000): - # Handle updating the `sliding_sync_membership_snapshots` table - # - DatabasePool.simple_delete_many_batch_txn( - txn, - table="sliding_sync_membership_snapshots", - keys=("user_id", "room_id"), - values=chunk, - ) - - # Now kick-off the background update to catch-up with what we missed while Synapse - # was downgraded. - # - # We may need to catch-up on everything if we have nothing written to the - # `sliding_sync_membership_snapshots` table yet. This could happen if someone had - # zero rooms on their server (so the normal background update completes), downgrade - # Synapse versions, join and create some new rooms, and upgrade again. - # - progress_json: JsonDict = {} - if max_stream_ordering_sliding_sync_membership_snapshots_table is not None: - progress_json["last_event_stream_ordering"] = ( - max_stream_ordering_sliding_sync_membership_snapshots_table - ) - - DatabasePool.simple_upsert_txn_native_upsert( - txn, - table="background_updates", - keyvalues={ - "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE - }, - values={}, - # Only insert the row if it doesn't already exist. If it already exists, we will - # eventually fill in the rows we're trying to populate. - insertion_values={ - "progress_json": json_encoder.encode(progress_json), - }, - ) - def _apply_module_schemas( txn: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 0770ea5e33..f6a6796e7b 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -33,8 +33,8 @@ from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.storage.databases.main.events import DeltaState -from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates -from synapse.storage.prepare_database import ( +from synapse.storage.databases.main.events_bg_updates import ( + _BackgroundUpdates, _resolve_stale_data_in_sliding_sync_joined_rooms_table, _resolve_stale_data_in_sliding_sync_membership_snapshots_table, )