diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c4188b087d..fa41d33920 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1730,12 +1730,11 @@ class PersistEventsStore: keyvalues={"room_id": room_id}, values=sliding_sync_table_changes.joined_room_updates, insertion_values={ - # The reason we're only *inserting* `event_stream_ordering` here - # is because the column has a `NON NULL` constraint and we need - # *some* answer. If the row already exists, we are trying to - # avoid doing an `UPDATE` and accidentally overwriting the value - # with some stale data since this is just a "best effort" value. - # It's better to just rely on + # The reason we're only *inserting* (not *updating*) + # `event_stream_ordering` here is because the column has a `NON + # NULL` constraint and we need *some* answer. And if the row + # already exists, it already has the correct value and it's + # better to just rely on # `_update_sliding_sync_tables_with_new_persisted_events_txn()` # to do the right thing (same for `bump_stamp`). "event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 8311583125..5e6768d2ce 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1691,33 +1691,24 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS + "Raising exception so we can just try again." ) - # Pulling keys/values separately is safe and will produce congruent - # lists - insert_keys = insert_map.keys() - insert_values = insert_map.values() # Since we partially update the `sliding_sync_joined_rooms` as new state # is sent, we need to update the state fields `ON CONFLICT`. We just # have to be careful we're not overwriting it with stale data (see # `last_current_state_delta_stream_id` check above). # - # We don't need to update `event_stream_ordering` and `bump_stamp` `ON - # CONFLICT` because if they are present, that means they are already - # up-to-date. - sql = f""" - INSERT INTO sliding_sync_joined_rooms - (room_id, event_stream_ordering, bump_stamp, {", ".join(insert_keys)}) - VALUES ( - ?, ?, ?, - {", ".join("?" for _ in insert_values)} - ) - ON CONFLICT (room_id) - DO UPDATE SET - {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} - """ - args = [room_id, event_stream_ordering, bump_stamp] + list( - insert_values + self.db_pool.simple_upsert_txn( + txn, + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + values=insert_map, + insertion_values={ + # The reason we're only *inserting* (not *updating*) `event_stream_ordering` + # and `bump_stamp` is because if they are present, that means they are already + # up-to-date. + "event_stream_ordering": event_stream_ordering, + "bump_stamp": bump_stamp, + }, ) - txn.execute(sql, args) # Keep track of the last successful room_id last_successful_room_id = room_id