1
0

Merge branch 'develop' into madlittlemods/log-context-using-contextvars3

This commit is contained in:
Eric Eastwood
2025-09-02 13:39:44 -05:00
39 changed files with 604 additions and 299 deletions
+41
View File
@@ -1,3 +1,44 @@
# Synapse 1.138.0rc1 (2025-09-02)
### Features
- Support for the stable endpoint and scopes of [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) & co. ([\#18549](https://github.com/element-hq/synapse/issues/18549))
### Bugfixes
- Improve database performance of [MSC4293](https://github.com/matrix-org/matrix-spec-proposals/pull/4293) - Redact on Kick/Ban. ([\#18851](https://github.com/element-hq/synapse/issues/18851))
- Do not throw an error when fetching a rejected delayed state event on startup. ([\#18858](https://github.com/element-hq/synapse/issues/18858))
### Improved Documentation
- Fix worker documentation incorrectly indicating all room Admin API requests were capable of being handled by workers. ([\#18853](https://github.com/element-hq/synapse/issues/18853))
### Internal Changes
- Instrument `_ByteProducer` with tracing to measure potential dead time while writing bytes to the request. ([\#18804](https://github.com/element-hq/synapse/issues/18804))
- Switch to OpenTracing's `ContextVarsScopeManager` instead of our own custom `LogContextScopeManager`. ([\#18849](https://github.com/element-hq/synapse/issues/18849))
- Trace how much work is being done while "recursively fetching redactions". ([\#18854](https://github.com/element-hq/synapse/issues/18854))
- Link [upstream Twisted bug](https://github.com/twisted/twisted/issues/12498) tracking the problem that explains why we have to use a `Producer` to write bytes to the request. ([\#18855](https://github.com/element-hq/synapse/issues/18855))
- Introduce `EventPersistencePair` type. ([\#18857](https://github.com/element-hq/synapse/issues/18857))
### Updates to locked dependencies
* Bump actions/add-to-project from c0c5949b017d0d4a39f7ba888255881bdac2a823 to 4515659e2b458b27365e167605ac44f219494b66. ([\#18863](https://github.com/element-hq/synapse/issues/18863))
* Bump actions/checkout from 4.3.0 to 5.0.0. ([\#18834](https://github.com/element-hq/synapse/issues/18834))
* Bump anyhow from 1.0.98 to 1.0.99. ([\#18841](https://github.com/element-hq/synapse/issues/18841))
* Bump docker/login-action from 3.4.0 to 3.5.0. ([\#18835](https://github.com/element-hq/synapse/issues/18835))
* Bump dtolnay/rust-toolchain from b3b07ba8b418998c39fb20f53e8b695cdcc8de1b to e97e2d8cc328f1b50210efc529dca0028893a2d9. ([\#18862](https://github.com/element-hq/synapse/issues/18862))
* Bump phonenumbers from 9.0.11 to 9.0.12. ([\#18837](https://github.com/element-hq/synapse/issues/18837))
* Bump regex from 1.11.1 to 1.11.2. ([\#18864](https://github.com/element-hq/synapse/issues/18864))
* Bump reqwest from 0.12.22 to 0.12.23. ([\#18842](https://github.com/element-hq/synapse/issues/18842))
* Bump ruff from 0.12.7 to 0.12.10. ([\#18865](https://github.com/element-hq/synapse/issues/18865))
* Bump serde_json from 1.0.142 to 1.0.143. ([\#18866](https://github.com/element-hq/synapse/issues/18866))
* Bump types-bleach from 6.2.0.20250514 to 6.2.0.20250809. ([\#18838](https://github.com/element-hq/synapse/issues/18838))
* Bump types-jsonschema from 4.25.0.20250720 to 4.25.1.20250822. ([\#18867](https://github.com/element-hq/synapse/issues/18867))
* Bump types-psycopg2 from 2.9.21.20250718 to 2.9.21.20250809. ([\#18836](https://github.com/element-hq/synapse/issues/18836))
# Synapse 1.137.0 (2025-08-26)
No significant changes since 1.137.0rc1.
+1
View File
@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.
-1
View File
@@ -1 +0,0 @@
Instrument `_ByteProducer` with tracing to measure potential dead time while writing bytes to the request.
-1
View File
@@ -1 +0,0 @@
Switch to OpenTracing's `ContextVarsScopeManager` instead of our own custom `LogContextScopeManager`.
-1
View File
@@ -1 +0,0 @@
Improve database performance of [MSC4293](https://github.com/matrix-org/matrix-spec-proposals/pull/4293) - Redact on Kick/Ban.
-1
View File
@@ -1 +0,0 @@
Fix worker documentation incorrectly indicating all room Admin API requests were capable of being handled by workers.
-1
View File
@@ -1 +0,0 @@
Trace how much work is being done while "recursively fetching redactions".
-1
View File
@@ -1 +0,0 @@
Link [upstream Twisted bug](https://github.com/twisted/twisted/issues/12498) tracking the problem that explains why we have to use a `Producer` to write bytes to the request.
-1
View File
@@ -1 +0,0 @@
Introduce `EventPersistencePair` type.
-1
View File
@@ -1 +0,0 @@
Do not throw an error when fetching a rejected delayed state event on startup.
+1
View File
@@ -0,0 +1 @@
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.
+6
View File
@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.138.0~rc1) stable; urgency=medium
* New synapse release 1.138.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 02 Sep 2025 12:16:14 +0000
matrix-synapse-py3 (1.137.0) stable; urgency=medium
* New Synapse release 1.137.0.
+7
View File
@@ -77,6 +77,13 @@ loggers:
#}
synapse.visibility.filtered_event_debug:
level: DEBUG
{#
If Synapse is under test, we don't care about seeing the "Applying schema" log
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
#}
synapse.storage.prepare_database:
level: WARN
{% endif %}
root:
+1 -1
View File
@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.137.0"
version = "1.138.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
+1 -1
View File
@@ -1,5 +1,5 @@
$schema: https://element-hq.github.io/synapse/latest/schema/v1/meta.schema.json
$id: https://element-hq.github.io/synapse/schema/synapse/v1.137/synapse-config.schema.json
$id: https://element-hq.github.io/synapse/schema/synapse/v1.138/synapse-config.schema.json
type: object
properties:
modules:
+5 -1
View File
@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
"""
hs = MockHomeserver(base_config, worker_app)
# TODO We only do this to avoid an error, but don't need the database etc
hs.setup()
return get_registered_paths_for_hs(hs)
registered_paths = get_registered_paths_for_hs(hs)
hs.cleanup()
return registered_paths
def elide_http_methods_if_unconflicting(
+14 -1
View File
@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
@@ -323,6 +324,7 @@ class MockHomeserver:
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock:
return self.clock
@@ -330,6 +332,9 @@ class MockHomeserver:
def get_reactor(self) -> ISynapseReactor:
return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str:
return "master"
@@ -685,7 +690,15 @@ class Porter:
)
prepare_database(db_conn, engine, config=self.hs_config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
)
db_conn.commit()
return store
+16 -10
View File
@@ -13,7 +13,7 @@
#
#
import logging
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Set
from urllib.parse import urlencode
from synapse._pydantic_compat import (
@@ -57,8 +57,10 @@ logger = logging.getLogger(__name__)
# Scope as defined by MSC2967
# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
UNSTABLE_SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
STABLE_SCOPE_MATRIX_API = "urn:matrix:client:api:*"
STABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:client:device:"
class ServerMetadata(BaseModel):
@@ -334,7 +336,10 @@ class MasDelegatedAuth(BaseAuth):
scope = introspection_result.get_scope_set()
# Determine type of user based on presence of particular scopes
if SCOPE_MATRIX_API not in scope:
if (
UNSTABLE_SCOPE_MATRIX_API not in scope
and STABLE_SCOPE_MATRIX_API not in scope
):
raise InvalidClientTokenError(
"Token doesn't grant access to the Matrix C-S API"
)
@@ -366,11 +371,12 @@ class MasDelegatedAuth(BaseAuth):
# We only allow a single device_id in the scope, so we find them all in the
# scope list, and raise if there are more than one. The OIDC server should be
# the one enforcing valid scopes, so we raise a 500 if we find an invalid scope.
device_ids = [
tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :]
for tok in scope
if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX)
]
device_ids: Set[str] = set()
for tok in scope:
if tok.startswith(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX):
device_ids.add(tok[len(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX) :])
elif tok.startswith(STABLE_SCOPE_MATRIX_DEVICE_PREFIX):
device_ids.add(tok[len(STABLE_SCOPE_MATRIX_DEVICE_PREFIX) :])
if len(device_ids) > 1:
raise AuthError(
@@ -378,7 +384,7 @@ class MasDelegatedAuth(BaseAuth):
"Multiple device IDs in scope",
)
device_id = device_ids[0] if device_ids else None
device_id = next(iter(device_ids), None)
if device_id is not None:
# Sanity check the device_id
+16 -18
View File
@@ -20,7 +20,7 @@
#
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set
from urllib.parse import urlencode
from authlib.oauth2 import ClientAuth
@@ -34,7 +34,6 @@ from synapse.api.errors import (
AuthError,
HttpResponseException,
InvalidClientTokenError,
OAuthInsufficientScopeError,
SynapseError,
UnrecognizedRequestError,
)
@@ -63,9 +62,10 @@ logger = logging.getLogger(__name__)
# Scope as defined by MSC2967
# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
SCOPE_MATRIX_GUEST = "urn:matrix:org.matrix.msc2967.client:api:guest"
SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
UNSTABLE_SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
STABLE_SCOPE_MATRIX_API = "urn:matrix:client:api:*"
STABLE_SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:client:device:"
# Scope which allows access to the Synapse admin API
SCOPE_SYNAPSE_ADMIN = "urn:synapse:admin:*"
@@ -444,9 +444,6 @@ class MSC3861DelegatedAuth(BaseAuth):
if not self._is_access_token_the_admin_token(access_token):
await self._record_request(request, requester)
if not allow_guest and requester.is_guest:
raise OAuthInsufficientScopeError([SCOPE_MATRIX_API])
request.requester = requester
return requester
@@ -528,10 +525,11 @@ class MSC3861DelegatedAuth(BaseAuth):
scope: List[str] = introspection_result.get_scope_list()
# Determine type of user based on presence of particular scopes
has_user_scope = SCOPE_MATRIX_API in scope
has_guest_scope = SCOPE_MATRIX_GUEST in scope
has_user_scope = (
UNSTABLE_SCOPE_MATRIX_API in scope or STABLE_SCOPE_MATRIX_API in scope
)
if not has_user_scope and not has_guest_scope:
if not has_user_scope:
raise InvalidClientTokenError("No scope in token granting user rights")
# Match via the sub claim
@@ -579,11 +577,12 @@ class MSC3861DelegatedAuth(BaseAuth):
# We only allow a single device_id in the scope, so we find them all in the
# scope list, and raise if there are more than one. The OIDC server should be
# the one enforcing valid scopes, so we raise a 500 if we find an invalid scope.
device_ids = [
tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :]
for tok in scope
if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX)
]
device_ids: Set[str] = set()
for tok in scope:
if tok.startswith(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX):
device_ids.add(tok[len(UNSTABLE_SCOPE_MATRIX_DEVICE_PREFIX) :])
elif tok.startswith(STABLE_SCOPE_MATRIX_DEVICE_PREFIX):
device_ids.add(tok[len(STABLE_SCOPE_MATRIX_DEVICE_PREFIX) :])
if len(device_ids) > 1:
raise AuthError(
@@ -591,7 +590,7 @@ class MSC3861DelegatedAuth(BaseAuth):
"Multiple device IDs in scope",
)
device_id = device_ids[0] if device_ids else None
device_id = next(iter(device_ids), None)
if device_id is not None:
# Sanity check the device_id
@@ -617,5 +616,4 @@ class MSC3861DelegatedAuth(BaseAuth):
user_id=user_id,
device_id=device_id,
scope=scope,
is_guest=(has_guest_scope and not has_user_scope),
)
+28 -15
View File
@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(queue)},
)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
self.clock.looping_call(self._clear_queue, 30 * 1000)
+27 -16
View File
@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
+18 -10
View File
@@ -173,6 +173,18 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers
+4 -2
View File
@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(
homeserver_instance_id=None, hook=_get_in_flight_counts
)
+93 -35
View File
@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
_instance_id_to_hook_map: Dict[
Optional[str], # instance_id
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
] = attr.ib(factory=dict, hash=False)
"""
Map from homeserver instance_id to a callback. Each callback should either return a
value (if there are no labels for this metric), or dict mapping from a label tuple
to a value.
We use `instance_id` instead of `server_name` because it's possible to have multiple
workers running in the same process with the same `server_name`.
"""
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
self.name,
homeserver_instance_id,
)
# Continue to return the rest of the metrics that aren't broken
continue
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
yield g
def register_hook(
self,
*,
homeserver_instance_id: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance
(`hs.get_instance_id()`) that this hook is associated with. This can be used
later to lookup all hooks associated with a given server name in order to
unregister them. This should only be omitted for global hooks that work
across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)
self._instance_id_to_hook_map[homeserver_instance_id] = hook
def unregister_hooks_for_homeserver_instance_id(
self, homeserver_instance_id: str
) -> None:
"""
Unregister all hooks associated with the given homeserver `instance_id`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance to
unregister hooks for (`hs.get_instance_id()`).
"""
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
REGISTRY.register(self)
def register(
self,
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
+26 -16
View File
@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -281,28 +299,20 @@ class Notifier:
)
}
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
notifier_listeners_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
notifier_rooms_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
notifier_users_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:
+18 -10
View File
@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
tcp_resource_total_connections_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
tcp_command_queue_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
+16 -4
View File
@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
pending_commands.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
transport_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_read_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
+11 -5
View File
@@ -76,11 +76,17 @@ class AuthMetadataServlet(RestServlet):
Advertises the OAuth 2.0 server metadata for the homeserver.
"""
PATTERNS = client_patterns(
"/org.matrix.msc2965/auth_metadata$",
unstable=True,
releases=(),
)
PATTERNS = [
*client_patterns(
"/auth_metadata$",
releases=("v1",),
),
*client_patterns(
"/org.matrix.msc2965/auth_metadata$",
unstable=True,
releases=(),
),
]
def __init__(self, hs: "HomeServer"):
super().__init__()
+35 -1
View File
@@ -129,7 +129,10 @@ from synapse.http.client import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics import register_threadpool
from synapse.metrics import (
all_later_gauges_to_clean_up_on_shutdown,
register_threadpool,
)
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks
@@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta):
if self.config.worker.run_background_tasks:
self.setup_background_tasks()
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
Make sure we actually do some clean-up, rather than leak data.
"""
self.cleanup()
def cleanup(self) -> None:
"""
WIP: Clean-up any references to the homeserver and stop any running related
processes, timers, loops, replication stream, etc.
This should be called wherever you care about the HomeServer being completely
garbage collected like in tests. It's not necessary to call if you plan to just
shut down the whole Python process anyway.
Can be called multiple times.
"""
logger.info("Received cleanup request for %s.", self.hostname)
# TODO: Stop background processes, timers, loops, replication stream, etc.
# Cleanup metrics associated with the homeserver
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_homeserver_instance_id(
self.get_instance_id()
)
logger.info("Cleanup complete for %s.", self.hostname)
def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners
+1 -7
View File
@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -611,12 +611,6 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0
+17
View File
@@ -22,6 +22,7 @@
import logging
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]):
"""The various databases.
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores.
if not main:
raise Exception("No 'main' database configured")
+10 -5
View File
@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
federation_known_servers_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")
+10 -4
View File
@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
sleep_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
sleep_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)
LaterGauge(
reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
reject_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
+10 -5
View File
@@ -44,6 +44,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -130,11 +137,9 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
running_tasks_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(
+56 -112
View File
@@ -25,11 +25,11 @@ import time
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO
from typing import Any, Coroutine, Dict, Generator, Optional, TypeVar, Union
from typing import Any, ClassVar, Coroutine, Dict, Generator, Optional, TypeVar, Union
from unittest.mock import ANY, AsyncMock, Mock
from urllib.parse import parse_qs
from parameterized import parameterized_class
from parameterized.parameterized import parameterized_class
from signedjson.key import (
encode_verify_key_base64,
generate_signing_key,
@@ -46,7 +46,6 @@ from synapse.api.errors import (
Codes,
HttpResponseException,
InvalidClientTokenError,
OAuthInsufficientScopeError,
SynapseError,
)
from synapse.appservice import ApplicationService
@@ -78,11 +77,7 @@ JWKS_URI = ISSUER + ".well-known/jwks.json"
INTROSPECTION_ENDPOINT = ISSUER + "introspect"
SYNAPSE_ADMIN_SCOPE = "urn:synapse:admin:*"
MATRIX_USER_SCOPE = "urn:matrix:org.matrix.msc2967.client:api:*"
MATRIX_GUEST_SCOPE = "urn:matrix:org.matrix.msc2967.client:api:guest"
MATRIX_DEVICE_SCOPE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
DEVICE = "AABBCCDD"
MATRIX_DEVICE_SCOPE = MATRIX_DEVICE_SCOPE_PREFIX + DEVICE
SUBJECT = "abc-def-ghi"
USERNAME = "test-user"
USER_ID = "@" + USERNAME + ":" + SERVER_NAME
@@ -112,7 +107,24 @@ async def get_json(url: str) -> JsonDict:
@skip_unless(HAS_AUTHLIB, "requires authlib")
@parameterized_class(
("device_scope_prefix", "api_scope"),
[
("urn:matrix:client:device:", "urn:matrix:client:api:*"),
(
"urn:matrix:org.matrix.msc2967.client:device:",
"urn:matrix:org.matrix.msc2967.client:api:*",
),
],
)
class MSC3861OAuthDelegation(HomeserverTestCase):
device_scope_prefix: ClassVar[str]
api_scope: ClassVar[str]
@property
def device_scope(self) -> str:
return self.device_scope_prefix + DEVICE
servlets = [
account.register_servlets,
keys.register_servlets,
@@ -212,7 +224,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
"""The handler should return a 500 when no subject is present."""
self._set_introspection_returnvalue(
{"active": True, "scope": " ".join([MATRIX_USER_SCOPE])}
{"active": True, "scope": " ".join([self.api_scope])},
)
request = Mock(args={})
@@ -235,7 +247,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_DEVICE_SCOPE]),
"scope": " ".join([self.device_scope]),
}
)
request = Mock(args={})
@@ -282,7 +294,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]),
"username": USERNAME,
}
)
@@ -312,9 +324,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE, MATRIX_GUEST_SCOPE]
),
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]),
"username": USERNAME,
}
)
@@ -344,7 +354,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"scope": " ".join([self.api_scope]),
"username": USERNAME,
}
)
@@ -374,7 +384,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"scope": " ".join([self.api_scope, self.device_scope]),
"username": USERNAME,
}
)
@@ -404,7 +414,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"scope": " ".join([self.api_scope]),
"device_id": DEVICE,
"username": USERNAME,
}
@@ -444,9 +454,9 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
f"{MATRIX_DEVICE_SCOPE_PREFIX}DDEEFF",
self.api_scope,
f"{self.device_scope_prefix}AABBCC",
f"{self.device_scope_prefix}DDEEFF",
]
),
"username": USERNAME,
@@ -457,68 +467,6 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), AuthError)
def test_active_guest_not_allowed(self) -> None:
"""The handler should return an insufficient scope error."""
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
error = self.get_failure(
self.auth.get_user_by_req(request), OAuthInsufficientScopeError
)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(
getattr(error.value, "headers", {})["WWW-Authenticate"],
'Bearer error="insufficient_scope", scope="urn:matrix:org.matrix.msc2967.client:api:*"',
)
def test_active_guest_allowed(self) -> None:
"""The handler should return a requester with guest user rights and a device ID."""
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(
self.auth.get_user_by_req(request, allow_guest=True)
)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
self.assertEqual(requester.is_guest, True)
self.assertEqual(
get_awaitable_result(self.auth.is_server_admin(requester)), False
)
self.assertEqual(requester.device_id, DEVICE)
def test_unavailable_introspection_endpoint(self) -> None:
"""The handler should return an internal server error."""
request = Mock(args={})
@@ -562,8 +510,8 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
self.api_scope,
f"{self.device_scope_prefix}AABBCC",
]
),
"username": USERNAME,
@@ -611,7 +559,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"scope": " ".join([self.api_scope, self.device_scope]),
"username": USERNAME,
}
)
@@ -676,7 +624,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
return json.dumps(
{
"active": True,
"scope": MATRIX_USER_SCOPE,
"scope": self.api_scope,
"sub": SUBJECT,
"username": USERNAME,
},
@@ -842,8 +790,24 @@ class FakeMasServer(HTTPServer):
T = TypeVar("T")
@parameterized_class(
("device_scope_prefix", "api_scope"),
[
("urn:matrix:client:device:", "urn:matrix:client:api:*"),
(
"urn:matrix:org.matrix.msc2967.client:device:",
"urn:matrix:org.matrix.msc2967.client:api:*",
),
],
)
class MasAuthDelegation(HomeserverTestCase):
server: FakeMasServer
device_scope_prefix: ClassVar[str]
api_scope: ClassVar[str]
@property
def device_scope(self) -> str:
return self.device_scope_prefix + DEVICE
def till_deferred_has_result(
self,
@@ -914,12 +878,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}",
]
),
"scope": " ".join([self.api_scope, self.device_scope]),
"username": USERNAME,
"expires_in": 60,
}
@@ -943,12 +902,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}",
]
),
"scope": " ".join([self.api_scope, self.device_scope]),
"username": USERNAME,
}
@@ -971,12 +925,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}ABCDEF",
]
),
"scope": " ".join([self.api_scope, f"{self.device_scope_prefix}ABCDEF"]),
"username": USERNAME,
"expires_in": 60,
}
@@ -993,7 +942,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"scope": " ".join([self.api_scope]),
"username": "inexistent_user",
"expires_in": 60,
}
@@ -1039,7 +988,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": MATRIX_USER_SCOPE,
"scope": self.api_scope,
"username": USERNAME,
"expires_in": 60,
"device_id": DEVICE,
@@ -1057,7 +1006,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, self.api_scope]),
"username": USERNAME,
"expires_in": 60,
}
@@ -1079,12 +1028,7 @@ class MasAuthDelegation(HomeserverTestCase):
self.server.introspection_response = {
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}{DEVICE}",
]
),
"scope": " ".join([self.api_scope, self.device_scope]),
"username": USERNAME,
"expires_in": 60,
}
+98 -2
View File
@@ -18,11 +18,18 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Dict, Protocol, Tuple
from typing import Dict, NoReturn, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
all_later_gauges_to_clean_up_on_shutdown,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def setUp(self) -> None:
super().setUp()
self.later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
def tearDown(self) -> None:
super().tearDown()
REGISTRY.unregister(self.later_gauge)
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
)
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Find the metrics from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in metrics {metrics_map}",
)
self.assertEqual(hs1_metric_value, "1.0")
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def test_later_gauge_hook_exception(self) -> None:
"""
Test that LaterGauge metrics are collected across multiple servers even if one
hooks is throwing an exception.
"""
def raise_exception() -> NoReturn:
raise Exception("fake error generating data")
# Make the hook for hs1 throw an exception
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=raise_exception
)
# Metrics from hs2 still work fine
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Since we encountered an exception while trying to collect metrics from hs1, we
# don't expect to see it here.
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNone(
hs1_metric_value,
(
"Since we encountered an exception while trying to collect metrics from hs1"
f"we don't expect to see it the metrics_map {metrics_map}"
),
)
# We should still see metrics from hs2 though
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.
+1 -2
View File
@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.test_handler = self._build_replication_data_handler()
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
repl_handler = ReplicationCommandHandler(self.worker_hs)
repl_handler = self.worker_hs.get_replication_command_handler()
self.client = ClientReplicationStreamProtocol(
self.worker_hs,
"client",
+13 -8
View File
@@ -18,8 +18,11 @@
# [This file includes modifications made by New Vector Limited]
#
from http import HTTPStatus
from typing import ClassVar
from unittest.mock import AsyncMock
from parameterized import parameterized_class
from synapse.rest.client import auth_metadata
from tests.unittest import HomeserverTestCase, override_config, skip_unless
@@ -85,17 +88,22 @@ class AuthIssuerTestCase(HomeserverTestCase):
req_mock.assert_not_called()
@parameterized_class(
("endpoint",),
[
("/_matrix/client/unstable/org.matrix.msc2965/auth_metadata",),
("/_matrix/client/v1/auth_metadata",),
],
)
class AuthMetadataTestCase(HomeserverTestCase):
endpoint: ClassVar[str]
servlets = [
auth_metadata.register_servlets,
]
def test_returns_404_when_msc3861_disabled(self) -> None:
# Make an unauthenticated request for the discovery info.
channel = self.make_request(
"GET",
"/_matrix/client/unstable/org.matrix.msc2965/auth_metadata",
)
channel = self.make_request("GET", self.endpoint)
self.assertEqual(channel.code, HTTPStatus.NOT_FOUND)
@skip_unless(HAS_AUTHLIB, "requires authlib")
@@ -124,10 +132,7 @@ class AuthMetadataTestCase(HomeserverTestCase):
)
self.hs.get_proxied_http_client().get_json = req_mock # type: ignore[method-assign]
channel = self.make_request(
"GET",
"/_matrix/client/unstable/org.matrix.msc2965/auth_metadata",
)
channel = self.make_request("GET", self.endpoint)
self.assertEqual(channel.code, HTTPStatus.OK)
self.assertEqual(
+3
View File
@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
reactor=reactor,
)
# Register the cleanup hook
cleanup_func(hs.cleanup)
# Install @cache_in_self attributes
for key, val in kwargs.items():
setattr(hs, "_" + key, val)