diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index dcce2cf22e..a8543040f3 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -37,7 +37,7 @@ from typing import ( import attr -from synapse.api.constants import MAIN_TIMELINE, Direction, RelationTypes +from synapse.api.constants import MAIN_TIMELINE, Direction, Membership, RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase from synapse.storage._base import SQLBaseStore @@ -1126,20 +1126,34 @@ class RelationsWorkerStore(SQLBaseStore): to_token: Optional[StreamToken] = None, limit: int = 5, ) -> Tuple[Sequence[Tuple[str, str]], Optional[int]]: - # TODO: comment - """Get a list of updates threads, ordered by stream ordering of their - latest reply. + """Get a list of updated threads, ordered by stream ordering of their + latest reply, filtered to only include threads in rooms where the user + was joined at the time of the thread's latest update. + + Note: This function has a known limitation due to the threads table only + storing the latest update per thread. If a thread had multiple updates + within the token range and the user left the room between updates, earlier + updates that occurred while the user was joined will NOT be returned. + + Example edge case: + t1: User joins room, thread updated (user should see this) + t2: User leaves room + t3: Thread updated again (user should NOT see this) + Result: Neither update is returned (because membership check at t3 fails) + + This is an acceptable trade-off to avoid expensive queries through the + event_relations table. Args: - user_id: Only fetch threads for rooms that the `user` is `join`ed to. + user_id: Only fetch threads for rooms where the user was joined at + the time of the thread's latest update. from_token: Fetch rows from a previous next_batch, or from the start if None. to_token: Fetch rows from a previous prev_batch, or from the stream end if None. limit: Only fetch the most recent `limit` threads. Returns: A tuple of: - A list of thread root event IDs. - + A list of (thread_id, room_id) tuples. The next_batch, if one exists. """ # Ensure bad limits aren't being passed in. @@ -1160,38 +1174,47 @@ class RelationsWorkerStore(SQLBaseStore): pagination_clause += " AND stream_ordering <= ?" pagination_args.append(str(to_bound)) - # TODO: get room_ids somehow... - # seems inefficient as we have to basically query for every single joined room - # id don't we? - # How would a specific thread_updates table be any better? - # There must be something somewhere that already does a query which has a - # "filter by all rooms that a user is joined to" clause. + # Filter threads to only those in rooms where the user was joined at the + # time of the thread's latest update. + # + # We use room_memberships.event_stream_ordering to perform a point-in-time + # membership check. This finds the most recent membership event for the user + # in each room that occurred at or before the thread's latest update + # (threads.stream_ordering), and verifies it was a 'join' membership. + # + # Note: Due to the threads table only storing the latest update per thread, + # this approach has a known limitation: if a thread had multiple updates and + # the user left the room between updates, earlier updates that occurred while + # they were joined will not be returned. This is an acceptable trade-off to + # avoid expensive queries through event_relations. sql = f""" SELECT thread_id, room_id, latest_event_id, stream_ordering FROM threads - WHERE - room_id LIKE ? - {pagination_clause} + WHERE EXISTS ( + SELECT 1 + FROM room_memberships AS rm + WHERE rm.room_id = threads.room_id + AND rm.user_id = ? + AND rm.event_stream_ordering <= threads.stream_ordering + AND rm.membership = ? + AND NOT EXISTS ( + SELECT 1 + FROM room_memberships AS rm2 + WHERE rm2.room_id = rm.room_id + AND rm2.user_id = rm.user_id + AND rm2.event_stream_ordering > rm.event_stream_ordering + AND rm2.event_stream_ordering <= threads.stream_ordering + ) + ) + {pagination_clause} ORDER BY stream_ordering DESC LIMIT ? """ - # sql = """ - # SELECT event_id, relation_type, sender, topological_ordering, stream_ordering - # FROM event_relations - # INNER JOIN events USING (event_id) - # WHERE relates_to_id = ? AND %s - # ORDER BY topological_ordering %s, stream_ordering %s - # LIMIT ? - # """ % ( - # " AND ".join(where_clause), - # order, - # order, - # ) def _get_thread_updates_for_user_txn( txn: LoggingTransaction, ) -> Tuple[List[Tuple[str, str]], Optional[int]]: - txn.execute(sql, ("%", *pagination_args, limit + 1)) + txn.execute(sql, (user_id, Membership.JOIN, *pagination_args, limit + 1)) rows = cast(List[Tuple[str, str, str, int]], txn.fetchall()) thread_ids = [(r[0], r[1]) for r in rows] diff --git a/tests/rest/client/sliding_sync/test_extension_threads.py b/tests/rest/client/sliding_sync/test_extension_threads.py index 4e350ceac5..9e7d000167 100644 --- a/tests/rest/client/sliding_sync/test_extension_threads.py +++ b/tests/rest/client/sliding_sync/test_extension_threads.py @@ -207,3 +207,153 @@ class SlidingSyncThreadsExtensionTestCase(SlidingSyncBase): response_body["extensions"][EXT_NAME], {"updates": {room_id: {thread_root_id: {}}}}, ) + + def test_threads_only_from_joined_rooms(self) -> None: + """ + Test that thread updates are only returned for rooms the user is joined to + at the time of the thread update. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # User1 creates two rooms + room_a_id = self.helper.create_room_as(user1_id, tok=user1_tok) + room_b_id = self.helper.create_room_as(user1_id, tok=user1_tok) + + # User2 joins only Room A + self.helper.join(room_a_id, user2_id, tok=user2_tok) + + # Create threads in both rooms + thread_a_root = self.helper.send(room_a_id, body="Thread A", tok=user1_tok)[ + "event_id" + ] + thread_b_root = self.helper.send(room_b_id, body="Thread B", tok=user1_tok)[ + "event_id" + ] + + # Add replies to both threads + self.helper.send_event( + room_a_id, + type="m.room.message", + content={ + "msgtype": "m.text", + "body": "Reply to A", + "m.relates_to": { + "rel_type": RelationTypes.THREAD, + "event_id": thread_a_root, + }, + }, + tok=user1_tok, + ) + self.helper.send_event( + room_b_id, + type="m.room.message", + content={ + "msgtype": "m.text", + "body": "Reply to B", + "m.relates_to": { + "rel_type": RelationTypes.THREAD, + "event_id": thread_b_root, + }, + }, + tok=user1_tok, + ) + + # User2 syncs with threads extension enabled + sync_body = { + "lists": {}, + "extensions": { + EXT_NAME: { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user2_tok) + + # Assert: User2 should only see thread from Room A (where they are joined) + self.assertEqual( + response_body["extensions"][EXT_NAME], + {"updates": {room_a_id: {thread_a_root: {}}}}, + "User2 should only see threads from Room A where they are joined, not Room B", + ) + + def test_threads_not_returned_after_leaving_room(self) -> None: + """ + Test that thread updates are not returned after a user leaves the room, + even if the thread was updated while they were joined. + + This tests the known limitation: if a thread has multiple updates and the + user leaves between them, they won't see any updates (even earlier ones + while joined). + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create room and both users join + room_id = self.helper.create_room_as(user1_id, tok=user1_tok) + self.helper.join(room_id, user2_id, tok=user2_tok) + + # Create thread + thread_root = self.helper.send(room_id, body="Thread root", tok=user1_tok)[ + "event_id" + ] + + # Initial sync for user2 + sync_body = { + "lists": {}, + "extensions": { + EXT_NAME: { + "enabled": True, + } + }, + } + _, sync_pos = self.do_sync(sync_body, tok=user2_tok) + + # Reply in thread while user2 is joined, but after initial sync + self.helper.send_event( + room_id, + type="m.room.message", + content={ + "msgtype": "m.text", + "body": "Reply 1 while user2 joined", + "m.relates_to": { + "rel_type": RelationTypes.THREAD, + "event_id": thread_root, + }, + }, + tok=user1_tok, + ) + + # User2 leaves the room + self.helper.leave(room_id, user2_id, tok=user2_tok) + + # Another reply after user2 left + self.helper.send_event( + room_id, + type="m.room.message", + content={ + "msgtype": "m.text", + "body": "Reply 2 after user2 left", + "m.relates_to": { + "rel_type": RelationTypes.THREAD, + "event_id": thread_root, + }, + }, + tok=user1_tok, + ) + + # User2 incremental sync + response_body, _ = self.do_sync(sync_body, tok=user2_tok, since=sync_pos) + + # Assert: User2 should NOT see the thread update (they left before latest update) + # Note: This demonstrates the known limitation - user2 won't see the thread + # even though there was an update while they were joined (Reply 1) + self.assertNotIn( + EXT_NAME, + response_body["extensions"], + "User2 should not see thread updates after leaving the room", + )