From e67cd89e7b16ee8a4a3268294123c0646cc1aae3 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sat, 1 Oct 2022 22:20:14 +0100 Subject: [PATCH] Reorder args so `*args, **kwargs` comes at the end --- synapse/storage/database.py | 11 +++-------- synapse/storage/databases/main/end_to_end_keys.py | 3 ++- synapse/storage/databases/main/event_federation.py | 3 ++- synapse/storage/databases/main/lock.py | 5 ++--- synapse/storage/databases/main/purge_events.py | 5 +++-- synapse/storage/databases/main/receipts.py | 6 ++---- synapse/storage/databases/main/transactions.py | 3 ++- synapse/storage/util/id_generators.py | 3 ++- 8 files changed, 18 insertions(+), 21 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 4ae30a4b4e..d63428bd33 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -806,21 +806,16 @@ class DatabasePool: **kwargs: Any, ) -> R: return await self.runInteraction_advanced( - desc, - func, - *args, - db_autocommit=False, - isolation_level=None, - **kwargs, + desc, False, None, func, *args, **kwargs ) async def runInteraction_advanced( self, desc: str, + db_autocommit: bool, + isolation_level: Optional[int], func: Callable[..., R], *args: Any, - db_autocommit: bool = False, - isolation_level: Optional[int] = None, **kwargs: Any, ) -> R: """Starts a transaction on the database and runs a given function diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 91829ee1e4..711c047119 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1084,11 +1084,12 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker claim_row = await self.db_pool.runInteraction_advanced( "claim_e2e_one_time_keys", + db_autocommit, + None, _claim_e2e_one_time_key, user_id, device_id, algorithm, - db_autocommit=db_autocommit, ) if claim_row: device_results = results.setdefault(user_id, {}).setdefault( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f9ead4439a..547df77424 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1691,8 +1691,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return await self.db_pool.runInteraction_advanced( "remove_received_event_from_staging", - _remove_received_event_from_staging_txn, db_autocommit=True, + isolation_level=None, + func=_remove_received_event_from_staging_txn, ) else: diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index c65bac3efb..cfca9b1aa4 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -164,10 +164,9 @@ class LockStore(SQLBaseStore): did_lock = await self.db_pool.runInteraction_advanced( "try_acquire_lock", - _try_acquire_lock_txn, - # We can autocommit here as we're executing a single query, this - # will avoid serialization errors. db_autocommit=True, + isolation_level=None, + func=_try_acquire_lock_txn, ) if not did_lock: return None diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 96f95595ad..6469f4ca42 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -327,9 +327,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): state_groups_to_delete = await self.db_pool.runInteraction_advanced( "purge_room", - self._purge_room_txn, - room_id=room_id, + False, isolation_level=IsolationLevel.READ_COMMITTED, + func=self._purge_room_txn, + room_id=room_id, ) state_groups_to_delete.extend( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 55179ef3bb..198fc2a8e2 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -779,6 +779,8 @@ class ReceiptsWorkerStore(SQLBaseStore): async with self._receipts_id_gen.get_next() as stream_id: # type: ignore[attr-defined] event_ts = await self.db_pool.runInteraction_advanced( "insert_linearized_receipt", + False, + IsolationLevel.READ_COMMITTED, self._insert_linearized_receipt_txn, room_id, receipt_type, @@ -787,10 +789,6 @@ class ReceiptsWorkerStore(SQLBaseStore): thread_id, data, stream_id=stream_id, - # Read committed is actually beneficial here because we check for a receipt with - # greater stream order, and checking the very latest data at select time is better - # than the data at transaction start time. - isolation_level=IsolationLevel.READ_COMMITTED, ) # If the receipt was older than the currently persisted one, nothing to do. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 098ec5a1e3..3ae34013c3 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -223,12 +223,13 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.runInteraction_advanced( "set_destination_retry_timings", + True, + None, self._set_destination_retry_timings_native, destination, failure_ts, retry_last_ts, retry_interval, - db_autocommit=True, # Safe as it's a single upsert ) def _set_destination_retry_timings_native( diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 677c0f8a1f..18dece3399 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -820,8 +820,9 @@ class _MultiWriterCtxManager: if self.id_gen._writers: await self.id_gen._db.runInteraction_advanced( "MultiWriterIdGenerator._update_table", - self.id_gen._update_stream_positions_table_txn, db_autocommit=True, + isolation_level=None, + func=self.id_gen._update_stream_positions_table_txn, ) return False