Remember previous timeline limit
This commit is contained in:
@@ -697,6 +697,7 @@ class SlidingSyncHandler:
|
||||
if has_lists or has_room_subscriptions:
|
||||
connection_position = await self.connection_store.record_rooms(
|
||||
sync_config=sync_config,
|
||||
room_configs=relevant_room_map,
|
||||
from_token=from_token,
|
||||
sent_room_ids=relevant_room_map.keys(),
|
||||
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
|
||||
@@ -2232,22 +2233,26 @@ class HaveSentRoom:
|
||||
contains the last stream token of the last updates we sent down
|
||||
the room, i.e. we still need to send everything since then to the
|
||||
client.
|
||||
timeline_limit: The timeline limit config for the room, if LIVE or
|
||||
PREVIOUSLY. This is used to track if the client has increased
|
||||
the timeline limit to request more events.
|
||||
"""
|
||||
|
||||
status: HaveSentRoomFlag
|
||||
last_token: Optional[RoomStreamToken]
|
||||
timeline_limit: Optional[int]
|
||||
|
||||
@staticmethod
|
||||
def live() -> "HaveSentRoom":
|
||||
return HaveSentRoom(HaveSentRoomFlag.LIVE, None)
|
||||
def live(timeline_limit: int) -> "HaveSentRoom":
|
||||
return HaveSentRoom(HaveSentRoomFlag.LIVE, None, timeline_limit)
|
||||
|
||||
@staticmethod
|
||||
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
|
||||
def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
|
||||
"""Constructor for `PREVIOUSLY` flag."""
|
||||
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
|
||||
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
|
||||
|
||||
|
||||
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
|
||||
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
@@ -2302,6 +2307,7 @@ class SlidingSyncConnectionStore:
|
||||
async def record_rooms(
|
||||
self,
|
||||
sync_config: SlidingSyncConfig,
|
||||
room_configs: Dict[str, RoomSyncConfig],
|
||||
from_token: Optional[SlidingSyncStreamToken],
|
||||
*,
|
||||
sent_room_ids: StrCollection,
|
||||
@@ -2343,7 +2349,9 @@ class SlidingSyncConnectionStore:
|
||||
have_updated = False
|
||||
for room_id in sent_room_ids:
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
new_room_statuses[room_id] = HaveSentRoom.live()
|
||||
new_room_statuses[room_id] = HaveSentRoom.live(
|
||||
room_configs[room_id].timeline_limit
|
||||
)
|
||||
if prev_state != new_room_statuses[room_id]:
|
||||
have_updated = True
|
||||
|
||||
@@ -2361,15 +2369,16 @@ class SlidingSyncConnectionStore:
|
||||
# we know that there are no existing entires.
|
||||
|
||||
if from_token:
|
||||
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
|
||||
|
||||
for room_id in unsent_room_ids:
|
||||
prev_state = new_room_statuses.get(room_id)
|
||||
if (
|
||||
prev_state is not None
|
||||
and prev_state.status == HaveSentRoomFlag.LIVE
|
||||
):
|
||||
new_room_statuses[room_id] = new_unsent_state
|
||||
new_room_statuses[room_id] = HaveSentRoom.previously(
|
||||
from_token.stream_token.room_key,
|
||||
room_configs[room_id].timeline_limit,
|
||||
)
|
||||
have_updated = True
|
||||
|
||||
if not have_updated:
|
||||
|
||||
@@ -39,7 +39,8 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.sliding_sync import StateValues
|
||||
from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
|
||||
from synapse.http.servlet import validate_json_object
|
||||
from synapse.rest.client import (
|
||||
devices,
|
||||
knock,
|
||||
@@ -53,6 +54,7 @@ from synapse.rest.client import (
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
SlidingSyncStreamToken,
|
||||
StreamKeyType,
|
||||
@@ -60,6 +62,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.handlers import SlidingSyncConfig
|
||||
from synapse.types.rest.client import SlidingSyncBody
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -1357,6 +1360,22 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
|
||||
"Expected `notifier.wait_for_events(...)` to be triggered"
|
||||
)
|
||||
|
||||
def make_sync_config(
|
||||
self, user: UserID, requester: Requester, content: JsonDict
|
||||
) -> SlidingSyncConfig:
|
||||
"""Helper function to turn a dict sync body to a sync config"""
|
||||
body = validate_json_object(content, SlidingSyncBody)
|
||||
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=user,
|
||||
requester=requester,
|
||||
conn_id=body.conn_id,
|
||||
lists=body.lists,
|
||||
room_subscriptions=body.room_subscriptions,
|
||||
extensions=body.extensions,
|
||||
)
|
||||
return sync_config
|
||||
|
||||
|
||||
class SlidingSyncTestCase(SlidingSyncBase):
|
||||
"""
|
||||
@@ -4538,7 +4557,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
|
||||
timeline_limit = 5
|
||||
conn_id = "conn_id"
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -4584,19 +4602,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
requester = self.get_success(
|
||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
||||
)
|
||||
sync_config = SlidingSyncConfig(
|
||||
user=requester.user,
|
||||
requester=requester,
|
||||
conn_id=conn_id,
|
||||
sync_config = self.make_sync_config(
|
||||
user=requester.user, requester=requester, content=sync_body
|
||||
)
|
||||
|
||||
parsed_initial_from_token = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
||||
)
|
||||
assert sync_config.lists
|
||||
room_configs = {
|
||||
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
|
||||
}
|
||||
connection_position = self.get_success(
|
||||
sliding_sync_handler.connection_store.record_rooms(
|
||||
sync_config,
|
||||
parsed_initial_from_token,
|
||||
room_configs=room_configs,
|
||||
from_token=parsed_initial_from_token,
|
||||
sent_room_ids=[],
|
||||
unsent_room_ids=[room_id1],
|
||||
)
|
||||
@@ -4646,7 +4667,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
|
||||
self.helper.send(room_id1, "msg", tok=user1_tok)
|
||||
|
||||
conn_id = "conn_id"
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
@@ -4693,19 +4713,24 @@ class SlidingSyncTestCase(SlidingSyncBase):
|
||||
requester = self.get_success(
|
||||
self.hs.get_auth().get_user_by_access_token(user1_tok)
|
||||
)
|
||||
sync_config = SlidingSyncConfig(
|
||||
sync_config = self.make_sync_config(
|
||||
user=requester.user,
|
||||
requester=requester,
|
||||
conn_id=conn_id,
|
||||
content=sync_body,
|
||||
)
|
||||
|
||||
parsed_initial_from_token = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
|
||||
)
|
||||
assert sync_config.lists
|
||||
room_configs = {
|
||||
room_id1: RoomSyncConfig.from_room_config(sync_config.lists["foo-list"])
|
||||
}
|
||||
connection_position = self.get_success(
|
||||
sliding_sync_handler.connection_store.record_rooms(
|
||||
sync_config,
|
||||
parsed_initial_from_token,
|
||||
room_configs=room_configs,
|
||||
from_token=parsed_initial_from_token,
|
||||
sent_room_ids=[],
|
||||
unsent_room_ids=[room_id1],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user