From 35d797a9c4feb71c4480730bf4244571b14dbac4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sun, 15 Sep 2024 10:13:00 +0100 Subject: [PATCH] Add history visibility index table --- synapse/storage/controllers/persist_events.py | 41 ++++++++++++++- synapse/storage/databases/main/events.py | 50 +++++++++++++++++++ .../schema/main/delta/87/04_visibility.sql | 25 ++++++++++ .../delta/87/05_visibility_index.sql.postgres | 15 ++++++ 4 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/main/delta/87/04_visibility.sql create mode 100644 synapse/storage/schema/main/delta/87/05_visibility_index.sql.postgres diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 879ee9039e..7019c2d69a 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -49,7 +49,7 @@ from prometheus_client import Counter, Histogram from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME @@ -635,6 +635,44 @@ class EventsPersistenceStorageController: room_id, [e for e, _ in chunk] ) + visibilities: Dict[str, str] = {} + with Measure(self._clock, "calculate_history_vis"): + # TODO: We only need to do this on changes, rather than looking + # up the state for every event + for event, context in events_and_contexts: + if ( + backfilled + or event.internal_metadata.is_outlier() + or context.rejected + ): + continue + + state = await context.get_current_state_ids( + StateFilter.from_types([(EventTypes.RoomHistoryVisibility, "")]) + ) + # We're not an outlier + assert state is not None + + history_visibility = HistoryVisibility.SHARED + history_visibility_event_id = state.get( + (EventTypes.RoomHistoryVisibility, "") + ) + if history_visibility_event_id: + for event, _ in events_and_contexts: + if event.event_id == history_visibility_event_id: + history_visibility_event = event + break + else: + history_visibility_event = await self.main_store.get_event( + history_visibility_event_id, + get_prev_content=False, + ) + history_visibility = history_visibility_event.content.get( + "history_visibility", HistoryVisibility.SHARED + ) + + visibilities[event.event_id] = history_visibility + await self.persist_events_store._persist_events_and_state_updates( room_id, chunk, @@ -643,6 +681,7 @@ class EventsPersistenceStorageController: use_negative_stream_ordering=backfilled, inhibit_local_membership_updates=backfilled, new_event_links=new_event_links, + visibilities=visibilities, ) return replaced_events diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e5eae9cee9..3b7922ce5f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -31,6 +31,7 @@ from typing import ( Generator, Iterable, List, + Mapping, Optional, Sequence, Set, @@ -271,6 +272,7 @@ class PersistEventsStore: new_event_links: Dict[str, NewEventChainLinks], use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, + visibilities: Mapping[str, str] = {}, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -355,6 +357,7 @@ class PersistEventsStore: new_forward_extremities=new_forward_extremities, new_event_links=new_event_links, sliding_sync_table_changes=sliding_sync_table_changes, + visibilities=visibilities, ) persist_event_counter.inc(len(events_and_contexts)) @@ -874,6 +877,7 @@ class PersistEventsStore: new_forward_extremities: Optional[Set[str]], new_event_links: Dict[str, NewEventChainLinks], sliding_sync_table_changes: Optional[SlidingSyncTableChanges], + visibilities: Mapping[str, str] = {}, ) -> None: """Insert some number of room events into the necessary database tables. @@ -1027,6 +1031,52 @@ class PersistEventsStore: txn, room_id, events_and_contexts ) + changes = [ + (visibilities[event.event_id], event.internal_metadata.stream_ordering) + for event, context in events_and_contexts + if event.event_id in visibilities + ] + + if changes: + sql = """ + SELECT visibility, start_range FROM history_visibility_ranges + WHERE room_id = ? + ORDER BY start_range DESC + LIMIT 1 + """ + txn.execute(sql, (room_id,)) + row = txn.fetchone() + prev_visibility = None + start_range = None + if row: + ( + prev_visibility, + start_range, + ) = row + + for new_visibility, stream_ordering in changes: + assert stream_ordering is not None + if new_visibility != prev_visibility: + if start_range is not None: + self.db_pool.simple_update_one_txn( + txn, + table="history_visibility_ranges", + keyvalues={"room_id": room_id, "start_range": start_range}, + updatevalues={"end_range": stream_ordering}, + ) + self.db_pool.simple_insert_txn( + txn, + table="history_visibility_ranges", + values={ + "room_id": room_id, + "visibility": new_visibility, + "start_range": stream_ordering, + "end_range": None, + }, + ) + prev_visibility = new_visibility + start_range = stream_ordering + def _persist_event_auth_chain_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/schema/main/delta/87/04_visibility.sql b/synapse/storage/schema/main/delta/87/04_visibility.sql new file mode 100644 index 0000000000..9b09d2bbc5 --- /dev/null +++ b/synapse/storage/schema/main/delta/87/04_visibility.sql @@ -0,0 +1,25 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE TABLE IF NOT EXISTS history_visibility_ranges ( + room_id TEXT NOT NULL, + visibility TEXT NOT NULL, + start_range BIGINT NOT NULL, + end_range BIGINT +); + +CREATE INDEX history_visibility_ranges_idx ON history_visibility_ranges(room_id, start_range, end_range DESC); +CREATE UNIQUE INDEX history_visibility_ranges_uniq_idx ON history_visibility_ranges(room_id, start_range); + +-- CREATE EXTENSION IF NOT EXISTS btree_gist; +-- CREATE INDEX history_visibility_ranges_idx_gist ON history_visibility_ranges USING gist(room_id, int8range(start_range, end_range, '[)]')); diff --git a/synapse/storage/schema/main/delta/87/05_visibility_index.sql.postgres b/synapse/storage/schema/main/delta/87/05_visibility_index.sql.postgres new file mode 100644 index 0000000000..3f993c9d9f --- /dev/null +++ b/synapse/storage/schema/main/delta/87/05_visibility_index.sql.postgres @@ -0,0 +1,15 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE EXTENSION IF NOT EXISTS btree_gist; +CREATE INDEX history_visibility_ranges_idx_gist ON history_visibility_ranges USING gist(room_id, int8range(start_range, end_range, '[)'));