Compare commits

...

9 Commits

Author SHA1 Message Date
David Robertson
824ae4e82c docstring formatting
Co-authored-by: Dan Callahan <danc@element.io>
2021-12-08 13:35:21 +00:00
David Robertson
8bd35651f2 Changelog 2021-12-07 18:33:13 +00:00
David Robertson
50fee8941a Small style rewrite to emphasise exclusive cases
I prefer `if .. elif` where possible, because I know for sure that at
most one branch will execute.
2021-12-07 18:26:40 +00:00
David Robertson
a5deabbaae Remove check on events[-1].membership
I claim this check is redundant, because said membership cannot be join.
If not, we'd have `continue`d above because `room_id` would be in
`sync_result_builder.joined_room_ids`.
2021-12-07 18:26:35 +00:00
David Robertson
0d0783caa3 Always call old_mem_ev---exactly once.
I claim this is identical to the existing behaviour. Proof: consider the
boolean `room_id in sync_result_builder.joined_room_ids or has_join`.
If this is true, we make the first call to `_fetch_membership_event_at`.
Otherwise:

- `room_id not in sync_result_builder.joined_room_ids` and `not has_join`.
- The former means we continue on to inspect `events[-1].membership`.
- This is not `"join"`, or else `room_id in
  sync_result_builder.joined_room_ids` would be true.
- `has_join` is False, so we hit the `else` branch and make the second
  call to `_fetch_membership_event_at`.

So, assuming we continue beyond the first `continue`, we always call
fetch the old membership event exactly once. Do it up front to make the
reader's life easier.
2021-12-07 18:23:49 +00:00
David Robertson
6ff95ee66c Pull out _fetch_membership_event_at
Doing so makes it possible we'll repeat a DB query. In the next commit,
I'll demonstrate that we always call this exactly once.
2021-12-07 18:23:23 +00:00
David Robertson
7662ba4531 Signposting in _classify_rooms_by_membership 2021-12-07 18:22:15 +00:00
David Robertson
bca64f949e Remove unused branch
I claim that it is impossible to hit the `continue` which is removed in
this commit. Setup:

- `events` is a list of all membership events in the range
  `since < time <= now`.
- `non_joins` is the list of `e in events` with `e.membership` not equal
  to`"join"`.
- `events` is a nonempty list by construction of
  `mem_change_events_by_room_id`.

Rationale:

- We hit the deleted code only if `non_joins` is empty.
- If so, `events` consists only of `join` membership events.
- `events` is non_empty, so there was at least one join during the
  sync period.
- Therefore the room_id will belong to
  `sync_result_builder.joined_room_ids`. But this means we will have
 `continue`d in the branch above.
  - I'm assuming here that `joined_room_ids` and `events` are both using
   the same `now_token.room_key`.
2021-12-07 18:21:59 +00:00
David Robertson
7aee344c24 Pull out _classify_rooms_by_membership_changes 2021-12-07 18:21:55 +00:00
2 changed files with 163 additions and 103 deletions

1
changelog.d/11532.misc Normal file
View File

@@ -0,0 +1 @@
Further refactors of the `/sync` handler.

View File

@@ -62,7 +62,6 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
# "true" or "false" depending on if the request asked for lazy loaded members or
@@ -83,7 +82,6 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncRequestKey = Tuple[Any, ...]
@@ -206,9 +204,42 @@ class _RoomChanges:
room_entries: List["RoomSyncResultBuilder"]
invited: List[InvitedSyncResult]
"""Our outstanding invitations at the `now_token`."""
knocked: List[KnockedSyncResult]
"""Rooms we have knocked on at the `now_token`."""
newly_joined_rooms: List[str]
"""Rooms we joined at some point between `since` and `now`.
Note: we need not be joined to these rooms at the `since` or `now` tokens.
Some examples:
Since Midway Now
--------------------------
<none> join
invite join
join leave join
invite join leave
"""
newly_left_rooms: List[str]
"""Rooms we are not joined to at the `now_token` and left between `since` and `now`.
"Left" means "membership changed from 'join` to something else". It's not the same
as moving to the membership `leave`.
Note: we need not have membership "leave" at the `since` or `now` tokens.
Some examples:
Since Midway Now
--------------------------
join leave
join ban
invite join leave
leave join leave
join leave invite
join leave knock
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -1072,7 +1103,7 @@ class SyncHandler:
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""
# NB: The now_token gets changed by some of the generate_sync_* methods,
# NB: Parts of the now_token get changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
@@ -1093,6 +1124,8 @@ class SyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
# The `room_key` part of the `now_token` is not changed by the sync
# machinery. If it did, `joined_room_ids` could become out of date.
joined_room_ids = await self.get_rooms_for_user_at(
user_id, now_token.room_key
)
@@ -1684,7 +1717,7 @@ class SyncHandler:
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
assert since_token
assert since_token is not None
# The spec
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
@@ -1703,15 +1736,99 @@ class SyncHandler:
user_id, since_token.room_key, now_token.room_key
)
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
room_changes = await self._classify_rooms_by_membership_changes(
sync_result_builder, membership_change_events, ignored_users
)
timeline_limit = sync_config.filter_collection.timeline_limit()
# Get all events since the `from_key` in rooms we're currently joined to.
# If there are too many, we get the most recent events only. This leaves
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
limit=timeline_limit + 1,
)
# We loop through all room ids, even if there are no new events, in case
# there are non room events that we need to notify about.
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
newly_joined = room_id in room_changes.newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
newly_joined=newly_joined,
full_state=False,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
)
else:
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
)
if newly_joined:
# debugging for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"RoomSyncResultBuilder events for newly joined room %s: %r",
room_id,
entry.events,
)
room_changes.room_entries.append(entry)
return room_changes
async def _classify_rooms_by_membership_changes(
self,
sync_result_builder: "SyncResultBuilder",
membership_change_events: List[EventBase],
ignored_users: Collection[str],
) -> _RoomChanges:
"""Classify each room by the membership changes from `since` upto `now`.
Rooms are grouped by the user's membership at the `now_token`, either "invite",
"join", "leave" or "knock".
Invite and knock are the simplest: to include these in the sync body, we need
just the room ID and the invite/knock event.
See the _RoomChanges struct for the meaning of the five lists we build up and
return.
"""
since_token = sync_result_builder.since_token
# This assetion is also made in the caller, `_get_rooms_changed`. We repeat it
# here for mypy's benefit.
assert since_token is not None
user_id = sync_result_builder.sync_config.user.to_string()
newly_joined_rooms: List[str] = []
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
knocked: List[KnockedSyncResult] = []
# 0. Do a first pass to group the events by room id.
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
for room_id, events in mem_change_events_by_room_id.items():
# The body of this loop will add this room to at least one of the five lists
# above. Things get messy if you've e.g. joined, left, joined then left the
@@ -1725,11 +1842,10 @@ class SyncHandler:
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)
# 1. Should we add this room to `newly_joined_rooms`?
# We want to figure out if we joined the room at some point since
# the last sync (even if we have since left). This is to make sure
# we do send down the room, and with full state, where necessary
old_state_ids = None
# we do send down the room, and with full state, where necessary.
if room_id in sync_result_builder.joined_room_ids and non_joins:
# Always include if the user (re)joined the room, especially
# important so that device list changes are calculated correctly.
@@ -1740,73 +1856,50 @@ class SyncHandler:
# User is in the room so we don't need to do the invite/leave checks
continue
old_mem_ev = await self._fetch_membership_event_at(
room_id, user_id, since_token
)
if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
old_mem_ev = await self.store.get_event(
old_mem_ev_id, allow_none=True
)
# debug for #4422
if has_join:
prev_membership = None
if old_mem_ev:
prev_membership = old_mem_ev.membership
if has_join and old_mem_ev is not None:
issue4422_logger.debug(
"Previous membership for room %s with join: %s (event %s)",
room_id,
prev_membership,
old_mem_ev_id,
old_mem_ev.membership,
old_mem_ev.event_id,
)
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
if old_mem_ev is None or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
# If user is in the room then we don't need to do the invite/leave checks
if room_id in sync_result_builder.joined_room_ids:
continue
if not non_joins:
continue
last_non_join = non_joins[-1]
# 2. Should we add this to `newly_left_rooms`?
# Check if we have left the room. This can either be because we were
# joined before *or* that we since joined and then left.
if events[-1].membership != Membership.JOIN:
if has_join:
if has_join:
newly_left_rooms.append(room_id)
else:
if old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
old_mem_ev = None
if old_mem_ev_id:
old_mem_ev = await self.store.get_event(
old_mem_ev_id, allow_none=True
)
if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
newly_left_rooms.append(room_id)
# Only bother if we're still currently invited
should_invite = last_non_join.membership == Membership.INVITE
if should_invite:
# 3. Should we add this room to `invited`?
last_non_join = non_joins[-1]
if last_non_join.membership == Membership.INVITE:
if last_non_join.sender not in ignored_users:
invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
if invite_room_sync:
invited.append(invite_room_sync)
# Only bother if our latest membership in the room is knock (and we haven't
# been accepted/rejected in the meantime).
should_knock = last_non_join.membership == Membership.KNOCK
if should_knock:
# 4. Should we add this room to `knocked`?
elif last_non_join.membership == Membership.KNOCK:
knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join)
if knock_room_sync:
knocked.append(knock_room_sync)
# 5. Do we need to add this to `room_entries`?
# Always include leave/ban events. Just take the last one.
# TODO: How do we handle ban -> leave in same batch?
leave_events = [
@@ -1859,58 +1952,6 @@ class SyncHandler:
)
)
timeline_limit = sync_config.filter_collection.timeline_limit()
# Get all events since the `from_key` in rooms we're currently joined to.
# If there are too many, we get the most recent events only. This leaves
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
limit=timeline_limit + 1,
)
# We loop through all room ids, even if there are no new events, in case
# there are non room events that we need to notify about.
for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
newly_joined=newly_joined,
full_state=False,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
)
else:
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
)
if newly_joined:
# debugging for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"RoomSyncResultBuilder events for newly joined room %s: %r",
room_id,
entry.events,
)
room_entries.append(entry)
return _RoomChanges(
room_entries,
invited,
@@ -1919,6 +1960,24 @@ class SyncHandler:
newly_left_rooms,
)
async def _fetch_membership_event_at(
self, room_id: str, user_id: str, since_token: StreamToken
) -> Optional[EventBase]:
"""What was the user's membership in this room at the given stream_token?
Returns None if
- there was no membership for the user at the given time
- the user had a membership event, but we couldn't find it.
Otherwise, returns the membership event itself.
"""
old_state_ids = await self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
if old_mem_ev_id is not None:
return await self.store.get_event(old_mem_ev_id, allow_none=True)
return None
async def _get_all_rooms(
self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges: