Merge remote-tracking branch 'origin/madlittlemods/sliding-sync-pre-populate-room-meta-data' into erikj/ss_hacks
This commit is contained in:
@@ -0,0 +1 @@
|
||||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting.
|
||||
@@ -129,6 +129,11 @@ BOOLEAN_COLUMNS = {
|
||||
"remote_media_cache": ["authenticated"],
|
||||
"room_stats_state": ["is_federatable"],
|
||||
"rooms": ["is_public", "has_auth_chain_index"],
|
||||
"sliding_sync_joined_rooms": ["is_encrypted"],
|
||||
"sliding_sync_membership_snapshots": [
|
||||
"has_known_state",
|
||||
"is_encrypted",
|
||||
],
|
||||
"users": ["shadow_banned", "approved", "locked", "suspended"],
|
||||
"un_partial_stated_event_stream": ["rejection_status_changed"],
|
||||
"users_who_share_rooms": ["share_private"],
|
||||
|
||||
@@ -245,6 +245,8 @@ class EventContentFields:
|
||||
# `m.room.encryption`` algorithm field
|
||||
ENCRYPTION_ALGORITHM: Final = "algorithm"
|
||||
|
||||
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
|
||||
|
||||
|
||||
class EventUnsignedContentFields:
|
||||
"""Fields found inside the 'unsigned' data on events"""
|
||||
|
||||
@@ -77,6 +77,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
OperationType,
|
||||
@@ -109,18 +110,6 @@ class Sentinel(enum.Enum):
|
||||
UNSET_SENTINEL = object()
|
||||
|
||||
|
||||
# The event types that clients should consider as new activity.
|
||||
DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _RoomMembershipForUser:
|
||||
"""
|
||||
@@ -2224,7 +2213,9 @@ class SlidingSyncHandler:
|
||||
# Figure out the last bump event in the room
|
||||
last_bump_event_result = (
|
||||
await self.store.get_last_event_pos_in_room_before_stream_ordering(
|
||||
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
|
||||
room_id,
|
||||
to_token.room_key,
|
||||
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -502,8 +502,15 @@ class EventsPersistenceStorageController:
|
||||
"""
|
||||
state = await self._calculate_current_state(room_id)
|
||||
delta = await self._calculate_state_delta(room_id, state)
|
||||
sliding_sync_table_changes = (
|
||||
await self.persist_events_store._calculate_sliding_sync_table_changes(
|
||||
room_id, [], delta
|
||||
)
|
||||
)
|
||||
|
||||
await self.persist_events_store.update_current_state(room_id, delta)
|
||||
await self.persist_events_store.update_current_state(
|
||||
room_id, delta, sliding_sync_table_changes
|
||||
)
|
||||
|
||||
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
|
||||
"""Calculate the current state of a room, based on the forward extremities
|
||||
|
||||
+12
-11
@@ -35,6 +35,7 @@ from typing import (
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
@@ -1299,9 +1300,9 @@ class DatabasePool:
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
@@ -1344,9 +1345,9 @@ class DatabasePool:
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
lock: bool = True,
|
||||
) -> bool:
|
||||
@@ -1425,9 +1426,9 @@ class DatabasePool:
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
values: Dict[str, Any],
|
||||
insertion_values: Optional[Dict[str, Any]] = None,
|
||||
keyvalues: Mapping[str, Any],
|
||||
values: Mapping[str, Any],
|
||||
insertion_values: Optional[Mapping[str, Any]] = None,
|
||||
where_clause: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
@@ -2011,8 +2012,8 @@ class DatabasePool:
|
||||
def simple_update_txn(
|
||||
txn: LoggingTransaction,
|
||||
table: str,
|
||||
keyvalues: Dict[str, Any],
|
||||
updatevalues: Dict[str, Any],
|
||||
keyvalues: Mapping[str, Any],
|
||||
updatevalues: Mapping[str, Any],
|
||||
) -> int:
|
||||
"""
|
||||
Update rows in the given database table.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -24,9 +24,9 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import EventContentFields, RelationTypes
|
||||
from synapse.api.constants import EventContentFields, Membership, RelationTypes
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import make_event_from_dict
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
@@ -34,9 +34,19 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
make_tuple_comparison_clause,
|
||||
)
|
||||
from synapse.storage.databases.main.events import PersistEventsStore
|
||||
from synapse.storage.databases.main.events import (
|
||||
SLIDING_SYNC_RELEVANT_STATE_SET,
|
||||
PersistEventsStore,
|
||||
SlidingSyncMembershipInfo,
|
||||
SlidingSyncMembershipSnapshotSharedInsertValues,
|
||||
SlidingSyncStateInsertValues,
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, StrCollection
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers.sliding_sync import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
from synapse.types.state import StateFilter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -78,6 +88,11 @@ class _BackgroundUpdates:
|
||||
|
||||
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
|
||||
|
||||
SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE = "sliding_sync_joined_rooms_bg_update"
|
||||
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
|
||||
"sliding_sync_membership_snapshots_bg_update"
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _CalculateChainCover:
|
||||
@@ -97,7 +112,7 @@ class _CalculateChainCover:
|
||||
finished_room_map: Dict[str, Tuple[int, int]]
|
||||
|
||||
|
||||
class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
@@ -279,6 +294,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
where_clause="NOT outlier",
|
||||
)
|
||||
|
||||
# Add some background updates to populate the sliding sync tables
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||
self._sliding_sync_joined_rooms_bg_update,
|
||||
)
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
self._sliding_sync_membership_snapshots_bg_update,
|
||||
)
|
||||
|
||||
async def _background_reindex_fields_sender(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
@@ -1073,7 +1098,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
PersistEventsStore._add_chain_cover_index(
|
||||
txn,
|
||||
self.db_pool,
|
||||
self.event_chain_id_gen, # type: ignore[attr-defined]
|
||||
self.event_chain_id_gen,
|
||||
event_to_room_id,
|
||||
event_to_types,
|
||||
cast(Dict[str, StrCollection], event_to_auth_chain),
|
||||
@@ -1516,3 +1541,501 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _sliding_sync_joined_rooms_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `sliding_sync_joined_rooms` table.
|
||||
"""
|
||||
last_room_id = progress.get("last_room_id", "")
|
||||
|
||||
def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[str]:
|
||||
# Fetch the set of room IDs that we want to update
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT room_id FROM current_state_events
|
||||
WHERE room_id > ?
|
||||
ORDER BY room_id ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(last_room_id, batch_size),
|
||||
)
|
||||
|
||||
rooms_to_update_rows = cast(List[Tuple[str]], txn.fetchall())
|
||||
|
||||
return [row[0] for row in rooms_to_update_rows]
|
||||
|
||||
rooms_to_update = await self.db_pool.runInteraction(
|
||||
"_sliding_sync_joined_rooms_bg_update._get_rooms_to_update_txn",
|
||||
_get_rooms_to_update_txn,
|
||||
)
|
||||
|
||||
if not rooms_to_update:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE
|
||||
)
|
||||
return 0
|
||||
|
||||
# Map from room_id to insert/update state values in the `sliding_sync_joined_rooms` table
|
||||
joined_room_updates: Dict[str, SlidingSyncStateInsertValues] = {}
|
||||
# Map from room_id to stream_ordering/bump_stamp/last_current_state_delta_stream_id values
|
||||
joined_room_stream_ordering_updates: Dict[
|
||||
str, Tuple[int, Optional[int], int]
|
||||
] = {}
|
||||
for room_id in rooms_to_update:
|
||||
current_state_ids_map, last_current_state_delta_stream_id = (
|
||||
await self.db_pool.runInteraction(
|
||||
"_sliding_sync_joined_rooms_bg_update._get_relevant_sliding_sync_current_state_event_ids_txn",
|
||||
PersistEventsStore._get_relevant_sliding_sync_current_state_event_ids_txn,
|
||||
room_id,
|
||||
)
|
||||
)
|
||||
# We're iterating over rooms pulled from the current_state_events table
|
||||
# so we should have some current state for each room
|
||||
assert current_state_ids_map
|
||||
|
||||
fetched_events = await self.get_events(current_state_ids_map.values())
|
||||
|
||||
current_state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
for state_key, event_id in current_state_ids_map.items()
|
||||
}
|
||||
|
||||
state_insert_values = (
|
||||
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
|
||||
current_state_map
|
||||
)
|
||||
)
|
||||
# We should have some insert values for each room, even if they are `None`
|
||||
assert state_insert_values
|
||||
joined_room_updates[room_id] = state_insert_values
|
||||
|
||||
# Figure out the stream_ordering of the latest event in the room
|
||||
most_recent_event_pos_results = await self.get_last_event_pos_in_room(
|
||||
room_id, event_types=None
|
||||
)
|
||||
assert most_recent_event_pos_results is not None, (
|
||||
f"We should not be seeing `None` here because the room ({room_id}) should at-least have a create event "
|
||||
+ "given we pulled the room out of `current_state_events`"
|
||||
)
|
||||
most_recent_event_stream_ordering = most_recent_event_pos_results[1].stream
|
||||
assert most_recent_event_stream_ordering > 0, (
|
||||
"We should have at-least one event in the room (our own join membership event for example) "
|
||||
+ "that isn't backfilled (negative `stream_ordering`) if we are joined to the room."
|
||||
)
|
||||
# Figure out the latest bump_stamp in the room. This could be `None` for a
|
||||
# federated room you just joined where all of events are still `outliers` or
|
||||
# backfilled history. In the Sliding Sync API, we default to the user's
|
||||
# membership event `stream_ordering` if we don't have a `bump_stamp`.
|
||||
bump_stamp_event_pos_results = await self.get_last_event_pos_in_room(
|
||||
room_id, event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
)
|
||||
most_recent_bump_stamp = None
|
||||
if (
|
||||
bump_stamp_event_pos_results is not None
|
||||
and bump_stamp_event_pos_results[1].stream > 0
|
||||
):
|
||||
most_recent_bump_stamp = bump_stamp_event_pos_results[1].stream
|
||||
|
||||
joined_room_stream_ordering_updates[room_id] = (
|
||||
most_recent_event_stream_ordering,
|
||||
most_recent_bump_stamp,
|
||||
last_current_state_delta_stream_id,
|
||||
)
|
||||
|
||||
def _fill_table_txn(txn: LoggingTransaction) -> None:
|
||||
# Handle updating the `sliding_sync_joined_rooms` table
|
||||
#
|
||||
last_successful_room_id: Optional[str] = None
|
||||
for room_id, insert_map in joined_room_updates.items():
|
||||
(
|
||||
event_stream_ordering,
|
||||
bump_stamp,
|
||||
last_current_state_delta_stream_id,
|
||||
) = joined_room_stream_ordering_updates[room_id]
|
||||
|
||||
# Check if the current state has been updated since we gathered it
|
||||
state_deltas_since_we_gathered_current_state = (
|
||||
self.get_current_state_deltas_for_room_txn(
|
||||
txn,
|
||||
room_id,
|
||||
from_token=RoomStreamToken(
|
||||
stream=last_current_state_delta_stream_id
|
||||
),
|
||||
to_token=None,
|
||||
)
|
||||
)
|
||||
for state_delta in state_deltas_since_we_gathered_current_state:
|
||||
# We only need to check if the state is relevant to the
|
||||
# `sliding_sync_joined_rooms` table.
|
||||
if (
|
||||
state_delta.event_type,
|
||||
state_delta.state_key,
|
||||
) in SLIDING_SYNC_RELEVANT_STATE_SET:
|
||||
# Save our progress before we exit early
|
||||
if last_successful_room_id is not None:
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||
{"last_room_id": room_id},
|
||||
)
|
||||
# Raising exception so we can just exit and try again. It would
|
||||
# be hard to resolve this within the transaction because we need
|
||||
# to get full events out that take redactions into account. We
|
||||
# could add some retry logic here, but it's easier to just let
|
||||
# the background update try again.
|
||||
raise Exception(
|
||||
"Current state was updated after we gathered it to update "
|
||||
+ "`sliding_sync_joined_rooms` in the background update. "
|
||||
+ "Raising exception so we can just try again."
|
||||
)
|
||||
|
||||
# Since we partially update the `sliding_sync_joined_rooms` as new state
|
||||
# is sent, we need to update the state fields `ON CONFLICT`. We just
|
||||
# have to be careful we're not overwriting it with stale data (see
|
||||
# `last_current_state_delta_stream_id` check above).
|
||||
#
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="sliding_sync_joined_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
values=insert_map,
|
||||
insertion_values={
|
||||
# The reason we're only *inserting* (not *updating*) `event_stream_ordering`
|
||||
# and `bump_stamp` is because if they are present, that means they are already
|
||||
# up-to-date.
|
||||
"event_stream_ordering": event_stream_ordering,
|
||||
"bump_stamp": bump_stamp,
|
||||
},
|
||||
)
|
||||
|
||||
# Keep track of the last successful room_id
|
||||
last_successful_room_id = room_id
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"sliding_sync_joined_rooms_bg_update", _fill_table_txn
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
await self.db_pool.updates._background_update_progress(
|
||||
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
|
||||
{"last_room_id": rooms_to_update[-1]},
|
||||
)
|
||||
|
||||
return len(rooms_to_update)
|
||||
|
||||
async def _sliding_sync_membership_snapshots_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `sliding_sync_membership_snapshots` table.
|
||||
"""
|
||||
last_event_stream_ordering = progress.get(
|
||||
"last_event_stream_ordering", -(1 << 31)
|
||||
)
|
||||
|
||||
def _find_memberships_to_update_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[Tuple[str, str, str, str, str, int, bool]]:
|
||||
# Fetch the set of event IDs that we want to update
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT
|
||||
c.room_id,
|
||||
c.user_id,
|
||||
e.sender,
|
||||
c.event_id,
|
||||
c.membership,
|
||||
c.event_stream_ordering,
|
||||
e.outlier
|
||||
FROM local_current_membership as c
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
WHERE event_stream_ordering > ?
|
||||
ORDER BY event_stream_ordering ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(last_event_stream_ordering, batch_size),
|
||||
)
|
||||
|
||||
memberships_to_update_rows = cast(
|
||||
List[Tuple[str, str, str, str, str, int, bool]], txn.fetchall()
|
||||
)
|
||||
|
||||
return memberships_to_update_rows
|
||||
|
||||
memberships_to_update_rows = await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update._find_memberships_to_update_txn",
|
||||
_find_memberships_to_update_txn,
|
||||
)
|
||||
|
||||
if not memberships_to_update_rows:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE
|
||||
)
|
||||
return 0
|
||||
|
||||
def _find_previous_membership_txn(
|
||||
txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
|
||||
) -> Tuple[str, str]:
|
||||
# Find the previous invite/knock event before the leave event
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, membership
|
||||
FROM room_memberships
|
||||
WHERE
|
||||
room_id = ?
|
||||
AND user_id = ?
|
||||
AND event_stream_ordering < ?
|
||||
ORDER BY event_stream_ordering DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(
|
||||
room_id,
|
||||
user_id,
|
||||
stream_ordering,
|
||||
),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
|
||||
# We should see a corresponding previous invite/knock event
|
||||
assert row is not None
|
||||
event_id, membership = row
|
||||
|
||||
return event_id, membership
|
||||
|
||||
# Map from (room_id, user_id) to ...
|
||||
to_insert_membership_snapshots: Dict[
|
||||
Tuple[str, str], SlidingSyncMembershipSnapshotSharedInsertValues
|
||||
] = {}
|
||||
to_insert_membership_infos: Dict[Tuple[str, str], SlidingSyncMembershipInfo] = (
|
||||
{}
|
||||
)
|
||||
for (
|
||||
room_id,
|
||||
user_id,
|
||||
sender,
|
||||
membership_event_id,
|
||||
membership,
|
||||
membership_event_stream_ordering,
|
||||
is_outlier,
|
||||
) in memberships_to_update_rows:
|
||||
# We don't know how to handle `membership` values other than these. The
|
||||
# code below would need to be updated.
|
||||
assert membership in (
|
||||
Membership.JOIN,
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
Membership.LEAVE,
|
||||
Membership.BAN,
|
||||
)
|
||||
|
||||
# Map of values to insert/update in the `sliding_sync_membership_snapshots` table
|
||||
sliding_sync_membership_snapshots_insert_map: (
|
||||
SlidingSyncMembershipSnapshotSharedInsertValues
|
||||
) = {}
|
||||
if membership == Membership.JOIN:
|
||||
# If we're still joined, we can pull from current state.
|
||||
current_state_ids_map: StateMap[
|
||||
str
|
||||
] = await self.hs.get_storage_controllers().state.get_current_state_ids(
|
||||
room_id,
|
||||
state_filter=StateFilter.from_types(
|
||||
SLIDING_SYNC_RELEVANT_STATE_SET
|
||||
),
|
||||
# Partially-stated rooms should have all state events except for
|
||||
# remote membership events so we don't need to wait at all because
|
||||
# we only want some non-membership state
|
||||
await_full_state=False,
|
||||
)
|
||||
# We're iterating over rooms that we are joined to so they should
|
||||
# have `current_state_events` and we should have some current state
|
||||
# for each room
|
||||
assert current_state_ids_map
|
||||
|
||||
fetched_events = await self.get_events(current_state_ids_map.values())
|
||||
|
||||
current_state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
for state_key, event_id in current_state_ids_map.items()
|
||||
}
|
||||
|
||||
state_insert_values = (
|
||||
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
|
||||
current_state_map
|
||||
)
|
||||
)
|
||||
sliding_sync_membership_snapshots_insert_map.update(state_insert_values)
|
||||
# We should have some insert values for each room, even if they are `None`
|
||||
assert sliding_sync_membership_snapshots_insert_map
|
||||
|
||||
# We have current state to work from
|
||||
sliding_sync_membership_snapshots_insert_map["has_known_state"] = True
|
||||
elif membership in (Membership.INVITE, Membership.KNOCK) or (
|
||||
membership == Membership.LEAVE and is_outlier
|
||||
):
|
||||
invite_or_knock_event_id = membership_event_id
|
||||
invite_or_knock_membership = membership
|
||||
|
||||
# If the event is an `out_of_band_membership` (special case of
|
||||
# `outlier`), we never had historical state so we have to pull from
|
||||
# the stripped state on the previous invite/knock event. This gives
|
||||
# us a consistent view of the room state regardless of your
|
||||
# membership (i.e. the room shouldn't disappear if your using the
|
||||
# `is_encrypted` filter and you leave).
|
||||
if membership == Membership.LEAVE and is_outlier:
|
||||
invite_or_knock_event_id, invite_or_knock_membership = (
|
||||
await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update._find_previous_membership",
|
||||
_find_previous_membership_txn,
|
||||
room_id,
|
||||
user_id,
|
||||
membership_event_stream_ordering,
|
||||
)
|
||||
)
|
||||
|
||||
# Pull from the stripped state on the invite/knock event
|
||||
invite_or_knock_event = await self.get_event(invite_or_knock_event_id)
|
||||
|
||||
raw_stripped_state_events = None
|
||||
if invite_or_knock_membership == Membership.INVITE:
|
||||
invite_room_state = invite_or_knock_event.unsigned.get(
|
||||
"invite_room_state"
|
||||
)
|
||||
raw_stripped_state_events = invite_room_state
|
||||
elif invite_or_knock_membership == Membership.KNOCK:
|
||||
knock_room_state = invite_or_knock_event.unsigned.get(
|
||||
"knock_room_state"
|
||||
)
|
||||
raw_stripped_state_events = knock_room_state
|
||||
|
||||
sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_stripped_state(
|
||||
raw_stripped_state_events
|
||||
)
|
||||
|
||||
# We should have some insert values for each room, even if no
|
||||
# stripped state is on the event because we still want to record
|
||||
# that we have no known state
|
||||
assert sliding_sync_membership_snapshots_insert_map
|
||||
elif membership in (Membership.LEAVE, Membership.BAN):
|
||||
# Pull from historical state
|
||||
state_ids_map = await self.hs.get_storage_controllers().state.get_state_ids_for_event(
|
||||
membership_event_id,
|
||||
state_filter=StateFilter.from_types(
|
||||
SLIDING_SYNC_RELEVANT_STATE_SET
|
||||
),
|
||||
# Partially-stated rooms should have all state events except for
|
||||
# remote membership events so we don't need to wait at all because
|
||||
# we only want some non-membership state
|
||||
await_full_state=False,
|
||||
)
|
||||
|
||||
fetched_events = await self.get_events(state_ids_map.values())
|
||||
|
||||
state_map: StateMap[EventBase] = {
|
||||
state_key: fetched_events[event_id]
|
||||
for state_key, event_id in state_ids_map.items()
|
||||
}
|
||||
|
||||
state_insert_values = (
|
||||
PersistEventsStore._get_sliding_sync_insert_values_from_state_map(
|
||||
state_map
|
||||
)
|
||||
)
|
||||
sliding_sync_membership_snapshots_insert_map.update(state_insert_values)
|
||||
# We should have some insert values for each room, even if they are `None`
|
||||
assert sliding_sync_membership_snapshots_insert_map
|
||||
|
||||
# We have historical state to work from
|
||||
sliding_sync_membership_snapshots_insert_map["has_known_state"] = True
|
||||
else:
|
||||
# We don't know how to handle this type of membership yet
|
||||
#
|
||||
# FIXME: We should use `assert_never` here but for some reason
|
||||
# the exhaustive matching doesn't recognize the `Never` here.
|
||||
# assert_never(membership)
|
||||
raise AssertionError(
|
||||
f"Unexpected membership {membership} ({membership_event_id}) that we don't know how to handle yet"
|
||||
)
|
||||
|
||||
to_insert_membership_snapshots[(room_id, user_id)] = (
|
||||
sliding_sync_membership_snapshots_insert_map
|
||||
)
|
||||
to_insert_membership_infos[(room_id, user_id)] = SlidingSyncMembershipInfo(
|
||||
user_id=user_id,
|
||||
sender=sender,
|
||||
membership_event_id=membership_event_id,
|
||||
membership=membership,
|
||||
membership_event_stream_ordering=membership_event_stream_ordering,
|
||||
)
|
||||
|
||||
def _fill_table_txn(txn: LoggingTransaction) -> None:
|
||||
# Handle updating the `sliding_sync_membership_snapshots` table
|
||||
#
|
||||
for key, insert_map in to_insert_membership_snapshots.items():
|
||||
room_id, user_id = key
|
||||
membership_info = to_insert_membership_infos[key]
|
||||
sender = membership_info.sender
|
||||
membership_event_id = membership_info.membership_event_id
|
||||
membership = membership_info.membership
|
||||
membership_event_stream_ordering = (
|
||||
membership_info.membership_event_stream_ordering
|
||||
)
|
||||
|
||||
# We don't need to upsert the state because we never partially
|
||||
# insert/update the snapshots and anything already there is up-to-date
|
||||
# EXCEPT for the `forgotten` field since that is updated out-of-band
|
||||
# from the membership changes.
|
||||
#
|
||||
# Even though we're only doing insertions, we're using
|
||||
# `simple_upsert_txn()` here to avoid unique violation errors that would
|
||||
# happen from `simple_insert_txn()`
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"room_id": room_id, "user_id": user_id},
|
||||
values={},
|
||||
insertion_values={
|
||||
**insert_map,
|
||||
"sender": sender,
|
||||
"membership_event_id": membership_event_id,
|
||||
"membership": membership,
|
||||
"event_stream_ordering": membership_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
# We need to find the `forgotten` value during the transaction because
|
||||
# we can't risk inserting stale data.
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE sliding_sync_membership_snapshots
|
||||
SET
|
||||
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
|
||||
WHERE room_id = ? and user_id = ?
|
||||
""",
|
||||
(
|
||||
membership_event_id,
|
||||
room_id,
|
||||
user_id,
|
||||
),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"sliding_sync_membership_snapshots_bg_update", _fill_table_txn
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
(
|
||||
_room_id,
|
||||
_user_id,
|
||||
_sender,
|
||||
_membership_event_id,
|
||||
_membership,
|
||||
membership_event_stream_ordering,
|
||||
_is_outlier,
|
||||
) = memberships_to_update_rows[-1]
|
||||
await self.db_pool.updates._background_update_progress(
|
||||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
{"last_event_stream_ordering": membership_event_stream_ordering},
|
||||
)
|
||||
|
||||
return len(memberships_to_update_rows)
|
||||
|
||||
@@ -511,6 +511,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
) -> Dict[str, EventBase]:
|
||||
"""Get events from the database
|
||||
|
||||
Unknown events will be omitted from the response.
|
||||
|
||||
Args:
|
||||
event_ids: The event_ids of the events to fetch
|
||||
|
||||
|
||||
@@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# so must be deleted first.
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
# Note: the sliding_sync_ tables have foreign keys to the `events` table
|
||||
# so must be deleted first.
|
||||
"sliding_sync_joined_rooms",
|
||||
"sliding_sync_membership_snapshots",
|
||||
"events",
|
||||
"federation_inbound_events_staging",
|
||||
"receipts_graph",
|
||||
|
||||
@@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="sliding_sync_membership_snapshots",
|
||||
keyvalues={"user_id": user_id, "room_id": room_id},
|
||||
updatevalues={"forgotten": 1},
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
|
||||
@@ -161,45 +161,80 @@ class StateDeltasStore(SQLBaseStore):
|
||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||
)
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
) -> List[StateDelta]:
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
from_clause = ""
|
||||
from_args = []
|
||||
if from_token is not None:
|
||||
from_clause = "AND ? < stream_id"
|
||||
from_args = [from_token.stream]
|
||||
|
||||
to_clause = ""
|
||||
to_args = []
|
||||
if to_token is not None:
|
||||
to_clause = "AND stream_id <= ?"
|
||||
to_args = [to_token.get_max_stream_pos()]
|
||||
|
||||
sql = f"""
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? {from_clause} {to_clause}
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(sql, [room_id] + from_args + to_args)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
@trace
|
||||
async def get_current_state_deltas_for_room(
|
||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||
self,
|
||||
room_id: str,
|
||||
*,
|
||||
from_token: Optional[RoomStreamToken],
|
||||
to_token: Optional[RoomStreamToken],
|
||||
) -> List[StateDelta]:
|
||||
"""Get the state deltas between two tokens."""
|
||||
"""
|
||||
Get the state deltas between two tokens.
|
||||
|
||||
if not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
(> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
if (
|
||||
from_token is not None
|
||||
and not self._curr_state_delta_stream_cache.has_entity_changed(
|
||||
room_id, from_token.stream
|
||||
)
|
||||
):
|
||||
return []
|
||||
|
||||
def get_current_state_deltas_for_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> List[StateDelta]:
|
||||
sql = """
|
||||
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
|
||||
FROM current_state_delta_stream
|
||||
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(
|
||||
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
|
||||
)
|
||||
|
||||
return [
|
||||
StateDelta(
|
||||
stream_id=row[1],
|
||||
room_id=room_id,
|
||||
event_type=row[2],
|
||||
state_key=row[3],
|
||||
event_id=row[4],
|
||||
prev_event_id=row[5],
|
||||
)
|
||||
for row in txn
|
||||
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||
"get_current_state_deltas_for_room",
|
||||
self.get_current_state_deltas_for_room_txn,
|
||||
room_id,
|
||||
from_token=from_token,
|
||||
to_token=to_token,
|
||||
)
|
||||
|
||||
@trace
|
||||
|
||||
@@ -1263,12 +1263,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
return None
|
||||
|
||||
async def get_last_event_pos_in_room(
|
||||
self,
|
||||
room_id: str,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room.
|
||||
|
||||
Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
|
||||
|
||||
Args:
|
||||
room_id
|
||||
event_types: Optional allowlist of event types to filter by
|
||||
|
||||
Returns:
|
||||
The ID of the most recent event and it's position, or None if there are no
|
||||
events in the room that match the given event types.
|
||||
"""
|
||||
|
||||
def _get_last_event_pos_in_room_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
event_type_clause = ""
|
||||
event_type_args: List[str] = []
|
||||
if event_types is not None and len(event_types) > 0:
|
||||
event_type_clause, event_type_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "type", event_types
|
||||
)
|
||||
event_type_clause = f"AND {event_type_clause}"
|
||||
|
||||
sql = f"""
|
||||
SELECT event_id, stream_ordering, instance_name
|
||||
FROM events
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE room_id = ?
|
||||
{event_type_clause}
|
||||
AND NOT outlier
|
||||
AND rejections.event_id IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
[room_id] + event_type_args,
|
||||
)
|
||||
|
||||
row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
|
||||
if row is not None:
|
||||
event_id, stream_ordering, instance_name = row
|
||||
|
||||
return event_id, PersistedEventPosition(
|
||||
# If instance_name is null we default to "master"
|
||||
instance_name or "master",
|
||||
stream_ordering,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_last_event_pos_in_room",
|
||||
_get_last_event_pos_in_room_txn,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||
self,
|
||||
room_id: str,
|
||||
end_token: RoomStreamToken,
|
||||
event_types: Optional[Collection[str]] = None,
|
||||
event_types: Optional[StrCollection] = None,
|
||||
) -> Optional[Tuple[str, PersistedEventPosition]]:
|
||||
"""
|
||||
Returns the ID and event position of the last event in a room at or before a
|
||||
|
||||
@@ -142,6 +142,10 @@ Changes in SCHEMA_VERSION = 85
|
||||
|
||||
Changes in SCHEMA_VERSION = 86
|
||||
- Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
|
||||
|
||||
Changes in SCHEMA_VERSION = 87
|
||||
- Add tables to store Sliding Sync data for quick filtering/sorting
|
||||
(`sliding_sync_joined_rooms`, `sliding_sync_membership_snapshots`)
|
||||
"""
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2024 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- A table for storing room meta data (current state relevant to sliding sync) that the
|
||||
-- local server is still participating in (someone local is joined to the room).
|
||||
--
|
||||
-- We store the joined rooms in separate table from `sliding_sync_membership_snapshots`
|
||||
-- because we need up-to-date information for joined rooms and it can be shared across
|
||||
-- everyone who is joined.
|
||||
--
|
||||
-- This table is kept in sync with `current_state_events` which means if the server is
|
||||
-- no longer participating in a room, the row will be deleted.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
-- The `stream_ordering` of the most-recent/latest event in the room
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- The `stream_ordering` of the last event according to the `bump_event_types`
|
||||
bump_stamp BIGINT,
|
||||
-- `m.room.create` -> `content.type` (current state)
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (current state)
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (current state)
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- The primary key is already `room_id`
|
||||
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_joined_rooms_event_stream_ordering ON sliding_sync_joined_rooms(event_stream_ordering);
|
||||
|
||||
-- A table for storing a snapshot of room meta data (historical current state relevant
|
||||
-- for sliding sync) at the time of a local user's membership. Only has rows for the
|
||||
-- latest membership event for a given local user in a room which matches
|
||||
-- `local_current_membership` .
|
||||
--
|
||||
-- We store all memberships including joins. This makes it easy to reference this table
|
||||
-- to find all membership for a given user and shares the same semantics as
|
||||
-- `local_current_membership`. And we get to avoid some table maintenance; if we only
|
||||
-- stored non-joins, we would have to delete the row for the user when the user joins
|
||||
-- the room. Stripped state doesn't include the `m.room.tombstone` event, so we just
|
||||
-- assume that the room doesn't have a tombstone.
|
||||
--
|
||||
-- For remote invite/knocks where the server is not participating in the room, we will
|
||||
-- use stripped state events to populate this table. We assume that if any stripped
|
||||
-- state is given, it will include all possible stripped state events types. For
|
||||
-- example, if stripped state is given but `m.room.encryption` isn't included, we will
|
||||
-- assume that the room is not encrypted.
|
||||
--
|
||||
-- We don't include `bump_stamp` here because we can just use the `stream_ordering` from
|
||||
-- the membership event itself as the `bump_stamp`.
|
||||
CREATE TABLE IF NOT EXISTS sliding_sync_membership_snapshots(
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id),
|
||||
user_id TEXT NOT NULL,
|
||||
-- Useful to be able to tell leaves from kicks (where the `user_id` is different from the `sender`)
|
||||
sender TEXT NOT NULL,
|
||||
membership_event_id TEXT NOT NULL REFERENCES events(event_id),
|
||||
membership TEXT NOT NULL,
|
||||
-- This is an integer just to match `room_memberships` and also means we don't need
|
||||
-- to do any casting.
|
||||
forgotten INTEGER DEFAULT 0 NOT NULL,
|
||||
-- `stream_ordering` of the `membership_event_id`
|
||||
event_stream_ordering BIGINT NOT NULL REFERENCES events(stream_ordering),
|
||||
-- For remote invites/knocks that don't include any stripped state, we want to be
|
||||
-- able to distinguish between a room with `None` as valid value for some state and
|
||||
-- room where the state is completely unknown. Basically, this should be True unless
|
||||
-- no stripped state was provided for a remote invite/knock (False).
|
||||
has_known_state BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.create` -> `content.type` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the `spaces`/`not_spaces` filter in the Sliding Sync API
|
||||
room_type TEXT,
|
||||
-- `m.room.name` -> `content.name` (according to the current state at the time of
|
||||
-- the membership).
|
||||
--
|
||||
-- Useful for the room meta data and `room_name_like` filter in the Sliding Sync API
|
||||
room_name TEXT,
|
||||
-- `m.room.encryption` -> `content.algorithm` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `is_encrypted` filter in the Sliding Sync API
|
||||
is_encrypted BOOLEAN DEFAULT FALSE NOT NULL,
|
||||
-- `m.room.tombstone` -> `content.replacement_room` (according to the current state at the
|
||||
-- time of the membership).
|
||||
--
|
||||
-- Useful for the `include_old_rooms` functionality in the Sliding Sync API
|
||||
tombstone_successor_room_id TEXT,
|
||||
PRIMARY KEY (room_id, user_id)
|
||||
);
|
||||
|
||||
-- So we can purge rooms easily.
|
||||
--
|
||||
-- Since we're using a multi-column index as the primary key (room_id, user_id), the
|
||||
-- first index column (room_id) is always usable for searching so we don't need to
|
||||
-- create a separate index for it.
|
||||
--
|
||||
-- CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_room_id ON sliding_sync_membership_snapshots(room_id);
|
||||
|
||||
-- So we can fetch all rooms for a given user
|
||||
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id);
|
||||
-- So we can sort by `stream_ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_event_stream_ordering ON sliding_sync_membership_snapshots(event_stream_ordering);
|
||||
|
||||
|
||||
-- Add some background updates to populate the new tables
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_joined_rooms_bg_update', '{}');
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(8701, 'sliding_sync_membership_snapshots_bg_update', '{}');
|
||||
@@ -61,6 +61,18 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sliding Sync: The event types that clients should consider as new activity and affect
|
||||
# the `bump_stamp`
|
||||
SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES = {
|
||||
EventTypes.Create,
|
||||
EventTypes.Message,
|
||||
EventTypes.Encrypted,
|
||||
EventTypes.Sticker,
|
||||
EventTypes.CallInvite,
|
||||
EventTypes.PollStart,
|
||||
EventTypes.LiveLocationShareStart,
|
||||
}
|
||||
|
||||
|
||||
class SlidingSyncConfig(SlidingSyncBody):
|
||||
"""
|
||||
|
||||
@@ -16,7 +16,7 @@ import logging
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest.client import login, room, sync
|
||||
from synapse.server import HomeServer
|
||||
@@ -44,6 +44,10 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
def test_rooms_meta_when_joined(self) -> None:
|
||||
"""
|
||||
@@ -600,16 +604,16 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
Test that `bump_stamp` ignores backfilled events, i.e. events with a
|
||||
negative stream ordering.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": "10",
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
@@ -618,6 +622,12 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
@@ -667,22 +677,29 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(
|
||||
self.store.store_room(room_id, creator, False, RoomVersions.V10)
|
||||
)
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(persistence.persist_event(event, context, backfilled=True))
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room
|
||||
join_tuple = self.get_success(
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[invite_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
@@ -691,7 +708,22 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
self.get_success(persistence.persist_event(*join_tuple))
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], invite_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# Doing an SS request should return a positive `bump_stamp`, even though
|
||||
# the only event that matches the bump types has as negative stream
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
@@ -35,6 +36,8 @@ from synapse.util import Clock
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExtremPruneTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
|
||||
@@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, cast
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, JoinRules, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.admin import register_servlets_for_client_rest_resource
|
||||
@@ -38,6 +38,7 @@ from synapse.util import Clock
|
||||
from tests import unittest
|
||||
from tests.server import TestHomeServer
|
||||
from tests.test_utils import event_injection
|
||||
from tests.test_utils.event_injection import create_event
|
||||
from tests.unittest import skip_unless
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -54,6 +55,10 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
||||
# We can't test the RoomMemberStore on its own without the other event
|
||||
# storage logic
|
||||
self.store = hs.get_datastores().main
|
||||
self.state_handler = self.hs.get_state_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
self.u_alice = self.register_user("alice", "pass")
|
||||
self.t_alice = self.login("alice", "pass")
|
||||
@@ -220,31 +225,166 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
def test_join_locally_forgotten_room(self) -> None:
|
||||
"""Tests if a user joins a forgotten room the room is not forgotten anymore."""
|
||||
self.room = self.helper.create_room_as(self.u_alice, tok=self.t_alice)
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
"""
|
||||
Tests if a user joins a forgotten room, the room is not forgotten anymore.
|
||||
|
||||
# after leaving and forget the room, it is forgotten
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "leave"
|
||||
Since a room can't be re-joined if everyone has left. This can only happen with
|
||||
a room with remote users in it.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
# Create a remote room
|
||||
creator = "@user:other"
|
||||
room_id = "!foo:other"
|
||||
room_version = RoomVersions.V10
|
||||
shared_kwargs = {
|
||||
"room_id": room_id,
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
create_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[],
|
||||
type=EventTypes.Create,
|
||||
state_key="",
|
||||
content={
|
||||
# The `ROOM_CREATOR` field could be removed if we used a room
|
||||
# version > 10 (in favor of relying on `sender`)
|
||||
EventContentFields.ROOM_CREATOR: creator,
|
||||
EventContentFields.ROOM_VERSION: room_version.identifier,
|
||||
},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
self.get_success(self.store.forget(self.u_alice, self.room))
|
||||
self.assertTrue(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
)
|
||||
|
||||
# after rejoin the room is not forgotten anymore
|
||||
self.get_success(
|
||||
event_injection.inject_member_event(
|
||||
self.hs, self.room, self.u_alice, "join"
|
||||
creator_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[create_tuple[0].event_id],
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=creator,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=creator,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
remote_events_and_contexts = [
|
||||
create_tuple,
|
||||
creator_tuple,
|
||||
]
|
||||
|
||||
# Ensure the local HS knows the room version
|
||||
self.get_success(self.store.store_room(room_id, creator, False, room_version))
|
||||
|
||||
# Persist these events as backfilled events.
|
||||
for event, context in remote_events_and_contexts:
|
||||
self.get_success(
|
||||
self.persistence.persist_event(event, context, backfilled=True)
|
||||
)
|
||||
|
||||
# Now we join the local user to the room. We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `persist_event(...)` shortcut but it would be nice to use the real remote join
|
||||
# process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[creator_tuple[0].event_id],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[create_tuple[0].event_id],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id for e in [create_tuple[0]]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# The room shouldn't be forgotten because the local user just joined
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(self.room))
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
)
|
||||
|
||||
# After all of the local users (there is only user1) leave and forgetting the
|
||||
# room, it is forgotten
|
||||
user1_leave_response = self.helper.leave(room_id, user1_id, tok=user1_tok)
|
||||
user1_leave_event = self.get_success(
|
||||
self.store.get_event(user1_leave_response["event_id"])
|
||||
)
|
||||
self.get_success(self.store.forget(user1_id, room_id))
|
||||
self.assertTrue(self.get_success(self.store.is_locally_forgotten_room(room_id)))
|
||||
|
||||
# Join the local user to the room (again). We want to make this feel as close to
|
||||
# the real `process_remote_join()` as possible but we'd like to avoid some of
|
||||
# the auth checks that would be done in the real code.
|
||||
#
|
||||
# FIXME: The test was originally written using this less-real
|
||||
# `event_injection.inject_member_event(...)` shortcut but it would be nice to
|
||||
# use the real remote join process in a `FederatingHomeserverTestCase`.
|
||||
flawed_join_tuple = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
prev_event_ids=[user1_leave_response["event_id"]],
|
||||
# This doesn't work correctly to create an `EventContext` that includes
|
||||
# both of these state events. I assume it's because we're working on our
|
||||
# local homeserver which has the remote state set as `outlier`. We have
|
||||
# to create our own EventContext below to get this right.
|
||||
auth_event_ids=[
|
||||
create_tuple[0].event_id,
|
||||
user1_leave_response["event_id"],
|
||||
],
|
||||
type=EventTypes.Member,
|
||||
state_key=user1_id,
|
||||
content={"membership": Membership.JOIN},
|
||||
sender=user1_id,
|
||||
**shared_kwargs,
|
||||
)
|
||||
)
|
||||
# We have to create our own context to get the state set correctly. If we use
|
||||
# the `EventContext` from the `flawed_join_tuple`, the `current_state_events`
|
||||
# table will only have the join event in it which should never happen in our
|
||||
# real server.
|
||||
join_event = flawed_join_tuple[0]
|
||||
join_context = self.get_success(
|
||||
self.state_handler.compute_event_context(
|
||||
join_event,
|
||||
state_ids_before_event={
|
||||
(e.type, e.state_key): e.event_id
|
||||
for e in [create_tuple[0], user1_leave_event]
|
||||
},
|
||||
partial_state=False,
|
||||
)
|
||||
)
|
||||
self.get_success(self.persistence.persist_event(join_event, join_context))
|
||||
|
||||
# After the local user rejoins the remote room, it isn't forgotten anymore
|
||||
self.assertFalse(
|
||||
self.get_success(self.store.is_locally_forgotten_room(room_id))
|
||||
)
|
||||
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
+2
-2
@@ -272,8 +272,8 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
def assertIncludes(
|
||||
self,
|
||||
actual_items: AbstractSet[str],
|
||||
expected_items: AbstractSet[str],
|
||||
actual_items: AbstractSet[TV],
|
||||
expected_items: AbstractSet[TV],
|
||||
exact: bool = False,
|
||||
message: Optional[str] = None,
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user