Handle limited syncs
This commit is contained in:
@@ -62,12 +62,14 @@ from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomSyncConfig,
|
||||
SlidingSyncConfig,
|
||||
SlidingSyncResult,
|
||||
StateValues,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import MutableOverlayMapping
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -1138,8 +1140,8 @@ class SlidingSyncHandler:
|
||||
# time we've sent the room down this connection.
|
||||
room_state: StateMap[EventBase] = {}
|
||||
|
||||
new_connection_state.room_lazy_membership[room_id] = dict.fromkeys(
|
||||
required_user_state
|
||||
new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges(
|
||||
added=dict.fromkeys(required_user_state)
|
||||
)
|
||||
|
||||
if initial:
|
||||
@@ -1163,12 +1165,32 @@ class SlidingSyncHandler:
|
||||
user_ids=required_user_state,
|
||||
)
|
||||
)
|
||||
new_connection_state.room_lazy_membership[room_id].update(
|
||||
new_connection_state.room_lazy_membership[room_id].added.update(
|
||||
previously_returned_user_state
|
||||
)
|
||||
else:
|
||||
previously_returned_user_state = {}
|
||||
|
||||
if limited:
|
||||
# If the timeline is limited we need to remove any members
|
||||
# from list of lazy loaded members that have changed but not
|
||||
# been sent down the timeline.
|
||||
previously_returned_user_state = MutableOverlayMapping(
|
||||
previously_returned_user_state
|
||||
)
|
||||
for event_type, state_key in room_state_delta_id_map:
|
||||
if event_type != EventTypes.Member:
|
||||
continue
|
||||
|
||||
if state_key not in required_user_state:
|
||||
previously_returned_user_state.pop(state_key, None)
|
||||
new_connection_state.room_lazy_membership[
|
||||
room_id
|
||||
].added.pop(state_key, None)
|
||||
new_connection_state.room_lazy_membership[
|
||||
room_id
|
||||
].removed.add(state_key)
|
||||
|
||||
# Check if there are any changes to the required state config
|
||||
# that we need to handle.
|
||||
changed_required_state_map, added_state_filter = (
|
||||
|
||||
@@ -33,6 +33,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoomFlag,
|
||||
MutablePerConnectionState,
|
||||
PerConnectionState,
|
||||
RoomLazyMembershipChanges,
|
||||
RoomStatusMap,
|
||||
RoomSyncConfig,
|
||||
)
|
||||
@@ -350,15 +351,12 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
value_values=values,
|
||||
)
|
||||
|
||||
for room_id, user_ids in per_connection_state.room_lazy_membership.items():
|
||||
self._persist_sliding_sync_connection_lazy_members_txn(
|
||||
txn,
|
||||
connection_key,
|
||||
previous_connection_position,
|
||||
connection_position,
|
||||
room_id,
|
||||
user_ids,
|
||||
)
|
||||
self._persist_sliding_sync_connection_lazy_members_txn(
|
||||
txn,
|
||||
connection_key,
|
||||
connection_position,
|
||||
per_connection_state.room_lazy_membership,
|
||||
)
|
||||
|
||||
return connection_position
|
||||
|
||||
@@ -557,50 +555,62 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
connection_key: int,
|
||||
previous_connection_position: int | None,
|
||||
new_connection_position: int,
|
||||
room_id: str,
|
||||
user_ids: Mapping[str, int | None],
|
||||
all_changes: dict[str, RoomLazyMembershipChanges],
|
||||
) -> None:
|
||||
"""Persist that we have sent lazy membership for the given user IDs."""
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
to_update: list[str] = []
|
||||
for user_id, last_seen_ts in user_ids.items():
|
||||
if last_seen_ts is None:
|
||||
# We've never sent this user before, so we need to record that
|
||||
# we've sent it at the new connection position.
|
||||
to_update.append(user_id)
|
||||
elif last_seen_ts + 60 * 60 * 1000 < now:
|
||||
# We last saw this user over an hour ago, so we update the
|
||||
# timestamp.
|
||||
to_update.append(user_id)
|
||||
to_update: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
for user_id, last_seen_ts in room_changes.added.items():
|
||||
if last_seen_ts is None:
|
||||
# We've never sent this user before, so we need to record that
|
||||
# we've sent it at the new connection position.
|
||||
to_update.append((room_id, user_id))
|
||||
elif last_seen_ts + 60 * 60 * 1000 < now:
|
||||
# We last saw this user over an hour ago, so we update the
|
||||
# timestamp.
|
||||
to_update.append((room_id, user_id))
|
||||
|
||||
if not to_update:
|
||||
return
|
||||
if to_update:
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_lazy_members
|
||||
(connection_key, connection_position, room_id, user_id, last_seen_ts)
|
||||
VALUES {value_placeholder}
|
||||
ON CONFLICT (connection_key, room_id, user_id)
|
||||
DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts
|
||||
WHERE sliding_sync_connection_lazy_members.connection_position IS NULL
|
||||
OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position
|
||||
"""
|
||||
|
||||
sql = """
|
||||
INSERT INTO sliding_sync_connection_lazy_members
|
||||
(connection_key, connection_position, room_id, user_id, last_seen_ts)
|
||||
VALUES {value_placeholder}
|
||||
ON CONFLICT (connection_key, room_id, user_id)
|
||||
DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts
|
||||
WHERE sliding_sync_connection_lazy_members.connection_position IS NULL
|
||||
OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position
|
||||
"""
|
||||
args = [
|
||||
(connection_key, new_connection_position, room_id, user_id, now)
|
||||
for room_id, user_id in to_update
|
||||
]
|
||||
|
||||
args = [
|
||||
(connection_key, new_connection_position, room_id, user_id, now)
|
||||
for user_id in to_update
|
||||
]
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = sql.format(value_placeholder="?")
|
||||
txn.execute_values(sql, args, fetch=False)
|
||||
else:
|
||||
sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)")
|
||||
txn.execute_batch(sql, args)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = sql.format(value_placeholder="?")
|
||||
txn.execute_values(sql, args, fetch=False)
|
||||
else:
|
||||
sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)")
|
||||
txn.execute_batch(sql, args)
|
||||
to_remove: list[tuple[str, str]] = []
|
||||
for room_id, room_changes in all_changes.items():
|
||||
for user_id in room_changes.removed:
|
||||
to_remove.append((room_id, user_id))
|
||||
|
||||
if to_remove:
|
||||
self.db_pool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_lazy_members",
|
||||
keys=("connection_key", "room_id", "user_id"),
|
||||
values=[
|
||||
(connection_key, room_id, user_id) for room_id, user_id in to_remove
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, frozen=True)
|
||||
@@ -622,7 +632,7 @@ class PerConnectionStateDB:
|
||||
|
||||
room_configs: Mapping[str, "RoomSyncConfig"]
|
||||
|
||||
room_lazy_membership: dict[str, dict[str, int | None]]
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges]
|
||||
|
||||
@staticmethod
|
||||
async def from_state(
|
||||
|
||||
@@ -885,6 +885,31 @@ class PerConnectionState:
|
||||
return len(self.rooms) + len(self.receipts) + len(self.room_configs)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class RoomLazyMembershipChanges:
|
||||
"""Changes to lazily-loaded room memberships for a given room.
|
||||
|
||||
Attributes:
|
||||
added: Map from user ID to timestamp for users whose membership we
|
||||
have lazily loaded.
|
||||
removed: Set of user IDs whose membership change we have *not* sent
|
||||
down
|
||||
"""
|
||||
|
||||
# A map from user ID -> timestamp. Indicates that those memberships have
|
||||
# been lazily loaded. I.e. that either a) we sent those memberships down, or
|
||||
# b) we did so previously. The timestamp indicates the time we previously
|
||||
# saw the membership.
|
||||
added: dict[str, int | None] = attr.Factory(dict)
|
||||
|
||||
# A set of user IDs whose membership change we have *not* sent
|
||||
# down
|
||||
removed: set[str] = attr.Factory(set)
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
return bool(self.added or self.removed)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class MutablePerConnectionState(PerConnectionState):
|
||||
"""A mutable version of `PerConnectionState`"""
|
||||
@@ -899,7 +924,7 @@ class MutablePerConnectionState(PerConnectionState):
|
||||
# memberships have been lazily loaded. I.e. that either a) we sent those
|
||||
# memberships down, or b) we did so previously. The timestamp indicates the
|
||||
# time we previously saw the membership.
|
||||
room_lazy_membership: dict[str, dict[str, int | None]] = attr.Factory(dict)
|
||||
room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict)
|
||||
|
||||
def has_updates(self) -> bool:
|
||||
return (
|
||||
|
||||
@@ -841,6 +841,101 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_lazy_members_limited_sync(self) -> None:
|
||||
"""Test that when using lazy loading for room members and a limited sync
|
||||
missing a membership change, we include the membership change next time
|
||||
said user says something.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||
self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||
|
||||
# Send a message from each user to the room so that both memberships are sent down.
|
||||
self.helper.send(room_id1, "1", tok=user1_tok)
|
||||
self.helper.send(room_id1, "2", tok=user2_tok)
|
||||
|
||||
# Make a first sync to establish a position
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# We should see both membership events in required_state
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1]["required_state"],
|
||||
{
|
||||
state_map[(EventTypes.Member, user1_id)],
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# User2 changes their display name (causing a membership change)
|
||||
self.helper.send_state(
|
||||
room_id1,
|
||||
event_type=EventTypes.Member,
|
||||
state_key=user2_id,
|
||||
body={
|
||||
EventContentFields.MEMBERSHIP: Membership.JOIN,
|
||||
EventContentFields.MEMBERSHIP_DISPLAYNAME: "New Name",
|
||||
},
|
||||
tok=user2_tok,
|
||||
)
|
||||
|
||||
# Send a couple of messages to the room to push out the membership change
|
||||
self.helper.send(room_id1, "3", tok=user1_tok)
|
||||
self.helper.send(room_id1, "4", tok=user1_tok)
|
||||
|
||||
# Make an incremental Sliding Sync request
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# The membership change should *not* be included yet as user2 doesn't
|
||||
# have any events in the timeline.
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1].get("required_state", []),
|
||||
set(),
|
||||
exact=True,
|
||||
)
|
||||
|
||||
# Now user2 sends a message to the room
|
||||
self.helper.send(room_id1, "5", tok=user2_tok)
|
||||
|
||||
# Make another incremental Sliding Sync request
|
||||
response_body, from_token = self.do_sync(
|
||||
sync_body, since=from_token, tok=user1_tok
|
||||
)
|
||||
|
||||
# The membership change should now be included as user2 has an event
|
||||
# in the timeline.
|
||||
state_map = self.get_success(
|
||||
self.storage_controllers.state.get_current_state(room_id1)
|
||||
)
|
||||
self._assertRequiredStateIncludes(
|
||||
response_body["rooms"][room_id1].get("required_state", []),
|
||||
{
|
||||
state_map[(EventTypes.Member, user2_id)],
|
||||
},
|
||||
exact=True,
|
||||
)
|
||||
|
||||
def test_rooms_required_state_me(self) -> None:
|
||||
"""
|
||||
Test `rooms.required_state` correctly handles $ME.
|
||||
|
||||
Reference in New Issue
Block a user