From 537e14169e3cc2dc293e9efbdd0733a95192253d Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 2 Sep 2025 13:55:12 +0200 Subject: [PATCH 1/4] Support stable endpoint and scopes from the MSC3861 family (#18549) This adds stable APIs for both MSC2965 and MSC2967 --- changelog.d/18549.feature | 1 + synapse/api/auth/mas.py | 26 ++-- synapse/api/auth/msc3861_delegated.py | 34 +++-- synapse/rest/client/auth_metadata.py | 16 ++- tests/handlers/test_oauth_delegation.py | 168 ++++++++---------------- tests/rest/client/test_auth_metadata.py | 21 +-- 6 files changed, 113 insertions(+), 153 deletions(-) create mode 100644 changelog.d/18549.feature diff --git a/changelog.d/18549.feature b/changelog.d/18549.feature new file mode 100644 index 0000000000..4d78ae57ab --- /dev/null +++ b/changelog.d/18549.feature @@ -0,0 +1 @@ +Support for the stable endpoint and scopes of [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) & co. diff --git a/synapse/api/auth/mas.py b/synapse/api/auth/mas.py index 00bad76856..40b4a5bd34 100644 --- a/synapse/api/auth/mas.py +++ b/synapse/api/auth/mas.py @@ -13,7 +13,7 @@ # # import logging -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Set from urllib.parse import urlencode from synapse._pydantic_compat import ( @@ -57,8 +57,10 @@ logger = logging.getLogger(__name__) # Scope as defined by MSC2967 # https://github.com/matrix-org/matrix-spec-proposals/pull/2967 -SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*" -SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:" +UNSTABLE_SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*" +UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:" +STABLE_SCOPE_MATRIX_API = "urn:matrix:client:api:*" +STABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:client:device:" class ServerMetadata(BaseModel): @@ -334,7 +336,10 @@ class MasDelegatedAuth(BaseAuth): scope = introspection_result.get_scope_set() # Determine type of user based on presence of particular scopes - if SCOPE_MATRIX_API not in scope: + if ( + UNSTABLE_SCOPE_MATRIX_API not in scope + and STABLE_SCOPE_MATRIX_API not in scope + ): raise InvalidClientTokenError( "Token doesn't grant access to the Matrix C-S API" ) @@ -366,11 +371,12 @@ class MasDelegatedAuth(BaseAuth): # We only allow a single device_id in the scope, so we find them all in the # scope list, and raise if there are more than one. The OIDC server should be # the one enforcing valid scopes, so we raise a 500 if we find an invalid scope. - device_ids = [ - tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :] - for tok in scope - if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX) - ] + device_ids: Set[str] = set() + for tok in scope: + if tok.startswith(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX): + device_ids.add(tok[len(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX) :]) + elif tok.startswith(STABLE_SCOPE_MATRIX_DEVICE_PREFIX): + device_ids.add(tok[len(STABLE_SCOPE_MATRIX_DEVICE_PREFIX) :]) if len(device_ids) > 1: raise AuthError( @@ -378,7 +384,7 @@ class MasDelegatedAuth(BaseAuth): "Multiple device IDs in scope", ) - device_id = device_ids[0] if device_ids else None + device_id = next(iter(device_ids), None) if device_id is not None: # Sanity check the device_id diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 928b2c8f8b..c406c683e7 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -20,7 +20,7 @@ # import logging from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set from urllib.parse import urlencode from authlib.oauth2 import ClientAuth @@ -34,7 +34,6 @@ from synapse.api.errors import ( AuthError, HttpResponseException, InvalidClientTokenError, - OAuthInsufficientScopeError, SynapseError, UnrecognizedRequestError, ) @@ -63,9 +62,10 @@ logger = logging.getLogger(__name__) # Scope as defined by MSC2967 # https://github.com/matrix-org/matrix-spec-proposals/pull/2967 -SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*" -SCOPE_MATRIX_GUEST = "urn:matrix:org.matrix.msc2967.client:api:guest" -SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:" +UNSTABLE_SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*" +UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:" +STABLE_SCOPE_MATRIX_API = "urn:matrix:client:api:*" +STABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:client:device:" # Scope which allows access to the Synapse admin API SCOPE_SYNAPSE_ADMIN = "urn:synapse:admin:*" @@ -444,9 +444,6 @@ class MSC3861DelegatedAuth(BaseAuth): if not self._is_access_token_the_admin_token(access_token): await self._record_request(request, requester) - if not allow_guest and requester.is_guest: - raise OAuthInsufficientScopeError([SCOPE_MATRIX_API]) - request.requester = requester return requester @@ -528,10 +525,11 @@ class MSC3861DelegatedAuth(BaseAuth): scope: List[str] = introspection_result.get_scope_list() # Determine type of user based on presence of particular scopes - has_user_scope = SCOPE_MATRIX_API in scope - has_guest_scope = SCOPE_MATRIX_GUEST in scope + has_user_scope = ( + UNSTABLE_SCOPE_MATRIX_API in scope or STABLE_SCOPE_MATRIX_API in scope + ) - if not has_user_scope and not has_guest_scope: + if not has_user_scope: raise InvalidClientTokenError("No scope in token granting user rights") # Match via the sub claim @@ -579,11 +577,12 @@ class MSC3861DelegatedAuth(BaseAuth): # We only allow a single device_id in the scope, so we find them all in the # scope list, and raise if there are more than one. The OIDC server should be # the one enforcing valid scopes, so we raise a 500 if we find an invalid scope. - device_ids = [ - tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :] - for tok in scope - if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX) - ] + device_ids: Set[str] = set() + for tok in scope: + if tok.startswith(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX): + device_ids.add(tok[len(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX) :]) + elif tok.startswith(STABLE_SCOPE_MATRIX_DEVICE_PREFIX): + device_ids.add(tok[len(STABLE_SCOPE_MATRIX_DEVICE_PREFIX) :]) if len(device_ids) > 1: raise AuthError( @@ -591,7 +590,7 @@ class MSC3861DelegatedAuth(BaseAuth): "Multiple device IDs in scope", ) - device_id = device_ids[0] if device_ids else None + device_id = next(iter(device_ids), None) if device_id is not None: # Sanity check the device_id @@ -617,5 +616,4 @@ class MSC3861DelegatedAuth(BaseAuth): user_id=user_id, device_id=device_id, scope=scope, - is_guest=(has_guest_scope and not has_user_scope), ) diff --git a/synapse/rest/client/auth_metadata.py b/synapse/rest/client/auth_metadata.py index 25e01a6574..4b5d997478 100644 --- a/synapse/rest/client/auth_metadata.py +++ b/synapse/rest/client/auth_metadata.py @@ -76,11 +76,17 @@ class AuthMetadataServlet(RestServlet): Advertises the OAuth 2.0 server metadata for the homeserver. """ - PATTERNS = client_patterns( - "/org.matrix.msc2965/auth_metadata$", - unstable=True, - releases=(), - ) + PATTERNS = [ + *client_patterns( + "/auth_metadata$", + releases=("v1",), + ), + *client_patterns( + "/org.matrix.msc2965/auth_metadata$", + unstable=True, + releases=(), + ), + ] def __init__(self, hs: "HomeServer"): super().__init__() diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index 2b0638bc12..d24614f6a3 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -25,11 +25,11 @@ import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO -from typing import Any, Coroutine, Dict, Generator, Optional, TypeVar, Union +from typing import Any, ClassVar, Coroutine, Dict, Generator, Optional, TypeVar, Union from unittest.mock import ANY, AsyncMock, Mock from urllib.parse import parse_qs -from parameterized import parameterized_class +from parameterized.parameterized import parameterized_class from signedjson.key import ( encode_verify_key_base64, generate_signing_key, @@ -46,7 +46,6 @@ from synapse.api.errors import ( Codes, HttpResponseException, InvalidClientTokenError, - OAuthInsufficientScopeError, SynapseError, ) from synapse.appservice import ApplicationService @@ -78,11 +77,7 @@ JWKS_URI = ISSUER + ".well-known/jwks.json" INTROSPECTION_ENDPOINT = ISSUER + "introspect" SYNAPSE_ADMIN_SCOPE = "urn:synapse:admin:*" -MATRIX_USER_SCOPE = "urn:matrix:org.matrix.msc2967.client:api:*" -MATRIX_GUEST_SCOPE = "urn:matrix:org.matrix.msc2967.client:api:guest" -MATRIX_DEVICE_SCOPE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:" DEVICE = "AABBCCDD" -MATRIX_DEVICE_SCOPE = MATRIX_DEVICE_SCOPE_PREFIX + DEVICE SUBJECT = "abc-def-ghi" USERNAME = "test-user" USER_ID = "@" + USERNAME + ":" + SERVER_NAME @@ -112,7 +107,24 @@ async def get_json(url: str) -> JsonDict: @skip_unless(HAS_AUTHLIB, "requires authlib") +@parameterized_class( + ("device_scope_prefix", "api_scope"), + [ + ("urn:matrix:client:device:", "urn:matrix:client:api:*"), + ( + "urn:matrix:org.matrix.msc2967.client:device:", + "urn:matrix:org.matrix.msc2967.client:api:*", + ), + ], +) class MSC3861OAuthDelegation(HomeserverTestCase): + device_scope_prefix: ClassVar[str] + api_scope: ClassVar[str] + + @property + def device_scope(self) -> str: + return self.device_scope_prefix + DEVICE + servlets = [ account.register_servlets, keys.register_servlets, @@ -212,7 +224,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): """The handler should return a 500 when no subject is present.""" self._set_introspection_returnvalue( - {"active": True, "scope": " ".join([MATRIX_USER_SCOPE])} + {"active": True, "scope": " ".join([self.api_scope])}, ) request = Mock(args={}) @@ -235,7 +247,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_DEVICE_SCOPE]), + "scope": " ".join([self.device_scope]), } ) request = Mock(args={}) @@ -282,7 +294,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]), + "scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]), "username": USERNAME, } ) @@ -312,9 +324,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join( - [SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE, MATRIX_GUEST_SCOPE] - ), + "scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]), "username": USERNAME, } ) @@ -344,7 +354,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_USER_SCOPE]), + "scope": " ".join([self.api_scope]), "username": USERNAME, } ) @@ -374,7 +384,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]), + "scope": " ".join([self.api_scope, self.device_scope]), "username": USERNAME, } ) @@ -404,7 +414,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_USER_SCOPE]), + "scope": " ".join([self.api_scope]), "device_id": DEVICE, "username": USERNAME, } @@ -444,9 +454,9 @@ class MSC3861OAuthDelegation(HomeserverTestCase): "sub": SUBJECT, "scope": " ".join( [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC", - f"{MATRIX_DEVICE_SCOPE_PREFIX}DDEEFF", + self.api_scope, + f"{self.device_scope_prefix}AABBCC", + f"{self.device_scope_prefix}DDEEFF", ] ), "username": USERNAME, @@ -457,68 +467,6 @@ class MSC3861OAuthDelegation(HomeserverTestCase): request.requestHeaders.getRawHeaders = mock_getRawHeaders() self.get_failure(self.auth.get_user_by_req(request), AuthError) - def test_active_guest_not_allowed(self) -> None: - """The handler should return an insufficient scope error.""" - - self._set_introspection_returnvalue( - { - "active": True, - "sub": SUBJECT, - "scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]), - "username": USERNAME, - } - ) - request = Mock(args={}) - request.args[b"access_token"] = [b"mockAccessToken"] - request.requestHeaders.getRawHeaders = mock_getRawHeaders() - error = self.get_failure( - self.auth.get_user_by_req(request), OAuthInsufficientScopeError - ) - self.http_client.get_json.assert_called_once_with(WELL_KNOWN) - self._rust_client.post.assert_called_once_with( - url=INTROSPECTION_ENDPOINT, - response_limit=ANY, - request_body=ANY, - headers=ANY, - ) - self._assertParams() - self.assertEqual( - getattr(error.value, "headers", {})["WWW-Authenticate"], - 'Bearer error="insufficient_scope", scope="urn:matrix:org.matrix.msc2967.client:api:*"', - ) - - def test_active_guest_allowed(self) -> None: - """The handler should return a requester with guest user rights and a device ID.""" - - self._set_introspection_returnvalue( - { - "active": True, - "sub": SUBJECT, - "scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]), - "username": USERNAME, - } - ) - request = Mock(args={}) - request.args[b"access_token"] = [b"mockAccessToken"] - request.requestHeaders.getRawHeaders = mock_getRawHeaders() - requester = self.get_success( - self.auth.get_user_by_req(request, allow_guest=True) - ) - self.http_client.get_json.assert_called_once_with(WELL_KNOWN) - self._rust_client.post.assert_called_once_with( - url=INTROSPECTION_ENDPOINT, - response_limit=ANY, - request_body=ANY, - headers=ANY, - ) - self._assertParams() - self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME)) - self.assertEqual(requester.is_guest, True) - self.assertEqual( - get_awaitable_result(self.auth.is_server_admin(requester)), False - ) - self.assertEqual(requester.device_id, DEVICE) - def test_unavailable_introspection_endpoint(self) -> None: """The handler should return an internal server error.""" request = Mock(args={}) @@ -562,8 +510,8 @@ class MSC3861OAuthDelegation(HomeserverTestCase): "sub": SUBJECT, "scope": " ".join( [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC", + self.api_scope, + f"{self.device_scope_prefix}AABBCC", ] ), "username": USERNAME, @@ -611,7 +559,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]), + "scope": " ".join([self.api_scope, self.device_scope]), "username": USERNAME, } ) @@ -676,7 +624,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase): return json.dumps( { "active": True, - "scope": MATRIX_USER_SCOPE, + "scope": self.api_scope, "sub": SUBJECT, "username": USERNAME, }, @@ -842,8 +790,24 @@ class FakeMasServer(HTTPServer): T = TypeVar("T") +@parameterized_class( + ("device_scope_prefix", "api_scope"), + [ + ("urn:matrix:client:device:", "urn:matrix:client:api:*"), + ( + "urn:matrix:org.matrix.msc2967.client:device:", + "urn:matrix:org.matrix.msc2967.client:api:*", + ), + ], +) class MasAuthDelegation(HomeserverTestCase): server: FakeMasServer + device_scope_prefix: ClassVar[str] + api_scope: ClassVar[str] + + @property + def device_scope(self) -> str: + return self.device_scope_prefix + DEVICE def till_deferred_has_result( self, @@ -914,12 +878,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join( - [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}", - ] - ), + "scope": " ".join([self.api_scope, self.device_scope]), "username": USERNAME, "expires_in": 60, } @@ -943,12 +902,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join( - [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}", - ] - ), + "scope": " ".join([self.api_scope, self.device_scope]), "username": USERNAME, } @@ -971,12 +925,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join( - [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}ABCDEF", - ] - ), + "scope": " ".join([self.api_scope, f"{self.device_scope_prefix}ABCDEF"]), "username": USERNAME, "expires_in": 60, } @@ -993,7 +942,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join([MATRIX_USER_SCOPE]), + "scope": " ".join([self.api_scope]), "username": "inexistent_user", "expires_in": 60, } @@ -1039,7 +988,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": MATRIX_USER_SCOPE, + "scope": self.api_scope, "username": USERNAME, "expires_in": 60, "device_id": DEVICE, @@ -1057,7 +1006,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]), + "scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]), "username": USERNAME, "expires_in": 60, } @@ -1079,12 +1028,7 @@ class MasAuthDelegation(HomeserverTestCase): self.server.introspection_response = { "active": True, "sub": SUBJECT, - "scope": " ".join( - [ - MATRIX_USER_SCOPE, - f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}", - ] - ), + "scope": " ".join([self.api_scope, self.device_scope]), "username": USERNAME, "expires_in": 60, } diff --git a/tests/rest/client/test_auth_metadata.py b/tests/rest/client/test_auth_metadata.py index a935533b09..c13d410636 100644 --- a/tests/rest/client/test_auth_metadata.py +++ b/tests/rest/client/test_auth_metadata.py @@ -18,8 +18,11 @@ # [This file includes modifications made by New Vector Limited] # from http import HTTPStatus +from typing import ClassVar from unittest.mock import AsyncMock +from parameterized import parameterized_class + from synapse.rest.client import auth_metadata from tests.unittest import HomeserverTestCase, override_config, skip_unless @@ -85,17 +88,22 @@ class AuthIssuerTestCase(HomeserverTestCase): req_mock.assert_not_called() +@parameterized_class( + ("endpoint",), + [ + ("/_matrix/client/unstable/org.matrix.msc2965/auth_metadata",), + ("/_matrix/client/v1/auth_metadata",), + ], +) class AuthMetadataTestCase(HomeserverTestCase): + endpoint: ClassVar[str] servlets = [ auth_metadata.register_servlets, ] def test_returns_404_when_msc3861_disabled(self) -> None: # Make an unauthenticated request for the discovery info. - channel = self.make_request( - "GET", - "/_matrix/client/unstable/org.matrix.msc2965/auth_metadata", - ) + channel = self.make_request("GET", self.endpoint) self.assertEqual(channel.code, HTTPStatus.NOT_FOUND) @skip_unless(HAS_AUTHLIB, "requires authlib") @@ -124,10 +132,7 @@ class AuthMetadataTestCase(HomeserverTestCase): ) self.hs.get_proxied_http_client().get_json = req_mock # type: ignore[method-assign] - channel = self.make_request( - "GET", - "/_matrix/client/unstable/org.matrix.msc2965/auth_metadata", - ) + channel = self.make_request("GET", self.endpoint) self.assertEqual(channel.code, HTTPStatus.OK) self.assertEqual( From 09a489e1982a868baee501c199479bee3c6cd812 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 2 Sep 2025 14:16:55 +0200 Subject: [PATCH 2/4] 1.138.0rc1 --- CHANGES.md | 41 +++++++++++++++++++++++++++++++ changelog.d/18549.feature | 1 - changelog.d/18804.misc | 1 - changelog.d/18849.misc | 1 - changelog.d/18851.bugfix | 1 - changelog.d/18853.doc.md | 1 - changelog.d/18854.misc | 1 - changelog.d/18855.misc | 1 - changelog.d/18857.misc | 1 - changelog.d/18858.bugfix | 1 - debian/changelog | 6 +++++ pyproject.toml | 2 +- schema/synapse-config.schema.yaml | 2 +- 13 files changed, 49 insertions(+), 11 deletions(-) delete mode 100644 changelog.d/18549.feature delete mode 100644 changelog.d/18804.misc delete mode 100644 changelog.d/18849.misc delete mode 100644 changelog.d/18851.bugfix delete mode 100644 changelog.d/18853.doc.md delete mode 100644 changelog.d/18854.misc delete mode 100644 changelog.d/18855.misc delete mode 100644 changelog.d/18857.misc delete mode 100644 changelog.d/18858.bugfix diff --git a/CHANGES.md b/CHANGES.md index 4601027f7a..76f4645905 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,44 @@ +# Synapse 1.138.0rc1 (2025-09-02) + +### Features + +- Support for the stable endpoint and scopes of [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) & co. ([\#18549](https://github.com/element-hq/synapse/issues/18549)) + +### Bugfixes + +- Improve database performance of [MSC4293](https://github.com/matrix-org/matrix-spec-proposals/pull/4293) - Redact on Kick/Ban. ([\#18851](https://github.com/element-hq/synapse/issues/18851)) +- Do not throw an error when fetching a rejected delayed state event on startup. ([\#18858](https://github.com/element-hq/synapse/issues/18858)) + +### Improved Documentation + +- Fix worker documentation incorrectly indicating all room Admin API requests were capable of being handled by workers. ([\#18853](https://github.com/element-hq/synapse/issues/18853)) + +### Internal Changes + +- Instrument `_ByteProducer` with tracing to measure potential dead time while writing bytes to the request. ([\#18804](https://github.com/element-hq/synapse/issues/18804)) +- Switch to OpenTracing's `ContextVarsScopeManager` instead of our own custom `LogContextScopeManager`. ([\#18849](https://github.com/element-hq/synapse/issues/18849)) +- Trace how much work is being done while "recursively fetching redactions". ([\#18854](https://github.com/element-hq/synapse/issues/18854)) +- Link [upstream Twisted bug](https://github.com/twisted/twisted/issues/12498) tracking the problem that explains why we have to use a `Producer` to write bytes to the request. ([\#18855](https://github.com/element-hq/synapse/issues/18855)) +- Introduce `EventPersistencePair` type. ([\#18857](https://github.com/element-hq/synapse/issues/18857)) + + + +### Updates to locked dependencies + +* Bump actions/add-to-project from c0c5949b017d0d4a39f7ba888255881bdac2a823 to 4515659e2b458b27365e167605ac44f219494b66. ([\#18863](https://github.com/element-hq/synapse/issues/18863)) +* Bump actions/checkout from 4.3.0 to 5.0.0. ([\#18834](https://github.com/element-hq/synapse/issues/18834)) +* Bump anyhow from 1.0.98 to 1.0.99. ([\#18841](https://github.com/element-hq/synapse/issues/18841)) +* Bump docker/login-action from 3.4.0 to 3.5.0. ([\#18835](https://github.com/element-hq/synapse/issues/18835)) +* Bump dtolnay/rust-toolchain from b3b07ba8b418998c39fb20f53e8b695cdcc8de1b to e97e2d8cc328f1b50210efc529dca0028893a2d9. ([\#18862](https://github.com/element-hq/synapse/issues/18862)) +* Bump phonenumbers from 9.0.11 to 9.0.12. ([\#18837](https://github.com/element-hq/synapse/issues/18837)) +* Bump regex from 1.11.1 to 1.11.2. ([\#18864](https://github.com/element-hq/synapse/issues/18864)) +* Bump reqwest from 0.12.22 to 0.12.23. ([\#18842](https://github.com/element-hq/synapse/issues/18842)) +* Bump ruff from 0.12.7 to 0.12.10. ([\#18865](https://github.com/element-hq/synapse/issues/18865)) +* Bump serde_json from 1.0.142 to 1.0.143. ([\#18866](https://github.com/element-hq/synapse/issues/18866)) +* Bump types-bleach from 6.2.0.20250514 to 6.2.0.20250809. ([\#18838](https://github.com/element-hq/synapse/issues/18838)) +* Bump types-jsonschema from 4.25.0.20250720 to 4.25.1.20250822. ([\#18867](https://github.com/element-hq/synapse/issues/18867)) +* Bump types-psycopg2 from 2.9.21.20250718 to 2.9.21.20250809. ([\#18836](https://github.com/element-hq/synapse/issues/18836)) + # Synapse 1.137.0 (2025-08-26) No significant changes since 1.137.0rc1. diff --git a/changelog.d/18549.feature b/changelog.d/18549.feature deleted file mode 100644 index 4d78ae57ab..0000000000 --- a/changelog.d/18549.feature +++ /dev/null @@ -1 +0,0 @@ -Support for the stable endpoint and scopes of [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) & co. diff --git a/changelog.d/18804.misc b/changelog.d/18804.misc deleted file mode 100644 index 3814dd668d..0000000000 --- a/changelog.d/18804.misc +++ /dev/null @@ -1 +0,0 @@ -Instrument `_ByteProducer` with tracing to measure potential dead time while writing bytes to the request. diff --git a/changelog.d/18849.misc b/changelog.d/18849.misc deleted file mode 100644 index 2fa37692ef..0000000000 --- a/changelog.d/18849.misc +++ /dev/null @@ -1 +0,0 @@ -Switch to OpenTracing's `ContextVarsScopeManager` instead of our own custom `LogContextScopeManager`. diff --git a/changelog.d/18851.bugfix b/changelog.d/18851.bugfix deleted file mode 100644 index 4c27154569..0000000000 --- a/changelog.d/18851.bugfix +++ /dev/null @@ -1 +0,0 @@ -Improve database performance of [MSC4293](https://github.com/matrix-org/matrix-spec-proposals/pull/4293) - Redact on Kick/Ban. \ No newline at end of file diff --git a/changelog.d/18853.doc.md b/changelog.d/18853.doc.md deleted file mode 100644 index 4b0d1afed8..0000000000 --- a/changelog.d/18853.doc.md +++ /dev/null @@ -1 +0,0 @@ -Fix worker documentation incorrectly indicating all room Admin API requests were capable of being handled by workers. diff --git a/changelog.d/18854.misc b/changelog.d/18854.misc deleted file mode 100644 index 9c1211a482..0000000000 --- a/changelog.d/18854.misc +++ /dev/null @@ -1 +0,0 @@ -Trace how much work is being done while "recursively fetching redactions". diff --git a/changelog.d/18855.misc b/changelog.d/18855.misc deleted file mode 100644 index 6f87c4ef28..0000000000 --- a/changelog.d/18855.misc +++ /dev/null @@ -1 +0,0 @@ -Link [upstream Twisted bug](https://github.com/twisted/twisted/issues/12498) tracking the problem that explains why we have to use a `Producer` to write bytes to the request. diff --git a/changelog.d/18857.misc b/changelog.d/18857.misc deleted file mode 100644 index e679be8b95..0000000000 --- a/changelog.d/18857.misc +++ /dev/null @@ -1 +0,0 @@ -Introduce `EventPersistencePair` type. diff --git a/changelog.d/18858.bugfix b/changelog.d/18858.bugfix deleted file mode 100644 index e71e224770..0000000000 --- a/changelog.d/18858.bugfix +++ /dev/null @@ -1 +0,0 @@ -Do not throw an error when fetching a rejected delayed state event on startup. diff --git a/debian/changelog b/debian/changelog index a77ae7d3c8..c9b86aea2a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.138.0~rc1) stable; urgency=medium + + * New synapse release 1.138.0rc1. + + -- Synapse Packaging team Tue, 02 Sep 2025 12:16:14 +0000 + matrix-synapse-py3 (1.137.0) stable; urgency=medium * New Synapse release 1.137.0. diff --git a/pyproject.toml b/pyproject.toml index 0e5c409d84..54fdd5e707 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.137.0" +version = "1.138.0rc1" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "AGPL-3.0-or-later" diff --git a/schema/synapse-config.schema.yaml b/schema/synapse-config.schema.yaml index e6d1088536..83e16de397 100644 --- a/schema/synapse-config.schema.yaml +++ b/schema/synapse-config.schema.yaml @@ -1,5 +1,5 @@ $schema: https://element-hq.github.io/synapse/latest/schema/v1/meta.schema.json -$id: https://element-hq.github.io/synapse/schema/synapse/v1.137/synapse-config.schema.json +$id: https://element-hq.github.io/synapse/schema/synapse/v1.138/synapse-config.schema.json type: object properties: modules: From bff4a11b3ff2ea8784dd309fe0a9ef8925ef4202 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 12:14:27 -0500 Subject: [PATCH 3/4] Re-introduce: Fix `LaterGauge` metrics to collect from all servers (#18791) Re-introduce: https://github.com/element-hq/synapse/pull/18751 that was reverted in https://github.com/element-hq/synapse/pull/18789 (explains why the PR was reverted in the first place). - Adds a `cleanup` pattern that cleans up metrics from each homeserver in the tests. Previously, the list of hooks built up until our CI machines couldn't operate properly, see https://github.com/element-hq/synapse/pull/18789 - Fix long-standing issue with `synapse_background_update_status` metrics only tracking the last database listed in the config (see https://github.com/element-hq/synapse/pull/18791#discussion_r2261706749) --- changelog.d/18791.misc | 1 + synapse/_scripts/generate_workers_map.py | 6 +- synapse/_scripts/synapse_port_db.py | 15 ++- synapse/federation/send_queue.py | 43 ++++--- synapse/federation/sender/__init__.py | 43 ++++--- synapse/handlers/presence.py | 28 ++-- synapse/http/request_metrics.py | 6 +- synapse/metrics/__init__.py | 128 ++++++++++++++----- synapse/notifier.py | 42 +++--- synapse/replication/tcp/handler.py | 28 ++-- synapse/replication/tcp/protocol.py | 20 ++- synapse/server.py | 36 +++++- synapse/storage/database.py | 8 +- synapse/storage/databases/__init__.py | 17 +++ synapse/storage/databases/main/roommember.py | 15 ++- synapse/util/ratelimitutils.py | 14 +- synapse/util/task_scheduler.py | 15 ++- tests/metrics/test_metrics.py | 100 ++++++++++++++- tests/replication/_base.py | 3 +- tests/server.py | 3 + 20 files changed, 435 insertions(+), 136 deletions(-) create mode 100644 changelog.d/18791.misc diff --git a/changelog.d/18791.misc b/changelog.d/18791.misc new file mode 100644 index 0000000000..6ecd498286 --- /dev/null +++ b/changelog.d/18791.misc @@ -0,0 +1 @@ +Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/_scripts/generate_workers_map.py b/synapse/_scripts/generate_workers_map.py index 09feb8cf30..8878e364e2 100755 --- a/synapse/_scripts/generate_workers_map.py +++ b/synapse/_scripts/generate_workers_map.py @@ -153,9 +153,13 @@ def get_registered_paths_for_default( """ hs = MockHomeserver(base_config, worker_app) + # TODO We only do this to avoid an error, but don't need the database etc hs.setup() - return get_registered_paths_for_hs(hs) + registered_paths = get_registered_paths_for_hs(hs) + hs.cleanup() + + return registered_paths def elide_http_methods_if_unconflicting( diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 0f54cfc64a..a81db3cfbf 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor from synapse.util import SYNAPSE_VERSION, Clock +from synapse.util.stringutils import random_string # Cast safety: Twisted does some naughty magic which replaces the # twisted.internet.reactor module with a Reactor instance at runtime. @@ -323,6 +324,7 @@ class MockHomeserver: self.config = config self.hostname = config.server.server_name self.version_string = SYNAPSE_VERSION + self.instance_id = random_string(5) def get_clock(self) -> Clock: return self.clock @@ -330,6 +332,9 @@ class MockHomeserver: def get_reactor(self) -> ISynapseReactor: return reactor + def get_instance_id(self) -> str: + return self.instance_id + def get_instance_name(self) -> str: return "master" @@ -685,7 +690,15 @@ class Porter: ) prepare_database(db_conn, engine, config=self.hs_config) # Type safety: ignore that we're using Mock homeservers here. - store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type] + store = Store( + DatabasePool( + hs, # type: ignore[arg-type] + db_config, + engine, + ), + db_conn, + hs, # type: ignore[arg-type] + ) db_conn.commit() return store diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7f511d570c..2fdee9ac54 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,6 +37,7 @@ Events are replicated via a separate events stream. """ import logging +from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -67,6 +68,25 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class QueueNames(str, Enum): + PRESENCE_MAP = "presence_map" + KEYED_EDU = "keyed_edu" + KEYED_EDU_CHANGED = "keyed_edu_changed" + EDUS = "edus" + POS_TIME = "pos_time" + PRESENCE_DESTINATIONS = "presence_destinations" + + +queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} + +for queue_name in QueueNames: + queue_name_to_gauge_map[queue_name] = LaterGauge( + name=f"synapse_federation_send_queue_{queue_name.value}_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(name: str, queue: Sized) -> None: - LaterGauge( - name="synapse_federation_send_queue_%s_size" % (queue_name,), - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(queue)}, + def register(queue_name: QueueNames, queue: Sized) -> None: + queue_name_to_gauge_map[queue_name].register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(queue)}, ) - for queue_name in [ - "presence_map", - "keyed_edu", - "keyed_edu_changed", - "edus", - "pos_time", - "presence_destinations", - ]: - register(queue_name, getattr(self, queue_name)) + for queue_name in QueueNames: + queue = getattr(self, queue_name.value) + assert isinstance(queue, Sized) + register(queue_name, queue=queue) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8befbe3722..278a957331 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter( labelnames=[SERVER_NAME_LABEL], ) +transaction_queue_pending_destinations_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_pdus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_edus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_destinations_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() @@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender): ) }, ) - - LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_pdus_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) }, ) - LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_edus_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498..d7de20f884 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,6 +173,18 @@ state_transition_counter = Counter( labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) +presence_user_to_current_state_size_gauge = LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +presence_wheel_timer_size_gauge = LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler): EduTypes.PRESENCE, self.incoming_presence ) - LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, + presence_user_to_current_state_size_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler): 60 * 1000, ) - LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.wheel_timer)}, + presence_wheel_timer_size_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index a9b049f904..83f52edb7c 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -LaterGauge( +in_flight_requests = LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], - caller=_get_in_flight_counts, +) +in_flight_requests.register_hook( + homeserver_instance_id=None, hook=_get_in_flight_counts ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 11e2551a16..5b291aa893 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -73,8 +73,6 @@ logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" -all_gauges: Dict[str, Collector] = {} - HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -163,42 +161,110 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # callback: should either return a value (if there are no labels for this metric), - # or dict mapping from a label tuple to a value - caller: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] + _instance_id_to_hook_map: Dict[ + Optional[str], # instance_id + Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ] = attr.ib(factory=dict, hash=False) + """ + Map from homeserver instance_id to a callback. Each callback should either return a + value (if there are no labels for this metric), or dict mapping from a label tuple + to a value. + + We use `instance_id` instead of `server_name` because it's possible to have multiple + workers running in the same process with the same `server_name`. + """ def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - try: - calls = self.caller() - except Exception: - logger.exception("Exception running callback for LaterGauge(%s)", self.name) - yield g - return + for homeserver_instance_id, hook in self._instance_id_to_hook_map.items(): + try: + hook_result = hook() + except Exception: + logger.exception( + "Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s", + self.name, + homeserver_instance_id, + ) + # Continue to return the rest of the metrics that aren't broken + continue - if isinstance(calls, (int, float)): - g.add_metric([], calls) - else: - for k, v in calls.items(): - g.add_metric(k, v) + if isinstance(hook_result, (int, float)): + g.add_metric([], hook_result) + else: + for k, v in hook_result.items(): + g.add_metric(k, v) yield g + def register_hook( + self, + *, + homeserver_instance_id: Optional[str], + hook: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ) -> None: + """ + Register a callback/hook that will be called to generate a metric samples for + the gauge. + + Args: + homeserver_instance_id: The unique ID for this Synapse process instance + (`hs.get_instance_id()`) that this hook is associated with. This can be used + later to lookup all hooks associated with a given server name in order to + unregister them. This should only be omitted for global hooks that work + across all homeservers. + hook: A callback that should either return a value (if there are no + labels for this metric), or dict mapping from a label tuple to a value + """ + # We shouldn't have multiple hooks registered for the same homeserver `instance_id`. + existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id) + assert existing_hook is None, ( + f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. " + "This is likely a Synapse bug and you forgot to unregister the previous hooks for " + "the server (especially in tests)." + ) + + self._instance_id_to_hook_map[homeserver_instance_id] = hook + + def unregister_hooks_for_homeserver_instance_id( + self, homeserver_instance_id: str + ) -> None: + """ + Unregister all hooks associated with the given homeserver `instance_id`. This should be + called when a homeserver is shutdown to avoid extra hooks sitting around. + + Args: + homeserver_instance_id: The unique ID for this Synapse process instance to + unregister hooks for (`hs.get_instance_id()`). + """ + self._instance_id_to_hook_map.pop(homeserver_instance_id, None) + def __attrs_post_init__(self) -> None: - self._register() - - def _register(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - REGISTRY.register(self) - all_gauges[self.name] = self + + # We shouldn't have multiple metrics with the same name. Typically, metrics + # should be created globally so you shouldn't be running into this and this will + # catch any stupid mistakes. The `REGISTRY.register(self)` call above will also + # raise an error if the metric already exists but to make things explicit, we'll + # also check here. + existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name) + assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. " + + # Keep track of the gauge so we can clean it up later. + all_later_gauges_to_clean_up_on_shutdown[self.name] = self + + +all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {} +""" +Track all `LaterGauge` instances so we can remove any associated hooks during homeserver +shutdown. +""" # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector): # Protects access to _registrations self._lock = threading.Lock() - self._register_with_collector() + REGISTRY.register(self) def register( self, @@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector): gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge - def _register_with_collector(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self - class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index 448a715e2a..7782c9ca65 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter( labelnames=["stream", SERVER_NAME_LABEL], ) + +notifier_listeners_gauge = LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +notifier_rooms_gauge = LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], +) +notifier_users_gauge = LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + T = TypeVar("T") @@ -281,28 +299,20 @@ class Notifier: ) } - LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=count_listeners, + notifier_listeners_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), hook=count_listeners ) - - LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + notifier_rooms_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) }, ) - LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, + notifier_users_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0f14c7e380..dd7e38dd78 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,6 +106,18 @@ user_ip_cache_counter = Counter( "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) +tcp_resource_total_connections_gauge = LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +tcp_command_queue_gauge = LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], +) + # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -243,11 +255,9 @@ class ReplicationCommandHandler: # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._connections)}, + tcp_resource_total_connections_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -266,11 +276,9 @@ class ReplicationCommandHandler: # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], - caller=lambda: { + tcp_command_queue_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() }, diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 969f0303e0..2ec25bf43d 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,7 +527,10 @@ pending_commands = LaterGauge( name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +pending_commands.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections }, ) @@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +transport_send_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections }, ) @@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_send_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections }, @@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_read_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections }, diff --git a/synapse/server.py b/synapse/server.py index bf82f79bec..3eac271c90 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -129,7 +129,10 @@ from synapse.http.client import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.media.media_repository import MediaRepository -from synapse.metrics import register_threadpool +from synapse.metrics import ( + all_later_gauges_to_clean_up_on_shutdown, + register_threadpool, +) from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.module_api import ModuleApi from synapse.module_api.callbacks import ModuleApiCallbacks @@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta): if self.config.worker.run_background_tasks: self.setup_background_tasks() + def __del__(self) -> None: + """ + Called when an the homeserver is garbage collected. + + Make sure we actually do some clean-up, rather than leak data. + """ + self.cleanup() + + def cleanup(self) -> None: + """ + WIP: Clean-up any references to the homeserver and stop any running related + processes, timers, loops, replication stream, etc. + + This should be called wherever you care about the HomeServer being completely + garbage collected like in tests. It's not necessary to call if you plan to just + shut down the whole Python process anyway. + + Can be called multiple times. + """ + logger.info("Received cleanup request for %s.", self.hostname) + + # TODO: Stop background processes, timers, loops, replication stream, etc. + + # Cleanup metrics associated with the homeserver + for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): + later_gauge.unregister_hooks_for_homeserver_instance_id( + self.get_instance_id() + ) + + logger.info("Cleanup complete for %s.", self.hostname) + def start_listening(self) -> None: # noqa: B027 (no-op by design) """Start the HTTP, manhole, metrics, etc listeners diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c96..cfec36e0fa 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -61,7 +61,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) -from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool +from synapse.metrics import SERVER_NAME_LABEL, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine @@ -611,12 +611,6 @@ class DatabasePool: ) self.updates = BackgroundUpdater(hs, self) - LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self.updates.get_status()}, - ) self._previous_txn_total_time = 0.0 self._current_txn_total_time = 0.0 diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 6442ab6c7a..a4aba96686 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -22,6 +22,7 @@ import logging from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.events import PersistEventsStore @@ -40,6 +41,13 @@ logger = logging.getLogger(__name__) DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True) +background_update_status = LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=["database_name", SERVER_NAME_LABEL], +) + + class Databases(Generic[DataStoreT]): """The various databases. @@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]): db_conn.close() + # Track the background update status for each database + background_update_status.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { + (database.name(), server_name): database.updates.get_status() + for database in self.databases + }, + ) + # Sanity check that we have actually configured all the required stores. if not main: raise Exception("No 'main' database configured") diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 67e7e99baa..9db2e14a06 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 +federation_known_servers_gauge = LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): 1, self._count_known_servers, ) - LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self._known_servers_count}, + federation_known_servers_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f5e592d80e..88edc07161 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -LaterGauge( +sleep_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +sleep_affected_hosts_gauge.register_hook( + homeserver_instance_id=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) ), ) -LaterGauge( +reject_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +reject_affected_hosts_gauge.register_hook( + homeserver_instance_id=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index fdcacdf128..0539989320 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,6 +44,13 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +running_tasks_gauge = LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], +) + + class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -130,11 +137,9 @@ class TaskScheduler: TaskScheduler.SCHEDULE_INTERVAL_MS, ) - LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._running_tasks)}, + running_tasks_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 61874564a6..832e991730 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -18,11 +18,18 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Dict, Protocol, Tuple +from typing import Dict, NoReturn, Protocol, Tuple from prometheus_client.core import Sample -from synapse.metrics import REGISTRY, InFlightGauge, generate_latest +from synapse.metrics import ( + REGISTRY, + SERVER_NAME_LABEL, + InFlightGauge, + LaterGauge, + all_later_gauges_to_clean_up_on_shutdown, + generate_latest, +) from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase): self.assertEqual(hs2_cache_max_size_metric_value, "777.0") +class LaterGaugeTests(unittest.HomeserverTestCase): + def setUp(self) -> None: + super().setUp() + self.later_gauge = LaterGauge( + name="foo", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + def tearDown(self) -> None: + super().tearDown() + + REGISTRY.unregister(self.later_gauge) + all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None) + + def test_later_gauge_multiple_servers(self) -> None: + """ + Test that LaterGauge metrics are reported correctly across multiple servers. We + will have an metrics entry for each homeserver that is labeled with the + `server_name` label. + """ + self.later_gauge.register_hook( + homeserver_instance_id="123", hook=lambda: {("hs1",): 1} + ) + self.later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) + + metrics_map = get_latest_metrics() + + # Find the metrics from both homeservers + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNotNone( + hs1_metric_value, + f"Missing metric {hs1_metric} in metrics {metrics_map}", + ) + self.assertEqual(hs1_metric_value, "1.0") + + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in metrics {metrics_map}", + ) + self.assertEqual(hs2_metric_value, "2.0") + + def test_later_gauge_hook_exception(self) -> None: + """ + Test that LaterGauge metrics are collected across multiple servers even if one + hooks is throwing an exception. + """ + + def raise_exception() -> NoReturn: + raise Exception("fake error generating data") + + # Make the hook for hs1 throw an exception + self.later_gauge.register_hook( + homeserver_instance_id="123", hook=raise_exception + ) + # Metrics from hs2 still work fine + self.later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) + + metrics_map = get_latest_metrics() + + # Since we encountered an exception while trying to collect metrics from hs1, we + # don't expect to see it here. + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNone( + hs1_metric_value, + ( + "Since we encountered an exception while trying to collect metrics from hs1" + f"we don't expect to see it the metrics_map {metrics_map}" + ), + ) + + # We should still see metrics from hs2 though + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + ) + self.assertEqual(hs2_metric_value, "2.0") + + def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map. diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 453eb7750b..e756021937 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource from synapse.replication.tcp.client import ReplicationDataHandler -from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ( ClientReplicationStreamProtocol, ServerReplicationStreamProtocol, @@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): self.test_handler = self._build_replication_data_handler() self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined] - repl_handler = ReplicationCommandHandler(self.worker_hs) + repl_handler = self.worker_hs.get_replication_command_handler() self.client = ClientReplicationStreamProtocol( self.worker_hs, "client", diff --git a/tests/server.py b/tests/server.py index 3a81a4c6d9..ebff8b04b3 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1145,6 +1145,9 @@ def setup_test_homeserver( reactor=reactor, ) + # Register the cleanup hook + cleanup_func(hs.cleanup) + # Install @cache_in_self attributes for key, val in kwargs.items(): setattr(hs, "_" + key, val) From b2997a8f20d1999ec9f73c3d4a5fb210d4294176 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 13:34:47 -0500 Subject: [PATCH 4/4] Suppress "Applying schema" log noise bulk when running Complement tests (#18878) If Synapse is under test (`SYNAPSE_LOG_TESTING` is set), we don't care about seeing the "Applying schema" log lines at the INFO level every time we run the tests (it's 100 lines of bulk for each homeserver). ``` synapse_main | 2025-08-29 22:34:03,453 - synapse.storage.prepare_database - 433 - INFO - main - Applying schema deltas for v73 synapse_main | 2025-08-29 22:34:03,454 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/01event_failed_pull_attempts.sql synapse_main | 2025-08-29 22:34:03,463 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02add_pusher_enabled.sql synapse_main | 2025-08-29 22:34:03,473 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02room_id_indexes_for_purging.sql synapse_main | 2025-08-29 22:34:03,482 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03pusher_device_id.sql synapse_main | 2025-08-29 22:34:03,492 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03users_approved_column.sql synapse_main | 2025-08-29 22:34:03,502 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04partial_join_details.sql synapse_main | 2025-08-29 22:34:03,513 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04pending_device_list_updates.sql ... ``` The Synapse logs are visible when a Complement test fails or you use `COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1`. This is spawning from a Complement test with three homeservers and wanting less log noise to scroll through. --- changelog.d/18878.docker | 1 + docker/conf/log.config | 7 +++++++ 2 files changed, 8 insertions(+) create mode 100644 changelog.d/18878.docker diff --git a/changelog.d/18878.docker b/changelog.d/18878.docker new file mode 100644 index 0000000000..cf74f67cc8 --- /dev/null +++ b/changelog.d/18878.docker @@ -0,0 +1 @@ +Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set. diff --git a/docker/conf/log.config b/docker/conf/log.config index 5772321202..6fe7db66da 100644 --- a/docker/conf/log.config +++ b/docker/conf/log.config @@ -77,6 +77,13 @@ loggers: #} synapse.visibility.filtered_event_debug: level: DEBUG + + {# + If Synapse is under test, we don't care about seeing the "Applying schema" log + lines at the INFO level every time we run the tests (it's 100 lines of bulk) + #} + synapse.storage.prepare_database: + level: WARN {% endif %} root: