diff --git a/changelog.d/18791.misc b/changelog.d/18791.misc new file mode 100644 index 0000000000..6ecd498286 --- /dev/null +++ b/changelog.d/18791.misc @@ -0,0 +1 @@ +Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/_scripts/generate_workers_map.py b/synapse/_scripts/generate_workers_map.py index 09feb8cf30..8878e364e2 100755 --- a/synapse/_scripts/generate_workers_map.py +++ b/synapse/_scripts/generate_workers_map.py @@ -153,9 +153,13 @@ def get_registered_paths_for_default( """ hs = MockHomeserver(base_config, worker_app) + # TODO We only do this to avoid an error, but don't need the database etc hs.setup() - return get_registered_paths_for_hs(hs) + registered_paths = get_registered_paths_for_hs(hs) + hs.cleanup() + + return registered_paths def elide_http_methods_if_unconflicting( diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 0f54cfc64a..a81db3cfbf 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor from synapse.util import SYNAPSE_VERSION, Clock +from synapse.util.stringutils import random_string # Cast safety: Twisted does some naughty magic which replaces the # twisted.internet.reactor module with a Reactor instance at runtime. @@ -323,6 +324,7 @@ class MockHomeserver: self.config = config self.hostname = config.server.server_name self.version_string = SYNAPSE_VERSION + self.instance_id = random_string(5) def get_clock(self) -> Clock: return self.clock @@ -330,6 +332,9 @@ class MockHomeserver: def get_reactor(self) -> ISynapseReactor: return reactor + def get_instance_id(self) -> str: + return self.instance_id + def get_instance_name(self) -> str: return "master" @@ -685,7 +690,15 @@ class Porter: ) prepare_database(db_conn, engine, config=self.hs_config) # Type safety: ignore that we're using Mock homeservers here. - store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type] + store = Store( + DatabasePool( + hs, # type: ignore[arg-type] + db_config, + engine, + ), + db_conn, + hs, # type: ignore[arg-type] + ) db_conn.commit() return store diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7f511d570c..2fdee9ac54 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,6 +37,7 @@ Events are replicated via a separate events stream. """ import logging +from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -67,6 +68,25 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class QueueNames(str, Enum): + PRESENCE_MAP = "presence_map" + KEYED_EDU = "keyed_edu" + KEYED_EDU_CHANGED = "keyed_edu_changed" + EDUS = "edus" + POS_TIME = "pos_time" + PRESENCE_DESTINATIONS = "presence_destinations" + + +queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} + +for queue_name in QueueNames: + queue_name_to_gauge_map[queue_name] = LaterGauge( + name=f"synapse_federation_send_queue_{queue_name.value}_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(name: str, queue: Sized) -> None: - LaterGauge( - name="synapse_federation_send_queue_%s_size" % (queue_name,), - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(queue)}, + def register(queue_name: QueueNames, queue: Sized) -> None: + queue_name_to_gauge_map[queue_name].register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(queue)}, ) - for queue_name in [ - "presence_map", - "keyed_edu", - "keyed_edu_changed", - "edus", - "pos_time", - "presence_destinations", - ]: - register(queue_name, getattr(self, queue_name)) + for queue_name in QueueNames: + queue = getattr(self, queue_name.value) + assert isinstance(queue, Sized) + register(queue_name, queue=queue) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8befbe3722..278a957331 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter( labelnames=[SERVER_NAME_LABEL], ) +transaction_queue_pending_destinations_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_pdus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_edus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_destinations_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() @@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender): ) }, ) - - LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_pdus_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) }, ) - LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_edus_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498..d7de20f884 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,6 +173,18 @@ state_transition_counter = Counter( labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) +presence_user_to_current_state_size_gauge = LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +presence_wheel_timer_size_gauge = LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler): EduTypes.PRESENCE, self.incoming_presence ) - LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, + presence_user_to_current_state_size_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler): 60 * 1000, ) - LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.wheel_timer)}, + presence_wheel_timer_size_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index a9b049f904..83f52edb7c 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -LaterGauge( +in_flight_requests = LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], - caller=_get_in_flight_counts, +) +in_flight_requests.register_hook( + homeserver_instance_id=None, hook=_get_in_flight_counts ) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 11e2551a16..5b291aa893 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -73,8 +73,6 @@ logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" -all_gauges: Dict[str, Collector] = {} - HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -163,42 +161,110 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # callback: should either return a value (if there are no labels for this metric), - # or dict mapping from a label tuple to a value - caller: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] + _instance_id_to_hook_map: Dict[ + Optional[str], # instance_id + Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ] = attr.ib(factory=dict, hash=False) + """ + Map from homeserver instance_id to a callback. Each callback should either return a + value (if there are no labels for this metric), or dict mapping from a label tuple + to a value. + + We use `instance_id` instead of `server_name` because it's possible to have multiple + workers running in the same process with the same `server_name`. + """ def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - try: - calls = self.caller() - except Exception: - logger.exception("Exception running callback for LaterGauge(%s)", self.name) - yield g - return + for homeserver_instance_id, hook in self._instance_id_to_hook_map.items(): + try: + hook_result = hook() + except Exception: + logger.exception( + "Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s", + self.name, + homeserver_instance_id, + ) + # Continue to return the rest of the metrics that aren't broken + continue - if isinstance(calls, (int, float)): - g.add_metric([], calls) - else: - for k, v in calls.items(): - g.add_metric(k, v) + if isinstance(hook_result, (int, float)): + g.add_metric([], hook_result) + else: + for k, v in hook_result.items(): + g.add_metric(k, v) yield g + def register_hook( + self, + *, + homeserver_instance_id: Optional[str], + hook: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ) -> None: + """ + Register a callback/hook that will be called to generate a metric samples for + the gauge. + + Args: + homeserver_instance_id: The unique ID for this Synapse process instance + (`hs.get_instance_id()`) that this hook is associated with. This can be used + later to lookup all hooks associated with a given server name in order to + unregister them. This should only be omitted for global hooks that work + across all homeservers. + hook: A callback that should either return a value (if there are no + labels for this metric), or dict mapping from a label tuple to a value + """ + # We shouldn't have multiple hooks registered for the same homeserver `instance_id`. + existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id) + assert existing_hook is None, ( + f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. " + "This is likely a Synapse bug and you forgot to unregister the previous hooks for " + "the server (especially in tests)." + ) + + self._instance_id_to_hook_map[homeserver_instance_id] = hook + + def unregister_hooks_for_homeserver_instance_id( + self, homeserver_instance_id: str + ) -> None: + """ + Unregister all hooks associated with the given homeserver `instance_id`. This should be + called when a homeserver is shutdown to avoid extra hooks sitting around. + + Args: + homeserver_instance_id: The unique ID for this Synapse process instance to + unregister hooks for (`hs.get_instance_id()`). + """ + self._instance_id_to_hook_map.pop(homeserver_instance_id, None) + def __attrs_post_init__(self) -> None: - self._register() - - def _register(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - REGISTRY.register(self) - all_gauges[self.name] = self + + # We shouldn't have multiple metrics with the same name. Typically, metrics + # should be created globally so you shouldn't be running into this and this will + # catch any stupid mistakes. The `REGISTRY.register(self)` call above will also + # raise an error if the metric already exists but to make things explicit, we'll + # also check here. + existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name) + assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. " + + # Keep track of the gauge so we can clean it up later. + all_later_gauges_to_clean_up_on_shutdown[self.name] = self + + +all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {} +""" +Track all `LaterGauge` instances so we can remove any associated hooks during homeserver +shutdown. +""" # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector): # Protects access to _registrations self._lock = threading.Lock() - self._register_with_collector() + REGISTRY.register(self) def register( self, @@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector): gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge - def _register_with_collector(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self - class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index 448a715e2a..7782c9ca65 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter( labelnames=["stream", SERVER_NAME_LABEL], ) + +notifier_listeners_gauge = LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +notifier_rooms_gauge = LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], +) +notifier_users_gauge = LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + T = TypeVar("T") @@ -281,28 +299,20 @@ class Notifier: ) } - LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=count_listeners, + notifier_listeners_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), hook=count_listeners ) - - LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + notifier_rooms_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) }, ) - LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, + notifier_users_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0f14c7e380..dd7e38dd78 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,6 +106,18 @@ user_ip_cache_counter = Counter( "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) +tcp_resource_total_connections_gauge = LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +tcp_command_queue_gauge = LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], +) + # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -243,11 +255,9 @@ class ReplicationCommandHandler: # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._connections)}, + tcp_resource_total_connections_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -266,11 +276,9 @@ class ReplicationCommandHandler: # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], - caller=lambda: { + tcp_command_queue_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() }, diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 969f0303e0..2ec25bf43d 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,7 +527,10 @@ pending_commands = LaterGauge( name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +pending_commands.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections }, ) @@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +transport_send_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections }, ) @@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_send_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections }, @@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_read_buffer.register_hook( + homeserver_instance_id=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections }, diff --git a/synapse/server.py b/synapse/server.py index bf82f79bec..3eac271c90 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -129,7 +129,10 @@ from synapse.http.client import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.media.media_repository import MediaRepository -from synapse.metrics import register_threadpool +from synapse.metrics import ( + all_later_gauges_to_clean_up_on_shutdown, + register_threadpool, +) from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.module_api import ModuleApi from synapse.module_api.callbacks import ModuleApiCallbacks @@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta): if self.config.worker.run_background_tasks: self.setup_background_tasks() + def __del__(self) -> None: + """ + Called when an the homeserver is garbage collected. + + Make sure we actually do some clean-up, rather than leak data. + """ + self.cleanup() + + def cleanup(self) -> None: + """ + WIP: Clean-up any references to the homeserver and stop any running related + processes, timers, loops, replication stream, etc. + + This should be called wherever you care about the HomeServer being completely + garbage collected like in tests. It's not necessary to call if you plan to just + shut down the whole Python process anyway. + + Can be called multiple times. + """ + logger.info("Received cleanup request for %s.", self.hostname) + + # TODO: Stop background processes, timers, loops, replication stream, etc. + + # Cleanup metrics associated with the homeserver + for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): + later_gauge.unregister_hooks_for_homeserver_instance_id( + self.get_instance_id() + ) + + logger.info("Cleanup complete for %s.", self.hostname) + def start_listening(self) -> None: # noqa: B027 (no-op by design) """Start the HTTP, manhole, metrics, etc listeners diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c96..cfec36e0fa 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -61,7 +61,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) -from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool +from synapse.metrics import SERVER_NAME_LABEL, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine @@ -611,12 +611,6 @@ class DatabasePool: ) self.updates = BackgroundUpdater(hs, self) - LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self.updates.get_status()}, - ) self._previous_txn_total_time = 0.0 self._current_txn_total_time = 0.0 diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 6442ab6c7a..a4aba96686 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -22,6 +22,7 @@ import logging from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.events import PersistEventsStore @@ -40,6 +41,13 @@ logger = logging.getLogger(__name__) DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True) +background_update_status = LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=["database_name", SERVER_NAME_LABEL], +) + + class Databases(Generic[DataStoreT]): """The various databases. @@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]): db_conn.close() + # Track the background update status for each database + background_update_status.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { + (database.name(), server_name): database.updates.get_status() + for database in self.databases + }, + ) + # Sanity check that we have actually configured all the required stores. if not main: raise Exception("No 'main' database configured") diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 67e7e99baa..9db2e14a06 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 +federation_known_servers_gauge = LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): 1, self._count_known_servers, ) - LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self._known_servers_count}, + federation_known_servers_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f5e592d80e..88edc07161 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -LaterGauge( +sleep_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +sleep_affected_hosts_gauge.register_hook( + homeserver_instance_id=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) ), ) -LaterGauge( +reject_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +reject_affected_hosts_gauge.register_hook( + homeserver_instance_id=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index fdcacdf128..0539989320 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,6 +44,13 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +running_tasks_gauge = LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], +) + + class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -130,11 +137,9 @@ class TaskScheduler: TaskScheduler.SCHEDULE_INTERVAL_MS, ) - LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._running_tasks)}, + running_tasks_gauge.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 61874564a6..832e991730 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -18,11 +18,18 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Dict, Protocol, Tuple +from typing import Dict, NoReturn, Protocol, Tuple from prometheus_client.core import Sample -from synapse.metrics import REGISTRY, InFlightGauge, generate_latest +from synapse.metrics import ( + REGISTRY, + SERVER_NAME_LABEL, + InFlightGauge, + LaterGauge, + all_later_gauges_to_clean_up_on_shutdown, + generate_latest, +) from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase): self.assertEqual(hs2_cache_max_size_metric_value, "777.0") +class LaterGaugeTests(unittest.HomeserverTestCase): + def setUp(self) -> None: + super().setUp() + self.later_gauge = LaterGauge( + name="foo", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + def tearDown(self) -> None: + super().tearDown() + + REGISTRY.unregister(self.later_gauge) + all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None) + + def test_later_gauge_multiple_servers(self) -> None: + """ + Test that LaterGauge metrics are reported correctly across multiple servers. We + will have an metrics entry for each homeserver that is labeled with the + `server_name` label. + """ + self.later_gauge.register_hook( + homeserver_instance_id="123", hook=lambda: {("hs1",): 1} + ) + self.later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) + + metrics_map = get_latest_metrics() + + # Find the metrics from both homeservers + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNotNone( + hs1_metric_value, + f"Missing metric {hs1_metric} in metrics {metrics_map}", + ) + self.assertEqual(hs1_metric_value, "1.0") + + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in metrics {metrics_map}", + ) + self.assertEqual(hs2_metric_value, "2.0") + + def test_later_gauge_hook_exception(self) -> None: + """ + Test that LaterGauge metrics are collected across multiple servers even if one + hooks is throwing an exception. + """ + + def raise_exception() -> NoReturn: + raise Exception("fake error generating data") + + # Make the hook for hs1 throw an exception + self.later_gauge.register_hook( + homeserver_instance_id="123", hook=raise_exception + ) + # Metrics from hs2 still work fine + self.later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) + + metrics_map = get_latest_metrics() + + # Since we encountered an exception while trying to collect metrics from hs1, we + # don't expect to see it here. + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNone( + hs1_metric_value, + ( + "Since we encountered an exception while trying to collect metrics from hs1" + f"we don't expect to see it the metrics_map {metrics_map}" + ), + ) + + # We should still see metrics from hs2 though + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + ) + self.assertEqual(hs2_metric_value, "2.0") + + def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map. diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 453eb7750b..e756021937 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource from synapse.replication.tcp.client import ReplicationDataHandler -from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ( ClientReplicationStreamProtocol, ServerReplicationStreamProtocol, @@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): self.test_handler = self._build_replication_data_handler() self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined] - repl_handler = ReplicationCommandHandler(self.worker_hs) + repl_handler = self.worker_hs.get_replication_command_handler() self.client = ClientReplicationStreamProtocol( self.worker_hs, "client", diff --git a/tests/server.py b/tests/server.py index 3a81a4c6d9..ebff8b04b3 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1145,6 +1145,9 @@ def setup_test_homeserver( reactor=reactor, ) + # Register the cleanup hook + cleanup_func(hs.cleanup) + # Install @cache_in_self attributes for key, val in kwargs.items(): setattr(hs, "_" + key, val)