Compare commits
9 Commits
v1.99.0
...
dmr/sync-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
824ae4e82c | ||
|
|
8bd35651f2 | ||
|
|
50fee8941a | ||
|
|
a5deabbaae | ||
|
|
0d0783caa3 | ||
|
|
6ff95ee66c | ||
|
|
7662ba4531 | ||
|
|
bca64f949e | ||
|
|
7aee344c24 |
1
changelog.d/11532.misc
Normal file
1
changelog.d/11532.misc
Normal file
@@ -0,0 +1 @@
|
||||
Further refactors of the `/sync` handler.
|
||||
@@ -62,7 +62,6 @@ logger = logging.getLogger(__name__)
|
||||
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
|
||||
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
|
||||
|
||||
|
||||
# Counts the number of times we returned a non-empty sync. `type` is one of
|
||||
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
|
||||
# "true" or "false" depending on if the request asked for lazy loaded members or
|
||||
@@ -83,7 +82,6 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
||||
# avoiding redundantly sending the same lazy-loaded members to the client
|
||||
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
||||
|
||||
|
||||
SyncRequestKey = Tuple[Any, ...]
|
||||
|
||||
|
||||
@@ -206,9 +204,42 @@ class _RoomChanges:
|
||||
|
||||
room_entries: List["RoomSyncResultBuilder"]
|
||||
invited: List[InvitedSyncResult]
|
||||
"""Our outstanding invitations at the `now_token`."""
|
||||
|
||||
knocked: List[KnockedSyncResult]
|
||||
"""Rooms we have knocked on at the `now_token`."""
|
||||
|
||||
newly_joined_rooms: List[str]
|
||||
"""Rooms we joined at some point between `since` and `now`.
|
||||
|
||||
Note: we need not be joined to these rooms at the `since` or `now` tokens.
|
||||
Some examples:
|
||||
|
||||
Since Midway Now
|
||||
--------------------------
|
||||
<none> join
|
||||
invite join
|
||||
join leave join
|
||||
invite join leave
|
||||
"""
|
||||
|
||||
newly_left_rooms: List[str]
|
||||
"""Rooms we are not joined to at the `now_token` and left between `since` and `now`.
|
||||
|
||||
"Left" means "membership changed from 'join` to something else". It's not the same
|
||||
as moving to the membership `leave`.
|
||||
|
||||
Note: we need not have membership "leave" at the `since` or `now` tokens.
|
||||
Some examples:
|
||||
Since Midway Now
|
||||
--------------------------
|
||||
join leave
|
||||
join ban
|
||||
invite join leave
|
||||
leave join leave
|
||||
join leave invite
|
||||
join leave knock
|
||||
"""
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
@@ -1072,7 +1103,7 @@ class SyncHandler:
|
||||
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
|
||||
instance to signify that the sync calculation is complete.
|
||||
"""
|
||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
||||
# NB: Parts of the now_token get changed by some of the generate_sync_* methods,
|
||||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
# Always use the `now_token` in `SyncResultBuilder`
|
||||
@@ -1093,6 +1124,8 @@ class SyncHandler:
|
||||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||
raise NotImplementedError()
|
||||
else:
|
||||
# The `room_key` part of the `now_token` is not changed by the sync
|
||||
# machinery. If it did, `joined_room_ids` could become out of date.
|
||||
joined_room_ids = await self.get_rooms_for_user_at(
|
||||
user_id, now_token.room_key
|
||||
)
|
||||
@@ -1684,7 +1717,7 @@ class SyncHandler:
|
||||
now_token = sync_result_builder.now_token
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
assert since_token
|
||||
assert since_token is not None
|
||||
|
||||
# The spec
|
||||
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
|
||||
@@ -1703,15 +1736,99 @@ class SyncHandler:
|
||||
user_id, since_token.room_key, now_token.room_key
|
||||
)
|
||||
|
||||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
|
||||
for event in membership_change_events:
|
||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||
room_changes = await self._classify_rooms_by_membership_changes(
|
||||
sync_result_builder, membership_change_events, ignored_users
|
||||
)
|
||||
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
|
||||
# Get all events since the `from_key` in rooms we're currently joined to.
|
||||
# If there are too many, we get the most recent events only. This leaves
|
||||
# a "gap" in the timeline, as described by the spec for /sync.
|
||||
room_to_events = await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=sync_result_builder.joined_room_ids,
|
||||
from_key=since_token.room_key,
|
||||
to_key=now_token.room_key,
|
||||
limit=timeline_limit + 1,
|
||||
)
|
||||
|
||||
# We loop through all room ids, even if there are no new events, in case
|
||||
# there are non room events that we need to notify about.
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
room_entry = room_to_events.get(room_id, None)
|
||||
|
||||
newly_joined = room_id in room_changes.newly_joined_rooms
|
||||
if room_entry:
|
||||
events, start_key = room_entry
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
|
||||
|
||||
entry = RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="joined",
|
||||
events=events,
|
||||
newly_joined=newly_joined,
|
||||
full_state=False,
|
||||
since_token=None if newly_joined else since_token,
|
||||
upto_token=prev_batch_token,
|
||||
)
|
||||
else:
|
||||
entry = RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="joined",
|
||||
events=[],
|
||||
newly_joined=newly_joined,
|
||||
full_state=False,
|
||||
since_token=since_token,
|
||||
upto_token=since_token,
|
||||
)
|
||||
|
||||
if newly_joined:
|
||||
# debugging for https://github.com/matrix-org/synapse/issues/4422
|
||||
issue4422_logger.debug(
|
||||
"RoomSyncResultBuilder events for newly joined room %s: %r",
|
||||
room_id,
|
||||
entry.events,
|
||||
)
|
||||
room_changes.room_entries.append(entry)
|
||||
|
||||
return room_changes
|
||||
|
||||
async def _classify_rooms_by_membership_changes(
|
||||
self,
|
||||
sync_result_builder: "SyncResultBuilder",
|
||||
membership_change_events: List[EventBase],
|
||||
ignored_users: Collection[str],
|
||||
) -> _RoomChanges:
|
||||
"""Classify each room by the membership changes from `since` upto `now`.
|
||||
|
||||
Rooms are grouped by the user's membership at the `now_token`, either "invite",
|
||||
"join", "leave" or "knock".
|
||||
|
||||
Invite and knock are the simplest: to include these in the sync body, we need
|
||||
just the room ID and the invite/knock event.
|
||||
|
||||
See the _RoomChanges struct for the meaning of the five lists we build up and
|
||||
return.
|
||||
"""
|
||||
since_token = sync_result_builder.since_token
|
||||
# This assetion is also made in the caller, `_get_rooms_changed`. We repeat it
|
||||
# here for mypy's benefit.
|
||||
assert since_token is not None
|
||||
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
|
||||
newly_joined_rooms: List[str] = []
|
||||
newly_left_rooms: List[str] = []
|
||||
room_entries: List[RoomSyncResultBuilder] = []
|
||||
invited: List[InvitedSyncResult] = []
|
||||
knocked: List[KnockedSyncResult] = []
|
||||
|
||||
# 0. Do a first pass to group the events by room id.
|
||||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
|
||||
for event in membership_change_events:
|
||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||
|
||||
for room_id, events in mem_change_events_by_room_id.items():
|
||||
# The body of this loop will add this room to at least one of the five lists
|
||||
# above. Things get messy if you've e.g. joined, left, joined then left the
|
||||
@@ -1725,11 +1842,10 @@ class SyncHandler:
|
||||
non_joins = [e for e in events if e.membership != Membership.JOIN]
|
||||
has_join = len(non_joins) != len(events)
|
||||
|
||||
# 1. Should we add this room to `newly_joined_rooms`?
|
||||
# We want to figure out if we joined the room at some point since
|
||||
# the last sync (even if we have since left). This is to make sure
|
||||
# we do send down the room, and with full state, where necessary
|
||||
|
||||
old_state_ids = None
|
||||
# we do send down the room, and with full state, where necessary.
|
||||
if room_id in sync_result_builder.joined_room_ids and non_joins:
|
||||
# Always include if the user (re)joined the room, especially
|
||||
# important so that device list changes are calculated correctly.
|
||||
@@ -1740,73 +1856,50 @@ class SyncHandler:
|
||||
# User is in the room so we don't need to do the invite/leave checks
|
||||
continue
|
||||
|
||||
old_mem_ev = await self._fetch_membership_event_at(
|
||||
room_id, user_id, since_token
|
||||
)
|
||||
if room_id in sync_result_builder.joined_room_ids or has_join:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
||||
old_mem_ev = None
|
||||
if old_mem_ev_id:
|
||||
old_mem_ev = await self.store.get_event(
|
||||
old_mem_ev_id, allow_none=True
|
||||
)
|
||||
|
||||
# debug for #4422
|
||||
if has_join:
|
||||
prev_membership = None
|
||||
if old_mem_ev:
|
||||
prev_membership = old_mem_ev.membership
|
||||
if has_join and old_mem_ev is not None:
|
||||
issue4422_logger.debug(
|
||||
"Previous membership for room %s with join: %s (event %s)",
|
||||
room_id,
|
||||
prev_membership,
|
||||
old_mem_ev_id,
|
||||
old_mem_ev.membership,
|
||||
old_mem_ev.event_id,
|
||||
)
|
||||
|
||||
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
|
||||
if old_mem_ev is None or old_mem_ev.membership != Membership.JOIN:
|
||||
newly_joined_rooms.append(room_id)
|
||||
|
||||
# If user is in the room then we don't need to do the invite/leave checks
|
||||
if room_id in sync_result_builder.joined_room_ids:
|
||||
continue
|
||||
|
||||
if not non_joins:
|
||||
continue
|
||||
last_non_join = non_joins[-1]
|
||||
|
||||
# 2. Should we add this to `newly_left_rooms`?
|
||||
# Check if we have left the room. This can either be because we were
|
||||
# joined before *or* that we since joined and then left.
|
||||
if events[-1].membership != Membership.JOIN:
|
||||
if has_join:
|
||||
if has_join:
|
||||
newly_left_rooms.append(room_id)
|
||||
else:
|
||||
if old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN:
|
||||
newly_left_rooms.append(room_id)
|
||||
else:
|
||||
if not old_state_ids:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_mem_ev_id = old_state_ids.get(
|
||||
(EventTypes.Member, user_id), None
|
||||
)
|
||||
old_mem_ev = None
|
||||
if old_mem_ev_id:
|
||||
old_mem_ev = await self.store.get_event(
|
||||
old_mem_ev_id, allow_none=True
|
||||
)
|
||||
if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
|
||||
newly_left_rooms.append(room_id)
|
||||
|
||||
# Only bother if we're still currently invited
|
||||
should_invite = last_non_join.membership == Membership.INVITE
|
||||
if should_invite:
|
||||
# 3. Should we add this room to `invited`?
|
||||
last_non_join = non_joins[-1]
|
||||
if last_non_join.membership == Membership.INVITE:
|
||||
if last_non_join.sender not in ignored_users:
|
||||
invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
|
||||
if invite_room_sync:
|
||||
invited.append(invite_room_sync)
|
||||
|
||||
# Only bother if our latest membership in the room is knock (and we haven't
|
||||
# been accepted/rejected in the meantime).
|
||||
should_knock = last_non_join.membership == Membership.KNOCK
|
||||
if should_knock:
|
||||
# 4. Should we add this room to `knocked`?
|
||||
elif last_non_join.membership == Membership.KNOCK:
|
||||
knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join)
|
||||
if knock_room_sync:
|
||||
knocked.append(knock_room_sync)
|
||||
|
||||
# 5. Do we need to add this to `room_entries`?
|
||||
# Always include leave/ban events. Just take the last one.
|
||||
# TODO: How do we handle ban -> leave in same batch?
|
||||
leave_events = [
|
||||
@@ -1859,58 +1952,6 @@ class SyncHandler:
|
||||
)
|
||||
)
|
||||
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
|
||||
# Get all events since the `from_key` in rooms we're currently joined to.
|
||||
# If there are too many, we get the most recent events only. This leaves
|
||||
# a "gap" in the timeline, as described by the spec for /sync.
|
||||
room_to_events = await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=sync_result_builder.joined_room_ids,
|
||||
from_key=since_token.room_key,
|
||||
to_key=now_token.room_key,
|
||||
limit=timeline_limit + 1,
|
||||
)
|
||||
|
||||
# We loop through all room ids, even if there are no new events, in case
|
||||
# there are non room events that we need to notify about.
|
||||
for room_id in sync_result_builder.joined_room_ids:
|
||||
room_entry = room_to_events.get(room_id, None)
|
||||
|
||||
newly_joined = room_id in newly_joined_rooms
|
||||
if room_entry:
|
||||
events, start_key = room_entry
|
||||
|
||||
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
|
||||
|
||||
entry = RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="joined",
|
||||
events=events,
|
||||
newly_joined=newly_joined,
|
||||
full_state=False,
|
||||
since_token=None if newly_joined else since_token,
|
||||
upto_token=prev_batch_token,
|
||||
)
|
||||
else:
|
||||
entry = RoomSyncResultBuilder(
|
||||
room_id=room_id,
|
||||
rtype="joined",
|
||||
events=[],
|
||||
newly_joined=newly_joined,
|
||||
full_state=False,
|
||||
since_token=since_token,
|
||||
upto_token=since_token,
|
||||
)
|
||||
|
||||
if newly_joined:
|
||||
# debugging for https://github.com/matrix-org/synapse/issues/4422
|
||||
issue4422_logger.debug(
|
||||
"RoomSyncResultBuilder events for newly joined room %s: %r",
|
||||
room_id,
|
||||
entry.events,
|
||||
)
|
||||
room_entries.append(entry)
|
||||
|
||||
return _RoomChanges(
|
||||
room_entries,
|
||||
invited,
|
||||
@@ -1919,6 +1960,24 @@ class SyncHandler:
|
||||
newly_left_rooms,
|
||||
)
|
||||
|
||||
async def _fetch_membership_event_at(
|
||||
self, room_id: str, user_id: str, since_token: StreamToken
|
||||
) -> Optional[EventBase]:
|
||||
"""What was the user's membership in this room at the given stream_token?
|
||||
|
||||
Returns None if
|
||||
- there was no membership for the user at the given time
|
||||
- the user had a membership event, but we couldn't find it.
|
||||
|
||||
Otherwise, returns the membership event itself.
|
||||
"""
|
||||
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
||||
if old_mem_ev_id is not None:
|
||||
return await self.store.get_event(old_mem_ev_id, allow_none=True)
|
||||
return None
|
||||
|
||||
async def _get_all_rooms(
|
||||
self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
|
||||
) -> _RoomChanges:
|
||||
|
||||
Reference in New Issue
Block a user