Clean up manual UPDATE many
This commit is contained in:
@@ -37,6 +37,7 @@ from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines.sqlite import Sqlite3Engine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.duration import Duration
|
||||
@@ -533,69 +534,48 @@ class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStor
|
||||
(event_id, self._sticky_events_id_gen.get_next_txn(txn))
|
||||
for event_id in passing_event_ids
|
||||
]
|
||||
# [event_id, stream_pos, event_id, stream_pos, ...]
|
||||
params = [p for pair in new_stream_ids for p in pair]
|
||||
|
||||
self.db_pool.simple_update_many_txn(
|
||||
txn,
|
||||
table="sticky_events",
|
||||
key_names=("event_id",),
|
||||
key_values=[(event_id,) for event_id, _stream_id in new_stream_ids],
|
||||
value_names=(
|
||||
"stream_id",
|
||||
"soft_failed",
|
||||
),
|
||||
value_values=[
|
||||
(stream_id, False) for _event_id, stream_id in new_stream_ids
|
||||
],
|
||||
)
|
||||
|
||||
# Also update the internal metadata on the event itself, so when we filter_events_for_client
|
||||
# we don't filter them out. It's a bit sad internal_metadata is TEXT and not JSONB...
|
||||
event_id_in_list_clause, event_id_in_list_args = make_in_list_sql_clause(
|
||||
txn.database_engine,
|
||||
"event_id",
|
||||
passing_event_ids,
|
||||
)
|
||||
if isinstance(txn.database_engine, PostgresEngine):
|
||||
values_placeholders = ", ".join(["(?, ?)"] * len(new_stream_ids))
|
||||
txn.execute(
|
||||
f"""
|
||||
UPDATE sticky_events AS se
|
||||
SET
|
||||
soft_failed = FALSE,
|
||||
stream_id = v.stream_id
|
||||
FROM (VALUES
|
||||
{values_placeholders}
|
||||
) AS v(event_id, stream_id)
|
||||
WHERE se.event_id = v.event_id;
|
||||
""",
|
||||
params,
|
||||
)
|
||||
# Also update the internal metadata on the event itself, so when we filter_events_for_client
|
||||
# we don't filter them out. It's a bit sad internal_metadata is TEXT and not JSONB...
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine,
|
||||
"event_id",
|
||||
passing_event_ids,
|
||||
)
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_json
|
||||
SET internal_metadata = (
|
||||
jsonb_set(internal_metadata::jsonb, '{soft_failed}', 'false'::jsonb)
|
||||
jsonb_set(internal_metadata::jsonb, '{{soft_failed}}', 'false'::jsonb)
|
||||
)::text
|
||||
WHERE %s
|
||||
"""
|
||||
% clause,
|
||||
args,
|
||||
WHERE {event_id_in_list_clause}
|
||||
""",
|
||||
event_id_in_list_args,
|
||||
)
|
||||
else:
|
||||
# Use a CASE expression to update in bulk for sqlite
|
||||
case_expr = " ".join(["WHEN ? THEN ? " for _ in new_stream_ids])
|
||||
txn.execute(
|
||||
f"""
|
||||
UPDATE sticky_events
|
||||
SET
|
||||
soft_failed = FALSE,
|
||||
stream_id = CASE event_id
|
||||
{case_expr}
|
||||
ELSE stream_id
|
||||
END
|
||||
WHERE event_id IN ({",".join("?" * len(new_stream_ids))});
|
||||
""",
|
||||
params + [eid for eid, _ in new_stream_ids],
|
||||
)
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine,
|
||||
"event_id",
|
||||
passing_event_ids,
|
||||
)
|
||||
assert isinstance(txn.database_engine, Sqlite3Engine)
|
||||
txn.execute(
|
||||
f"""
|
||||
UPDATE event_json
|
||||
SET internal_metadata = json_set(internal_metadata, '$.soft_failed', json('false'))
|
||||
WHERE {clause}
|
||||
WHERE {event_id_in_list_clause}
|
||||
""",
|
||||
args,
|
||||
event_id_in_list_args,
|
||||
)
|
||||
# finally, invalidate caches
|
||||
for event_id in passing_event_ids:
|
||||
|
||||
Reference in New Issue
Block a user