Merge commit 'a7bdf98d0' into anoa/dinsic_release_1_21_x
* commit 'a7bdf98d0': Rename database classes to make some sense (#8033)
This commit is contained in:
@@ -35,32 +35,30 @@ from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.deviceinbox import (
|
||||
DeviceInboxBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.events_bg_updates import (
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.events_bg_updates import (
|
||||
EventsBackgroundUpdatesStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.media_repository import (
|
||||
from synapse.storage.databases.main.media_repository import (
|
||||
MediaRepositoryBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.main.profile import ProfileStore
|
||||
from synapse.storage.data_stores.main.registration import (
|
||||
from synapse.storage.databases.main.profile import ProfileStore
|
||||
from synapse.storage.databases.main.registration import (
|
||||
RegistrationBackgroundUpdateStore,
|
||||
find_max_generated_user_id_localpart,
|
||||
)
|
||||
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.stats import StatsStore
|
||||
from synapse.storage.data_stores.main.user_directory import (
|
||||
from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.user_directory import (
|
||||
UserDirectoryBackgroundUpdateStore,
|
||||
)
|
||||
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.database import Database, make_conn
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.util import Clock
|
||||
@@ -177,14 +175,14 @@ class Store(
|
||||
StatsStore,
|
||||
):
|
||||
def execute(self, f, *args, **kwargs):
|
||||
return self.db.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
|
||||
def execute_sql(self, sql, *args):
|
||||
def r(txn):
|
||||
txn.execute(sql, args)
|
||||
return txn.fetchall()
|
||||
|
||||
return self.db.runInteraction("execute_sql", r)
|
||||
return self.db_pool.runInteraction("execute_sql", r)
|
||||
|
||||
def insert_many_txn(self, txn, table, headers, rows):
|
||||
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
|
||||
@@ -229,7 +227,7 @@ class Porter(object):
|
||||
async def setup_table(self, table):
|
||||
if table in APPEND_ONLY_TABLES:
|
||||
# It's safe to just carry on inserting.
|
||||
row = await self.postgres_store.db.simple_select_one(
|
||||
row = await self.postgres_store.db_pool.simple_select_one(
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
retcols=("forward_rowid", "backward_rowid"),
|
||||
@@ -246,7 +244,7 @@ class Porter(object):
|
||||
) = await self._setup_sent_transactions()
|
||||
backward_chunk = 0
|
||||
else:
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": table,
|
||||
@@ -276,7 +274,7 @@ class Porter(object):
|
||||
|
||||
await self.postgres_store.execute(delete_all)
|
||||
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
|
||||
)
|
||||
@@ -320,7 +318,7 @@ class Porter(object):
|
||||
if table == "user_directory_stream_pos":
|
||||
# We need to make sure there is a single row, `(X, null), as that is
|
||||
# what synapse expects to be there.
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table=table, values={"stream_id": None}
|
||||
)
|
||||
self.progress.update(table, table_size) # Mark table as done
|
||||
@@ -361,7 +359,7 @@ class Porter(object):
|
||||
|
||||
return headers, forward_rows, backward_rows
|
||||
|
||||
headers, frows, brows = await self.sqlite_store.db.runInteraction(
|
||||
headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
|
||||
"select", r
|
||||
)
|
||||
|
||||
@@ -377,7 +375,7 @@ class Porter(object):
|
||||
def insert(txn):
|
||||
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
|
||||
|
||||
self.postgres_store.db.simple_update_one_txn(
|
||||
self.postgres_store.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": table},
|
||||
@@ -415,7 +413,7 @@ class Porter(object):
|
||||
|
||||
return headers, rows
|
||||
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
|
||||
|
||||
if rows:
|
||||
forward_chunk = rows[-1][0] + 1
|
||||
@@ -453,7 +451,7 @@ class Porter(object):
|
||||
],
|
||||
)
|
||||
|
||||
self.postgres_store.db.simple_update_one_txn(
|
||||
self.postgres_store.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="port_from_sqlite3",
|
||||
keyvalues={"table_name": "event_search"},
|
||||
@@ -496,7 +494,7 @@ class Porter(object):
|
||||
db_conn, allow_outdated_version=allow_outdated_version
|
||||
)
|
||||
prepare_database(db_conn, engine, config=self.hs_config)
|
||||
store = Store(Database(hs, db_config, engine), db_conn, hs)
|
||||
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
|
||||
db_conn.commit()
|
||||
|
||||
return store
|
||||
@@ -504,7 +502,7 @@ class Porter(object):
|
||||
async def run_background_updates_on_postgres(self):
|
||||
# Manually apply all background updates on the PostgreSQL database.
|
||||
postgres_ready = (
|
||||
await self.postgres_store.db.updates.has_completed_background_updates()
|
||||
await self.postgres_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
if not postgres_ready:
|
||||
@@ -513,9 +511,9 @@ class Porter(object):
|
||||
self.progress.set_state("Running background updates on PostgreSQL")
|
||||
|
||||
while not postgres_ready:
|
||||
await self.postgres_store.db.updates.do_next_background_update(100)
|
||||
await self.postgres_store.db_pool.updates.do_next_background_update(100)
|
||||
postgres_ready = await (
|
||||
self.postgres_store.db.updates.has_completed_background_updates()
|
||||
self.postgres_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
|
||||
async def run(self):
|
||||
@@ -536,7 +534,7 @@ class Porter(object):
|
||||
|
||||
# Check if all background updates are done, abort if not.
|
||||
updates_complete = (
|
||||
await self.sqlite_store.db.updates.has_completed_background_updates()
|
||||
await self.sqlite_store.db_pool.updates.has_completed_background_updates()
|
||||
)
|
||||
if not updates_complete:
|
||||
end_error = (
|
||||
@@ -578,22 +576,24 @@ class Porter(object):
|
||||
)
|
||||
|
||||
try:
|
||||
await self.postgres_store.db.runInteraction("alter_table", alter_table)
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"alter_table", alter_table
|
||||
)
|
||||
except Exception:
|
||||
# On Error Resume Next
|
||||
pass
|
||||
|
||||
await self.postgres_store.db.runInteraction(
|
||||
await self.postgres_store.db_pool.runInteraction(
|
||||
"create_port_table", create_port_table
|
||||
)
|
||||
|
||||
# Step 2. Get tables.
|
||||
self.progress.set_state("Fetching tables")
|
||||
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
|
||||
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
|
||||
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
|
||||
)
|
||||
|
||||
postgres_tables = await self.postgres_store.db.simple_select_onecol(
|
||||
postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
|
||||
table="information_schema.tables",
|
||||
keyvalues={},
|
||||
retcol="distinct table_name",
|
||||
@@ -694,7 +694,7 @@ class Porter(object):
|
||||
|
||||
return headers, [r for r in rows if r[ts_ind] < yesterday]
|
||||
|
||||
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
|
||||
headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
|
||||
|
||||
rows = self._convert_rows("sent_transactions", headers, rows)
|
||||
|
||||
@@ -727,7 +727,7 @@ class Porter(object):
|
||||
next_chunk = await self.sqlite_store.execute(get_start_id)
|
||||
next_chunk = max(max_inserted_rowid + 1, next_chunk)
|
||||
|
||||
await self.postgres_store.db.simple_insert(
|
||||
await self.postgres_store.db_pool.simple_insert(
|
||||
table="port_from_sqlite3",
|
||||
values={
|
||||
"table_name": "sent_transactions",
|
||||
@@ -796,14 +796,14 @@ class Porter(object):
|
||||
next_id = curr_id + 1
|
||||
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
|
||||
return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
|
||||
|
||||
def _setup_user_id_seq(self):
|
||||
def r(txn):
|
||||
next_id = find_max_generated_user_id_localpart(txn) + 1
|
||||
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
|
||||
|
||||
return self.postgres_store.db.runInteraction("setup_user_id_seq", r)
|
||||
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
|
||||
|
||||
|
||||
##############################################
|
||||
|
||||
Reference in New Issue
Block a user