Re-introduce: Fix LaterGauge metrics to collect from all servers (#18791)

Re-introduce: https://github.com/element-hq/synapse/pull/18751 that was
reverted in https://github.com/element-hq/synapse/pull/18789 (explains
why the PR was reverted in the first place).

- Adds a `cleanup` pattern that cleans up metrics from each homeserver
in the tests. Previously, the list of hooks built up until our CI
machines couldn't operate properly, see
https://github.com/element-hq/synapse/pull/18789
- Fix long-standing issue with `synapse_background_update_status`
metrics only tracking the last database listed in the config (see
https://github.com/element-hq/synapse/pull/18791#discussion_r2261706749)
This commit is contained in:
Eric Eastwood
2025-09-02 12:14:27 -05:00
committed by GitHub
parent 09a489e198
commit bff4a11b3f
20 changed files with 435 additions and 136 deletions

1
changelog.d/18791.misc Normal file
View File

@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.

View File

@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
""" """
hs = MockHomeserver(base_config, worker_app) hs = MockHomeserver(base_config, worker_app)
# TODO We only do this to avoid an error, but don't need the database etc # TODO We only do this to avoid an error, but don't need the database etc
hs.setup() hs.setup()
return get_registered_paths_for_hs(hs) registered_paths = get_registered_paths_for_hs(hs)
hs.cleanup()
return registered_paths
def elide_http_methods_if_unconflicting( def elide_http_methods_if_unconflicting(

View File

@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
# Cast safety: Twisted does some naughty magic which replaces the # Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime. # twisted.internet.reactor module with a Reactor instance at runtime.
@@ -323,6 +324,7 @@ class MockHomeserver:
self.config = config self.config = config
self.hostname = config.server.server_name self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock: def get_clock(self) -> Clock:
return self.clock return self.clock
@@ -330,6 +332,9 @@ class MockHomeserver:
def get_reactor(self) -> ISynapseReactor: def get_reactor(self) -> ISynapseReactor:
return reactor return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str: def get_instance_name(self) -> str:
return "master" return "master"
@@ -685,7 +690,15 @@ class Porter:
) )
prepare_database(db_conn, engine, config=self.hs_config) prepare_database(db_conn, engine, config=self.hs_config)
# Type safety: ignore that we're using Mock homeservers here. # Type safety: ignore that we're using Mock homeservers here.
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type] store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
)
db_conn.commit() db_conn.commit()
return store return store

View File

@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
""" """
import logging import logging
from enum import Enum
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Dict, Dict,
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender): class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender""" """A drop in replacement for FederationSender"""
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner # we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which # lambda binds to the queue rather than to the name of the queue which
# changes. ARGH. # changes. ARGH.
def register(name: str, queue: Sized) -> None: def register(queue_name: QueueNames, queue: Sized) -> None:
LaterGauge( queue_name_to_gauge_map[queue_name].register_hook(
name="synapse_federation_send_queue_%s_size" % (queue_name,), homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): len(queue)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
) )
for queue_name in [ for queue_name in QueueNames:
"presence_map", queue = getattr(self, queue_name.value)
"keyed_edu", assert isinstance(queue, Sized)
"keyed_edu_changed", register(queue_name, queue=queue)
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000) self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL], labelnames=[SERVER_NAME_LABEL],
) )
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have # Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. # catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is # Please note that rate limiting still applies, so while the loop is
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue # map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {} self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge( transaction_queue_pending_destinations_gauge.register_hook(
name="synapse_federation_transaction_queue_pending_destinations", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
1 1
for d in self._per_destination_queues.values() for d in self._per_destination_queues.values()
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
) )
}, },
) )
transaction_queue_pending_pdus_gauge.register_hook(
LaterGauge( homeserver_instance_id=hs.get_instance_id(),
name="synapse_federation_transaction_queue_pending_pdus", hook=lambda: {
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values() d.pending_pdu_count() for d in self._per_destination_queues.values()
) )
}, },
) )
LaterGauge( transaction_queue_pending_edus_gauge.register_hook(
name="synapse_federation_transaction_queue_pending_edus", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values() d.pending_edu_count() for d in self._per_destination_queues.values()
) )

View File

@@ -173,6 +173,18 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL], labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
) )
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active" # "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000 LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence EduTypes.PRESENCE, self.incoming_presence
) )
LaterGauge( presence_user_to_current_state_size_gauge.register_hook(
name="synapse_handlers_presence_user_to_current_state_size", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
) )
# The per-device presence state, maps user to devices to per-device presence state. # The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000, 60 * 1000,
) )
LaterGauge( presence_wheel_timer_size_gauge.register_hook(
name="synapse_handlers_presence_wheel_timer_size", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): len(self.wheel_timer)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
) )
# Used to handle sending of presence to newly joined users/servers # Used to handle sending of presence to newly joined users/servers

View File

@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts return counts
LaterGauge( in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count", name="synapse_http_server_in_flight_requests_count",
desc="", desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL], labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts, )
in_flight_requests.register_hook(
homeserver_instance_id=None, hook=_get_in_flight_counts
) )

View File

@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics" METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name" SERVER_NAME_LABEL = "server_name"
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
name: str name: str
desc: str desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False) labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric), _instance_id_to_hook_map: Dict[
# or dict mapping from a label tuple to a value Optional[str], # instance_id
caller: Callable[ Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
] ],
] = attr.ib(factory=dict, hash=False)
"""
Map from homeserver instance_id to a callback. Each callback should either return a
value (if there are no labels for this metric), or dict mapping from a label tuple
to a value.
We use `instance_id` instead of `server_name` because it's possible to have multiple
workers running in the same process with the same `server_name`.
"""
def collect(self) -> Iterable[Metric]: def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up). # (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
try: for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
calls = self.caller() try:
except Exception: hook_result = hook()
logger.exception("Exception running callback for LaterGauge(%s)", self.name) except Exception:
yield g logger.exception(
return "Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
self.name,
homeserver_instance_id,
)
# Continue to return the rest of the metrics that aren't broken
continue
if isinstance(calls, (int, float)): if isinstance(hook_result, (int, float)):
g.add_metric([], calls) g.add_metric([], hook_result)
else: else:
for k, v in calls.items(): for k, v in hook_result.items():
g.add_metric(k, v) g.add_metric(k, v)
yield g yield g
def register_hook(
self,
*,
homeserver_instance_id: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance
(`hs.get_instance_id()`) that this hook is associated with. This can be used
later to lookup all hooks associated with a given server name in order to
unregister them. This should only be omitted for global hooks that work
across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)
self._instance_id_to_hook_map[homeserver_instance_id] = hook
def unregister_hooks_for_homeserver_instance_id(
self, homeserver_instance_id: str
) -> None:
"""
Unregister all hooks associated with the given homeserver `instance_id`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance to
unregister hooks for (`hs.get_instance_id()`).
"""
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
def __attrs_post_init__(self) -> None: def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self) REGISTRY.register(self)
all_gauges[self.name] = self
# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""
# `MetricsEntry` only makes sense when it is a `Protocol`, # `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations # Protects access to _registrations
self._lock = threading.Lock() self._lock = threading.Lock()
self._register_with_collector() REGISTRY.register(self)
def register( def register(
self, self,
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name)) gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
""" """

View File

@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL], labelnames=["stream", SERVER_NAME_LABEL],
) )
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T") T = TypeVar("T")
@@ -281,28 +299,20 @@ class Notifier:
) )
} }
LaterGauge( notifier_listeners_gauge.register_hook(
name="synapse_notifier_listeners", homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
) )
notifier_rooms_gauge.register_hook(
LaterGauge( homeserver_instance_id=hs.get_instance_id(),
name="synapse_notifier_rooms", hook=lambda: {
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count( (self.server_name,): count(
bool, list(self.room_to_user_streams.values()) bool, list(self.room_to_user_streams.values())
) )
}, },
) )
LaterGauge( notifier_users_gauge.register_hook(
name="synapse_notifier_users", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
) )
def add_replication_callback(self, cb: Callable[[], None]) -> None: def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
) )
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream # the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[ _StreamCommandQueue = Deque[
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
# outgoing replication commands to.) # outgoing replication commands to.)
self._connections: List[IReplicationConnection] = [] self._connections: List[IReplicationConnection] = []
LaterGauge( tcp_resource_total_connections_gauge.register_hook(
name="synapse_replication_tcp_resource_total_connections", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): len(self._connections)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
) )
# When POSITION or RDATA commands arrive, we stick them in a queue and process # When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
# from that connection. # from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge( tcp_command_queue_gauge.register_hook(
name="synapse_replication_tcp_command_queue", homeserver_instance_id=hs.get_instance_id(),
desc="Number of inbound RDATA/POSITION commands queued for processing", hook=lambda: {
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue) (stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items() for stream_name, queue in self._command_queues_by_stream.items()
}, },

View File

@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands", name="synapse_replication_tcp_protocol_pending_commands",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
pending_commands.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections (p.name, p.server_name): len(p.pending_commands) for p in connected_connections
}, },
) )
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer", name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
transport_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
}, },
) )
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
tcp_transport_kernel_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False) (p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections for p in connected_connections
}, },
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: { )
tcp_transport_kernel_read_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True) (p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections for p in connected_connections
}, },

View File

@@ -129,7 +129,10 @@ from synapse.http.client import (
) )
from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository from synapse.media.media_repository import MediaRepository
from synapse.metrics import register_threadpool from synapse.metrics import (
all_later_gauges_to_clean_up_on_shutdown,
register_threadpool,
)
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks from synapse.module_api.callbacks import ModuleApiCallbacks
@@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta):
if self.config.worker.run_background_tasks: if self.config.worker.run_background_tasks:
self.setup_background_tasks() self.setup_background_tasks()
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
Make sure we actually do some clean-up, rather than leak data.
"""
self.cleanup()
def cleanup(self) -> None:
"""
WIP: Clean-up any references to the homeserver and stop any running related
processes, timers, loops, replication stream, etc.
This should be called wherever you care about the HomeServer being completely
garbage collected like in tests. It's not necessary to call if you plan to just
shut down the whole Python process anyway.
Can be called multiple times.
"""
logger.info("Received cleanup request for %s.", self.hostname)
# TODO: Stop background processes, timers, loops, replication stream, etc.
# Cleanup metrics associated with the homeserver
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_homeserver_instance_id(
self.get_instance_id()
)
logger.info("Cleanup complete for %s.", self.hostname)
def start_listening(self) -> None: # noqa: B027 (no-op by design) def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners """Start the HTTP, manhole, metrics, etc listeners

View File

@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context, current_context,
make_deferred_yieldable, make_deferred_yieldable,
) )
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -611,12 +611,6 @@ class DatabasePool:
) )
self.updates = BackgroundUpdater(hs, self) self.updates = BackgroundUpdater(hs, self)
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0 self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0 self._current_txn_total_time = 0.0

View File

@@ -22,6 +22,7 @@
import logging import logging
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.databases.main.events import PersistEventsStore
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True) DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]): class Databases(Generic[DataStoreT]):
"""The various databases. """The various databases.
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
db_conn.close() db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores. # Sanity check that we have actually configured all the required stores.
if not main: if not main:
raise Exception("No 'main' database configured") raise Exception("No 'main' database configured")

View File

@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True) @attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership: class EventIdMembership:
"""Returned by `get_membership_from_event_ids`""" """Returned by `get_membership_from_event_ids`"""
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1, 1,
self._count_known_servers, self._count_known_servers,
) )
LaterGauge( federation_known_servers_gauge.register_hook(
name="synapse_federation_known_servers", homeserver_instance_id=hs.get_instance_id(),
desc="", hook=lambda: {(self.server_name,): self._known_servers_count},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
) )
@wrap_as_background_process("_count_known_servers") @wrap_as_background_process("_count_known_servers")

View File

@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can # We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general # differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation. # ratelimit tuning problem across the federation.
LaterGauge( sleep_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts", name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep", desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance( )
sleep_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_sleep() ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()
) )
), ),
) )
LaterGauge( reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts", name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected", desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance( )
reject_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_reject() ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()

View File

@@ -44,6 +44,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler: class TaskScheduler:
""" """
This is a simple task scheduler designed for resumable tasks. Normally, This is a simple task scheduler designed for resumable tasks. Normally,
@@ -130,11 +137,9 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS, TaskScheduler.SCHEDULE_INTERVAL_MS,
) )
LaterGauge( running_tasks_gauge.register_hook(
name="synapse_scheduler_running_tasks", homeserver_instance_id=hs.get_instance_id(),
desc="The number of concurrent running tasks handled by the TaskScheduler", hook=lambda: {(self.server_name,): len(self._running_tasks)},
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
) )
def register_action( def register_action(

View File

@@ -18,11 +18,18 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
from typing import Dict, Protocol, Tuple from typing import Dict, NoReturn, Protocol, Tuple
from prometheus_client.core import Sample from prometheus_client.core import Sample
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
all_later_gauges_to_clean_up_on_shutdown,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest from tests import unittest
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0") self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def setUp(self) -> None:
super().setUp()
self.later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
def tearDown(self) -> None:
super().tearDown()
REGISTRY.unregister(self.later_gauge)
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
)
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Find the metrics from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in metrics {metrics_map}",
)
self.assertEqual(hs1_metric_value, "1.0")
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def test_later_gauge_hook_exception(self) -> None:
"""
Test that LaterGauge metrics are collected across multiple servers even if one
hooks is throwing an exception.
"""
def raise_exception() -> NoReturn:
raise Exception("fake error generating data")
# Make the hook for hs1 throw an exception
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=raise_exception
)
# Metrics from hs2 still work fine
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Since we encountered an exception while trying to collect metrics from hs1, we
# don't expect to see it here.
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNone(
hs1_metric_value,
(
"Since we encountered an exception while trying to collect metrics from hs1"
f"we don't expect to see it the metrics_map {metrics_map}"
),
)
# We should still see metrics from hs2 though
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]: def get_latest_metrics() -> Dict[str, str]:
""" """
Collect the latest metrics from the registry and parse them into an easy to use map. Collect the latest metrics from the registry and parse them into an easy to use map.

View File

@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
from synapse.http.site import SynapseRequest, SynapseSite from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ( from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol, ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol, ServerReplicationStreamProtocol,
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.test_handler = self._build_replication_data_handler() self.test_handler = self._build_replication_data_handler()
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined] self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
repl_handler = ReplicationCommandHandler(self.worker_hs) repl_handler = self.worker_hs.get_replication_command_handler()
self.client = ClientReplicationStreamProtocol( self.client = ClientReplicationStreamProtocol(
self.worker_hs, self.worker_hs,
"client", "client",

View File

@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
reactor=reactor, reactor=reactor,
) )
# Register the cleanup hook
cleanup_func(hs.cleanup)
# Install @cache_in_self attributes # Install @cache_in_self attributes
for key, val in kwargs.items(): for key, val in kwargs.items():
setattr(hs, "_" + key, val) setattr(hs, "_" + key, val)