diff --git a/changelog.d/18714.misc b/changelog.d/18714.misc new file mode 100644 index 0000000000..0421264655 --- /dev/null +++ b/changelog.d/18714.misc @@ -0,0 +1 @@ +Refactor `LaterGauge` metrics to be homeserver-scoped. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e309836a52..7f511d570c 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -54,7 +54,7 @@ from sortedcontainers import SortedDict from synapse.api.presence import UserPresenceState from synapse.federation.sender import AbstractFederationSender, FederationSender -from synapse.metrics import LaterGauge +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.replication.tcp.streams.federation import FederationStream from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util.metrics import Measure @@ -113,10 +113,10 @@ class FederationRemoteSendQueue(AbstractFederationSender): # changes. ARGH. def register(name: str, queue: Sized) -> None: LaterGauge( - "synapse_federation_send_queue_%s_size" % (queue_name,), - "", - [], - lambda: len(queue), + name="synapse_federation_send_queue_%s_size" % (queue_name,), + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(queue)}, ) for queue_name in [ diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 9b52848792..1d8bea9943 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -399,31 +399,37 @@ class FederationSender(AbstractFederationSender): self._per_destination_queues: Dict[str, PerDestinationQueue] = {} LaterGauge( - "synapse_federation_transaction_queue_pending_destinations", - "", - [], - lambda: sum( - 1 - for d in self._per_destination_queues.values() - if d.transmission_loop_running - ), + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { + (self.server_name,): sum( + 1 + for d in self._per_destination_queues.values() + if d.transmission_loop_running + ) + }, ) LaterGauge( - "synapse_federation_transaction_queue_pending_pdus", - "", - [], - lambda: sum( - d.pending_pdu_count() for d in self._per_destination_queues.values() - ), + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { + (self.server_name,): sum( + d.pending_pdu_count() for d in self._per_destination_queues.values() + ) + }, ) LaterGauge( - "synapse_federation_transaction_queue_pending_edus", - "", - [], - lambda: sum( - d.pending_edu_count() for d in self._per_destination_queues.values() - ), + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { + (self.server_name,): sum( + d.pending_edu_count() for d in self._per_destination_queues.values() + ) + }, ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a85178d5aa..b253117498 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -780,10 +780,10 @@ class PresenceHandler(BasePresenceHandler): ) LaterGauge( - "synapse_handlers_presence_user_to_current_state_size", - "", - [], - lambda: len(self.user_to_current_state), + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -883,10 +883,10 @@ class PresenceHandler(BasePresenceHandler): ) LaterGauge( - "synapse_handlers_presence_wheel_timer_size", - "", - [], - lambda: len(self.wheel_timer), + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 799b0cb4c7..21f520e772 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -144,27 +144,31 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: # Cast to a list to prevent it changing while the Prometheus # thread is collecting metrics with _in_flight_requests_lock: - reqs = list(_in_flight_requests) + request_metrics = list(_in_flight_requests) - for rm in reqs: - rm.update_metrics() + for request_metric in request_metrics: + request_metric.update_metrics() # Map from (method, name) -> int, the number of in flight requests of that # type. The key type is Tuple[str, str], but we leave the length unspecified # for compatability with LaterGauge's annotations. counts: Dict[Tuple[str, ...], int] = {} - for rm in reqs: - key = (rm.method, rm.name) + for request_metric in request_metrics: + key = ( + request_metric.method, + request_metric.name, + request_metric.our_server_name, + ) counts[key] = counts.get(key, 0) + 1 return counts LaterGauge( - "synapse_http_server_in_flight_requests_count", - "", - ["method", "servlet"], - _get_in_flight_counts, + name="synapse_http_server_in_flight_requests_count", + desc="", + labelnames=["method", "servlet", SERVER_NAME_LABEL], + caller=_get_in_flight_counts, ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a041a0d617..c042dd27d2 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,13 +156,13 @@ class _RegistryProxy: RegistryProxy = cast(CollectorRegistry, _RegistryProxy) -@attr.s(slots=True, hash=True, auto_attribs=True) +@attr.s(slots=True, hash=True, auto_attribs=True, kw_only=True) class LaterGauge(Collector): """A Gauge which periodically calls a user-provided callback to produce metrics.""" name: str desc: str - labels: Optional[StrSequence] = attr.ib(hash=False) + labelnames: Optional[StrSequence] = attr.ib(hash=False) # callback: should either return a value (if there are no labels for this metric), # or dict mapping from a label tuple to a value caller: Callable[ @@ -170,7 +170,7 @@ class LaterGauge(Collector): ] def collect(self) -> Iterable[Metric]: - g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) + g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) try: calls = self.caller() diff --git a/synapse/notifier.py b/synapse/notifier.py index 45204842ce..448a715e2a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -29,6 +29,7 @@ from typing import ( Iterable, List, Literal, + Mapping, Optional, Set, Tuple, @@ -263,7 +264,10 @@ class Notifier: # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. - def count_listeners() -> int: + # + # Ideally, we'd use `Mapping[Tuple[str], int]` here but mypy doesn't like it. + # This is close enough and better than a type ignore. + def count_listeners() -> Mapping[Tuple[str, ...], int]: all_user_streams: Set[_NotifierUserStream] = set() for streams in list(self.room_to_user_streams.values()): @@ -271,18 +275,34 @@ class Notifier: for stream in list(self.user_to_user_stream.values()): all_user_streams.add(stream) - return sum(stream.count_listeners() for stream in all_user_streams) - - LaterGauge("synapse_notifier_listeners", "", [], count_listeners) + return { + (self.server_name,): sum( + stream.count_listeners() for stream in all_user_streams + ) + } LaterGauge( - "synapse_notifier_rooms", - "", - [], - lambda: count(bool, list(self.room_to_user_streams.values())), + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=count_listeners, + ) + + LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: { + (self.server_name,): count( + bool, list(self.room_to_user_streams.values()) + ) + }, ) LaterGauge( - "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 007563168d..0f14c7e380 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -244,10 +244,10 @@ class ReplicationCommandHandler: self._connections: List[IReplicationConnection] = [] LaterGauge( - "synapse_replication_tcp_resource_total_connections", - "", - [], - lambda: len(self._connections), + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -267,11 +267,11 @@ class ReplicationCommandHandler: self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} LaterGauge( - "synapse_replication_tcp_command_queue", - "Number of inbound RDATA/POSITION commands queued for processing", - ["stream_name"], - lambda: { - (stream_name,): len(queue) + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], + caller=lambda: { + (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() }, ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c726baf142..969f0303e0 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -524,10 +524,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # The following simply registers metrics for the replication connections pending_commands = LaterGauge( - "synapse_replication_tcp_protocol_pending_commands", - "", - ["name"], - lambda: {(p.name,): len(p.pending_commands) for p in connected_connections}, + name="synapse_replication_tcp_protocol_pending_commands", + desc="", + labelnames=["name", SERVER_NAME_LABEL], + caller=lambda: { + (p.name, p.server_name): len(p.pending_commands) for p in connected_connections + }, ) @@ -539,10 +541,12 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: transport_send_buffer = LaterGauge( - "synapse_replication_tcp_protocol_transport_send_buffer", - "", - ["name"], - lambda: {(p.name,): transport_buffer_size(p) for p in connected_connections}, + name="synapse_replication_tcp_protocol_transport_send_buffer", + desc="", + labelnames=["name", SERVER_NAME_LABEL], + caller=lambda: { + (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections + }, ) @@ -564,22 +568,22 @@ def transport_kernel_read_buffer_size( tcp_transport_kernel_send_buffer = LaterGauge( - "synapse_replication_tcp_protocol_transport_kernel_send_buffer", - "", - ["name"], - lambda: { - (p.name,): transport_kernel_read_buffer_size(p, False) + name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", + desc="", + labelnames=["name", SERVER_NAME_LABEL], + caller=lambda: { + (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections }, ) tcp_transport_kernel_read_buffer = LaterGauge( - "synapse_replication_tcp_protocol_transport_kernel_read_buffer", - "", - ["name"], - lambda: { - (p.name,): transport_kernel_read_buffer_size(p, True) + name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", + desc="", + labelnames=["name", SERVER_NAME_LABEL], + caller=lambda: { + (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections }, ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 5ef9139c29..6946abd021 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -588,10 +588,10 @@ class DatabasePool: self.updates = BackgroundUpdater(hs, self) LaterGauge( - "synapse_background_update_status", - "Background update status", - [], - self.updates.get_status, + name="synapse_background_update_status", + desc="Background update status", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index ce77e0b0d6..9c35a7837d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -43,7 +43,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.logging.opentracing import trace -from synapse.metrics import LaterGauge +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -117,10 +117,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): self._count_known_servers, ) LaterGauge( - "synapse_federation_known_servers", - "", - [], - lambda: self._known_servers_count, + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 343ee4f454..f22e5710f8 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -119,7 +119,10 @@ def _get_counts_from_rate_limiter_instance( # Only track metrics if they provided a `metrics_name` to # differentiate this instance of the rate limiter. if rate_limiter_instance.metrics_name: - key = (rate_limiter_instance.metrics_name,) + key = ( + rate_limiter_instance.metrics_name, + rate_limiter_instance.our_server_name, + ) counts[key] = count_func(rate_limiter_instance) return counts @@ -129,10 +132,10 @@ def _get_counts_from_rate_limiter_instance( # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. LaterGauge( - "synapse_rate_limit_sleep_affected_hosts", - "Number of hosts that had requests put to sleep", - ["rate_limiter_name"], - lambda: _get_counts_from_rate_limiter_instance( + name="synapse_rate_limit_sleep_affected_hosts", + desc="Number of hosts that had requests put to sleep", + labelnames=["rate_limiter_name", SERVER_NAME_LABEL], + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() @@ -140,10 +143,10 @@ LaterGauge( ), ) LaterGauge( - "synapse_rate_limit_reject_affected_hosts", - "Number of hosts that had requests rejected", - ["rate_limiter_name"], - lambda: _get_counts_from_rate_limiter_instance( + name="synapse_rate_limit_reject_affected_hosts", + desc="Number of hosts that had requests rejected", + labelnames=["rate_limiter_name", SERVER_NAME_LABEL], + caller=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() @@ -171,6 +174,7 @@ class FederationRateLimiter: for this rate limiter. """ + self.our_server_name = our_server_name self.metrics_name = metrics_name def new_limiter() -> "_PerHostRatelimiter": diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b141d0ee88..fdcacdf128 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -30,7 +30,7 @@ from synapse.logging.context import ( nested_logging_context, set_current_context, ) -from synapse.metrics import LaterGauge +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -131,10 +131,10 @@ class TaskScheduler: ) LaterGauge( - "synapse_scheduler_running_tasks", - "The number of concurrent running tasks handled by the TaskScheduler", - labels=None, - caller=lambda: len(self._running_tasks), + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], + caller=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action(