First pass of not fetching left rooms and adding back newly_left
This commit is contained in:
@@ -55,7 +55,6 @@ from synapse.storage.roommember import (
|
||||
)
|
||||
from synapse.types import (
|
||||
MutableStateMap,
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
StrCollection,
|
||||
@@ -220,6 +219,9 @@ class SlidingSyncRoomLists:
|
||||
# include rooms that are outside the list ranges.
|
||||
all_rooms: Set[str] = set()
|
||||
|
||||
# Note: this won't include rooms the user have left themselves. We add back
|
||||
# `newly_left` rooms below. This is more efficient than fetching all rooms and
|
||||
# then filtering out the old left rooms.
|
||||
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
|
||||
user_id
|
||||
)
|
||||
@@ -245,36 +247,11 @@ class SlidingSyncRoomLists:
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
# We keep the current state of the room though
|
||||
# We keep the state of the room though
|
||||
has_known_state=existing_room.has_known_state,
|
||||
room_type=existing_room.room_type,
|
||||
is_encrypted=existing_room.is_encrypted,
|
||||
)
|
||||
else:
|
||||
# This can happen if we get "state reset" out of the room
|
||||
# after the `to_token`. In other words, there is no membership
|
||||
# for the room after the `to_token` but we see membership in
|
||||
# the token range.
|
||||
|
||||
# Get the state at the time. Note that room type never changes,
|
||||
# so we can just get current room type
|
||||
room_type = await self.store.get_room_type(room_id)
|
||||
is_encrypted = await self.get_is_encrypted_for_room_at_token(
|
||||
room_id, to_token.room_key
|
||||
)
|
||||
|
||||
# Add back rooms that the user was state-reset out of after `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
has_known_state=True,
|
||||
room_type=room_type,
|
||||
is_encrypted=is_encrypted,
|
||||
)
|
||||
|
||||
(
|
||||
newly_joined_room_ids,
|
||||
@@ -284,38 +261,106 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||
|
||||
# Handle state resets in the from -> to token range.
|
||||
state_reset_rooms = (
|
||||
# Add back `newly_left` rooms (rooms left in the from -> to token range). We do
|
||||
# this because `get_sliding_sync_rooms_for_user(...)` doesn't include rooms that
|
||||
# the user left themselves and it's more efficient to add them back here than to
|
||||
# fetch all rooms and then filter out the old left rooms.
|
||||
#
|
||||
missing_newly_left_rooms = (
|
||||
newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
)
|
||||
if state_reset_rooms:
|
||||
if missing_newly_left_rooms:
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id in (
|
||||
newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
):
|
||||
# Get the state at the time. Note that room type never changes,
|
||||
# so we can just get current room type
|
||||
room_type = await self.store.get_room_type(room_id)
|
||||
is_encrypted = await self.get_is_encrypted_for_room_at_token(
|
||||
room_id, newly_left_room_map[room_id].to_room_stream_token()
|
||||
)
|
||||
for room_id in missing_newly_left_rooms:
|
||||
newly_left_room_for_user = newly_left_room_map[room_id]
|
||||
# This should be a given
|
||||
assert newly_left_room_for_user.membership == Membership.LEAVE
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=None,
|
||||
membership=Membership.LEAVE,
|
||||
event_id=None,
|
||||
event_pos=newly_left_room_map[room_id],
|
||||
room_version_id=await self.store.get_room_version_id(room_id),
|
||||
has_known_state=True,
|
||||
room_type=room_type,
|
||||
is_encrypted=is_encrypted,
|
||||
)
|
||||
room_type = None
|
||||
has_known_state = False
|
||||
is_encrypted = False
|
||||
# Add back `newly_left` rooms
|
||||
#
|
||||
# Normal user left the room
|
||||
if newly_left_room_for_user.event_id is not None:
|
||||
# We fetch from the Sliding Sync tables as it's just another membership
|
||||
newly_left_room_for_user_sliding_sync = (
|
||||
await self.store.get_sliding_sync_room_for_user(
|
||||
user_id, room_id
|
||||
)
|
||||
)
|
||||
# The only reason this would happen is if the user was state reset
|
||||
# out of the room while we were working on the response (between
|
||||
# when we checked for newly_left rooms and now).
|
||||
#
|
||||
# TODO: We should handle this as a state reset case like below
|
||||
if newly_left_room_for_user_sliding_sync is None:
|
||||
raise AssertionError(
|
||||
f"Can't find newly left room for user in the Sliding Sync tables: {user_id} / {room_id}",
|
||||
)
|
||||
|
||||
room_membership_for_user_map[room_id] = (
|
||||
newly_left_room_for_user_sliding_sync
|
||||
)
|
||||
|
||||
change = changes.get(room_id)
|
||||
if change is not None:
|
||||
# Update room membership events to the point in time of the `to_token`
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=change.sender,
|
||||
membership=change.membership,
|
||||
event_id=change.event_id,
|
||||
event_pos=change.event_pos,
|
||||
room_version_id=change.room_version_id,
|
||||
# We keep the state of the room though
|
||||
has_known_state=newly_left_room_for_user_sliding_sync.has_known_state,
|
||||
room_type=newly_left_room_for_user_sliding_sync.room_type,
|
||||
is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
|
||||
)
|
||||
|
||||
# They should match at this point
|
||||
assert (
|
||||
room_membership_for_user_map[room_id].event_id
|
||||
== newly_left_room_for_user.event_id
|
||||
)
|
||||
|
||||
# "state reset" out of the room
|
||||
else:
|
||||
# Get the state at the time. We can't read from the Sliding Sync
|
||||
# tables because the user has no membership in the room according to
|
||||
# the state (thanks to the state reset).
|
||||
#
|
||||
# Note: `room_type` never changes, so we can just get current room
|
||||
# type
|
||||
room_type = await self.store.get_room_type(room_id)
|
||||
has_known_state = room_type is not ROOM_UNKNOWN_SENTINEL
|
||||
if isinstance(room_type, StateSentinel):
|
||||
room_type = None
|
||||
|
||||
# Get the encryption status at the time of the token
|
||||
is_encrypted = await self.get_is_encrypted_for_room_at_token(
|
||||
room_id,
|
||||
newly_left_room_for_user.event_pos.to_room_stream_token(),
|
||||
)
|
||||
|
||||
room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
|
||||
room_id=room_id,
|
||||
sender=newly_left_room_for_user.sender,
|
||||
membership=newly_left_room_for_user.membership,
|
||||
event_id=newly_left_room_for_user.event_id,
|
||||
event_pos=newly_left_room_for_user.event_pos,
|
||||
room_version_id=newly_left_room_for_user.room_version_id,
|
||||
has_known_state=has_known_state,
|
||||
room_type=room_type,
|
||||
is_encrypted=is_encrypted,
|
||||
)
|
||||
|
||||
if sync_config.lists:
|
||||
sync_room_map = {
|
||||
room_id: room_membership_for_user
|
||||
for room_id, room_membership_for_user in room_membership_for_user_map.items()
|
||||
# TODO: Do we need to do this anymore?
|
||||
if filter_membership_for_sync(
|
||||
user_id=user_id,
|
||||
room_membership_for_user=room_membership_for_user,
|
||||
@@ -955,17 +1000,23 @@ class SlidingSyncRoomLists:
|
||||
# Ensure we have entries for rooms that the user has been "state reset"
|
||||
# out of. These are rooms appear in the `newly_left_rooms` map but
|
||||
# aren't in the `rooms_for_user` map.
|
||||
for room_id, left_event_pos in newly_left_room_ids.items():
|
||||
for room_id, newly_left_room_for_user in newly_left_room_ids.items():
|
||||
# If we already know about the room, it's not a state reset
|
||||
if room_id in rooms_for_user:
|
||||
continue
|
||||
|
||||
# This should be true if it's a state reset
|
||||
assert newly_left_room_for_user.membership is Membership.LEAVE
|
||||
assert newly_left_room_for_user.event_id is None
|
||||
assert newly_left_room_for_user.sender is None
|
||||
|
||||
rooms_for_user[room_id] = RoomsForUserStateReset(
|
||||
room_id=room_id,
|
||||
event_id=None,
|
||||
event_pos=left_event_pos,
|
||||
event_pos=newly_left_room_for_user.event_pos,
|
||||
membership=Membership.LEAVE,
|
||||
sender=None,
|
||||
room_version_id=await self.store.get_room_version_id(room_id),
|
||||
room_version_id=newly_left_room_for_user.room_version_id,
|
||||
)
|
||||
|
||||
return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
|
||||
@@ -976,7 +1027,9 @@ class SlidingSyncRoomLists:
|
||||
user_id: str,
|
||||
to_token: StreamToken,
|
||||
from_token: Optional[StreamToken],
|
||||
) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
|
||||
) -> Tuple[
|
||||
AbstractSet[str], Mapping[str, Union[RoomsForUser, RoomsForUserStateReset]]
|
||||
]:
|
||||
"""Fetch the sets of rooms that the user newly joined or left in the
|
||||
given token range.
|
||||
|
||||
@@ -989,7 +1042,7 @@ class SlidingSyncRoomLists:
|
||||
IDs to the event position the leave happened at.
|
||||
"""
|
||||
newly_joined_room_ids: Set[str] = set()
|
||||
newly_left_room_map: Dict[str, PersistedEventPosition] = {}
|
||||
newly_left_room_map: Dict[str, Union[RoomsForUser, RoomsForUserStateReset]] = {}
|
||||
|
||||
# We need to figure out the
|
||||
#
|
||||
@@ -1060,9 +1113,27 @@ class SlidingSyncRoomLists:
|
||||
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
|
||||
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
|
||||
# 1) Mark this room as `newly_left`
|
||||
newly_left_room_map[room_id] = (
|
||||
last_membership_change_in_from_to_range.event_pos
|
||||
)
|
||||
if last_membership_change_in_from_to_range.event_id is not None:
|
||||
# This is a normal leave event so this should exist
|
||||
assert last_membership_change_in_from_to_range.sender is not None
|
||||
|
||||
newly_left_room_map[room_id] = RoomsForUser(
|
||||
room_id=room_id,
|
||||
sender=last_membership_change_in_from_to_range.sender,
|
||||
membership=Membership.LEAVE,
|
||||
event_id=last_membership_change_in_from_to_range.event_id,
|
||||
event_pos=last_membership_change_in_from_to_range.event_pos,
|
||||
room_version_id=await self.store.get_room_version_id(room_id),
|
||||
)
|
||||
else:
|
||||
newly_left_room_map[room_id] = RoomsForUserStateReset(
|
||||
room_id=room_id,
|
||||
sender=last_membership_change_in_from_to_range.sender,
|
||||
membership=Membership.LEAVE,
|
||||
event_id=last_membership_change_in_from_to_range.event_id,
|
||||
event_pos=last_membership_change_in_from_to_range.event_pos,
|
||||
room_version_id=await self.store.get_room_version_id(room_id),
|
||||
)
|
||||
|
||||
# 2) Figure out `newly_joined`
|
||||
for room_id in possibly_newly_joined_room_ids:
|
||||
|
||||
@@ -1403,7 +1403,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
) -> Mapping[str, RoomsForUserSlidingSync]:
|
||||
"""Get all the rooms for a user to handle a sliding sync request.
|
||||
|
||||
Ignores forgotten rooms and rooms that the user has been kicked from.
|
||||
Ignores forgotten rooms and rooms that the user has left themselves.
|
||||
|
||||
Returns:
|
||||
Map from room ID to membership info
|
||||
@@ -1428,6 +1428,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
|
||||
WHERE user_id = ?
|
||||
AND m.forgotten = 0
|
||||
AND (m.membership != 'leave' OR m.user_id != m.sender)
|
||||
"""
|
||||
txn.execute(sql, (user_id,))
|
||||
return {
|
||||
@@ -1450,6 +1451,49 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
get_sliding_sync_rooms_for_user_txn,
|
||||
)
|
||||
|
||||
async def get_sliding_sync_room_for_user(
|
||||
self, user_id: str, room_id: str
|
||||
) -> Optional[RoomsForUserSlidingSync]:
|
||||
"""Get the sliding sync room entry for the given user and room."""
|
||||
|
||||
def get_sliding_sync_room_for_user_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[RoomsForUserSlidingSync]:
|
||||
sql = """
|
||||
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
|
||||
r.room_version,
|
||||
m.event_instance_name, m.event_stream_ordering,
|
||||
m.has_known_state,
|
||||
COALESCE(j.room_type, m.room_type),
|
||||
COALESCE(j.is_encrypted, m.is_encrypted)
|
||||
FROM sliding_sync_membership_snapshots AS m
|
||||
INNER JOIN rooms AS r USING (room_id)
|
||||
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
|
||||
WHERE user_id = ?
|
||||
AND m.forgotten = 0
|
||||
AND m.room_id = ?
|
||||
"""
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
row = txn.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
|
||||
return RoomsForUserSlidingSync(
|
||||
room_id=row[0],
|
||||
sender=row[1],
|
||||
membership=row[2],
|
||||
event_id=row[3],
|
||||
room_version_id=row[4],
|
||||
event_pos=PersistedEventPosition(row[5], row[6]),
|
||||
has_known_state=bool(row[7]),
|
||||
room_type=row[8],
|
||||
is_encrypted=row[9],
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
|
||||
)
|
||||
|
||||
|
||||
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||
def __init__(
|
||||
|
||||
@@ -308,8 +308,24 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
return create_event
|
||||
|
||||
@cached(max_entries=10000)
|
||||
async def get_room_type(self, room_id: str) -> Optional[str]:
|
||||
raise NotImplementedError()
|
||||
async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]:
|
||||
"""Fetch room type for given room.
|
||||
|
||||
Since this function is cached, any missing values would be cached as
|
||||
`None`. In order to distinguish between an unencrypted room that has
|
||||
`None` encryption and a room that is unknown to the server where we
|
||||
might want to omit the value (which would make it cached as `None`),
|
||||
instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`.
|
||||
"""
|
||||
|
||||
try:
|
||||
create_event = await self.get_create_event_for_room(room_id)
|
||||
return create_event.content.get(EventContentFields.ROOM_TYPE)
|
||||
except NotFoundError:
|
||||
# We use the sentinel value to distinguish between `None` which is a
|
||||
# valid room type and a room that is unknown to the server so the value
|
||||
# is just unset.
|
||||
return ROOM_UNKNOWN_SENTINEL
|
||||
|
||||
@cachedList(cached_method_name="get_room_type", list_name="room_ids")
|
||||
async def bulk_get_room_type(
|
||||
|
||||
Reference in New Issue
Block a user