Compare commits
9 Commits
v1.140.0rc
...
neilj/spee
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa147edfd9 | ||
|
|
d32cd70abd | ||
|
|
0e6f652065 | ||
|
|
7bc6c1c038 | ||
|
|
6ba90dfbe2 | ||
|
|
ca3806b95b | ||
|
|
1a89eb5e64 | ||
|
|
76b4bf33f1 | ||
|
|
29d4d5cd92 |
1
changelog.d/17074.bugfix
Normal file
1
changelog.d/17074.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Process room membership changes likes profile updates asynchronously so the request does not timeout. Fixes https://github.com/element-hq/synapse/issues/1297 .
|
||||
@@ -20,7 +20,7 @@
|
||||
#
|
||||
import logging
|
||||
import random
|
||||
from typing import TYPE_CHECKING, List, Optional, Union
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
|
||||
|
||||
from synapse.api.constants import ProfileFields
|
||||
from synapse.api.errors import (
|
||||
@@ -32,7 +32,16 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
|
||||
from synapse.types import JsonDict, JsonValue, Requester, UserID, create_requester
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
JsonValue,
|
||||
Requester,
|
||||
ScheduledTask,
|
||||
TaskStatus,
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.stringutils import parse_and_validate_mxc_uri
|
||||
|
||||
@@ -46,6 +55,8 @@ MAX_AVATAR_URL_LEN = 1000
|
||||
# Field name length is specced at 255 bytes.
|
||||
MAX_CUSTOM_FIELD_LEN = 255
|
||||
|
||||
UPDATE_JOIN_STATES_TASK_NAME = "update_join_states"
|
||||
|
||||
|
||||
class ProfileHandler:
|
||||
"""Handles fetching and updating user profile information.
|
||||
@@ -77,6 +88,11 @@ class ProfileHandler:
|
||||
|
||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||
|
||||
self._task_scheduler = hs.get_task_scheduler()
|
||||
self._task_scheduler.register_action(
|
||||
self._update_join_states, UPDATE_JOIN_STATES_TASK_NAME
|
||||
)
|
||||
|
||||
async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
|
||||
"""
|
||||
Get a user's profile as a JSON dictionary.
|
||||
@@ -236,7 +252,18 @@ class ProfileHandler:
|
||||
)
|
||||
|
||||
if propagate:
|
||||
await self._update_join_states(requester, target_user)
|
||||
if deactivation:
|
||||
# During deactivation, run profile updates synchronously to ensure
|
||||
# they complete before room forgetting logic runs
|
||||
await self._update_join_states_direct(requester, target_user)
|
||||
else:
|
||||
await self._task_scheduler.schedule_task(
|
||||
UPDATE_JOIN_STATES_TASK_NAME,
|
||||
params={
|
||||
"requester": requester.serialize(),
|
||||
"target_user": target_user.to_string(),
|
||||
},
|
||||
)
|
||||
|
||||
async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
|
||||
"""
|
||||
@@ -338,7 +365,18 @@ class ProfileHandler:
|
||||
)
|
||||
|
||||
if propagate:
|
||||
await self._update_join_states(requester, target_user)
|
||||
if deactivation:
|
||||
# During deactivation, run profile updates synchronously to ensure
|
||||
# they complete before room forgetting logic runs
|
||||
await self._update_join_states_direct(requester, target_user)
|
||||
else:
|
||||
await self._task_scheduler.schedule_task(
|
||||
UPDATE_JOIN_STATES_TASK_NAME,
|
||||
params={
|
||||
"requester": requester.serialize(),
|
||||
"target_user": target_user.to_string(),
|
||||
},
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def check_avatar_size_and_mime_type(self, mxc: str) -> bool:
|
||||
@@ -566,14 +604,31 @@ class ProfileHandler:
|
||||
|
||||
return response
|
||||
|
||||
async def _update_join_states(
|
||||
async def _perform_join_state_updates(
|
||||
self, requester: Requester, target_user: UserID
|
||||
) -> None:
|
||||
"""
|
||||
Update the membership events of each room the user is joined to with the
|
||||
new profile information.
|
||||
"""Perform join state updates for a user's profile change across all their rooms.
|
||||
|
||||
Note that this stomps over any custom display name or avatar URL in member events.
|
||||
This method handles the core logic for updating membership events when a user's
|
||||
profile (displayname, avatar_url, etc.) changes. It includes validation, rate
|
||||
limiting, and special handling for shadow-banned users.
|
||||
|
||||
Args:
|
||||
requester: The user requesting the profile update. Used for rate limiting
|
||||
and authentication checks.
|
||||
target_user: The user whose profile is being updated. Must be a local user.
|
||||
|
||||
Returns:
|
||||
None. The method completes silently on success.
|
||||
|
||||
Raises:
|
||||
Does not raise exceptions directly, but underlying room membership updates
|
||||
may fail and are logged as warnings.
|
||||
|
||||
Note:
|
||||
- Returns early if target_user is not local to this homeserver
|
||||
- Shadow-banned users get a random delay but no actual updates
|
||||
- Rate limiting is applied per requester
|
||||
"""
|
||||
if not self.hs.is_mine(target_user):
|
||||
return
|
||||
@@ -586,14 +641,44 @@ class ProfileHandler:
|
||||
await self.clock.sleep(random.randint(1, 10))
|
||||
return
|
||||
|
||||
room_ids = await self.store.get_rooms_for_user(target_user.to_string())
|
||||
await self._update_rooms_for_profile_change(requester, target_user)
|
||||
|
||||
async def _update_rooms_for_profile_change(
|
||||
self, requester: Requester, target_user: UserID
|
||||
) -> None:
|
||||
"""Update membership events in all rooms where user is joined.
|
||||
|
||||
Iterates through all rooms where the target user is currently joined and
|
||||
updates their membership event to reflect the new profile information.
|
||||
Uses read receipt ordering to prioritize recently viewed rooms first.
|
||||
|
||||
Args:
|
||||
requester: The user requesting the update, used for membership operations.
|
||||
target_user: The user whose membership events should be updated.
|
||||
|
||||
Returns:
|
||||
None. Individual room update failures are logged but don't stop processing.
|
||||
|
||||
Implementation Details:
|
||||
- Rooms are processed in read receipt order (most recently viewed first)
|
||||
- Each membership update is treated as a "join" event with new profile data
|
||||
- Rate limiting is disabled for these updates to hide that they're not atomic
|
||||
- Failures in individual rooms are logged but don't affect other rooms
|
||||
- Assumes target_user is not a guest (guests can't set profile data)
|
||||
|
||||
Note:
|
||||
This stomps over any custom display name or avatar URL in member events.
|
||||
"""
|
||||
room_ids = await self.store.get_rooms_for_user_by_read_receipts(
|
||||
target_user.to_string()
|
||||
)
|
||||
|
||||
room_member_handler = self.hs.get_room_member_handler()
|
||||
for room_id in room_ids:
|
||||
handler = self.hs.get_room_member_handler()
|
||||
try:
|
||||
# Assume the target_user isn't a guest,
|
||||
# because we don't let guests set profile or avatar data.
|
||||
await handler.update_membership(
|
||||
await room_member_handler.update_membership(
|
||||
requester,
|
||||
target_user,
|
||||
room_id,
|
||||
@@ -605,6 +690,36 @@ class ProfileHandler:
|
||||
"Failed to update join event for room %s - %s", room_id, str(e)
|
||||
)
|
||||
|
||||
async def _update_join_states(
|
||||
self, task: ScheduledTask
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
"""
|
||||
Task scheduler wrapper for join state updates.
|
||||
|
||||
Update the membership events of each room the user is joined to with the
|
||||
new profile information.
|
||||
|
||||
Note that this stomps over any custom display name or avatar URL in member events.
|
||||
|
||||
Args:
|
||||
task: Scheduled task containing requester and target_user parameters
|
||||
"""
|
||||
assert task.params is not None
|
||||
requester = Requester.deserialize(self.store, task.params["requester"])
|
||||
target_user = UserID.from_string(task.params["target_user"])
|
||||
|
||||
await self._perform_join_state_updates(requester, target_user)
|
||||
return TaskStatus.COMPLETE, None, None
|
||||
|
||||
async def _update_join_states_direct(
|
||||
self, requester: Requester, target_user: UserID
|
||||
) -> None:
|
||||
"""
|
||||
Direct version for synchronous execution (e.g., during deactivation).
|
||||
Runs the same logic as _update_join_states without task scheduler wrapper.
|
||||
"""
|
||||
await self._perform_join_state_updates(requester, target_user)
|
||||
|
||||
async def check_profile_query_allowed(
|
||||
self, target_user: UserID, requester: Optional[UserID] = None
|
||||
) -> None:
|
||||
|
||||
@@ -769,6 +769,40 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
return frozenset(room_ids)
|
||||
|
||||
async def get_rooms_for_user_by_read_receipts(self, user_id: str) -> List[str]:
|
||||
"""Returns room_ids ordered by most recent read receipt activity.
|
||||
|
||||
Rooms with recent read receipts appear first (most recent first),
|
||||
rooms without read receipts appear after in arbitrary order.
|
||||
|
||||
Args:
|
||||
user_id: The ID of the user.
|
||||
|
||||
Returns:
|
||||
List of room_ids ordered by read receipt activity.
|
||||
"""
|
||||
|
||||
def _get_rooms_txn(txn: LoggingTransaction) -> List[str]:
|
||||
sql = """
|
||||
SELECT cse.room_id
|
||||
FROM current_state_events cse
|
||||
LEFT JOIN receipts_linearized rl ON (
|
||||
rl.room_id = cse.room_id
|
||||
AND rl.user_id = ?
|
||||
AND rl.receipt_type = 'm.read'
|
||||
)
|
||||
WHERE cse.type = 'm.room.member'
|
||||
AND cse.membership = 'join'
|
||||
AND cse.state_key = ?
|
||||
ORDER BY rl.event_stream_ordering DESC NULLS LAST, cse.room_id
|
||||
"""
|
||||
txn.execute(sql, (user_id, user_id))
|
||||
return [row[0] for row in txn.fetchall()]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_rooms_for_user_by_read_receipts", _get_rooms_txn
|
||||
)
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="get_rooms_for_user",
|
||||
list_name="user_ids",
|
||||
|
||||
@@ -26,10 +26,11 @@ from parameterized import parameterized
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
import synapse.types
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import AuthError, SynapseError
|
||||
from synapse.rest import admin
|
||||
from synapse.rest import admin, login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict, UserID
|
||||
from synapse.types import JsonDict, UserID, get_localpart_from_id
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
@@ -38,7 +39,11 @@ from tests import unittest
|
||||
class ProfileTestCase(unittest.HomeserverTestCase):
|
||||
"""Tests profile management."""
|
||||
|
||||
servlets = [admin.register_servlets]
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.mock_federation = AsyncMock()
|
||||
@@ -67,10 +72,69 @@ class ProfileTestCase(unittest.HomeserverTestCase):
|
||||
self.bob = UserID.from_string("@4567:test")
|
||||
self.alice = UserID.from_string("@alice:remote")
|
||||
|
||||
self.register_user(self.frank.localpart, "frankpassword")
|
||||
frank_password = "frankpassword"
|
||||
self.frank_registered = self.register_user(self.frank.localpart, frank_password)
|
||||
self.frank_token = self.login(self.frank_registered, frank_password)
|
||||
|
||||
self.handler = hs.get_profile_handler()
|
||||
|
||||
def test_set_displayname_scales_to_many_rooms(self) -> None:
|
||||
room_count = 10
|
||||
room_ids = []
|
||||
for _ in range(room_count):
|
||||
room_ids.append(
|
||||
self.helper.create_room_as(
|
||||
self.frank_registered, tok=self.frank_token, is_public=True
|
||||
)
|
||||
)
|
||||
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_ids[0],
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
self.assertEqual(
|
||||
get_localpart_from_id(self.frank_registered), event_content["displayname"]
|
||||
)
|
||||
|
||||
user1_displayname = "user1 displayname"
|
||||
# Create the deferred but don't pump the reactor yet
|
||||
# This allows us to verify that set_displayname doesn't block on room updates
|
||||
from twisted.internet.defer import ensureDeferred
|
||||
|
||||
displayname_deferred = ensureDeferred(
|
||||
self.handler.set_displayname(
|
||||
self.frank,
|
||||
synapse.types.create_requester(self.frank_registered),
|
||||
user1_displayname,
|
||||
)
|
||||
)
|
||||
# Check room state BEFORE resolving the deferred
|
||||
# If set_displayname is truly async, room states shouldn't be updated yet
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_ids[room_count - 1],
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
original_displayname = event_content.get("displayname")
|
||||
# This proves set_displayname doesn't block - room states are unchanged
|
||||
# even though the deferred exists
|
||||
self.assertNotEqual(user1_displayname, original_displayname)
|
||||
# Now resolve the deferred - this will pump the reactor and run background tasks
|
||||
self.get_success(displayname_deferred)
|
||||
|
||||
# After pumping, the background task should have completed
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_ids[room_count - 1],
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
# Final verification that the background task updated the room state
|
||||
self.assertEqual(user1_displayname, event_content["displayname"])
|
||||
|
||||
def test_get_my_name(self) -> None:
|
||||
self.get_success(self.store.set_profile_displayname(self.frank, "Frank"))
|
||||
|
||||
@@ -79,28 +143,58 @@ class ProfileTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual("Frank", displayname)
|
||||
|
||||
def test_set_my_name(self) -> None:
|
||||
room_id = self.helper.create_room_as(
|
||||
self.frank_registered, tok=self.frank_token, is_public=True
|
||||
)
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_id,
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
self.assertEqual(
|
||||
get_localpart_from_id(self.frank_registered), event_content["displayname"]
|
||||
)
|
||||
|
||||
displayname1 = "Frank Jr."
|
||||
self.get_success(
|
||||
self.handler.set_displayname(
|
||||
self.frank, synapse.types.create_requester(self.frank), "Frank Jr."
|
||||
self.frank, synapse.types.create_requester(self.frank), displayname1
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
(self.get_success(self.store.get_profile_displayname(self.frank))),
|
||||
"Frank Jr.",
|
||||
displayname1,
|
||||
)
|
||||
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_id,
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
self.assertEqual(displayname1, event_content["displayname"])
|
||||
|
||||
# Set displayname again
|
||||
displayname2 = "Frank"
|
||||
self.get_success(
|
||||
self.handler.set_displayname(
|
||||
self.frank, synapse.types.create_requester(self.frank), "Frank"
|
||||
self.frank, synapse.types.create_requester(self.frank), displayname2
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
(self.get_success(self.store.get_profile_displayname(self.frank))),
|
||||
"Frank",
|
||||
displayname2,
|
||||
)
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_id,
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
self.assertEqual(displayname2, event_content["displayname"])
|
||||
|
||||
# Set displayname to an empty string
|
||||
self.get_success(
|
||||
@@ -112,6 +206,13 @@ class ProfileTestCase(unittest.HomeserverTestCase):
|
||||
self.assertIsNone(
|
||||
self.get_success(self.store.get_profile_displayname(self.frank))
|
||||
)
|
||||
event_content = self.helper.get_state(
|
||||
room_id=room_id,
|
||||
event_type=EventTypes.Member,
|
||||
tok=self.frank_token,
|
||||
state_key=str(self.frank),
|
||||
)
|
||||
self.assertEqual(None, event_content.get("displayname"))
|
||||
|
||||
def test_set_my_name_if_disabled(self) -> None:
|
||||
self.hs.config.registration.enable_set_displayname = False
|
||||
|
||||
@@ -810,3 +810,119 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# Now let's actually drive the updates to completion
|
||||
self.wait_for_background_updates()
|
||||
|
||||
|
||||
class RoomMemberReadReceiptOrderingTestCase(unittest.HomeserverTestCase):
|
||||
"""Tests for ordering rooms by read receipt activity in get_rooms_for_user_by_read_receipts"""
|
||||
|
||||
def prepare(
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
super().prepare(reactor, clock, homeserver)
|
||||
self.store = homeserver.get_datastores().main
|
||||
self.room_creator = homeserver.get_room_creation_handler()
|
||||
|
||||
persist_event_storage_controller = self.hs.get_storage_controllers().persistence
|
||||
assert persist_event_storage_controller is not None
|
||||
self.persist_event_storage_controller = persist_event_storage_controller
|
||||
# Create test users and rooms in prepare()
|
||||
self.alice = UserID("alice", "test")
|
||||
self.alice_id = self.alice.to_string()
|
||||
self.alice_requester = create_requester(self.alice)
|
||||
|
||||
# Create test rooms
|
||||
self.room1, _, _ = self.get_success(
|
||||
self.room_creator.create_room(self.alice_requester, {}), by=1.0
|
||||
)
|
||||
self.room2, _, _ = self.get_success(
|
||||
self.room_creator.create_room(self.alice_requester, {}), by=1.0
|
||||
)
|
||||
self.room3, _, _ = self.get_success(
|
||||
self.room_creator.create_room(self.alice_requester, {}), by=1.0
|
||||
)
|
||||
|
||||
def test_get_rooms_for_user_by_read_receipts_ordering(self) -> None:
|
||||
"""Test that rooms are ordered by read receipt recency."""
|
||||
# Create events in each room to have something to point read receipts at
|
||||
event1 = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
room_id=self.room1,
|
||||
type="m.room.message",
|
||||
sender=self.alice_id,
|
||||
content={"msgtype": "m.text", "body": "Message 1"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.persist_event_storage_controller.persist_event(event1[0], event1[1])
|
||||
)
|
||||
|
||||
event2 = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
room_id=self.room2,
|
||||
type="m.room.message",
|
||||
sender=self.alice_id,
|
||||
content={"msgtype": "m.text", "body": "Message 2"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.persist_event_storage_controller.persist_event(event2[0], event2[1])
|
||||
)
|
||||
|
||||
event3 = self.get_success(
|
||||
create_event(
|
||||
self.hs,
|
||||
room_id=self.room3,
|
||||
type="m.room.message",
|
||||
sender=self.alice_id,
|
||||
content={"msgtype": "m.text", "body": "Message 3"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.persist_event_storage_controller.persist_event(event3[0], event3[1])
|
||||
)
|
||||
|
||||
# Insert read receipts with different timestamps
|
||||
# room2 should be first (most recent: stream_ordering ~300)
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room2, "m.read", self.alice_id, [event2[0].event_id], None, {}
|
||||
)
|
||||
)
|
||||
|
||||
# room1 should be second (older: stream_ordering ~100)
|
||||
# Advance clock to ensure different timestamp
|
||||
self.reactor.advance(1000)
|
||||
self.get_success(
|
||||
self.store.insert_receipt(
|
||||
self.room1, "m.read", self.alice_id, [event1[0].event_id], None, {}
|
||||
)
|
||||
)
|
||||
|
||||
# room3 has no read receipt, should be last
|
||||
|
||||
# Test the ordering
|
||||
room_ids = self.get_success(
|
||||
self.store.get_rooms_for_user_by_read_receipts(self.alice_id)
|
||||
)
|
||||
|
||||
# room2 should be first (first receipt, higher stream_ordering)
|
||||
# room1 should be second (second receipt, lower stream_ordering)
|
||||
# room3 should be last (no receipt)
|
||||
self.assertEqual(len(room_ids), 3)
|
||||
self.assertEqual(room_ids[0], self.room2) # Most recent receipt
|
||||
self.assertEqual(room_ids[1], self.room1) # Older receipt
|
||||
self.assertEqual(room_ids[2], self.room3) # No receipt
|
||||
|
||||
def test_get_rooms_for_user_by_read_receipts_no_receipts(self) -> None:
|
||||
"""Test that rooms without read receipts are returned in deterministic order."""
|
||||
# Use alice's existing rooms but don't add any read receipts
|
||||
room_ids = self.get_success(
|
||||
self.store.get_rooms_for_user_by_read_receipts(self.alice_id)
|
||||
)
|
||||
|
||||
# Should return all rooms in deterministic order (by room_id since no receipts)
|
||||
self.assertEqual(len(room_ids), 3)
|
||||
expected_order = sorted([self.room1, self.room2, self.room3])
|
||||
self.assertEqual(room_ids, expected_order)
|
||||
|
||||
Reference in New Issue
Block a user