Compare commits

...

9 Commits

Author SHA1 Message Date
Neil Johnson
aa147edfd9 fix failures to account deactivation to allow for blocking updates to profiles and avatars 2025-08-28 22:18:58 +01:00
Neil Johnson
d32cd70abd ensure that the rooms are updated according to most recent read receipt, so that the lag in updating all rooms is less noticable 2025-08-28 17:12:27 +01:00
Neil Johnson
0e6f652065 fix type checking and lint 2025-08-28 17:12:27 +01:00
Neil Johnson
7bc6c1c038 got to finish with a . 2025-08-28 17:12:27 +01:00
Neil Johnson
6ba90dfbe2 changelog 2025-08-28 17:12:27 +01:00
Neil Johnson
ca3806b95b prevent reactor moving forwards and processing background tasks unexpectedly 2025-08-28 17:12:27 +01:00
Neil Johnson
1a89eb5e64 fix missing , 2025-08-28 17:12:27 +01:00
Neil Johnson
76b4bf33f1 add failing tests to verify behaviour 2025-08-28 17:12:27 +01:00
Neil Johnson
29d4d5cd92 move _update_join_states to be processed by a ScheduledTask 2025-08-28 17:12:27 +01:00
5 changed files with 387 additions and 20 deletions

1
changelog.d/17074.bugfix Normal file
View File

@@ -0,0 +1 @@
Process room membership changes likes profile updates asynchronously so the request does not timeout. Fixes https://github.com/element-hq/synapse/issues/1297 .

View File

@@ -20,7 +20,7 @@
#
import logging
import random
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from synapse.api.constants import ProfileFields
from synapse.api.errors import (
@@ -32,7 +32,16 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
from synapse.types import JsonDict, JsonValue, Requester, UserID, create_requester
from synapse.types import (
JsonDict,
JsonMapping,
JsonValue,
Requester,
ScheduledTask,
TaskStatus,
UserID,
create_requester,
)
from synapse.util.caches.descriptors import cached
from synapse.util.stringutils import parse_and_validate_mxc_uri
@@ -46,6 +55,8 @@ MAX_AVATAR_URL_LEN = 1000
# Field name length is specced at 255 bytes.
MAX_CUSTOM_FIELD_LEN = 255
UPDATE_JOIN_STATES_TASK_NAME = "update_join_states"
class ProfileHandler:
"""Handles fetching and updating user profile information.
@@ -77,6 +88,11 @@ class ProfileHandler:
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self._task_scheduler = hs.get_task_scheduler()
self._task_scheduler.register_action(
self._update_join_states, UPDATE_JOIN_STATES_TASK_NAME
)
async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
"""
Get a user's profile as a JSON dictionary.
@@ -236,7 +252,18 @@ class ProfileHandler:
)
if propagate:
await self._update_join_states(requester, target_user)
if deactivation:
# During deactivation, run profile updates synchronously to ensure
# they complete before room forgetting logic runs
await self._update_join_states_direct(requester, target_user)
else:
await self._task_scheduler.schedule_task(
UPDATE_JOIN_STATES_TASK_NAME,
params={
"requester": requester.serialize(),
"target_user": target_user.to_string(),
},
)
async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
"""
@@ -338,7 +365,18 @@ class ProfileHandler:
)
if propagate:
await self._update_join_states(requester, target_user)
if deactivation:
# During deactivation, run profile updates synchronously to ensure
# they complete before room forgetting logic runs
await self._update_join_states_direct(requester, target_user)
else:
await self._task_scheduler.schedule_task(
UPDATE_JOIN_STATES_TASK_NAME,
params={
"requester": requester.serialize(),
"target_user": target_user.to_string(),
},
)
@cached()
async def check_avatar_size_and_mime_type(self, mxc: str) -> bool:
@@ -566,14 +604,31 @@ class ProfileHandler:
return response
async def _update_join_states(
async def _perform_join_state_updates(
self, requester: Requester, target_user: UserID
) -> None:
"""
Update the membership events of each room the user is joined to with the
new profile information.
"""Perform join state updates for a user's profile change across all their rooms.
Note that this stomps over any custom display name or avatar URL in member events.
This method handles the core logic for updating membership events when a user's
profile (displayname, avatar_url, etc.) changes. It includes validation, rate
limiting, and special handling for shadow-banned users.
Args:
requester: The user requesting the profile update. Used for rate limiting
and authentication checks.
target_user: The user whose profile is being updated. Must be a local user.
Returns:
None. The method completes silently on success.
Raises:
Does not raise exceptions directly, but underlying room membership updates
may fail and are logged as warnings.
Note:
- Returns early if target_user is not local to this homeserver
- Shadow-banned users get a random delay but no actual updates
- Rate limiting is applied per requester
"""
if not self.hs.is_mine(target_user):
return
@@ -586,14 +641,44 @@ class ProfileHandler:
await self.clock.sleep(random.randint(1, 10))
return
room_ids = await self.store.get_rooms_for_user(target_user.to_string())
await self._update_rooms_for_profile_change(requester, target_user)
async def _update_rooms_for_profile_change(
self, requester: Requester, target_user: UserID
) -> None:
"""Update membership events in all rooms where user is joined.
Iterates through all rooms where the target user is currently joined and
updates their membership event to reflect the new profile information.
Uses read receipt ordering to prioritize recently viewed rooms first.
Args:
requester: The user requesting the update, used for membership operations.
target_user: The user whose membership events should be updated.
Returns:
None. Individual room update failures are logged but don't stop processing.
Implementation Details:
- Rooms are processed in read receipt order (most recently viewed first)
- Each membership update is treated as a "join" event with new profile data
- Rate limiting is disabled for these updates to hide that they're not atomic
- Failures in individual rooms are logged but don't affect other rooms
- Assumes target_user is not a guest (guests can't set profile data)
Note:
This stomps over any custom display name or avatar URL in member events.
"""
room_ids = await self.store.get_rooms_for_user_by_read_receipts(
target_user.to_string()
)
room_member_handler = self.hs.get_room_member_handler()
for room_id in room_ids:
handler = self.hs.get_room_member_handler()
try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
await handler.update_membership(
await room_member_handler.update_membership(
requester,
target_user,
room_id,
@@ -605,6 +690,36 @@ class ProfileHandler:
"Failed to update join event for room %s - %s", room_id, str(e)
)
async def _update_join_states(
self, task: ScheduledTask
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
Task scheduler wrapper for join state updates.
Update the membership events of each room the user is joined to with the
new profile information.
Note that this stomps over any custom display name or avatar URL in member events.
Args:
task: Scheduled task containing requester and target_user parameters
"""
assert task.params is not None
requester = Requester.deserialize(self.store, task.params["requester"])
target_user = UserID.from_string(task.params["target_user"])
await self._perform_join_state_updates(requester, target_user)
return TaskStatus.COMPLETE, None, None
async def _update_join_states_direct(
self, requester: Requester, target_user: UserID
) -> None:
"""
Direct version for synchronous execution (e.g., during deactivation).
Runs the same logic as _update_join_states without task scheduler wrapper.
"""
await self._perform_join_state_updates(requester, target_user)
async def check_profile_query_allowed(
self, target_user: UserID, requester: Optional[UserID] = None
) -> None:

View File

@@ -769,6 +769,40 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return frozenset(room_ids)
async def get_rooms_for_user_by_read_receipts(self, user_id: str) -> List[str]:
"""Returns room_ids ordered by most recent read receipt activity.
Rooms with recent read receipts appear first (most recent first),
rooms without read receipts appear after in arbitrary order.
Args:
user_id: The ID of the user.
Returns:
List of room_ids ordered by read receipt activity.
"""
def _get_rooms_txn(txn: LoggingTransaction) -> List[str]:
sql = """
SELECT cse.room_id
FROM current_state_events cse
LEFT JOIN receipts_linearized rl ON (
rl.room_id = cse.room_id
AND rl.user_id = ?
AND rl.receipt_type = 'm.read'
)
WHERE cse.type = 'm.room.member'
AND cse.membership = 'join'
AND cse.state_key = ?
ORDER BY rl.event_stream_ordering DESC NULLS LAST, cse.room_id
"""
txn.execute(sql, (user_id, user_id))
return [row[0] for row in txn.fetchall()]
return await self.db_pool.runInteraction(
"get_rooms_for_user_by_read_receipts", _get_rooms_txn
)
@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",

View File

@@ -26,10 +26,11 @@ from parameterized import parameterized
from twisted.internet.testing import MemoryReactor
import synapse.types
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, SynapseError
from synapse.rest import admin
from synapse.rest import admin, login, room
from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
from synapse.types import JsonDict, UserID, get_localpart_from_id
from synapse.util import Clock
from tests import unittest
@@ -38,7 +39,11 @@ from tests import unittest
class ProfileTestCase(unittest.HomeserverTestCase):
"""Tests profile management."""
servlets = [admin.register_servlets]
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.mock_federation = AsyncMock()
@@ -67,10 +72,69 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.bob = UserID.from_string("@4567:test")
self.alice = UserID.from_string("@alice:remote")
self.register_user(self.frank.localpart, "frankpassword")
frank_password = "frankpassword"
self.frank_registered = self.register_user(self.frank.localpart, frank_password)
self.frank_token = self.login(self.frank_registered, frank_password)
self.handler = hs.get_profile_handler()
def test_set_displayname_scales_to_many_rooms(self) -> None:
room_count = 10
room_ids = []
for _ in range(room_count):
room_ids.append(
self.helper.create_room_as(
self.frank_registered, tok=self.frank_token, is_public=True
)
)
event_content = self.helper.get_state(
room_id=room_ids[0],
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
self.assertEqual(
get_localpart_from_id(self.frank_registered), event_content["displayname"]
)
user1_displayname = "user1 displayname"
# Create the deferred but don't pump the reactor yet
# This allows us to verify that set_displayname doesn't block on room updates
from twisted.internet.defer import ensureDeferred
displayname_deferred = ensureDeferred(
self.handler.set_displayname(
self.frank,
synapse.types.create_requester(self.frank_registered),
user1_displayname,
)
)
# Check room state BEFORE resolving the deferred
# If set_displayname is truly async, room states shouldn't be updated yet
event_content = self.helper.get_state(
room_id=room_ids[room_count - 1],
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
original_displayname = event_content.get("displayname")
# This proves set_displayname doesn't block - room states are unchanged
# even though the deferred exists
self.assertNotEqual(user1_displayname, original_displayname)
# Now resolve the deferred - this will pump the reactor and run background tasks
self.get_success(displayname_deferred)
# After pumping, the background task should have completed
event_content = self.helper.get_state(
room_id=room_ids[room_count - 1],
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
# Final verification that the background task updated the room state
self.assertEqual(user1_displayname, event_content["displayname"])
def test_get_my_name(self) -> None:
self.get_success(self.store.set_profile_displayname(self.frank, "Frank"))
@@ -79,28 +143,58 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.assertEqual("Frank", displayname)
def test_set_my_name(self) -> None:
room_id = self.helper.create_room_as(
self.frank_registered, tok=self.frank_token, is_public=True
)
event_content = self.helper.get_state(
room_id=room_id,
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
self.assertEqual(
get_localpart_from_id(self.frank_registered), event_content["displayname"]
)
displayname1 = "Frank Jr."
self.get_success(
self.handler.set_displayname(
self.frank, synapse.types.create_requester(self.frank), "Frank Jr."
self.frank, synapse.types.create_requester(self.frank), displayname1
)
)
self.assertEqual(
(self.get_success(self.store.get_profile_displayname(self.frank))),
"Frank Jr.",
displayname1,
)
event_content = self.helper.get_state(
room_id=room_id,
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
self.assertEqual(displayname1, event_content["displayname"])
# Set displayname again
displayname2 = "Frank"
self.get_success(
self.handler.set_displayname(
self.frank, synapse.types.create_requester(self.frank), "Frank"
self.frank, synapse.types.create_requester(self.frank), displayname2
)
)
self.assertEqual(
(self.get_success(self.store.get_profile_displayname(self.frank))),
"Frank",
displayname2,
)
event_content = self.helper.get_state(
room_id=room_id,
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
self.assertEqual(displayname2, event_content["displayname"])
# Set displayname to an empty string
self.get_success(
@@ -112,6 +206,13 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.assertIsNone(
self.get_success(self.store.get_profile_displayname(self.frank))
)
event_content = self.helper.get_state(
room_id=room_id,
event_type=EventTypes.Member,
tok=self.frank_token,
state_key=str(self.frank),
)
self.assertEqual(None, event_content.get("displayname"))
def test_set_my_name_if_disabled(self) -> None:
self.hs.config.registration.enable_set_displayname = False

View File

@@ -810,3 +810,119 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
# Now let's actually drive the updates to completion
self.wait_for_background_updates()
class RoomMemberReadReceiptOrderingTestCase(unittest.HomeserverTestCase):
"""Tests for ordering rooms by read receipt activity in get_rooms_for_user_by_read_receipts"""
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
super().prepare(reactor, clock, homeserver)
self.store = homeserver.get_datastores().main
self.room_creator = homeserver.get_room_creation_handler()
persist_event_storage_controller = self.hs.get_storage_controllers().persistence
assert persist_event_storage_controller is not None
self.persist_event_storage_controller = persist_event_storage_controller
# Create test users and rooms in prepare()
self.alice = UserID("alice", "test")
self.alice_id = self.alice.to_string()
self.alice_requester = create_requester(self.alice)
# Create test rooms
self.room1, _, _ = self.get_success(
self.room_creator.create_room(self.alice_requester, {}), by=1.0
)
self.room2, _, _ = self.get_success(
self.room_creator.create_room(self.alice_requester, {}), by=1.0
)
self.room3, _, _ = self.get_success(
self.room_creator.create_room(self.alice_requester, {}), by=1.0
)
def test_get_rooms_for_user_by_read_receipts_ordering(self) -> None:
"""Test that rooms are ordered by read receipt recency."""
# Create events in each room to have something to point read receipts at
event1 = self.get_success(
create_event(
self.hs,
room_id=self.room1,
type="m.room.message",
sender=self.alice_id,
content={"msgtype": "m.text", "body": "Message 1"},
)
)
self.get_success(
self.persist_event_storage_controller.persist_event(event1[0], event1[1])
)
event2 = self.get_success(
create_event(
self.hs,
room_id=self.room2,
type="m.room.message",
sender=self.alice_id,
content={"msgtype": "m.text", "body": "Message 2"},
)
)
self.get_success(
self.persist_event_storage_controller.persist_event(event2[0], event2[1])
)
event3 = self.get_success(
create_event(
self.hs,
room_id=self.room3,
type="m.room.message",
sender=self.alice_id,
content={"msgtype": "m.text", "body": "Message 3"},
)
)
self.get_success(
self.persist_event_storage_controller.persist_event(event3[0], event3[1])
)
# Insert read receipts with different timestamps
# room2 should be first (most recent: stream_ordering ~300)
self.get_success(
self.store.insert_receipt(
self.room2, "m.read", self.alice_id, [event2[0].event_id], None, {}
)
)
# room1 should be second (older: stream_ordering ~100)
# Advance clock to ensure different timestamp
self.reactor.advance(1000)
self.get_success(
self.store.insert_receipt(
self.room1, "m.read", self.alice_id, [event1[0].event_id], None, {}
)
)
# room3 has no read receipt, should be last
# Test the ordering
room_ids = self.get_success(
self.store.get_rooms_for_user_by_read_receipts(self.alice_id)
)
# room2 should be first (first receipt, higher stream_ordering)
# room1 should be second (second receipt, lower stream_ordering)
# room3 should be last (no receipt)
self.assertEqual(len(room_ids), 3)
self.assertEqual(room_ids[0], self.room2) # Most recent receipt
self.assertEqual(room_ids[1], self.room1) # Older receipt
self.assertEqual(room_ids[2], self.room3) # No receipt
def test_get_rooms_for_user_by_read_receipts_no_receipts(self) -> None:
"""Test that rooms without read receipts are returned in deterministic order."""
# Use alice's existing rooms but don't add any read receipts
room_ids = self.get_success(
self.store.get_rooms_for_user_by_read_receipts(self.alice_id)
)
# Should return all rooms in deterministic order (by room_id since no receipts)
self.assertEqual(len(room_ids), 3)
expected_order = sorted([self.room1, self.room2, self.room3])
self.assertEqual(room_ids, expected_order)