1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
4cf2966797 WIP don't pull out left rooms 2024-09-10 15:38:44 +01:00
Erik Johnston
d99dd4c1e4 Fast-path for have_completed_background_updates 2024-09-10 14:07:50 +01:00
Erik Johnston
864cbcd86f Short-circuit have_finished_sliding_sync_background_jobs
We only need to check this if returned bump stamp is `None`, which is
rare.
2024-09-10 14:04:03 +01:00
8 changed files with 204 additions and 80 deletions

View File

@@ -1173,8 +1173,8 @@ class SlidingSyncHandler:
# `SCHEMA_COMPAT_VERSION` and run the foreground update for # `SCHEMA_COMPAT_VERSION` and run the foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
# (tracked by https://github.com/element-hq/synapse/issues/17623) # (tracked by https://github.com/element-hq/synapse/issues/17623)
await self.store.have_finished_sliding_sync_background_jobs() latest_room_bump_stamp is None
and latest_room_bump_stamp is None and await self.store.have_finished_sliding_sync_background_jobs()
): ):
return None return None

View File

@@ -54,7 +54,6 @@ from synapse.storage.roommember import (
) )
from synapse.types import ( from synapse.types import (
MutableStateMap, MutableStateMap,
PersistedEventPosition,
RoomStreamToken, RoomStreamToken,
StateMap, StateMap,
StrCollection, StrCollection,
@@ -219,6 +218,8 @@ class SlidingSyncRoomLists:
# include rooms that are outside the list ranges. # include rooms that are outside the list ranges.
all_rooms: Set[str] = set() all_rooms: Set[str] = set()
# Note: this won't include rooms the user have left themselves. We add
# back in rooms that the user left since `from_token` below.
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user( room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
user_id user_id
) )
@@ -234,44 +235,26 @@ class SlidingSyncRoomLists:
room_membership_for_user_map.pop(room_id) room_membership_for_user_map.pop(room_id)
continue continue
existing_room = room_membership_for_user_map.get(room_id) current_room_entry = await self.store.get_sliding_sync_room_for_user(
if existing_room is not None: user_id, room_id
# Update room membership events to the point in time of the `to_token` )
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( if current_room_entry is None:
room_id=room_id, # We should always have an entry, even if we get state reset
sender=change.sender, # out of the room.
membership=change.membership, logger.error("Can't find room for user: %s / %s", user_id, room_id)
event_id=change.event_id, continue
event_pos=change.event_pos,
room_version_id=change.room_version_id,
# We keep the current state of the room though
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
else:
# This can happen if we get "state reset" out of the room
# after the `to_token`. In other words, there is no membership
# for the room after the `to_token` but we see membership in
# the token range.
# Get the state at the time. Note that room type never changes, room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
# so we can just get current room type room_id=room_id,
room_type = await self.store.get_room_type(room_id) sender=change.sender,
is_encrypted = await self.get_is_encrypted_for_room_at_token( membership=change.membership,
room_id, to_token.room_key event_id=change.event_id,
) event_pos=change.event_pos,
room_version_id=change.room_version_id,
# Add back rooms that the user was state-reset out of after `to_token` # We keep the current state of the room though
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_type=current_room_entry.room_type,
room_id=room_id, is_encrypted=current_room_entry.is_encrypted,
sender=change.sender, )
membership=change.membership,
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
room_type=room_type,
is_encrypted=is_encrypted,
)
( (
newly_joined_room_ids, newly_joined_room_ids,
@@ -281,31 +264,33 @@ class SlidingSyncRoomLists:
) )
dm_room_ids = await self._get_dm_rooms_for_user(user_id) dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Handle state resets in the from -> to token range. # Handle leaves and state resets in the from -> to token range.
state_reset_rooms = ( left_rooms = newly_left_room_map.keys() - room_membership_for_user_map.keys()
newly_left_room_map.keys() - room_membership_for_user_map.keys() if left_rooms:
)
if state_reset_rooms:
room_membership_for_user_map = dict(room_membership_for_user_map) room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id in ( for room_id in left_rooms:
newly_left_room_map.keys() - room_membership_for_user_map.keys() room_for_user = newly_left_room_map[room_id]
):
# Get the state at the time. Note that room type never changes, # We fetch the current room entry for the user, as that's the
# so we can just get current room type # easiest way of getting the room type etc.
room_type = await self.store.get_room_type(room_id) current_room_entry = await self.store.get_sliding_sync_room_for_user(
is_encrypted = await self.get_is_encrypted_for_room_at_token( user_id, room_id
room_id, newly_left_room_map[room_id].to_room_stream_token()
) )
if current_room_entry is None:
# We should always have an entry, even if we get state reset
# out of the room.
logger.error("Can't find room for user: %s / %s", user_id, room_id)
continue
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync( room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
room_id=room_id, room_id=room_id,
sender=None, sender=room_for_user.sender,
membership=Membership.LEAVE, membership=room_for_user.membership,
event_id=None, event_id=room_for_user.event_id,
event_pos=newly_left_room_map[room_id], event_pos=room_for_user.event_pos,
room_version_id=await self.store.get_room_version_id(room_id), room_version_id=room_for_user.room_version_id,
room_type=room_type, room_type=current_room_entry.room_type,
is_encrypted=is_encrypted, is_encrypted=current_room_entry.is_encrypted,
) )
if sync_config.lists: if sync_config.lists:
@@ -417,8 +402,19 @@ class SlidingSyncRoomLists:
room_subscription, room_subscription,
) in sync_config.room_subscriptions.items(): ) in sync_config.room_subscriptions.items():
if room_id not in room_membership_for_user_map: if room_id not in room_membership_for_user_map:
# TODO: Handle rooms the user isn't in. # Check if we do have an entry for the room, but didn't
continue # pull it out above. This could be e.g. a leave that we
# don't pull out by default.
current_room_entry = (
await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
)
if not current_room_entry:
# TODO: Handle rooms the user isn't in.
continue
room_membership_for_user_map[room_id] = current_room_entry
all_rooms.add(room_id) all_rooms.add(room_id)
@@ -944,18 +940,11 @@ class SlidingSyncRoomLists:
# Ensure we have entries for rooms that the user has been "state reset" # Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but # out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map. # aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items(): for room_id, room_membership in newly_left_room_ids.items():
if room_id in rooms_for_user: if room_id in rooms_for_user:
continue continue
rooms_for_user[room_id] = RoomsForUserStateReset( rooms_for_user[room_id] = room_membership
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
room_version_id=await self.store.get_room_version_id(room_id),
)
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids) return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
@@ -965,7 +954,7 @@ class SlidingSyncRoomLists:
user_id: str, user_id: str,
to_token: StreamToken, to_token: StreamToken,
from_token: Optional[StreamToken], from_token: Optional[StreamToken],
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]: ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
"""Fetch the sets of rooms that the user newly joined or left in the """Fetch the sets of rooms that the user newly joined or left in the
given token range. given token range.
@@ -975,10 +964,10 @@ class SlidingSyncRoomLists:
Returns: Returns:
A 2-tuple of newly joined room IDs and a map of newly left room A 2-tuple of newly joined room IDs and a map of newly left room
IDs to the event position the leave happened at. IDs to the `RoomsForUserStateReset` entry.
""" """
newly_joined_room_ids: Set[str] = set() newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, PersistedEventPosition] = {} newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
# We need to figure out the # We need to figure out the
# #
@@ -1045,12 +1034,21 @@ class SlidingSyncRoomLists:
# 2) # 2)
if last_membership_change_in_from_to_range.membership == Membership.JOIN: if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id) possibly_newly_joined_room_ids.add(room_id)
break
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE: if (
last_membership_change_in_from_to_range.prev_membership
== Membership.JOIN
):
# 1) Mark this room as `newly_left` # 1) Mark this room as `newly_left`
newly_left_room_map[room_id] = ( newly_left_room_map[room_id] = RoomsForUserStateReset(
last_membership_change_in_from_to_range.event_pos room_id=room_id,
sender=last_membership_change_in_from_to_range.sender,
membership=Membership.LEAVE,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
room_version_id=await self.store.get_room_version_id(room_id),
) )
# 2) Figure out `newly_joined` # 2) Figure out `newly_joined`

View File

@@ -495,6 +495,12 @@ class BackgroundUpdater:
if self._all_done: if self._all_done:
return True return True
# We now check if we have completed all pending background updates. We
# do this as once this returns True then it will set `self._all_done`
# and we can skip checking the database in future.
if await self.has_completed_background_updates():
return True
rows = await self.db_pool.simple_select_many_batch( rows = await self.db_pool.simple_select_many_batch(
table="background_updates", table="background_updates",
column="update_name", column="update_name",

View File

@@ -413,6 +413,8 @@ class PersistEventsStore:
to_insert = delta_state.to_insert to_insert = delta_state.to_insert
to_delete = delta_state.to_delete to_delete = delta_state.to_delete
# TODO: Add entry to membership snapshots on state resets.
# If no state is changing, we don't need to do anything. This can happen when a # If no state is changing, we don't need to do anything. This can happen when a
# partial-stated room is re-syncing the current state. # partial-stated room is re-syncing the current state.
if not to_insert and not to_delete: if not to_insert and not to_delete:

View File

@@ -1384,7 +1384,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
) -> Mapping[str, RoomsForUserSlidingSync]: ) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request. """Get all the rooms for a user to handle a sliding sync request.
Ignores forgotten rooms and rooms that the user has been kicked from. Ignores forgotten rooms and rooms that the user has left themselves.
Returns: Returns:
Map from room ID to membership info Map from room ID to membership info
@@ -1404,6 +1404,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join') LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ? WHERE user_id = ?
AND m.forgotten = 0 AND m.forgotten = 0
AND (m.membership != 'leave' OR m.user_id != m.sender)
""" """
txn.execute(sql, (user_id,)) txn.execute(sql, (user_id,))
return { return {
@@ -1425,6 +1426,47 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
get_sliding_sync_rooms_for_user_txn, get_sliding_sync_rooms_for_user_txn,
) )
async def get_sliding_sync_room_for_user(
self, user_id: str, room_id: str
) -> Optional[RoomsForUserSlidingSync]:
"""Get the sliding sync room entry for the given user and room."""
def get_sliding_sync_room_for_user_txn(
txn: LoggingTransaction,
) -> Optional[RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
AND m.room_id = ?
"""
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
if not row:
return None
return RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
)
return await self.db_pool.runInteraction(
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
)
async def have_finished_sliding_sync_background_jobs(self) -> bool: async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if it's safe to use the sliding sync membership tables.""" """Return if it's safe to use the sliding sync membership tables."""

View File

@@ -308,8 +308,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return create_event return create_event
@cached(max_entries=10000) @cached(max_entries=10000)
async def get_room_type(self, room_id: str) -> Optional[str]: async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]:
raise NotImplementedError() """Fetch room type for given room.
Since this function is cached, any missing values would be cached as
`None`. In order to distinguish between an unencrypted room that has
`None` encryption and a room that is unknown to the server where we
might want to omit the value (which would make it cached as `None`),
instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`.
"""
try:
create_event = await self.get_create_event_for_room(room_id)
return create_event.content.get(EventContentFields.ROOM_TYPE)
except NotFoundError:
# We use the sentinel value to distinguish between `None` which is a
# valid room type and a room that is unknown to the server so the value
# is just unset.
return ROOM_UNKNOWN_SENTINEL
@cachedList(cached_method_name="get_room_type", list_name="room_ids") @cachedList(cached_method_name="get_room_type", list_name="room_ids")
async def bulk_get_room_type( async def bulk_get_room_type(

View File

@@ -947,6 +947,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return [] return []
if from_key: if from_key:
# TODO: We also need to invalidate this on current state change
has_changed = self._membership_stream_cache.has_entity_changed( has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream) user_id, int(from_key.stream)
) )

View File

@@ -25,6 +25,7 @@ from synapse.api.constants import (
AccountDataTypes, AccountDataTypes,
EventContentFields, EventContentFields,
EventTypes, EventTypes,
JoinRules,
RoomTypes, RoomTypes,
) )
from synapse.events import EventBase from synapse.events import EventBase
@@ -42,6 +43,7 @@ from synapse.util.stringutils import random_string
from tests import unittest from tests import unittest
from tests.server import TimedOutException from tests.server import TimedOutException
from tests.test_utils.event_injection import create_event
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -1007,3 +1009,60 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Make the Sliding Sync request # Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, tok=user1_tok) response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True) self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
def test_state_reset_room_comes_down_incremental_sync(self) -> None:
"""Test that a room that we were state reset out of comes down
incremental sync"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, is_public=True, tok=user2_tok)
event_response = self.helper.send(room_id1, "test", tok=user2_tok)
event_id = event_response["event_id"]
self.helper.join(room_id1, user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 1,
}
}
}
# Make the Sliding Sync request
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
# Trigger a state reset
join_rule_event, join_rule_context = self.get_success(
create_event(
self.hs,
prev_event_ids=[event_id],
type=EventTypes.JoinRules,
state_key="",
content={"join_rule": JoinRules.INVITE},
sender=user2_id,
room_id=room_id1,
room_version=self.get_success(self.store.get_room_version_id(room_id1)),
)
)
self.get_success(
self.hs.get_storage_controllers().persistence.persist_event(
join_rule_event, join_rule_context
)
)
users_in_room = self.get_success(self.store.get_users_in_room(room_id1))
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# TODO: What should we expect here? Probably at least *something*?
print(response_body["rooms"][room_id1])