Fix state reset when filtering
This commit is contained in:
@@ -363,6 +363,7 @@ class SlidingSyncRoomLists:
|
||||
newly_left=room_id in newly_left_room_map,
|
||||
)
|
||||
}
|
||||
logger.info("asdf sync_room_map: %s", sync_room_map)
|
||||
with start_active_span("assemble_sliding_window_lists"):
|
||||
for list_key, list_config in sync_config.lists.items():
|
||||
# Apply filters
|
||||
@@ -371,10 +372,14 @@ class SlidingSyncRoomLists:
|
||||
filtered_sync_room_map = await self.filter_rooms_using_tables(
|
||||
user_id,
|
||||
sync_room_map,
|
||||
previous_connection_state,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
dm_room_ids,
|
||||
)
|
||||
logger.info(
|
||||
"asdf filtered_sync_room_map: %s", filtered_sync_room_map
|
||||
)
|
||||
|
||||
# Find which rooms are partially stated and may need to be filtered out
|
||||
# depending on the `required_state` requested (see below).
|
||||
@@ -574,6 +579,7 @@ class SlidingSyncRoomLists:
|
||||
filtered_sync_room_map = await self.filter_rooms(
|
||||
sync_config.user,
|
||||
sync_room_map,
|
||||
previous_connection_state,
|
||||
list_config.filters,
|
||||
to_token,
|
||||
dm_room_ids,
|
||||
@@ -1573,6 +1579,7 @@ class SlidingSyncRoomLists:
|
||||
self,
|
||||
user: UserID,
|
||||
sync_room_map: Dict[str, RoomsForUserType],
|
||||
previous_connection_state: PerConnectionState,
|
||||
filters: SlidingSyncConfig.SlidingSyncList.Filters,
|
||||
to_token: StreamToken,
|
||||
dm_room_ids: AbstractSet[str],
|
||||
@@ -1758,14 +1765,33 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
}
|
||||
|
||||
# Keep rooms if the user has been state reset out of it but we previously sent
|
||||
# down the connection before. We want to make sure that we send these down to
|
||||
# the client regardless of filters so they find out about the state reset.
|
||||
#
|
||||
# We don't always have access to the state in a room after being state reset if
|
||||
# no one else locally on the server is participating in the room so we patch
|
||||
# these back in manually.
|
||||
state_reset_out_of_room_id_set = {
|
||||
room_id
|
||||
for room_id in sync_room_map.keys()
|
||||
if sync_room_map[room_id].event_id is None
|
||||
and previous_connection_state.rooms.have_sent_room(room_id).status
|
||||
!= HaveSentRoomFlag.NEVER
|
||||
}
|
||||
|
||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||
return {
|
||||
room_id: sync_room_map[room_id]
|
||||
for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
|
||||
}
|
||||
|
||||
@trace
|
||||
async def filter_rooms_using_tables(
|
||||
self,
|
||||
user_id: str,
|
||||
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
|
||||
previous_connection_state: PerConnectionState,
|
||||
filters: SlidingSyncConfig.SlidingSyncList.Filters,
|
||||
to_token: StreamToken,
|
||||
dm_room_ids: AbstractSet[str],
|
||||
@@ -1907,8 +1933,26 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
}
|
||||
|
||||
# Keep rooms if the user has been state reset out of it but we previously sent
|
||||
# down the connection before. We want to make sure that we send these down to
|
||||
# the client regardless of filters so they find out about the state reset.
|
||||
#
|
||||
# We don't always have access to the state in a room after being state reset if
|
||||
# no one else locally on the server is participating in the room so we patch
|
||||
# these back in manually.
|
||||
state_reset_out_of_room_id_set = {
|
||||
room_id
|
||||
for room_id in sync_room_map.keys()
|
||||
if sync_room_map[room_id].event_id is None
|
||||
and previous_connection_state.rooms.have_sent_room(room_id).status
|
||||
!= HaveSentRoomFlag.NEVER
|
||||
}
|
||||
|
||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||
return {
|
||||
room_id: sync_room_map[room_id]
|
||||
for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
|
||||
}
|
||||
|
||||
@trace
|
||||
async def sort_rooms(
|
||||
|
||||
@@ -424,6 +424,9 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.storage_controllers = hs.get_storage_controllers()
|
||||
self.account_data_handler = hs.get_account_data_handler()
|
||||
persistence = self.hs.get_storage_controllers().persistence
|
||||
assert persistence is not None
|
||||
self.persistence = persistence
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
@@ -902,9 +905,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
)
|
||||
)
|
||||
_, join_rule_event_pos, _ = self.get_success(
|
||||
self.hs.get_storage_controllers().persistence.persist_event(
|
||||
join_rule_event, join_rule_context
|
||||
)
|
||||
self.persistence.persist_event(join_rule_event, join_rule_context)
|
||||
)
|
||||
|
||||
# FIXME: We're manually busting the cache since
|
||||
@@ -1034,9 +1035,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
)
|
||||
)
|
||||
_, join_rule_event_pos, _ = self.get_success(
|
||||
self.hs.get_storage_controllers().persistence.persist_event(
|
||||
join_rule_event, join_rule_context
|
||||
)
|
||||
self.persistence.persist_event(join_rule_event, join_rule_context)
|
||||
)
|
||||
|
||||
# FIXME: We're manually busting the cache since
|
||||
@@ -1049,6 +1048,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
users_in_room = self.get_success(self.store.get_users_in_room(space_room_id))
|
||||
self.assertIncludes(set(users_in_room), {user2_id}, exact=True)
|
||||
|
||||
# User2 also leaves the room so the server is no longer participating in the room
|
||||
# and we don't have access to current state
|
||||
self.helper.leave(space_room_id, user2_id, tok=user2_tok)
|
||||
|
||||
# Make another Sliding Sync request (incremental)
|
||||
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
|
||||
@@ -30,8 +30,8 @@ from twisted.test.proto_helpers import MemoryReactor
|
||||
from synapse.api.constants import (
|
||||
Direction,
|
||||
EventTypes,
|
||||
Membership,
|
||||
JoinRules,
|
||||
Membership,
|
||||
RelationTypes,
|
||||
)
|
||||
from synapse.api.filtering import Filter
|
||||
@@ -1206,9 +1206,7 @@ class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
|
||||
)
|
||||
)
|
||||
_, join_rule_event_pos, _ = self.get_success(
|
||||
self.hs.get_storage_controllers().persistence.persist_event(
|
||||
join_rule_event, join_rule_context
|
||||
)
|
||||
self.persistence.persist_event(join_rule_event, join_rule_context)
|
||||
)
|
||||
|
||||
# FIXME: We're manually busting the cache since
|
||||
|
||||
Reference in New Issue
Block a user