1
0

Compare commits

...

16 Commits

Author SHA1 Message Date
David Baker
53104b5e45 initial_rooms not initial_receipts 2025-04-08 15:38:54 +01:00
David Baker
1118b5c4b4 Filter for rooms that are in initial_rooms
otherwise we send too many
2025-04-08 12:54:44 +01:00
David Baker
1a4c085470 re-use user receipts in the receipts extension 2025-04-08 11:28:40 +01:00
David Baker
5bcd19babd Merge branch 'develop' into dbkr/sss_notif_counts 2025-04-07 15:46:55 +01:00
David Baker
73cd0d0aa4 Add comment 2025-04-07 15:14:22 +01:00
David Baker
c0749a8ac7 Return no notifications in case of no member event at all 2025-04-04 17:30:34 +01:00
David Baker
38fc56b2a1 Add type 2025-04-04 10:33:47 +01:00
David Baker
98a5eb9fd4 Import order 2025-04-04 10:18:08 +01:00
David Baker
002e8ccf41 Add test for notification counts in SSS 2025-04-03 18:20:47 +01:00
David Baker
44b487a1b0 Change test as rooms now appear when read receipts sent 2025-04-01 16:34:52 +01:00
David Baker
f72ba26e15 More iteration on types 2025-03-31 15:29:40 +01:00
David Baker
1676fa787f Iterate on types 2025-03-31 15:26:11 +01:00
David Baker
9ba2c7030b More types 2025-03-28 15:50:47 +00:00
David Baker
4ea8507bbd Fix types 2025-03-28 15:13:35 +00:00
David Baker
cb9d25ffed Here is the news at six o'clock 2025-03-28 12:36:05 +00:00
David Baker
d39dc3ef27 Add support for sending notification counts in simplified sliding sync 2025-03-28 12:22:05 +00:00
10 changed files with 267 additions and 65 deletions

View File

@@ -0,0 +1 @@
Add support for sending notification counts and thread notification counts in simplified sliding sync mode.

View File

@@ -15,7 +15,17 @@
import itertools
import logging
from itertools import chain
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)
from prometheus_client import Histogram
from typing_extensions import assert_never
@@ -38,6 +48,7 @@ from synapse.logging.opentracing import (
tag_args,
trace,
)
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.stream import PaginateFunction
@@ -245,11 +256,31 @@ class SlidingSyncHandler:
to_token=to_token,
)
# fetch the user's receipts between the two points: these will be factor
# in deciding whether to send the room, since it may have changed their
# notification counts
receipts = await self.store.get_linearized_receipts_for_user_in_rooms(
user_id=user_id,
room_ids=interested_rooms.relevant_room_map.keys(),
from_key=from_token.stream_token.receipt_key if from_token else None,
to_key=to_token.receipt_key,
)
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send(
sync_config.user,
previous_connection_state,
from_token.stream_token if from_token else None,
to_token,
interested_rooms.relevant_room_map,
receipts,
)
lists = interested_rooms.lists
relevant_room_map = interested_rooms.relevant_room_map
all_rooms = interested_rooms.all_rooms
room_membership_for_user_map = interested_rooms.room_membership_for_user_map
relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -272,6 +303,7 @@ class SlidingSyncHandler:
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
room_receipts=receipts[room_id] if room_id in receipts else None,
)
# Filter out empty room results during incremental sync
@@ -296,6 +328,7 @@ class SlidingSyncHandler:
actual_room_response_map=rooms,
from_token=from_token,
to_token=to_token,
user_receipts=receipts,
)
if has_lists or has_room_subscriptions:
@@ -543,6 +576,7 @@ class SlidingSyncHandler:
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
room_receipts: Optional[Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@@ -560,6 +594,8 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
room_receipts: Any read receipts from the in question in that room between
from_token and to_token
"""
user = sync_config.user
@@ -1312,6 +1348,11 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
)
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
@@ -1329,11 +1370,8 @@ class SlidingSyncHandler:
bump_stamp=bump_stamp,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
notif_counts=unread_notifs,
room_receipts=room_receipts,
)
@trace

View File

@@ -80,6 +80,7 @@ class SlidingSyncExtensionHandler:
actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.
@@ -95,6 +96,7 @@ class SlidingSyncExtensionHandler:
Sliding Sync response.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""
if sync_config.extensions is None:
@@ -142,6 +144,7 @@ class SlidingSyncExtensionHandler:
receipts_request=sync_config.extensions.receipts,
to_token=to_token,
from_token=from_token,
user_receipts=user_receipts,
)
typing_coro = None
@@ -619,6 +622,7 @@ class SlidingSyncExtensionHandler:
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
"""Handle Receipts extension (MSC3960)
@@ -635,6 +639,7 @@ class SlidingSyncExtensionHandler:
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""
# Skip if the extension is not enabled
if not receipts_request.enabled:
@@ -726,15 +731,6 @@ class SlidingSyncExtensionHandler:
)
if initial_rooms:
# We also always send down receipts for the current user.
user_receipts = (
await self.store.get_linearized_receipts_for_user_in_rooms(
user_id=sync_config.user.to_string(),
room_ids=initial_rooms,
to_key=to_token.receipt_key,
)
)
# For rooms we haven't previously sent down, we could send all receipts
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
@@ -752,22 +748,23 @@ class SlidingSyncExtensionHandler:
# Combine the receipts for a room and add them to
# `fetched_receipts`
for room_id in initial_receipts.keys() | user_receipts.keys():
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
if room_id in initial_rooms:
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
)
)
)
)
fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)
fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)
fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()

View File

@@ -24,6 +24,7 @@ from typing import (
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
@@ -44,6 +45,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
@@ -102,10 +104,6 @@ class SlidingSyncInterestedRooms:
lists: A mapping from list name to the list result for the response
relevant_room_map: A map from rooms that match the sync request to
their room sync config.
relevant_rooms_to_send_map: Subset of `relevant_room_map` that
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
@@ -115,7 +113,6 @@ class SlidingSyncInterestedRooms:
lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, RoomsForUserType]
@@ -128,7 +125,6 @@ class SlidingSyncInterestedRooms:
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
@@ -547,16 +543,9 @@ class SlidingSyncRoomLists:
relevant_room_map[room_id] = room_sync_config
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
@@ -735,16 +724,9 @@ class SlidingSyncRoomLists:
relevant_room_map[room_id] = room_sync_config
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
@@ -752,18 +734,21 @@ class SlidingSyncRoomLists:
dm_room_ids=dm_room_ids,
)
async def _filter_relevant_rooms_to_send(
def filter_relevant_rooms_to_send(
self,
user_id: UserID,
previous_connection_state: PerConnectionState,
from_token: Optional[StreamToken],
relevant_room_map: Dict[str, RoomSyncConfig],
) -> Dict[str, RoomSyncConfig]:
to_token: StreamToken,
relevant_room_map: Mapping[str, RoomSyncConfig],
receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Mapping[str, RoomSyncConfig]:
"""Filters the `relevant_room_map` down to those rooms that may have
updates we need to fetch and return."""
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] = relevant_room_map
if relevant_room_map:
with start_active_span("filter_relevant_rooms_to_send"):
if from_token:
@@ -814,6 +799,11 @@ class SlidingSyncRoomLists:
)
)
rooms_should_send.update(rooms_that_have_updates)
# Any rooms with receipts should be considered for sending as their
# notification counts may have changed.
rooms_should_send.update(receipts.keys())
relevant_rooms_to_send_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()

View File

@@ -1066,10 +1066,19 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
"notification_count": room_result.notif_counts.main_timeline.notify_count,
"highlight_count": room_result.notif_counts.main_timeline.highlight_count,
}
if len(room_result.notif_counts.threads) > 0:
serialized_rooms[room_id]["unread_thread_notifications"] = {
thread_id: {
"notification_count": counts.notify_count,
"highlight_count": counts.highlight_count,
}
for thread_id, counts in room_result.notif_counts.threads.items()
}
if room_result.bump_stamp is not None:
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp

View File

@@ -547,13 +547,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# If the user has no receipts in the room, retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is
# a join).
# Sometimes (usually state resets) there can be no membership event either,
# so we allow None and return no notifications which is probably about
# the best we can do short of failing outright.
event_id = self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
retcol="event_id",
allow_none=True,
)
if event_id is None:
return _EMPTY_ROOM_NOTIF_COUNTS
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)
return self._get_unread_counts_by_pos_txn(

View File

@@ -666,7 +666,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
return results
async def get_linearized_receipts_for_user_in_rooms(
self, user_id: str, room_ids: StrCollection, to_key: MultiWriterStreamToken
self,
user_id: str,
room_ids: StrCollection,
from_key: Optional[MultiWriterStreamToken] = None,
to_key: Optional[MultiWriterStreamToken] = None,
) -> Mapping[str, Sequence[ReceiptInRoom]]:
"""Fetch all receipts for the user in the given room.
@@ -685,11 +689,18 @@ class ReceiptsWorkerStore(SQLBaseStore):
sql = f"""
SELECT instance_name, stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE {clause} AND user_id = ? AND stream_id <= ?
WHERE {clause} AND user_id = ?
"""
args.append(user_id)
args.append(to_key.get_max_stream_pos())
if from_key is not None:
sql += " AND stream_id >= ?"
args.append(from_key.get_max_stream_pos())
if to_key is not None:
sql += " AND stream_id <= ?"
args.append(to_key.get_max_stream_pos())
txn.execute(sql, args)

View File

@@ -40,6 +40,8 @@ import attr
from synapse._pydantic_compat import Extra
from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -163,10 +165,10 @@ class SlidingSyncResult:
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
`m.invited_member_count`)
notification_count: The total number of unread notifications for this room. (same
as sync v2)
highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2)
notif_counts: An object containing the number of unread notifications for both
the main thread and any other threads.
room_receipts: A sequence of any read receipts from the user in question in
the room, used to calculate whether the notif_counts could have changed
"""
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -197,8 +199,8 @@ class SlidingSyncResult:
bump_stamp: Optional[int]
joined_count: Optional[int]
invited_count: Optional[int]
notification_count: int
highlight_count: int
notif_counts: RoomNotifCounts
room_receipts: Optional[Sequence[ReceiptInRoom]]
def __bool__(self) -> bool:
return (
@@ -215,6 +217,7 @@ class SlidingSyncResult:
or bool(self.required_state)
or bool(self.timeline_events)
or bool(self.stripped_state)
or bool(self.room_receipts)
)
@attr.s(slots=True, frozen=True, auto_attribs=True)

View File

@@ -433,8 +433,11 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
set(),
exact=True,
)
# The room should be be in user1's sync because they sent a read receipt...
self.assertIn(room_id1, response_body["rooms"])
# but there should be no timeline events
# No events in the timeline since they were sent before the `from_token`
self.assertNotIn(room_id1, response_body["rooms"])
self.assertNotIn("timeline", response_body["rooms"][room_id1])
# Check room3:
#

View File

@@ -0,0 +1,143 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from http import HTTPStatus
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, ReceiptTypes, RelationTypes
from synapse.rest.client import login, receipts, room, sync
from synapse.server import HomeServer
from synapse.util import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
class SlidingSyncNotificationCountsTestCase(SlidingSyncBase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
sync.register_servlets,
receipts.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
super().prepare(reactor, clock, hs)
def setUp(self) -> None:
super().setUp()
self.user1_id = self.register_user("user1", "pass")
self.user1_tok = self.login(self.user1_id, "pass")
self.user2_id = self.register_user("user2", "pass")
self.user2_tok = self.login(self.user2_id, "pass")
# Create room1
self.room_id1 = self.helper.create_room_as(self.user2_id, tok=self.user2_tok)
self.helper.join(self.room_id1, self.user1_id, tok=self.user1_tok)
self.helper.join(self.room_id1, self.user2_id, tok=self.user2_tok)
self.sync_req = {
"lists": {},
"room_subscriptions": {
self.room_id1: {
"required_state": [],
"timeline_limit": 1,
},
},
}
sync_resp, self.user1_start_token = self.do_sync(
self.sync_req, tok=self.user1_tok
)
# send a read receipt to make sure the counts are 0
channel = self.make_request(
"POST",
f"/rooms/{self.room_id1}/receipt/{ReceiptTypes.READ}/{sync_resp['rooms'][self.room_id1]['timeline'][0]['event_id']}",
{},
access_token=self.user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
def test_main_thread_notification_count(self) -> None:
# send an event from user 2
self.helper.send(self.room_id1, body="new event", tok=self.user2_tok)
# user 1 syncs
sync_resp, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
# notification count should now be 1
self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1)
def test_main_thread_highlight_count(self) -> None:
# send an event that mentions user1
self.helper.send(self.room_id1, body="Hello user1", tok=self.user2_tok)
# user 1 syncs
sync_resp, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
# notification and highlight count should be 1
self.assertEqual(sync_resp["rooms"][self.room_id1]["notification_count"], 1)
self.assertEqual(sync_resp["rooms"][self.room_id1]["highlight_count"], 1)
def test_thread_notification_count(self) -> None:
room1_event_response1 = self.helper.send(
self.room_id1, body="Thread root", tok=self.user2_tok
)
thread_id = room1_event_response1["event_id"]
_, from_token = self.do_sync(
self.sync_req, tok=self.user1_tok, since=self.user1_start_token
)
threaded_event_content = {
"msgtype": "m.text",
"body": "threaded response",
"m.relates_to": {
"event_id": thread_id,
"rel_type": RelationTypes.THREAD,
},
}
self.helper.send_event(
self.room_id1,
EventTypes.Message,
threaded_event_content,
None,
self.user2_tok,
HTTPStatus.OK,
custom_headers=None,
)
sync_resp, _ = self.do_sync(self.sync_req, tok=self.user1_tok, since=from_token)
self.assertEqual(
sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][
"notification_count"
],
1,
)
self.assertEqual(
sync_resp["rooms"][self.room_id1]["unread_thread_notifications"][thread_id][
"highlight_count"
],
0,
)