Fix LaterGauge metrics to collect from all servers (#18751)

Fix `LaterGauge` metrics to collect from all servers

Follow-up to https://github.com/element-hq/synapse/pull/18714

Previously, our `LaterGauge` metrics did include the `server_name` label
as expected but we were only seeing the last server being reported in
some cases. Any `LaterGauge` that we were creating multiple times was
only reporting the last instance.

This PR updates all `LaterGauge` to be created once and then we use
`LaterGauge.register_hook(...)` to add in the metric callback as before.
This works now because we store a list of callbacks instead of just one.

I noticed this problem thanks to some [tests in the Synapse Pro for
Small Hosts](https://github.com/element-hq/synapse-small-hosts/pull/173)
repo that sanity check all metrics to ensure that we can see each metric
includes data from multiple servers.


### Testing strategy

1. This is only noticeable when you run multiple Synapse instances in
the same process.
 1. TODO

(see test that was added)

### Dev notes

Previous non-global `LaterGauge`:

```
synapse_federation_send_queue_xxx
synapse_federation_transaction_queue_pending_destinations
synapse_federation_transaction_queue_pending_pdus
synapse_federation_transaction_queue_pending_edus
synapse_handlers_presence_user_to_current_state_size
synapse_handlers_presence_wheel_timer_size
synapse_notifier_listeners
synapse_notifier_rooms
synapse_notifier_users
synapse_replication_tcp_resource_total_connections
synapse_replication_tcp_command_queue
synapse_background_update_status
synapse_federation_known_servers
synapse_scheduler_running_tasks
```



### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct (run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
This commit is contained in:
Eric Eastwood
2025-08-05 10:28:55 -05:00
committed by GitHub
parent c7762cd55e
commit 076db0ab49
14 changed files with 241 additions and 141 deletions

1
changelog.d/18751.misc Normal file
View File

@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.

View File

@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
""" """
import logging import logging
from enum import Enum
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Dict, Dict,
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender): class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender""" """A drop in replacement for FederationSender"""
@@ -111,23 +131,15 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner # we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which # lambda binds to the queue rather than to the name of the queue which
# changes. ARGH. # changes. ARGH.
def register(name: str, queue: Sized) -> None: def register(queue_name: QueueNames, queue: Sized) -> None:
LaterGauge( queue_name_to_gauge_map[queue_name].register_hook(
name="synapse_federation_send_queue_%s_size" % (queue_name,), lambda: {(self.server_name,): len(queue)}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
) )
for queue_name in [ for queue_name in QueueNames:
"presence_map", queue = getattr(self, queue_name.value)
"keyed_edu", assert isinstance(queue, Sized)
"keyed_edu_changed", register(queue_name, queue=queue)
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000) self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL], labelnames=[SERVER_NAME_LABEL],
) )
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have # Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. # catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is # Please note that rate limiting still applies, so while the loop is
@@ -398,38 +416,28 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue # map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {} self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge( transaction_queue_pending_destinations_gauge.register_hook(
name="synapse_federation_transaction_queue_pending_destinations", lambda: {
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
1 1
for d in self._per_destination_queues.values() for d in self._per_destination_queues.values()
if d.transmission_loop_running if d.transmission_loop_running
) )
}, }
) )
transaction_queue_pending_pdus_gauge.register_hook(
LaterGauge( lambda: {
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values() d.pending_pdu_count() for d in self._per_destination_queues.values()
) )
}, }
) )
LaterGauge( transaction_queue_pending_edus_gauge.register_hook(
name="synapse_federation_transaction_queue_pending_edus", lambda: {
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values() d.pending_edu_count() for d in self._per_destination_queues.values()
) )
}, }
) )
self._is_processing = False self._is_processing = False

View File

@@ -173,6 +173,18 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL], labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
) )
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active" # "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000 LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,8 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence EduTypes.PRESENCE, self.incoming_presence
) )
LaterGauge( presence_user_to_current_state_size_gauge.register_hook(
name="synapse_handlers_presence_user_to_current_state_size", lambda: {(self.server_name,): len(self.user_to_current_state)}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
) )
# The per-device presence state, maps user to devices to per-device presence state. # The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +891,8 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000, 60 * 1000,
) )
LaterGauge( presence_wheel_timer_size_gauge.register_hook(
name="synapse_handlers_presence_wheel_timer_size", lambda: {(self.server_name,): len(self.wheel_timer)}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
) )
# Used to handle sending of presence to newly joined users/servers # Used to handle sending of presence to newly joined users/servers

View File

@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts return counts
LaterGauge( in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count", name="synapse_http_server_in_flight_requests_count",
desc="", desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL], labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
) )
in_flight_requests.register_hook(_get_in_flight_counts)
class RequestMetrics: class RequestMetrics:

View File

@@ -31,6 +31,7 @@ from typing import (
Dict, Dict,
Generic, Generic,
Iterable, Iterable,
List,
Mapping, Mapping,
Optional, Optional,
Sequence, Sequence,
@@ -73,8 +74,6 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics" METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name" SERVER_NAME_LABEL = "server_name"
@@ -163,42 +162,47 @@ class LaterGauge(Collector):
name: str name: str
desc: str desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False) labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric), # List of callbacks: each callback should either return a value (if there are no
# or dict mapping from a label tuple to a value # labels for this metric), or dict mapping from a label tuple to a value
caller: Callable[ _hooks: List[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] Callable[
] [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)
def collect(self) -> Iterable[Metric]: def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up). # (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
try: for hook in self._hooks:
calls = self.caller() try:
except Exception: hook_result = hook()
logger.exception("Exception running callback for LaterGauge(%s)", self.name) except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
yield g yield g
return
if isinstance(calls, (int, float)): def register_hook(
g.add_metric([], calls) self,
else: hook: Callable[
for k, v in calls.items(): [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
g.add_metric(k, v) ],
) -> None:
yield g self._hooks.append(hook)
def __attrs_post_init__(self) -> None: def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self) REGISTRY.register(self)
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`, # `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +254,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations # Protects access to _registrations
self._lock = threading.Lock() self._lock = threading.Lock()
self._register_with_collector() REGISTRY.register(self)
def register( def register(
self, self,
@@ -341,14 +345,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name)) gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
""" """

View File

@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL], labelnames=["stream", SERVER_NAME_LABEL],
) )
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T") T = TypeVar("T")
@@ -281,28 +299,16 @@ class Notifier:
) )
} }
LaterGauge( notifier_listeners_gauge.register_hook(count_listeners)
name="synapse_notifier_listeners", notifier_rooms_gauge.register_hook(
desc="", lambda: {
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count( (self.server_name,): count(
bool, list(self.room_to_user_streams.values()) bool, list(self.room_to_user_streams.values())
) )
}, }
) )
LaterGauge( notifier_users_gauge.register_hook(
name="synapse_notifier_users", lambda: {(self.server_name,): len(self.user_to_user_stream)}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
) )
def add_replication_callback(self, cb: Callable[[], None]) -> None: def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
) )
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream # the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[ _StreamCommandQueue = Deque[
@@ -243,11 +255,8 @@ class ReplicationCommandHandler:
# outgoing replication commands to.) # outgoing replication commands to.)
self._connections: List[IReplicationConnection] = [] self._connections: List[IReplicationConnection] = []
LaterGauge( tcp_resource_total_connections_gauge.register_hook(
name="synapse_replication_tcp_resource_total_connections", lambda: {(self.server_name,): len(self._connections)}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
) )
# When POSITION or RDATA commands arrive, we stick them in a queue and process # When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -266,14 +275,11 @@ class ReplicationCommandHandler:
# from that connection. # from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge( tcp_command_queue_gauge.register_hook(
name="synapse_replication_tcp_command_queue", lambda: {
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue) (stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items() for stream_name, queue in self._command_queues_by_stream.items()
}, }
) )
self._is_master = hs.config.worker.worker_app is None self._is_master = hs.config.worker.worker_app is None

View File

@@ -527,9 +527,11 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands", name="synapse_replication_tcp_protocol_pending_commands",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
pending_commands.register_hook(
lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections (p.name, p.server_name): len(p.pending_commands) for p in connected_connections
}, }
) )
@@ -544,9 +546,11 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer", name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
transport_send_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
}, }
) )
@@ -571,10 +575,12 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
tcp_transport_kernel_send_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False) (p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections for p in connected_connections
}, }
) )
@@ -582,8 +588,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
tcp_transport_kernel_read_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True) (p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections for p in connected_connections
}, }
) )

View File

@@ -100,6 +100,12 @@ sql_txn_duration = Counter(
labelnames=["desc", SERVER_NAME_LABEL], labelnames=["desc", SERVER_NAME_LABEL],
) )
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
)
# Unique indexes which have been added in background updates. Maps from table name # Unique indexes which have been added in background updates. Maps from table name
# to the name of the background update which added the unique index to that table. # to the name of the background update which added the unique index to that table.
@@ -611,11 +617,8 @@ class DatabasePool:
) )
self.updates = BackgroundUpdater(hs, self) self.updates = BackgroundUpdater(hs, self)
LaterGauge( background_update_status.register_hook(
name="synapse_background_update_status", lambda: {(self.server_name,): self.updates.get_status()},
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
) )
self._previous_txn_total_time = 0.0 self._previous_txn_total_time = 0.0

View File

@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True) @attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership: class EventIdMembership:
"""Returned by `get_membership_from_event_ids`""" """Returned by `get_membership_from_event_ids`"""
@@ -116,11 +123,8 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1, 1,
self._count_known_servers, self._count_known_servers,
) )
LaterGauge( federation_known_servers_gauge.register_hook(
name="synapse_federation_known_servers", lambda: {(self.server_name,): self._known_servers_count}
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
) )
@wrap_as_background_process("_count_known_servers") @wrap_as_background_process("_count_known_servers")

View File

@@ -131,27 +131,31 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can # We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general # differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation. # ratelimit tuning problem across the federation.
LaterGauge( sleep_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts", name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep", desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance( )
sleep_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_sleep() ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()
) )
), )
) )
LaterGauge( reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts", name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected", desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance( )
reject_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_reject() ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()
) )
), )
) )

View File

@@ -44,6 +44,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler: class TaskScheduler:
""" """
This is a simple task scheduler designed for resumable tasks. Normally, This is a simple task scheduler designed for resumable tasks. Normally,
@@ -130,11 +137,8 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS, TaskScheduler.SCHEDULE_INTERVAL_MS,
) )
LaterGauge( running_tasks_gauge.register_hook(
name="synapse_scheduler_running_tasks", lambda: {(self.server_name,): len(self._running_tasks)}
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
) )
def register_action( def register_action(

View File

@@ -22,7 +22,13 @@ from typing import Dict, Protocol, Tuple
from prometheus_client.core import Sample from prometheus_client.core import Sample
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest from tests import unittest
@@ -285,6 +291,42 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0") self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
later_gauge.register_hook(lambda: {("hs1",): 1})
later_gauge.register_hook(lambda: {("hs2",): 2})
metrics_map = get_latest_metrics()
# Find the metrics for the caches from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
)
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
# Sanity check the metric values
self.assertEqual(hs1_metric_value, "1.0")
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]: def get_latest_metrics() -> Dict[str, str]:
""" """
Collect the latest metrics from the registry and parse them into an easy to use map. Collect the latest metrics from the registry and parse them into an easy to use map.