diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 276188e21b..267b66272a 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1572,7 +1572,7 @@ class PersistEventsStore: DO UPDATE SET {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}, event_stream_ordering = CASE - WHEN event_stream_ordering < EXCLUDED.event_stream_ordering + WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering THEN EXCLUDED.event_stream_ordering ELSE event_stream_ordering END @@ -2005,39 +2005,39 @@ class PersistEventsStore: event.internal_metadata.stream_ordering ) - # `_store_event_txn` is run before `_update_current_state_txn` which handles - # deleting the rows if we are no longer in the room so we don't need to worry - # about inserting something that will be orphaned. - # - # FIXME: We need to handle cases where we are persisting events out of order and - # the stream_ordering didn't increase. - self.db_pool.simple_upsert_many_txn( - txn, - table="sliding_sync_joined_rooms", - key_names=("room_id",), - key_values=[ - (room_id,) for room_id in room_id_to_stream_ordering_map.keys() - ], - value_names=("event_stream_ordering",), - value_values=[ - (room_id_to_stream_ordering_map[room_id],) + # This function (`_store_event_txn(...)`) is run before + # `_update_current_state_txn(...)` which handles deleting the rows if we are no + # longer in the room so we don't need to worry about inserting something that + # will be orphaned. + txn.execute_batch( + f""" + INSERT INTO sliding_sync_joined_rooms + (room_id, event_stream_ordering, bump_stamp) + VALUES ( + ?, ?, ? + ) + ON CONFLICT (room_id) + DO UPDATE SET + event_stream_ordering = CASE + WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering + THEN EXCLUDED.event_stream_ordering + ELSE event_stream_ordering + END, + bump_stamp = CASE + WHEN bump_stamp IS NULL OR bump_stamp < EXCLUDED.bump_stamp + THEN EXCLUDED.bump_stamp + ELSE bump_stamp + END + """, + [ + [ + room_id, + room_id_to_stream_ordering_map[room_id], + room_id_to_bump_stamp_map.get(room_id), + ] for room_id in room_id_to_stream_ordering_map.keys() ], ) - # This has to be separate from the upsert above because we won't have a - # `bump_stamp` for every event and we don't want to overwrite the existing value - # with `None`. - self.db_pool.simple_upsert_many_txn( - txn, - table="sliding_sync_joined_rooms", - key_names=("room_id",), - key_values=[(room_id,) for room_id in room_id_to_bump_stamp_map.keys()], - value_names=("bump_stamp",), - value_values=[ - (room_id_to_bump_stamp_map[room_id],) - for room_id in room_id_to_bump_stamp_map.keys() - ], - ) def _store_rejected_events_txn( self,