From 4b42e44ef9cb3318c574c716464ef5f5c23edfae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 15 Aug 2024 00:21:35 -0500 Subject: [PATCH] Work on background update for `sliding_sync_membership_snapshots` --- synapse/storage/databases/main/events.py | 195 ++++++++------- .../databases/main/events_bg_updates.py | 235 ++++++++++++++---- 2 files changed, 288 insertions(+), 142 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 62a203e252..138afd324f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1243,8 +1243,10 @@ class PersistEventsStore: str, Optional[Union[str, bool]] ] = {} if current_state_map: - sliding_sync_membership_snapshots_insert_map = self._get_sliding_sync_insert_values_according_to_current_state_map_txn( - txn, current_state_map + sliding_sync_membership_snapshots_insert_map = ( + self._get_sliding_sync_insert_values_from_current_state_map_txn( + txn, current_state_map + ) ) # We have current state to work from sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( @@ -1279,19 +1281,19 @@ class PersistEventsStore: f""" INSERT INTO sliding_sync_membership_snapshots (room_id, user_id, membership_event_id, membership, event_stream_ordering - {"," + (", ".join(insert_keys)) if insert_keys else ""}) + {("," + ", ".join(insert_keys)) if insert_keys else ""}) VALUES ( ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?), (SELECT stream_ordering FROM events WHERE event_id = ?) - {"," + (", ".join("?" for _ in insert_values)) if insert_values else ""} + {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} ) ON CONFLICT (room_id, user_id) DO UPDATE SET membership_event_id = EXCLUDED.membership_event_id, membership = EXCLUDED.membership, event_stream_ordering = EXCLUDED.event_stream_ordering - {"," + (", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} + {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} """, [ [ @@ -1636,7 +1638,7 @@ class PersistEventsStore: return current_state_map @classmethod - def _get_sliding_sync_insert_values_according_to_current_state_map_txn( + def _get_sliding_sync_insert_values_from_current_state_map_txn( cls, txn: LoggingTransaction, current_state_map: StateMap[str] ) -> Dict[str, Optional[Union[str, bool]]]: """ @@ -2448,91 +2450,27 @@ class PersistEventsStore: "membership": event.membership, "event_stream_ordering": event.internal_metadata.stream_ordering, } - if raw_stripped_state_events is not None: - stripped_state_map: MutableStateMap[StrippedStateEvent] = {} - if isinstance(raw_stripped_state_events, list): - for raw_stripped_event in raw_stripped_state_events: - stripped_state_event = parse_stripped_state_event( - raw_stripped_event - ) - if stripped_state_event is not None: - stripped_state_map[ - ( - stripped_state_event.type, - stripped_state_event.state_key, - ) - ] = stripped_state_event - - # If there is some stripped state, we assume the remote server passed *all* - # of the potential stripped state events for the room. - create_stripped_event = stripped_state_map.get( - (EventTypes.Create, "") + if event.membership == Membership.LEAVE: + # Inherit the meta data from the remote invite/knock. When using + # sliding sync filters, this will prevent the room from + # disappearing/appearing just because you left the room. + pass + elif event.membership in (Membership.INVITE, Membership.KNOCK): + extra_insert_values = ( + self._get_sliding_sync_insert_values_from_stripped_state_txn( + txn, raw_stripped_state_events + ) ) - # Sanity check that we at-least have the create event - if create_stripped_event is not None: - insert_values["has_known_state"] = True - - # Find the room_type - insert_values["room_type"] = ( - create_stripped_event.content.get( - EventContentFields.ROOM_TYPE - ) - if create_stripped_event is not None - else None - ) - - # Find whether the room is_encrypted - encryption_stripped_event = stripped_state_map.get( - (EventTypes.RoomEncryption, "") - ) - encryption = ( - encryption_stripped_event.content.get( - EventContentFields.ENCRYPTION_ALGORITHM - ) - if encryption_stripped_event is not None - else None - ) - insert_values["is_encrypted"] = encryption is not None - - # Find the room_name - room_name_stripped_event = stripped_state_map.get( - (EventTypes.Name, "") - ) - insert_values["room_name"] = ( - room_name_stripped_event.content.get( - EventContentFields.ROOM_NAME - ) - if room_name_stripped_event is not None - else None - ) - - else: - # No strip state provided - insert_values["has_known_state"] = False - insert_values["room_type"] = None - insert_values["room_name"] = None - insert_values["is_encrypted"] = False + insert_values.update(extra_insert_values) else: - if event.membership == Membership.LEAVE: - # Inherit the meta data from the remote invite/knock. When using - # sliding sync filters, this will prevent the room from - # disappearing/appearing just because you left the room. - pass - elif event.membership in (Membership.INVITE, Membership.KNOCK): - # No strip state provided - insert_values["has_known_state"] = False - insert_values["room_type"] = None - insert_values["room_name"] = None - insert_values["is_encrypted"] = False - else: - # We don't know how to handle this type of membership yet - # - # FIXME: We should use `assert_never` here but for some reason - # the exhaustive matching doesn't recognize the `Never` here. - # assert_never(event.membership) - raise AssertionError( - f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet" - ) + # We don't know how to handle this type of membership yet + # + # FIXME: We should use `assert_never` here but for some reason + # the exhaustive matching doesn't recognize the `Never` here. + # assert_never(event.membership) + raise AssertionError( + f"Unexpected out-of-band membership {event.membership} ({event.event_id}) that we don't know how to handle yet" + ) self.db_pool.simple_upsert_txn( txn, @@ -2544,6 +2482,85 @@ class PersistEventsStore: values=insert_values, ) + @classmethod + def _get_sliding_sync_insert_values_from_stripped_state_txn( + cls, txn: LoggingTransaction, unsigned_stripped_state_events: Any + ) -> Dict[str, Optional[Union[str, bool]]]: + """ + TODO + + Returns: + Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant + state values needed to insert into the `sliding_sync_membership_snapshots` tables. + """ + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {} + + if unsigned_stripped_state_events is not None: + stripped_state_map: MutableStateMap[StrippedStateEvent] = {} + if isinstance(unsigned_stripped_state_events, list): + for raw_stripped_event in unsigned_stripped_state_events: + stripped_state_event = parse_stripped_state_event( + raw_stripped_event + ) + if stripped_state_event is not None: + stripped_state_map[ + ( + stripped_state_event.type, + stripped_state_event.state_key, + ) + ] = stripped_state_event + + # If there is some stripped state, we assume the remote server passed *all* + # of the potential stripped state events for the room. + create_stripped_event = stripped_state_map.get((EventTypes.Create, "")) + # Sanity check that we at-least have the create event + if create_stripped_event is not None: + sliding_sync_insert_map["has_known_state"] = True + + # Find the room_type + sliding_sync_insert_map["room_type"] = ( + create_stripped_event.content.get(EventContentFields.ROOM_TYPE) + if create_stripped_event is not None + else None + ) + + # Find whether the room is_encrypted + encryption_stripped_event = stripped_state_map.get( + (EventTypes.RoomEncryption, "") + ) + encryption = ( + encryption_stripped_event.content.get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + if encryption_stripped_event is not None + else None + ) + sliding_sync_insert_map["is_encrypted"] = encryption is not None + + # Find the room_name + room_name_stripped_event = stripped_state_map.get((EventTypes.Name, "")) + sliding_sync_insert_map["room_name"] = ( + room_name_stripped_event.content.get(EventContentFields.ROOM_NAME) + if room_name_stripped_event is not None + else None + ) + + else: + # No strip state provided + sliding_sync_insert_map["has_known_state"] = False + sliding_sync_insert_map["room_type"] = None + sliding_sync_insert_map["room_name"] = None + sliding_sync_insert_map["is_encrypted"] = False + else: + # No strip state provided + sliding_sync_insert_map["has_known_state"] = False + sliding_sync_insert_map["room_type"] = None + sliding_sync_insert_map["room_name"] = None + sliding_sync_insert_map["is_encrypted"] = False + + return sliding_sync_insert_map + def _handle_event_relations( self, txn: LoggingTransaction, event: EventBase ) -> None: diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index b59bc2a561..cef354dd5b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -20,11 +20,12 @@ # import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast import attr +from typing_extensions import assert_never -from synapse.api.constants import EventContentFields, RelationTypes +from synapse.api.constants import EventContentFields, Membership, RelationTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -1598,7 +1599,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): return 0 for (room_id,) in rooms_to_update_rows: - logger.info("asdf Working on room %s", room_id) current_state_map = PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn( txn, room_id ) @@ -1606,7 +1606,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): # so we should have some current state for each room assert current_state_map - sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_according_to_current_state_map_txn( + sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_current_state_map_txn( txn, current_state_map ) # We should have some insert values for each room, even if they are `None` @@ -1679,62 +1679,191 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """ Handles backfilling the `sliding_sync_membership_snapshots` table. """ - # last_event_stream_ordering = progress.get("last_event_stream_ordering", "") + last_event_stream_ordering = progress.get( + "last_event_stream_ordering", -(1 << 31) + ) def _txn(txn: LoggingTransaction) -> int: - # # Fetch the set of event IDs that we want to update - # txn.execute( - # """ - # SELECT room_id, user_id, event_id FROM local_current_membership - # WHERE event_stream_ordering > ? - # ORDER BY event_stream_ordering ASC - # LIMIT ? - # """, - # (last_event_stream_ordering, batch_size), - # ) + # Fetch the set of event IDs that we want to update + txn.execute( + """ + SELECT + c.room_id, + c.user_id, + c.event_id, + c.membership, + c.event_stream_ordering, + e.outlier + FROM local_current_membership as c + INNER JOIN events AS e USING (event_id) + WHERE event_stream_ordering > ? + ORDER BY event_stream_ordering ASC + LIMIT ? + """, + (last_event_stream_ordering, batch_size), + ) - # rows = txn.fetchall() - # if not rows: - # return 0 + memberships_to_update_rows = txn.fetchall() + if not memberships_to_update_rows: + return 0 - # # Update the redactions with the received_ts. - # # - # # Note: Not all events have an associated received_ts, so we - # # fallback to using origin_server_ts. If we for some reason don't - # # have an origin_server_ts, lets just use the current timestamp. - # # - # # We don't want to leave it null, as then we'll never try and - # # censor those redactions. - # txn.execute_batch( - # f""" - # INSERT INTO sliding_sync_membership_snapshots - # (room_id, user_id, membership_event_id, membership, event_stream_ordering - # {"," + (", ".join(insert_keys)) if insert_keys else ""}) - # VALUES ( - # ?, ?, ?, - # (SELECT membership FROM room_memberships WHERE event_id = ?), - # (SELECT stream_ordering FROM events WHERE event_id = ?) - # {"," + (", ".join("?" for _ in insert_values)) if insert_values else ""} - # ) - # ON CONFLICT (room_id, user_id) - # DO UPDATE SET - # membership_event_id = EXCLUDED.membership_event_id, - # membership = EXCLUDED.membership, - # event_stream_ordering = EXCLUDED.event_stream_ordering - # {"," + (", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} - # """, - # (TODO,), - # ) + for ( + room_id, + user_id, + membership_event_id, + membership, + _membership_event_stream_ordering, + is_outlier, + ) in memberships_to_update_rows: + # We don't know how to handle `membership` values other than these. The + # code below would need to be updated. + assert membership in ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, + ) - # self.db_pool.updates._background_update_progress_txn( - # txn, "redactions_received_ts", {"last_event_id": upper_event_id} - # ) + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_membership_snapshots_insert_map: Dict[ + str, Optional[Union[str, bool]] + ] = {} + if membership == Membership.JOIN: + # If we're still joined, we can pull from current state + current_state_map = PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn( + txn, room_id + ) + # We're iterating over rooms that we are joined to so they should + # have `current_state_events` and we should have some current state + # for each room + assert current_state_map - # return len(rows) + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_current_state_map_txn( + txn, current_state_map + ) + # We should have some insert values for each room, even if they are `None` + assert sliding_sync_membership_snapshots_insert_map - # TODO - # return len(rows) - return 0 + # We have current state to work from + sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( + True + ) + elif membership in (Membership.INVITE, Membership.KNOCK) or ( + membership == Membership.LEAVE and is_outlier + ): + invite_or_knock_event_id = membership_event_id + invite_or_knock_membership = membership + + # If the event is an `out_of_band_membership` (special case of + # `outlier`), we never had historical state so we have to pull from + # the stripped state on the previous invite/knock event. This gives + # us a consistent view of the room state regardless of your + # membership (i.e. the room shouldn't disappear if your using the + # `is_encrypted` filter and you leave). + if membership == Membership.LEAVE and is_outlier: + # Find the previous invite/knock event before the leave event + txn.execute( + """ + SELECT event_id, membership + FROM room_memberships + WHERE + room_id = ? + AND user_id = ? + AND event_stream_ordering < ? + ORDER BY event_stream_ordering DESC + LIMIT 1 + """ + ) + row = txn.fetchone() + # We should see a corresponding previous invite/knock event + assert row is not None + invite_or_knock_event_id, invite_or_knock_membership = row + + # Pull from the stripped state on the invite/knock event + txn.execute( + """ + SELECT json FROM event_json + WHERE event_id = ? + """, + (invite_or_knock_event_id), + ) + row = txn.fetchone() + # We should find a corresponding event + assert row is not None + json = row[0] + event_json = db_to_json(json) + + raw_stripped_state_events = None + if invite_or_knock_membership == Membership.INVITE: + invite_room_state = event_json.get("unsigned").get( + "invite_room_state" + ) + raw_stripped_state_events = invite_room_state + elif invite_or_knock_membership == Membership.KNOCK: + knock_room_state = event_json.get("unsigned").get( + "knock_room_state" + ) + raw_stripped_state_events = knock_room_state + + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state_txn( + txn, raw_stripped_state_events + ) + # We should have some insert values for each room, even if no + # stripped state is on the event because we still want to record + # that we have no known state + assert sliding_sync_membership_snapshots_insert_map + elif membership == Membership.BAN: + # Pull from historical state + # TODO + pass + else: + assert_never(membership) + + # Pulling keys/values separately is safe and will produce congruent + # lists + insert_keys = sliding_sync_membership_snapshots_insert_map.keys() + insert_values = sliding_sync_membership_snapshots_insert_map.values() + # We don't need to do anything `ON CONFLICT` because we never partially + # insert/update the snapshots + txn.execute( + f""" + INSERT INTO sliding_sync_membership_snapshots + (room_id, user_id, membership_event_id, membership, event_stream_ordering + {("," + ", ".join(insert_keys)) if insert_keys else ""}) + VALUES ( + ?, ?, ?, ?, + (SELECT stream_ordering FROM events WHERE event_id = ?) + {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} + ) + ON CONFLICT (room_id, user_id) + DO NOTHING + """, + [ + room_id, + user_id, + membership_event_id, + membership, + membership_event_id, + ] + + list(insert_values), + ) + + ( + _room_id, + _user_id, + _membership_event_id, + _membership, + membership_event_stream_ordering, + _is_outlier, + ) = memberships_to_update_rows[-1] + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BACKFILL, + {"last_event_stream_ordering": membership_event_stream_ordering}, + ) + + return len(memberships_to_update_rows) count = await self.db_pool.runInteraction( "sliding_sync_membership_snapshots_backfill", _txn