Clear out-of-date rows
This commit is contained in:
@@ -37,10 +37,15 @@ from typing import (
|
||||
import attr
|
||||
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.util.iterutils import batch_iter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -567,6 +572,161 @@ def _upgrade_existing_database(
|
||||
|
||||
logger.info("Schema now up to date")
|
||||
|
||||
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
# foreground update for
|
||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||
# https://github.com/element-hq/synapse/issues/TODO)
|
||||
_clear_out_of_date_sliding_sync_tables(
|
||||
txn=cur,
|
||||
)
|
||||
|
||||
|
||||
def _clear_out_of_date_sliding_sync_tables(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
"""
|
||||
Clears out-of-date entries from the
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
|
||||
|
||||
This accounts for when someone downgrades their Synapse version and then upgrades it
|
||||
again. This will ensure that we don't have any out-of-date data in the
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
|
||||
|
||||
FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
foreground update for
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||
https://github.com/element-hq/synapse/issues/TODO)
|
||||
"""
|
||||
|
||||
_clear_out_of_date_sliding_sync_joined_rooms_table(txn)
|
||||
_clear_out_of_date_sliding_sync_membership_snapshots_table(txn)
|
||||
|
||||
|
||||
def _clear_out_of_date_sliding_sync_joined_rooms_table(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
"""
|
||||
Clears out-of-date entries from the `sliding_sync_joined_rooms` table.
|
||||
|
||||
This accounts for when someone downgrades their Synapse version and then upgrades it
|
||||
again. This will ensure that we don't have any out-of-date data in the
|
||||
`sliding_sync_joined_rooms` table.
|
||||
|
||||
FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
foreground update for
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||
https://github.com/element-hq/synapse/issues/TODO)
|
||||
"""
|
||||
|
||||
# Find the point when we stopped writing to the `sliding_sync_joined_rooms` table
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_stream_ordering
|
||||
FROM sliding_sync_joined_rooms
|
||||
ORDER BY event_stream_ordering DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
)
|
||||
|
||||
row = txn.fetchone()
|
||||
# We have nothing written to the `sliding_sync_joined_rooms` table so there is
|
||||
# nothing to clean up
|
||||
if row is None:
|
||||
return
|
||||
|
||||
max_stream_ordering_sliding_sync_joined_rooms_table = row[0]
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT(room_id)
|
||||
FROM events
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering DESC
|
||||
""",
|
||||
(max_stream_ordering_sliding_sync_joined_rooms_table,),
|
||||
)
|
||||
|
||||
room_rows = txn.fetchall()
|
||||
# No new events have been written to the `events` table since the last time we wrote
|
||||
# to the `sliding_sync_joined_rooms` table so there is nothing to clean up. This is
|
||||
# the expected normal scenario for people who have not downgraded their Synapse
|
||||
# version.
|
||||
if not room_rows:
|
||||
return
|
||||
|
||||
for chunk in batch_iter(room_rows, 1000):
|
||||
# Handle updating the `sliding_sync_joined_rooms` table
|
||||
#
|
||||
DatabasePool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_joined_rooms",
|
||||
keys=("room_id",),
|
||||
values=chunk,
|
||||
)
|
||||
|
||||
|
||||
def _clear_out_of_date_sliding_sync_membership_snapshots_table(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
"""
|
||||
Clears out-of-date entries from the `sliding_sync_membership_snapshots` table.
|
||||
|
||||
This accounts for when someone downgrades their Synapse version and then upgrades it
|
||||
again. This will ensure that we don't have any out-of-date data in the
|
||||
`sliding_sync_membership_snapshots` table.
|
||||
|
||||
FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
|
||||
foreground update for
|
||||
`sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
|
||||
https://github.com/element-hq/synapse/issues/TODO)
|
||||
"""
|
||||
|
||||
# Find the point when we stopped writing to the `sliding_sync_membership_snapshots` table
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_stream_ordering
|
||||
FROM sliding_sync_membership_snapshots
|
||||
ORDER BY event_stream_ordering DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
)
|
||||
|
||||
row = txn.fetchone()
|
||||
# We have nothing written to the `sliding_sync_membership_snapshots` table so there is
|
||||
# nothing to clean up
|
||||
if row is None:
|
||||
return
|
||||
|
||||
max_stream_ordering_sliding_sync_membership_snapshots_table = row[0]
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT(user_id, room_id)
|
||||
FROM room_memberships
|
||||
WHERE event_stream_ordering > ?
|
||||
ORDER BY event_stream_ordering DESC
|
||||
""",
|
||||
(max_stream_ordering_sliding_sync_membership_snapshots_table,),
|
||||
)
|
||||
|
||||
membership_rows = txn.fetchall()
|
||||
# No new events have been written to the `events` table since the last time we wrote
|
||||
# to the `sliding_sync_membership_snapshots` table so there is nothing to clean up.
|
||||
# This is the expected normal scenario for people who have not downgraded their
|
||||
# Synapse version.
|
||||
if not membership_rows:
|
||||
return
|
||||
|
||||
for chunk in batch_iter(membership_rows, 1000):
|
||||
# Handle updating the `sliding_sync_membership_snapshots` table
|
||||
#
|
||||
DatabasePool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keys=("user_id", "room_id"),
|
||||
values=chunk,
|
||||
)
|
||||
|
||||
|
||||
def _apply_module_schemas(
|
||||
txn: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig
|
||||
|
||||
Reference in New Issue
Block a user