1
0

Fix bumping when events are persisted out of order

This commit is contained in:
Eric Eastwood
2024-08-12 11:27:17 -05:00
parent 3367422fd3
commit ed47a7eff5

View File

@@ -1572,7 +1572,7 @@ class PersistEventsStore:
DO UPDATE SET
{", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)},
event_stream_ordering = CASE
WHEN event_stream_ordering < EXCLUDED.event_stream_ordering
WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering
THEN EXCLUDED.event_stream_ordering
ELSE event_stream_ordering
END
@@ -2005,39 +2005,39 @@ class PersistEventsStore:
event.internal_metadata.stream_ordering
)
# `_store_event_txn` is run before `_update_current_state_txn` which handles
# deleting the rows if we are no longer in the room so we don't need to worry
# about inserting something that will be orphaned.
#
# FIXME: We need to handle cases where we are persisting events out of order and
# the stream_ordering didn't increase.
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_joined_rooms",
key_names=("room_id",),
key_values=[
(room_id,) for room_id in room_id_to_stream_ordering_map.keys()
],
value_names=("event_stream_ordering",),
value_values=[
(room_id_to_stream_ordering_map[room_id],)
# This function (`_store_event_txn(...)`) is run before
# `_update_current_state_txn(...)` which handles deleting the rows if we are no
# longer in the room so we don't need to worry about inserting something that
# will be orphaned.
txn.execute_batch(
f"""
INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, bump_stamp)
VALUES (
?, ?, ?
)
ON CONFLICT (room_id)
DO UPDATE SET
event_stream_ordering = CASE
WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering
THEN EXCLUDED.event_stream_ordering
ELSE event_stream_ordering
END,
bump_stamp = CASE
WHEN bump_stamp IS NULL OR bump_stamp < EXCLUDED.bump_stamp
THEN EXCLUDED.bump_stamp
ELSE bump_stamp
END
""",
[
[
room_id,
room_id_to_stream_ordering_map[room_id],
room_id_to_bump_stamp_map.get(room_id),
]
for room_id in room_id_to_stream_ordering_map.keys()
],
)
# This has to be separate from the upsert above because we won't have a
# `bump_stamp` for every event and we don't want to overwrite the existing value
# with `None`.
self.db_pool.simple_upsert_many_txn(
txn,
table="sliding_sync_joined_rooms",
key_names=("room_id",),
key_values=[(room_id,) for room_id in room_id_to_bump_stamp_map.keys()],
value_names=("bump_stamp",),
value_values=[
(room_id_to_bump_stamp_map[room_id],)
for room_id in room_id_to_bump_stamp_map.keys()
],
)
def _store_rejected_events_txn(
self,