From 55727c7212ad085eb88aba2cec9ad4ebdec1efe9 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 18 Dec 2025 12:00:20 +0000 Subject: [PATCH] Clean up manual UPDATE many --- .../storage/databases/main/sticky_events.py | 80 +++++++------------ 1 file changed, 30 insertions(+), 50 deletions(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index da7534f3d8..e3b8f54ebe 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -37,6 +37,7 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.engines import PostgresEngine +from synapse.storage.engines.sqlite import Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types.state import StateFilter from synapse.util.duration import Duration @@ -533,69 +534,48 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor (event_id, self._sticky_events_id_gen.get_next_txn(txn)) for event_id in passing_event_ids ] - # [event_id, stream_pos, event_id, stream_pos, ...] - params = [p for pair in new_stream_ids for p in pair] + + self.db_pool.simple_update_many_txn( + txn, + table="sticky_events", + key_names=("event_id",), + key_values=[(event_id,) for event_id, _stream_id in new_stream_ids], + value_names=( + "stream_id", + "soft_failed", + ), + value_values=[ + (stream_id, False) for _event_id, stream_id in new_stream_ids + ], + ) + + # Also update the internal metadata on the event itself, so when we filter_events_for_client + # we don't filter them out. It's a bit sad internal_metadata is TEXT and not JSONB... + event_id_in_list_clause, event_id_in_list_args = make_in_list_sql_clause( + txn.database_engine, + "event_id", + passing_event_ids, + ) if isinstance(txn.database_engine, PostgresEngine): - values_placeholders = ", ".join(["(?, ?)"] * len(new_stream_ids)) txn.execute( f""" - UPDATE sticky_events AS se - SET - soft_failed = FALSE, - stream_id = v.stream_id - FROM (VALUES - {values_placeholders} - ) AS v(event_id, stream_id) - WHERE se.event_id = v.event_id; - """, - params, - ) - # Also update the internal metadata on the event itself, so when we filter_events_for_client - # we don't filter them out. It's a bit sad internal_metadata is TEXT and not JSONB... - clause, args = make_in_list_sql_clause( - txn.database_engine, - "event_id", - passing_event_ids, - ) - txn.execute( - """ UPDATE event_json SET internal_metadata = ( - jsonb_set(internal_metadata::jsonb, '{soft_failed}', 'false'::jsonb) + jsonb_set(internal_metadata::jsonb, '{{soft_failed}}', 'false'::jsonb) )::text - WHERE %s - """ - % clause, - args, + WHERE {event_id_in_list_clause} + """, + event_id_in_list_args, ) else: - # Use a CASE expression to update in bulk for sqlite - case_expr = " ".join(["WHEN ? THEN ? " for _ in new_stream_ids]) - txn.execute( - f""" - UPDATE sticky_events - SET - soft_failed = FALSE, - stream_id = CASE event_id - {case_expr} - ELSE stream_id - END - WHERE event_id IN ({",".join("?" * len(new_stream_ids))}); - """, - params + [eid for eid, _ in new_stream_ids], - ) - clause, args = make_in_list_sql_clause( - txn.database_engine, - "event_id", - passing_event_ids, - ) + assert isinstance(txn.database_engine, Sqlite3Engine) txn.execute( f""" UPDATE event_json SET internal_metadata = json_set(internal_metadata, '$.soft_failed', json('false')) - WHERE {clause} + WHERE {event_id_in_list_clause} """, - args, + event_id_in_list_args, ) # finally, invalidate caches for event_id in passing_event_ids: