diff --git a/changelog.d/18552.misc b/changelog.d/18552.misc new file mode 100644 index 0000000000..3882c8fed2 --- /dev/null +++ b/changelog.d/18552.misc @@ -0,0 +1 @@ +Allow user registrations to be done on workers. diff --git a/docs/upgrade.md b/docs/upgrade.md index ca9ca121f2..e79ca93c04 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -117,6 +117,14 @@ each upgrade are complete before moving on to the next upgrade, to avoid stacking them up. You can monitor the currently running background updates with [the Admin API](usage/administration/admin_api/background_updates.html#status). +# Upgrading to v1.135.0 + +## `on_user_registration` module API callback may now run on any worker + +Previously, the `on_user_registration` callback would only run on the main +process. Modules relying on this callback must assume that they may now be +called from any worker, not just the main process. + # Upgrading to v1.134.0 ## ICU bundled with Synapse diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 75c65ccc0d..d0924c413b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -118,7 +118,6 @@ class GenericWorkerStore( # FIXME(https://github.com/matrix-org/synapse/issues/3714): We need to add # UserDirectoryStore as we write directly rather than going via the correct worker. UserDirectoryStore, - StatsStore, UIAuthWorkerStore, EndToEndRoomKeyStore, PresenceStore, @@ -154,6 +153,7 @@ class GenericWorkerStore( StreamWorkerStore, EventsWorkerStore, RegistrationWorkerStore, + StatsStore, SearchStore, TransactionWorkerStore, LockStore, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 970013ef20..b90126f9c7 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -49,7 +49,6 @@ from synapse.http.servlet import assert_params_in_dict from synapse.replication.http.login import RegisterDeviceReplicationServlet from synapse.replication.http.register import ( ReplicationPostRegisterActionsServlet, - ReplicationRegisterServlet, ) from synapse.spam_checker_api import RegistrationBehaviour from synapse.types import GUEST_USER_ID_PATTERN, RoomAlias, UserID, create_requester @@ -120,7 +119,6 @@ class RegistrationHandler: self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker if hs.config.worker.worker_app: - self._register_client = ReplicationRegisterServlet.make_client(hs) self._register_device_client = RegisterDeviceReplicationServlet.make_client( hs ) @@ -738,37 +736,20 @@ class RegistrationHandler: shadow_banned: Whether to shadow-ban the user approved: Whether to mark the user as approved by an administrator """ - if self.hs.config.worker.worker_app: - await self._register_client( - user_id=user_id, - password_hash=password_hash, - was_guest=was_guest, - make_guest=make_guest, - appservice_id=appservice_id, - create_profile_with_displayname=create_profile_with_displayname, - admin=admin, - user_type=user_type, - address=address, - shadow_banned=shadow_banned, - approved=approved, - ) - else: - await self.store.register_user( - user_id=user_id, - password_hash=password_hash, - was_guest=was_guest, - make_guest=make_guest, - appservice_id=appservice_id, - create_profile_with_displayname=create_profile_with_displayname, - admin=admin, - user_type=user_type, - shadow_banned=shadow_banned, - approved=approved, - ) + await self.store.register_user( + user_id=user_id, + password_hash=password_hash, + was_guest=was_guest, + make_guest=make_guest, + appservice_id=appservice_id, + create_profile_with_displayname=create_profile_with_displayname, + admin=admin, + user_type=user_type, + shadow_banned=shadow_banned, + approved=approved, + ) - # Only call the account validity module(s) on the main process, to avoid - # repeating e.g. database writes on all of the workers. - await self._account_validity_handler.on_user_registration(user_id) + await self._account_validity_handler.on_user_registration(user_id) async def register_device( self, diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 42a58b2858..27d3504c3c 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -33,6 +33,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# FIXME(2025-07-22): Remove this on the next release, this may only be used +# during rollout to Synapse 1.134 and can be removed after that release. class ReplicationRegisterServlet(ReplicationEndpoint): """Register a new user""" diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 1e21996b12..38d1e650ad 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -175,7 +175,7 @@ class ThreepidValidationSession: """timestamp of when this session was validated if so""" -class RegistrationWorkerStore(CacheInvalidationWorkerStore): +class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore): def __init__( self, database: DatabasePool, @@ -217,12 +217,167 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): self._set_expiration_date_when_missing, ) + # If support for MSC3866 is enabled and configured to require approval for new + # account, we will create new users with an 'approved' flag set to false. + self._require_approval = ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ) + # Create a background job for culling expired 3PID validity tokens if hs.config.worker.run_background_tasks: self._clock.looping_call( self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS ) + async def register_user( + self, + user_id: str, + password_hash: Optional[str] = None, + was_guest: bool = False, + make_guest: bool = False, + appservice_id: Optional[str] = None, + create_profile_with_displayname: Optional[str] = None, + admin: bool = False, + user_type: Optional[str] = None, + shadow_banned: bool = False, + approved: bool = False, + ) -> None: + """Attempts to register an account. + + Args: + user_id: The desired user ID to register. + password_hash: Optional. The password hash for this user. + was_guest: Whether this is a guest account being upgraded to a + non-guest account. + make_guest: True if the the new user should be guest, false to add a + regular user account. + appservice_id: The ID of the appservice registering the user. + create_profile_with_displayname: Optionally create a profile for + the user, setting their displayname to the given value + admin: is an admin user? + user_type: type of user. One of the values from api.constants.UserTypes, + a custom value set in the configuration file, or None for a normal + user. + shadow_banned: Whether the user is shadow-banned, i.e. they may be + told their requests succeeded but we ignore them. + approved: Whether to consider the user has already been approved by an + administrator. + + Raises: + StoreError if the user_id could not be registered. + """ + await self.db_pool.runInteraction( + "register_user", + self._register_user, + user_id, + password_hash, + was_guest, + make_guest, + appservice_id, + create_profile_with_displayname, + admin, + user_type, + shadow_banned, + approved, + ) + + def _register_user( + self, + txn: LoggingTransaction, + user_id: str, + password_hash: Optional[str], + was_guest: bool, + make_guest: bool, + appservice_id: Optional[str], + create_profile_with_displayname: Optional[str], + admin: bool, + user_type: Optional[str], + shadow_banned: bool, + approved: bool, + ) -> None: + user_id_obj = UserID.from_string(user_id) + + now = int(self._clock.time()) + + user_approved = approved or not self._require_approval + + try: + if was_guest: + # Ensure that the guest user actually exists + # ``allow_none=False`` makes this raise an exception + # if the row isn't in the database. + self.db_pool.simple_select_one_txn( + txn, + "users", + keyvalues={"name": user_id, "is_guest": 1}, + retcols=("name",), + allow_none=False, + ) + + self.db_pool.simple_update_one_txn( + txn, + "users", + keyvalues={"name": user_id, "is_guest": 1}, + updatevalues={ + "password_hash": password_hash, + "upgrade_ts": now, + "is_guest": 1 if make_guest else 0, + "appservice_id": appservice_id, + "admin": 1 if admin else 0, + "user_type": user_type, + "shadow_banned": shadow_banned, + "approved": user_approved, + }, + ) + else: + self.db_pool.simple_insert_txn( + txn, + "users", + values={ + "name": user_id, + "password_hash": password_hash, + "creation_ts": now, + "is_guest": 1 if make_guest else 0, + "appservice_id": appservice_id, + "admin": 1 if admin else 0, + "user_type": user_type, + "shadow_banned": shadow_banned, + "approved": user_approved, + }, + ) + + except self.database_engine.module.IntegrityError: + raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE) + + if self._account_validity_enabled: + self.set_expiration_date_for_user_txn(txn, user_id) + + if create_profile_with_displayname: + # set a default displayname serverside to avoid ugly race + # between auto-joins and clients trying to set displaynames + # + # *obviously* the 'profiles' table uses localpart for user_id + # while everything else uses the full mxid. + txn.execute( + "INSERT INTO profiles(full_user_id, user_id, displayname) VALUES (?,?,?)", + (user_id, user_id_obj.localpart, create_profile_with_displayname), + ) + + if self.hs.config.stats.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + @cached() async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" @@ -2354,7 +2509,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): return nb_processed -class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): +class RegistrationStore(RegistrationBackgroundUpdateStore): def __init__( self, database: DatabasePool, @@ -2370,13 +2525,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") - # If support for MSC3866 is enabled and configured to require approval for new - # account, we will create new users with an 'approved' flag set to false. - self._require_approval = ( - hs.config.experimental.msc3866.enabled - and hs.config.experimental.msc3866.require_approval_for_new_accounts - ) - # Create a background job for removing expired login tokens if hs.config.worker.run_background_tasks: self._clock.looping_call( @@ -2524,153 +2672,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): device_id, ) - async def register_user( - self, - user_id: str, - password_hash: Optional[str] = None, - was_guest: bool = False, - make_guest: bool = False, - appservice_id: Optional[str] = None, - create_profile_with_displayname: Optional[str] = None, - admin: bool = False, - user_type: Optional[str] = None, - shadow_banned: bool = False, - approved: bool = False, - ) -> None: - """Attempts to register an account. - - Args: - user_id: The desired user ID to register. - password_hash: Optional. The password hash for this user. - was_guest: Whether this is a guest account being upgraded to a - non-guest account. - make_guest: True if the the new user should be guest, false to add a - regular user account. - appservice_id: The ID of the appservice registering the user. - create_profile_with_displayname: Optionally create a profile for - the user, setting their displayname to the given value - admin: is an admin user? - user_type: type of user. One of the values from api.constants.UserTypes, - a custom value set in the configuration file, or None for a normal - user. - shadow_banned: Whether the user is shadow-banned, i.e. they may be - told their requests succeeded but we ignore them. - approved: Whether to consider the user has already been approved by an - administrator. - - Raises: - StoreError if the user_id could not be registered. - """ - await self.db_pool.runInteraction( - "register_user", - self._register_user, - user_id, - password_hash, - was_guest, - make_guest, - appservice_id, - create_profile_with_displayname, - admin, - user_type, - shadow_banned, - approved, - ) - - def _register_user( - self, - txn: LoggingTransaction, - user_id: str, - password_hash: Optional[str], - was_guest: bool, - make_guest: bool, - appservice_id: Optional[str], - create_profile_with_displayname: Optional[str], - admin: bool, - user_type: Optional[str], - shadow_banned: bool, - approved: bool, - ) -> None: - user_id_obj = UserID.from_string(user_id) - - now = int(self._clock.time()) - - user_approved = approved or not self._require_approval - - try: - if was_guest: - # Ensure that the guest user actually exists - # ``allow_none=False`` makes this raise an exception - # if the row isn't in the database. - self.db_pool.simple_select_one_txn( - txn, - "users", - keyvalues={"name": user_id, "is_guest": 1}, - retcols=("name",), - allow_none=False, - ) - - self.db_pool.simple_update_one_txn( - txn, - "users", - keyvalues={"name": user_id, "is_guest": 1}, - updatevalues={ - "password_hash": password_hash, - "upgrade_ts": now, - "is_guest": 1 if make_guest else 0, - "appservice_id": appservice_id, - "admin": 1 if admin else 0, - "user_type": user_type, - "shadow_banned": shadow_banned, - "approved": user_approved, - }, - ) - else: - self.db_pool.simple_insert_txn( - txn, - "users", - values={ - "name": user_id, - "password_hash": password_hash, - "creation_ts": now, - "is_guest": 1 if make_guest else 0, - "appservice_id": appservice_id, - "admin": 1 if admin else 0, - "user_type": user_type, - "shadow_banned": shadow_banned, - "approved": user_approved, - }, - ) - - except self.database_engine.module.IntegrityError: - raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE) - - if self._account_validity_enabled: - self.set_expiration_date_for_user_txn(txn, user_id) - - if create_profile_with_displayname: - # set a default displayname serverside to avoid ugly race - # between auto-joins and clients trying to set displaynames - # - # *obviously* the 'profiles' table uses localpart for user_id - # while everything else uses the full mxid. - txn.execute( - "INSERT INTO profiles(full_user_id, user_id, displayname) VALUES (?,?,?)", - (user_id, user_id_obj.localpart, create_profile_with_displayname), - ) - - if self.hs.config.stats.stats_enabled: - # we create a new completed user statistics row - - # we don't strictly need current_token since this user really can't - # have any state deltas before now (as it is a new user), but still, - # we include it for completeness. - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - self._update_stats_delta_txn( - txn, now, "user", user_id, {}, complete_with_stream_id=current_token - ) - - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - async def user_set_password_hash( self, user_id: str, password_hash: Optional[str] ) -> None: