Add user room filtering for threads extension
This commit is contained in:
@@ -37,7 +37,7 @@ from typing import (
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.api.constants import MAIN_TIMELINE, Direction, RelationTypes
|
||||
from synapse.api.constants import MAIN_TIMELINE, Direction, Membership, RelationTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
@@ -1126,20 +1126,34 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
to_token: Optional[StreamToken] = None,
|
||||
limit: int = 5,
|
||||
) -> Tuple[Sequence[Tuple[str, str]], Optional[int]]:
|
||||
# TODO: comment
|
||||
"""Get a list of updates threads, ordered by stream ordering of their
|
||||
latest reply.
|
||||
"""Get a list of updated threads, ordered by stream ordering of their
|
||||
latest reply, filtered to only include threads in rooms where the user
|
||||
was joined at the time of the thread's latest update.
|
||||
|
||||
Note: This function has a known limitation due to the threads table only
|
||||
storing the latest update per thread. If a thread had multiple updates
|
||||
within the token range and the user left the room between updates, earlier
|
||||
updates that occurred while the user was joined will NOT be returned.
|
||||
|
||||
Example edge case:
|
||||
t1: User joins room, thread updated (user should see this)
|
||||
t2: User leaves room
|
||||
t3: Thread updated again (user should NOT see this)
|
||||
Result: Neither update is returned (because membership check at t3 fails)
|
||||
|
||||
This is an acceptable trade-off to avoid expensive queries through the
|
||||
event_relations table.
|
||||
|
||||
Args:
|
||||
user_id: Only fetch threads for rooms that the `user` is `join`ed to.
|
||||
user_id: Only fetch threads for rooms where the user was joined at
|
||||
the time of the thread's latest update.
|
||||
from_token: Fetch rows from a previous next_batch, or from the start if None.
|
||||
to_token: Fetch rows from a previous prev_batch, or from the stream end if None.
|
||||
limit: Only fetch the most recent `limit` threads.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
A list of thread root event IDs.
|
||||
|
||||
A list of (thread_id, room_id) tuples.
|
||||
The next_batch, if one exists.
|
||||
"""
|
||||
# Ensure bad limits aren't being passed in.
|
||||
@@ -1160,38 +1174,47 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
pagination_clause += " AND stream_ordering <= ?"
|
||||
pagination_args.append(str(to_bound))
|
||||
|
||||
# TODO: get room_ids somehow...
|
||||
# seems inefficient as we have to basically query for every single joined room
|
||||
# id don't we?
|
||||
# How would a specific thread_updates table be any better?
|
||||
# There must be something somewhere that already does a query which has a
|
||||
# "filter by all rooms that a user is joined to" clause.
|
||||
# Filter threads to only those in rooms where the user was joined at the
|
||||
# time of the thread's latest update.
|
||||
#
|
||||
# We use room_memberships.event_stream_ordering to perform a point-in-time
|
||||
# membership check. This finds the most recent membership event for the user
|
||||
# in each room that occurred at or before the thread's latest update
|
||||
# (threads.stream_ordering), and verifies it was a 'join' membership.
|
||||
#
|
||||
# Note: Due to the threads table only storing the latest update per thread,
|
||||
# this approach has a known limitation: if a thread had multiple updates and
|
||||
# the user left the room between updates, earlier updates that occurred while
|
||||
# they were joined will not be returned. This is an acceptable trade-off to
|
||||
# avoid expensive queries through event_relations.
|
||||
sql = f"""
|
||||
SELECT thread_id, room_id, latest_event_id, stream_ordering
|
||||
FROM threads
|
||||
WHERE
|
||||
room_id LIKE ?
|
||||
{pagination_clause}
|
||||
WHERE EXISTS (
|
||||
SELECT 1
|
||||
FROM room_memberships AS rm
|
||||
WHERE rm.room_id = threads.room_id
|
||||
AND rm.user_id = ?
|
||||
AND rm.event_stream_ordering <= threads.stream_ordering
|
||||
AND rm.membership = ?
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM room_memberships AS rm2
|
||||
WHERE rm2.room_id = rm.room_id
|
||||
AND rm2.user_id = rm.user_id
|
||||
AND rm2.event_stream_ordering > rm.event_stream_ordering
|
||||
AND rm2.event_stream_ordering <= threads.stream_ordering
|
||||
)
|
||||
)
|
||||
{pagination_clause}
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
# sql = """
|
||||
# SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
|
||||
# FROM event_relations
|
||||
# INNER JOIN events USING (event_id)
|
||||
# WHERE relates_to_id = ? AND %s
|
||||
# ORDER BY topological_ordering %s, stream_ordering %s
|
||||
# LIMIT ?
|
||||
# """ % (
|
||||
# " AND ".join(where_clause),
|
||||
# order,
|
||||
# order,
|
||||
# )
|
||||
|
||||
def _get_thread_updates_for_user_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[str, str]], Optional[int]]:
|
||||
txn.execute(sql, ("%", *pagination_args, limit + 1))
|
||||
txn.execute(sql, (user_id, Membership.JOIN, *pagination_args, limit + 1))
|
||||
|
||||
rows = cast(List[Tuple[str, str, str, int]], txn.fetchall())
|
||||
thread_ids = [(r[0], r[1]) for r in rows]
|
||||
|
||||
@@ -207,3 +207,153 @@ class SlidingSyncThreadsExtensionTestCase(SlidingSyncBase):
|
||||
response_body["extensions"][EXT_NAME],
|
||||
{"updates": {room_id: {thread_root_id: {}}}},
|
||||
)
|
||||
|
||||
def test_threads_only_from_joined_rooms(self) -> None:
|
||||
"""
|
||||
Test that thread updates are only returned for rooms the user is joined to
|
||||
at the time of the thread update.
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
# User1 creates two rooms
|
||||
room_a_id = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
room_b_id = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
|
||||
# User2 joins only Room A
|
||||
self.helper.join(room_a_id, user2_id, tok=user2_tok)
|
||||
|
||||
# Create threads in both rooms
|
||||
thread_a_root = self.helper.send(room_a_id, body="Thread A", tok=user1_tok)[
|
||||
"event_id"
|
||||
]
|
||||
thread_b_root = self.helper.send(room_b_id, body="Thread B", tok=user1_tok)[
|
||||
"event_id"
|
||||
]
|
||||
|
||||
# Add replies to both threads
|
||||
self.helper.send_event(
|
||||
room_a_id,
|
||||
type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Reply to A",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_a_root,
|
||||
},
|
||||
},
|
||||
tok=user1_tok,
|
||||
)
|
||||
self.helper.send_event(
|
||||
room_b_id,
|
||||
type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Reply to B",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_b_root,
|
||||
},
|
||||
},
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# User2 syncs with threads extension enabled
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
EXT_NAME: {
|
||||
"enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
response_body, _ = self.do_sync(sync_body, tok=user2_tok)
|
||||
|
||||
# Assert: User2 should only see thread from Room A (where they are joined)
|
||||
self.assertEqual(
|
||||
response_body["extensions"][EXT_NAME],
|
||||
{"updates": {room_a_id: {thread_a_root: {}}}},
|
||||
"User2 should only see threads from Room A where they are joined, not Room B",
|
||||
)
|
||||
|
||||
def test_threads_not_returned_after_leaving_room(self) -> None:
|
||||
"""
|
||||
Test that thread updates are not returned after a user leaves the room,
|
||||
even if the thread was updated while they were joined.
|
||||
|
||||
This tests the known limitation: if a thread has multiple updates and the
|
||||
user leaves between them, they won't see any updates (even earlier ones
|
||||
while joined).
|
||||
"""
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
user2_id = self.register_user("user2", "pass")
|
||||
user2_tok = self.login(user2_id, "pass")
|
||||
|
||||
# Create room and both users join
|
||||
room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
|
||||
self.helper.join(room_id, user2_id, tok=user2_tok)
|
||||
|
||||
# Create thread
|
||||
thread_root = self.helper.send(room_id, body="Thread root", tok=user1_tok)[
|
||||
"event_id"
|
||||
]
|
||||
|
||||
# Initial sync for user2
|
||||
sync_body = {
|
||||
"lists": {},
|
||||
"extensions": {
|
||||
EXT_NAME: {
|
||||
"enabled": True,
|
||||
}
|
||||
},
|
||||
}
|
||||
_, sync_pos = self.do_sync(sync_body, tok=user2_tok)
|
||||
|
||||
# Reply in thread while user2 is joined, but after initial sync
|
||||
self.helper.send_event(
|
||||
room_id,
|
||||
type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Reply 1 while user2 joined",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_root,
|
||||
},
|
||||
},
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# User2 leaves the room
|
||||
self.helper.leave(room_id, user2_id, tok=user2_tok)
|
||||
|
||||
# Another reply after user2 left
|
||||
self.helper.send_event(
|
||||
room_id,
|
||||
type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": "Reply 2 after user2 left",
|
||||
"m.relates_to": {
|
||||
"rel_type": RelationTypes.THREAD,
|
||||
"event_id": thread_root,
|
||||
},
|
||||
},
|
||||
tok=user1_tok,
|
||||
)
|
||||
|
||||
# User2 incremental sync
|
||||
response_body, _ = self.do_sync(sync_body, tok=user2_tok, since=sync_pos)
|
||||
|
||||
# Assert: User2 should NOT see the thread update (they left before latest update)
|
||||
# Note: This demonstrates the known limitation - user2 won't see the thread
|
||||
# even though there was an update while they were joined (Reply 1)
|
||||
self.assertNotIn(
|
||||
EXT_NAME,
|
||||
response_body["extensions"],
|
||||
"User2 should not see thread updates after leaving the room",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user