Use simple_upsert_txn for sliding_sync_membership_snapshots in background update
This commit is contained in:
@@ -1982,42 +1982,41 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
membership_info.membership_event_stream_ordering
|
||||
)
|
||||
|
||||
# Pulling keys/values separately is safe and will produce congruent
|
||||
# lists
|
||||
insert_keys = insert_map.keys()
|
||||
insert_values = insert_map.values()
|
||||
# We don't need to update the state `ON CONFLICT` because we never
|
||||
# partially insert/update the snapshots and anything already there is
|
||||
# up-to-date EXCEPT for the `forgotten` field since that is updated out
|
||||
# of band from the membership changes.
|
||||
# We don't need to upsert the state because we never partially
|
||||
# insert/update the snapshots and anything already there is up-to-date
|
||||
# EXCEPT for the `forgotten` field since that is updated out-of-band
|
||||
# from the membership changes.
|
||||
#
|
||||
# Even though we're only doing insertions, we're using
|
||||
# `simple_upsert_txn()` here to avoid unique violation errors that would
|
||||
# happen from `simple_insert_txn()`
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"room_id": room_id, "user_id": user_id},
|
||||
values={},
|
||||
insertion_values={
|
||||
**insert_map,
|
||||
"sender": sender,
|
||||
"membership_event_id": membership_event_id,
|
||||
"membership": membership,
|
||||
"event_stream_ordering": membership_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
# We need to find the `forgotten` value during the transaction because
|
||||
# we can't risk inserting stale data.
|
||||
txn.execute(
|
||||
f"""
|
||||
INSERT INTO sliding_sync_membership_snapshots
|
||||
(room_id, user_id, sender, membership_event_id, membership, forgotten, event_stream_ordering
|
||||
{("," + ", ".join(insert_keys)) if insert_keys else ""})
|
||||
VALUES (
|
||||
?, ?, ?, ?, ?,
|
||||
(SELECT forgotten FROM room_memberships WHERE event_id = ?),
|
||||
?
|
||||
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
|
||||
)
|
||||
ON CONFLICT (room_id, user_id)
|
||||
DO UPDATE SET
|
||||
forgotten = EXCLUDED.forgotten
|
||||
"""
|
||||
UPDATE sliding_sync_membership_snapshots
|
||||
SET
|
||||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
|
||||
WHERE room_id = ? and user_id = ?
|
||||
""",
|
||||
[
|
||||
(
|
||||
membership_event_id,
|
||||
room_id,
|
||||
user_id,
|
||||
sender,
|
||||
membership_event_id,
|
||||
membership,
|
||||
membership_event_id,
|
||||
membership_event_stream_ordering,
|
||||
]
|
||||
+ list(insert_values),
|
||||
),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
|
||||
Reference in New Issue
Block a user