Fix sequence migration for autoincrement tables in synapse_port_db (#18677)
Closes https://github.com/element-hq/synapse/issues/18053 - the sliding sync tables will now migrate properly.
This commit is contained in:
1
changelog.d/18677.bugfix
Normal file
1
changelog.d/18677.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix sliding_sync_connections related errors when porting from SQLite to Postgres.
|
||||
@@ -190,13 +190,18 @@ APPEND_ONLY_TABLES = [
|
||||
"users",
|
||||
]
|
||||
|
||||
# These tables declare their id column with "PRIMARY KEY AUTOINCREMENT" on sqlite side
|
||||
# and with "PRIMARY KEY GENERATED ALWAYS AS IDENTITY" on postgres side. This creates an
|
||||
# implicit sequence that needs its value to be migrated separately. Additionally,
|
||||
# inserting on postgres side needs to use the "OVERRIDING SYSTEM VALUE" modifier.
|
||||
AUTOINCREMENT_TABLES = {
|
||||
"sliding_sync_connections",
|
||||
"sliding_sync_connection_positions",
|
||||
"sliding_sync_connection_required_state",
|
||||
"state_groups_pending_deletion",
|
||||
}
|
||||
|
||||
IGNORED_TABLES = {
|
||||
# Porting the auto generated sequence in this table is non-trivial.
|
||||
# None of the entries in this list are mandatory for Synapse to keep working.
|
||||
# If state group disk space is an issue after the port, the
|
||||
# `mark_unreferenced_state_groups_for_deletion_bg_update` background task can be run again.
|
||||
"state_groups_pending_deletion",
|
||||
# We don't port these tables, as they're a faff and we can regenerate
|
||||
# them anyway.
|
||||
"user_directory",
|
||||
@@ -284,11 +289,17 @@ class Store(
|
||||
return self.db_pool.runInteraction("execute_sql", r)
|
||||
|
||||
def insert_many_txn(
|
||||
self, txn: LoggingTransaction, table: str, headers: List[str], rows: List[Tuple]
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
headers: List[str],
|
||||
rows: List[Tuple],
|
||||
override_system_value: bool = False,
|
||||
) -> None:
|
||||
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
|
||||
sql = "INSERT INTO %s (%s) %s VALUES (%s)" % (
|
||||
table,
|
||||
", ".join(k for k in headers),
|
||||
"OVERRIDING SYSTEM VALUE" if override_system_value else "",
|
||||
", ".join("%s" for _ in headers),
|
||||
)
|
||||
|
||||
@@ -532,7 +543,13 @@ class Porter:
|
||||
|
||||
def insert(txn: LoggingTransaction) -> None:
|
||||
assert headers is not None
|
||||
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
|
||||
self.postgres_store.insert_many_txn(
|
||||
txn,
|
||||
table,
|
||||
headers[1:],
|
||||
rows,
|
||||
override_system_value=table in AUTOINCREMENT_TABLES,
|
||||
)
|
||||
|
||||
self.postgres_store.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
@@ -884,6 +901,19 @@ class Porter:
|
||||
],
|
||||
)
|
||||
|
||||
await self._setup_autoincrement_sequence(
|
||||
"sliding_sync_connection_positions", "connection_position"
|
||||
)
|
||||
await self._setup_autoincrement_sequence(
|
||||
"sliding_sync_connection_required_state", "required_state_id"
|
||||
)
|
||||
await self._setup_autoincrement_sequence(
|
||||
"sliding_sync_connections", "connection_key"
|
||||
)
|
||||
await self._setup_autoincrement_sequence(
|
||||
"state_groups_pending_deletion", "sequence_number"
|
||||
)
|
||||
|
||||
# Step 3. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
|
||||
@@ -1216,6 +1246,49 @@ class Porter:
|
||||
"_setup_%s" % (sequence_name,), r
|
||||
)
|
||||
|
||||
async def _setup_autoincrement_sequence(
|
||||
self,
|
||||
sqlite_table_name: str,
|
||||
sqlite_id_column_name: str,
|
||||
) -> None:
|
||||
"""Set a sequence to the correct value. Use where id column was declared with PRIMARY KEY AUTOINCREMENT."""
|
||||
seq_name = await self._pg_get_serial_sequence(
|
||||
sqlite_table_name, sqlite_id_column_name
|
||||
)
|
||||
if seq_name is None:
|
||||
raise Exception(
|
||||
"implicit sequence not found for table " + sqlite_table_name
|
||||
)
|
||||
|
||||
seq_value = await self.sqlite_store.db_pool.simple_select_one_onecol(
|
||||
table="sqlite_sequence",
|
||||
keyvalues={"name": sqlite_table_name},
|
||||
retcol="seq",
|
||||
allow_none=True,
|
||||
)
|
||||
if seq_value is None:
|
||||
return
|
||||
|
||||
def r(txn: LoggingTransaction) -> None:
|
||||
sql = "ALTER SEQUENCE %s RESTART WITH" % (seq_name,)
|
||||
txn.execute(sql + " %s", (seq_value + 1,))
|
||||
|
||||
await self.postgres_store.db_pool.runInteraction("_setup_%s" % (seq_name,), r)
|
||||
|
||||
async def _pg_get_serial_sequence(self, table: str, column: str) -> Optional[str]:
|
||||
"""Returns the name of the postgres sequence associated with a column, or NULL."""
|
||||
|
||||
def r(txn: LoggingTransaction) -> Optional[str]:
|
||||
txn.execute("SELECT pg_get_serial_sequence('%s', '%s')" % (table, column))
|
||||
result = txn.fetchone()
|
||||
if not result:
|
||||
return None
|
||||
return result[0]
|
||||
|
||||
return await self.postgres_store.db_pool.runInteraction(
|
||||
"_pg_get_serial_sequence", r
|
||||
)
|
||||
|
||||
async def _setup_auth_chain_sequence(self) -> None:
|
||||
curr_chain_id: Optional[
|
||||
int
|
||||
|
||||
Reference in New Issue
Block a user