From 0c395fd8b9ef511ec309f56324bc3d0e8bda21ac Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Jun 2022 08:31:36 -0400 Subject: [PATCH] Start splitting receipts on new events. --- rrr_test.py | 61 ++++---- synapse/storage/databases/main/events.py | 172 +++++++++++++++++++++ synapse/storage/databases/main/receipts.py | 32 ++++ 3 files changed, 233 insertions(+), 32 deletions(-) diff --git a/rrr_test.py b/rrr_test.py index becbb5df5f..e37a84c602 100644 --- a/rrr_test.py +++ b/rrr_test.py @@ -98,7 +98,7 @@ def main(): # Create a new room as user 2, add a bunch of messages. result = requests.post( f"{HOMESERVER}/_matrix/client/v3/createRoom", - json={"visibility": "public", "name": f"Road to Nowhere ({monotonic()})"}, + json={"visibility": "public", "name": f"Ranged Read Receipts ({monotonic()})"}, headers=USER_2_HEADERS, ) _check_for_status(result) @@ -115,29 +115,16 @@ def main(): # User 2 sends some messages. event_ids = [] - with open("road_to_no_where.txt", "r") as f: - count = 0 - forks = 1 - for line in f.readlines(): - line = line.strip() - if not line: - if forks < 3: - last_event_id = first_event_id - forks += 1 - else: - # Let the server figure it out. - last_event_id = None - continue - # Send a msg to the room. - last_event_id = _send_event(room_id, line, last_event_id) - event_ids.append(last_event_id) - sleep(1) + def _send_and_append(body, prev_message_id = None): + event_id = _send_event(room_id, body, prev_message_id) + event_ids.append(event_id) + return event_id - count += 1 - - if count == 20: # End of second verse - break + prev_message_id = first_message_id = _send_and_append("Root") + for msg in range(3): + prev_message_id = _send_and_append(f"Fork 1 Message {msg}", prev_message_id) + sleep(1) # User 2 sends a read receipt. print("@second reads to end") @@ -151,9 +138,9 @@ def main(): _sync_and_show(room_id) # User 1 sends a read receipt. - print("@test reads from 3 -> 8") + print("@test reads from fork 1") result = requests.post( - f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[8]}/{event_ids[3]}", + f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[3]}/{event_ids[1]}", headers=USER_1_HEADERS, json={}, ) @@ -161,16 +148,26 @@ def main(): _sync_and_show(room_id) + # Create a fork in the DAG. + prev_message_id = first_message_id + for msg in range(1): + prev_message_id = _send_and_append(f"Fork 2 Message {msg}", prev_message_id) + sleep(1) + # # Join the forks. + _send_and_append("Tail") + + _sync_and_show(room_id) + # User 1 sends another read receipt. - print("@test reads from 13 -> 14") - result = requests.post( - f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[14]}/{event_ids[13]}", - headers=USER_1_HEADERS, - json={}, - ) - _check_for_status(result) + # print("@test reads everything") + # result = requests.post( + # f"{HOMESERVER}/_matrix/client/v3/rooms/{room_id}/receipt/m.read/{event_ids[-1]}/{event_ids[0]}", + # headers=USER_1_HEADERS, + # json={}, + # ) + # _check_for_status(result) - _sync_and_show(room_id) + # _sync_and_show(room_id) if __name__ == "__main__": diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a8773374be..5b86ac55e9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -28,6 +28,7 @@ from typing import ( Sequence, Set, Tuple, + cast, ) import attr @@ -195,6 +196,9 @@ class PersistEventsStore: ) persist_event_counter.inc(len(events_and_contexts)) + # Update any receipts for users in the rooms. + await self._update_receipts(events_and_contexts) + if not use_negative_stream_ordering: # we don't want to set the event_persisted_position to a negative # stream_ordering. @@ -2099,6 +2103,174 @@ class PersistEventsStore: ), ) + def _get_receipts_to_update( + self, txn: LoggingTransaction, event: EventBase + ) -> List[tuple]: + # Find any receipt ranges that would be "broken" by this event. + sql = """ + SELECT + stream_id, + receipts_ranged.room_id, + receipt_type, + user_id, + start_event_id, + end_event_id, + data, + start_event.topological_ordering, + end_event.topological_ordering + FROM receipts_ranged + LEFT JOIN events AS end_event ON (end_event.event_id = end_event_id) + LEFT JOIN events AS start_event ON (start_event.event_id = start_event_id) + WHERE + receipts_ranged.room_id = ? AND + (start_event.topological_ordering <= ? OR start_event_id IS NULL) AND + ? <= end_event.topological_ordering; + """ + + txn.execute( + sql, + (event.room_id, event.depth, event.depth), + ) + return list(txn.fetchall()) + + def _split_receipt( + self, + txn: LoggingTransaction, + event: EventBase, + stream_id: int, + room_id: str, + receipt_type: str, + user_id: str, + start_event_id: str, + end_event_id: str, + data: JsonDict, + start_topological_ordering: int, + end_topological_ordering: int, + stream_orderings: Tuple[int, ...], + ) -> None: + # Upsert the current receipt to give it a new endpoint as the + # latest event in the range before the new event. + sql = """ + SELECT event_id FROM events + WHERE room_id = ? AND topological_ordering <= ? AND stream_ordering < ? + ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT 1; + """ + txn.execute( + sql, + ( + event.room_id, + event.depth, + event.internal_metadata.stream_ordering, + ), + ) + new_end_event_id = cast(Tuple[str], txn.fetchone())[0] # XXX Can this be None? + # TODO Upsert? + self.db_pool.simple_delete_one_txn( + txn, table="receipts_ranged", keyvalues={"stream_id": stream_id} + ) + self.db_pool.simple_insert_txn( + txn, + table="receipts_ranged", + values={ + "room_id": room_id, + "user_id": user_id, + "receipt_type": receipt_type, + "start_event_id": start_event_id, + "end_event_id": new_end_event_id, + "stream_id": stream_orderings[0], + "data": data, # XXX Does it make sense to duplicate this? + }, + ) + + # Insert a new receipt with a start point as the first event after + # the new event and re-using the old endpoint. + sql = """ + SELECT event_id FROM events + WHERE room_id = ? AND topological_ordering > ? AND stream_ordering < ? + ORDER BY topological_ordering, stream_ordering LIMIT 1; + """ + txn.execute( + sql, + ( + event.room_id, + event.depth, + event.internal_metadata.stream_ordering, + ), + ) + row = txn.fetchone() + # If there's no events topologically after the end event, the + # second range is just for the single event. + if row is not None: + new_start_event_id = row[0] + else: + new_start_event_id = end_event_id + self.db_pool.simple_insert_txn( + txn, + table="receipts_ranged", + values={ + "room_id": room_id, + "user_id": user_id, + "receipt_type": receipt_type, + "start_event_id": new_start_event_id, + "end_event_id": end_event_id, + "stream_id": stream_orderings[1], + "data": data, # XXX Does it make sense to duplicate this? + }, + ) + + txn.call_after( + self.store.invalidate_caches_for_receipt, + room_id, + receipt_type, + user_id, + ) + + async def _update_receipts( + self, events_and_contexts: List[Tuple[EventBase, EventContext]] + ) -> None: + # Only non-outlier events can have a receipt associated with them. + # XXX Is this true? + non_outlier_events = [ + event + for event, _ in events_and_contexts + if not event.internal_metadata.is_outlier() + ] + + # XXX This is probably slow... + for event in non_outlier_events: + receipts = await self.db_pool.runInteraction( + "update_receipts", self._get_receipts_to_update, event=event + ) + + # Split each receipt in two by the new event. + for ( + stream_id, + room_id, + receipt_type, + user_id, + start_event_id, + end_event_id, + data, + start_topological_ordering, + end_topological_ordering, + ) in receipts: + async with self.store._receipts_id_gen.get_next_mult(2) as stream_orderings: # type: ignore[attr-defined] + await self.db_pool.runInteraction( + "split_receipts", + self._split_receipt, + event=event, + stream_id=stream_id, + room_id=room_id, + receipt_type=receipt_type, + user_id=user_id, + start_event_id=start_event_id, + end_event_id=end_event_id, + data=data, + start_topological_ordering=start_topological_ordering, + end_topological_ordering=end_topological_ordering, + stream_orderings=stream_orderings, + ) + def _set_push_actions_for_event_and_users_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index abf1dfaecd..269a7521c6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -13,6 +13,38 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Receipts are stored as per-user ranges from a starting event to an ending event. +If the starting event is missing than the range is considered to cover all events +earlier in the room than the ending events. + +Since events in a room are a DAG we need to linearise it before applying receipts. +Synapse linearises the room by sorting events by (topological ordering, stream ordering). +To ensure that receipts are non-overlapping and correct the following operations +need to occur: + +* When a new receipt is received from a client, we coalesce it with other receipts. +* When new events are received, any receipt range which includes the event's + topological ordering must be split into two receipts. + +Given a simple linear room: + + A--B--C--D + + This is covered by a single receipt [A, D] + +If a forked in the DAG occurs: + + A--B--C--D which linearises to: A--B--E--C--F--D + \ / + E---F + + The receipt from above must be split into component parts: + [A, B] + [C, C] + [D, D] +""" + import logging from typing import ( TYPE_CHECKING,