1
0

Make synapse_port_db correctly create indexes (#6102)

* commit 'c97ed64db':
  Make synapse_port_db correctly create indexes (#6102)
This commit is contained in:
Andrew Morgan
2020-03-16 15:26:35 +00:00
2 changed files with 133 additions and 54 deletions

1
changelog.d/6102.bugfix Normal file
View File

@@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.

View File

@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,9 +30,23 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
from synapse.storage.devices import DeviceBackgroundUpdateStore
from synapse.storage.engines import create_engine
from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
from synapse.storage.prepare_database import prepare_database
from synapse.storage.registration import RegistrationBackgroundUpdateStore
from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.search import SearchBackgroundUpdateStore
from synapse.storage.state import StateBackgroundUpdateStore
from synapse.storage.stats import StatsStore
from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
from synapse.util import Clock
logger = logging.getLogger("synapse_port_db")
@@ -98,33 +113,24 @@ APPEND_ONLY_TABLES = [
end_error_exec_info = None
class Store(object):
"""This object is used to pull out some of the convenience API from the
Storage layer.
*All* database interactions should go through this object.
"""
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
_simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__[
"_simple_select_one_onecol_txn"
]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
class Store(
ClientIpBackgroundUpdateStore,
DeviceInboxBackgroundUpdateStore,
DeviceBackgroundUpdateStore,
EventsBackgroundUpdatesStore,
MediaRepositoryBackgroundUpdateStore,
RegistrationBackgroundUpdateStore,
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super().__init__(db_conn, hs)
self.db_pool = hs.get_db_pool()
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
@@ -150,7 +156,8 @@ class Store(object):
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise
return self.db_pool.runWithConnection(r)
with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))
def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
@@ -176,6 +183,25 @@ class Store(object):
raise
class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
def get_db_conn(self):
return self.db_conn
def get_db_pool(self):
return self.db_pool
def get_clock(self):
return self.clock
class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
@@ -447,31 +473,75 @@ class Porter(object):
db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, config):
"""Builds and returns a database store using the provided configuration.
Args:
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
Returns:
The built Store object.
"""
engine = create_engine(config)
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
store = Store(conn, hs)
yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
)
return store
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()
if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
)
@defer.inlineCallbacks
def run(self):
try:
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"], **self.sqlite_config["args"]
self.sqlite_store = yield self.build_db_store(self.sqlite_config)
# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)
self.postgres_store = yield self.build_db_store(
self.hs_config.database_config
)
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"], **self.postgres_config["args"]
)
sqlite_engine = create_engine(sqlite_config)
postgres_engine = create_engine(postgres_config)
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
yield self.postgres_store.execute(postgres_engine.check_database)
# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
self.setup_db(sqlite_config, sqlite_engine)
self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
yield self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")
@@ -563,6 +633,8 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
if isinstance(col, bytes):
return bytearray(col)
elif isinstance(col, string_types) and "\0" in col:
logger.warn(
"DROPPING ROW: NUL value in table %s col %s: %r",
@@ -926,18 +998,24 @@ if __name__ == "__main__":
},
}
postgres_config = yaml.safe_load(args.postgres_config)
hs_config = yaml.safe_load(args.postgres_config)
if "database" in postgres_config:
postgres_config = postgres_config["database"]
if "database" not in hs_config:
sys.stderr.write("The configuration file must have a 'database' section.\n")
sys.exit(4)
postgres_config = hs_config["database"]
if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'")
sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
sys.stderr.write("Database must use 'psycopg2' connector.")
sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3)
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
def start(stdscr=None):
if stdscr:
progress = CursesProgress(stdscr)
@@ -946,9 +1024,9 @@ if __name__ == "__main__":
porter = Porter(
sqlite_config=sqlite_config,
postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
)
reactor.callWhenRunning(porter.run)