Refactor GaugeBucketCollector metrics to be homeserver-scoped (#18715)

Refactor `GaugeBucketCollector` metrics to be homeserver-scoped

Part of https://github.com/element-hq/synapse/issues/18592


### Testing strategy

 1. Add the `metrics` listener in your `homeserver.yaml`
    ```yaml
    listeners:
      # This is just showing how to configure metrics either way
      #
      # `http` `metrics` resource
      - port: 9322
        type: http
        bind_addresses: ['127.0.0.1']
        resources:
          - names: [metrics]
            compress: false
      # `metrics` listener
      - port: 9323
        type: metrics
        bind_addresses: ['127.0.0.1']
    ```
1. Start the homeserver: `poetry run synapse_homeserver --config-path
homeserver.yaml`
1. Fetch `http://localhost:9322/_synapse/metrics` and/or
`http://localhost:9323/metrics`
1. Adjust the number of [`msecs` in the `looping_call` so that
`_read_forward_extremities`](a82b8a966a/synapse/storage/databases/main/metrics.py (L79))
runs immediately instead of after an hour.
1. Observe response includes the `synapse_forward_extremities` and
`synapse_excess_extremity_events` metrics with the `server_name` label
This commit is contained in:
Eric Eastwood
2025-07-29 11:46:21 -05:00
committed by GitHub
parent f13a136396
commit 5106818bd0
4 changed files with 98 additions and 30 deletions

1
changelog.d/18715.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `GaugeBucketCollector` metrics to be homeserver-scoped.

View File

@@ -33,6 +33,7 @@ from typing import (
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
@@ -343,6 +344,51 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
Custom version of `GaugeHistogramMetricFamily` from `prometheus_client` that allows
specifying labels and label values.
A single gauge histogram and its samples.
For use by custom collectors.
"""
def __init__(
self,
*,
name: str,
documentation: str,
gsum_value: float,
buckets: Optional[Sequence[Tuple[str, float]]] = None,
labelnames: StrSequence = (),
labelvalues: StrSequence = (),
unit: str = "",
):
# Sanity check the number of label values matches the number of label names.
if len(labelvalues) != len(labelnames):
raise ValueError(
"The number of label values must match the number of label names"
)
# Call the super to validate and set the labelnames. We use this stable API
# instead of setting the internal `_labelnames` field directly.
super().__init__(
name=name,
documentation=documentation,
labels=labelnames,
# Since `GaugeHistogramMetricFamily` doesn't support supplying `labels` and
# `buckets` at the same time (artificial limitation), we will just set these
# as `None` and set up the buckets ourselves just below.
buckets=None,
gsum_value=None,
)
# Create a gauge for each bucket.
if buckets is not None:
self.add_metric(labels=labelvalues, buckets=buckets, gsum_value=gsum_value)
class GaugeBucketCollector(Collector):
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
@@ -355,14 +401,17 @@ class GaugeBucketCollector(Collector):
__slots__ = (
"_name",
"_documentation",
"_labelnames",
"_bucket_bounds",
"_metric",
)
def __init__(
self,
*,
name: str,
documentation: str,
labelnames: Optional[StrSequence],
buckets: Iterable[float],
registry: CollectorRegistry = REGISTRY,
):
@@ -376,6 +425,7 @@ class GaugeBucketCollector(Collector):
"""
self._name = name
self._documentation = documentation
self._labelnames = labelnames if labelnames else ()
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
@@ -387,7 +437,7 @@ class GaugeBucketCollector(Collector):
# We initially set this to None. We won't report metrics until
# this has been initialised after a successful data update
self._metric: Optional[GaugeHistogramMetricFamily] = None
self._metric: Optional[GaugeHistogramMetricFamilyWithLabels] = None
registry.register(self)
@@ -396,15 +446,26 @@ class GaugeBucketCollector(Collector):
if self._metric is not None:
yield self._metric
def update_data(self, values: Iterable[float]) -> None:
def update_data(self, values: Iterable[float], labels: StrSequence = ()) -> None:
"""Update the data to be reported by the metric
The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
"""
self._metric = self._values_to_metric(values)
def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
Args:
values
labels
"""
self._metric = self._values_to_metric(values, labels)
def _values_to_metric(
self, values: Iterable[float], labels: StrSequence = ()
) -> GaugeHistogramMetricFamilyWithLabels:
"""
Args:
values
labels
"""
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]
@@ -422,9 +483,11 @@ class GaugeBucketCollector(Collector):
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)
return GaugeHistogramMetricFamily(
self._name,
self._documentation,
return GaugeHistogramMetricFamilyWithLabels(
name=self._name,
documentation=self._documentation,
labelnames=self._labelnames,
labelvalues=labels,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),

View File

@@ -23,7 +23,7 @@ import logging
import time
from typing import TYPE_CHECKING, Dict, List, Tuple, cast
from synapse.metrics import GaugeBucketCollector
from synapse.metrics import SERVER_NAME_LABEL, GaugeBucketCollector
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
@@ -42,9 +42,10 @@ logger = logging.getLogger(__name__)
# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
name="synapse_forward_extremities",
documentation="Number of rooms on the server with the given number of forward extremities"
" or fewer",
labelnames=[SERVER_NAME_LABEL],
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
)
@@ -54,9 +55,10 @@ _extremities_collecter = GaugeBucketCollector(
# we could remove from state resolution by reducing the graph to a single
# forward extremity.
_excess_state_events_collecter = GaugeBucketCollector(
"synapse_excess_extremity_events",
"Number of rooms on the server with the given number of excess extremity "
name="synapse_excess_extremity_events",
documentation="Number of rooms on the server with the given number of excess extremity "
"events, or fewer",
labelnames=[SERVER_NAME_LABEL],
buckets=[0] + [1 << n for n in range(12)],
)
@@ -100,10 +102,12 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
_extremities_collecter.update_data(x[0] for x in res)
_extremities_collecter.update_data(
values=(x[0] for x in res), labels=(self.server_name,)
)
_excess_state_events_collecter.update_data(
(x[0] - 1) * x[1] for x in res if x[1]
values=((x[0] - 1) * x[1] for x in res if x[1]), labels=(self.server_name,)
)
async def count_daily_e2ee_messages(self) -> int:

View File

@@ -65,24 +65,24 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
)
expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
b'synapse_forward_extremities_bucket{le="5.0"} 2.0',
b'synapse_forward_extremities_bucket{le="7.0"} 3.0',
b'synapse_forward_extremities_bucket{le="10.0"} 3.0',
b'synapse_forward_extremities_bucket{le="15.0"} 3.0',
b'synapse_forward_extremities_bucket{le="20.0"} 3.0',
b'synapse_forward_extremities_bucket{le="50.0"} 3.0',
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
b'synapse_forward_extremities_bucket{le="1.0",server_name="test"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0",server_name="test"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0",server_name="test"} 2.0',
b'synapse_forward_extremities_bucket{le="5.0",server_name="test"} 2.0',
b'synapse_forward_extremities_bucket{le="7.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="10.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="15.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="20.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="50.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="100.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0",server_name="test"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0",server_name="test"} 3.0',
# per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
# "inf" is valid: "this includes variants such as inf"
b'synapse_forward_extremities_bucket{le="inf"} 3.0',
b'synapse_forward_extremities_bucket{le="inf",server_name="test"} 3.0',
b"# TYPE synapse_forward_extremities_gcount gauge",
b"synapse_forward_extremities_gcount 3.0",
b'synapse_forward_extremities_gcount{server_name="test"} 3.0',
b"# TYPE synapse_forward_extremities_gsum gauge",
b"synapse_forward_extremities_gsum 10.0",
b'synapse_forward_extremities_gsum{server_name="test"} 10.0',
]
self.assertEqual(items, expected)