Merge commit '4a54b821b' into anoa/dinsic_release_1_23_1
This commit is contained in:
@@ -22,7 +22,7 @@ import logging
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional
|
||||
from typing import Dict, Optional, Set
|
||||
|
||||
import yaml
|
||||
|
||||
@@ -294,6 +294,34 @@ class Porter(object):
|
||||
|
||||
return table, already_ported, total_to_port, forward_chunk, backward_chunk
|
||||
|
||||
async def get_table_constraints(self) -> Dict[str, Set[str]]:
|
||||
"""Returns a map of tables that have foreign key constraints to tables they depend on.
|
||||
"""
|
||||
|
||||
def _get_constraints(txn):
|
||||
# We can pull the information about foreign key constraints out from
|
||||
# the postgres schema tables.
|
||||
sql = """
|
||||
SELECT DISTINCT
|
||||
tc.table_name,
|
||||
ccu.table_name AS foreign_table_name
|
||||
FROM
|
||||
information_schema.table_constraints AS tc
|
||||
INNER JOIN information_schema.constraint_column_usage AS ccu
|
||||
USING (table_schema, constraint_name)
|
||||
WHERE tc.constraint_type = 'FOREIGN KEY';
|
||||
"""
|
||||
txn.execute(sql)
|
||||
|
||||
results = {}
|
||||
for table, foreign_table in txn:
|
||||
results.setdefault(table, set()).add(foreign_table)
|
||||
return results
|
||||
|
||||
return await self.postgres_store.db_pool.runInteraction(
|
||||
"get_table_constraints", _get_constraints
|
||||
)
|
||||
|
||||
async def handle_table(
|
||||
self, table, postgres_size, table_size, forward_chunk, backward_chunk
|
||||
):
|
||||
@@ -593,7 +621,18 @@ class Porter(object):
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
|
||||
# Step 2. Get tables.
|
||||
# Step 2. Set up sequences
|
||||
#
|
||||
# We do this before porting the tables so that event if we fail half
|
||||
# way through the postgres DB always have sequences that are greater
|
||||
# than their respective tables. If we don't then creating the
|
||||
# `DataStore` object will fail due to the inconsistency.
|
||||
self.progress.set_state("Setting up sequence generators")
|
||||
await self._setup_state_group_id_seq()
|
||||
await self._setup_user_id_seq()
|
||||
await self._setup_events_stream_seqs()
|
||||
|
||||
# Step 3. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
|
||||
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
|
||||
@@ -608,7 +647,7 @@ class Porter(object):
|
||||
tables = set(sqlite_tables) & set(postgres_tables)
|
||||
logger.info("Found %d tables", len(tables))
|
||||
|
||||
# Step 3. Figure out what still needs copying
|
||||
# Step 4. Figure out what still needs copying
|
||||
self.progress.set_state("Checking on port progress")
|
||||
setup_res = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -621,21 +660,43 @@ class Porter(object):
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
# Map from table name to args passed to `handle_table`, i.e. a tuple
|
||||
# of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
|
||||
tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}
|
||||
|
||||
# Step 4. Do the copying.
|
||||
# Step 5. Do the copying.
|
||||
#
|
||||
# This is slightly convoluted as we need to ensure tables are ported
|
||||
# in the correct order due to foreign key constraints.
|
||||
self.progress.set_state("Copying to postgres")
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[run_in_background(self.handle_table, *res) for res in setup_res],
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
|
||||
# Step 5. Set up sequences
|
||||
self.progress.set_state("Setting up sequence generators")
|
||||
await self._setup_state_group_id_seq()
|
||||
await self._setup_user_id_seq()
|
||||
await self._setup_events_stream_seqs()
|
||||
constraints = await self.get_table_constraints()
|
||||
tables_ported = set() # type: Set[str]
|
||||
|
||||
while tables_to_port_info_map:
|
||||
# Pulls out all tables that are still to be ported and which
|
||||
# only depend on tables that are already ported (if any).
|
||||
tables_to_port = [
|
||||
table
|
||||
for table in tables_to_port_info_map
|
||||
if not constraints.get(table, set()) - tables_ported
|
||||
]
|
||||
|
||||
await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(
|
||||
self.handle_table,
|
||||
table,
|
||||
*tables_to_port_info_map.pop(table),
|
||||
)
|
||||
for table in tables_to_port
|
||||
],
|
||||
consumeErrors=True,
|
||||
)
|
||||
)
|
||||
|
||||
tables_ported.update(tables_to_port)
|
||||
|
||||
self.progress.done()
|
||||
except Exception as e:
|
||||
@@ -794,45 +855,62 @@ class Porter(object):
|
||||
|
||||
return done, remaining + done
|
||||
|
||||
def _setup_state_group_id_seq(self):
|
||||
async def _setup_state_group_id_seq(self):
|
||||
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
|
||||
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
|
||||
)
|
||||
|
||||
if not curr_id:
|
||||
return
|
||||
|
||||
def r(txn):
|
||||
txn.execute("SELECT MAX(id) FROM state_groups")
|
||||
curr_id = txn.fetchone()[0]
|
||||
if not curr_id:
|
||||
return
|
||||
next_id = curr_id + 1
|
||||
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
|
||||
await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
|
||||
|
||||
async def _setup_user_id_seq(self):
|
||||
curr_id = await self.sqlite_store.db_pool.runInteraction(
|
||||
"setup_user_id_seq", find_max_generated_user_id_localpart
|
||||
)
|
||||
|
||||
def _setup_user_id_seq(self):
|
||||
def r(txn):
|
||||
next_id = find_max_generated_user_id_localpart(txn) + 1
|
||||
next_id = curr_id + 1
|
||||
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
|
||||
|
||||
def _setup_events_stream_seqs(self):
|
||||
def r(txn):
|
||||
txn.execute("SELECT MAX(stream_ordering) FROM events")
|
||||
curr_id = txn.fetchone()[0]
|
||||
if curr_id:
|
||||
next_id = curr_id + 1
|
||||
async def _setup_events_stream_seqs(self):
|
||||
"""Set the event stream sequences to the correct values.
|
||||
"""
|
||||
|
||||
# We get called before we've ported the events table, so we need to
|
||||
# fetch the current positions from the SQLite store.
|
||||
curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
|
||||
table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True
|
||||
)
|
||||
|
||||
curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={},
|
||||
retcol="MAX(-MIN(stream_ordering), 1)",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def _setup_events_stream_seqs_set_pos(txn):
|
||||
if curr_forward_id:
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,)
|
||||
"ALTER SEQUENCE events_stream_seq RESTART WITH %s",
|
||||
(curr_forward_id + 1,),
|
||||
)
|
||||
|
||||
txn.execute("SELECT -MIN(stream_ordering) FROM events")
|
||||
curr_id = txn.fetchone()[0]
|
||||
if curr_id:
|
||||
next_id = curr_id + 1
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(next_id,),
|
||||
)
|
||||
txn.execute(
|
||||
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
|
||||
(curr_backward_id + 1,),
|
||||
)
|
||||
|
||||
return self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_events_stream_seqs", r
|
||||
return await self.postgres_store.db_pool.runInteraction(
|
||||
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user