diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index bac331893d..781484f9f4 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): def __init__(self, hs: "HomeServer"): self.server_name = hs.hostname self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id self.is_mine_server_name = hs.is_mine_server_name @@ -157,7 +158,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): def _clear_queue_before_pos(self, position_to_delete: int) -> None: """Clear all the queues from before a given position""" - with Measure(self.clock, "send_queue._clear"): + with Measure(self.clock, self.metrics_collector_registry, "send_queue._clear"): # Delete things out of presence maps keys = self.presence_destinations.keys() i = self.presence_destinations.bisect_left(position_to_delete) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 27654b12f7..99799426de 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -367,6 +367,7 @@ class FederationSender(AbstractFederationSender): self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.is_mine_id = hs.is_mine_id self.is_mine_server_name = hs.is_mine_server_name @@ -650,7 +651,11 @@ class FederationSender(AbstractFederationSender): logger.debug( "Handling %i events in room %s", len(events), events[0].room_id ) - with Measure(self.clock, "handle_room_events"): + with Measure( + self.clock, + self.metrics_collector_registry, + "handle_room_events", + ): for event in events: await handle_event(event) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index d8a3eaa525..08438b9f46 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -60,6 +60,9 @@ class TransactionManager: def __init__(self, hs: "synapse.server.HomeServer"): self._server_name = hs.hostname self.clock = hs.get_clock() # nb must be called this for @measure_func + self.metrics_collector_registry = ( + hs.metrics_collector_registry + ) # nb must be called this for @measure_func self._store = hs.get_datastores().main self._transaction_actions = TransactionActions(self._store) self._transport_layer = hs.get_federation_transport_client() diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 602fd536f0..a61a46bb53 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -77,6 +77,7 @@ class ApplicationServicesHandler: self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.notify_appservices = hs.config.worker.should_notify_appservices self.event_sources = hs.get_event_sources() self._msc2409_to_device_messages_enabled = ( @@ -124,7 +125,9 @@ class ApplicationServicesHandler: @wrap_as_background_process("notify_interested_services") async def _notify_interested_services(self, max_token: RoomStreamToken) -> None: - with Measure(self.clock, "notify_interested_services"): + with Measure( + self.clock, self.metrics_collector_registry, "notify_interested_services" + ): self.is_processing = True try: upper_bound = -1 @@ -333,7 +336,11 @@ class ApplicationServicesHandler: users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s", stream_key) - with Measure(self.clock, "notify_interested_services_ephemeral"): + with Measure( + self.clock, + self.metrics_collector_registry, + "notify_interested_services_ephemeral", + ): for service in services: if stream_key == StreamKeyType.TYPING: # Note that we don't persist the token (via set_appservice_stream_type_pos) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index 80cb1cec9b..46635d724a 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -58,6 +58,7 @@ class DelayedEventsHandler: self._storage_controllers = hs.get_storage_controllers() self._config = hs.config self._clock = hs.get_clock() + self._metrics_collector_registry = hs.metrics_collector_registry self._event_creation_handler = hs.get_event_creation_handler() self._room_member_handler = hs.get_room_member_handler() @@ -159,7 +160,9 @@ class DelayedEventsHandler: # Loop round handling deltas until we're up to date while True: - with Measure(self._clock, "delayed_events_delta"): + with Measure( + self._clock, self._metrics_collector_registry, "delayed_events_delta" + ): room_max_stream_ordering = self._store.get_room_max_stream_ordering() if self._event_pos == room_max_stream_ordering: return diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 51638251d7..d763c36827 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -87,7 +87,10 @@ class DeviceWorkerHandler: device_list_updater: "DeviceListWorkerUpdater" def __init__(self, hs: "HomeServer"): - self.clock = hs.get_clock() + self.clock = hs.get_clock() # nb must be called this for @measure_func + self.metrics_collector_registry = ( + hs.metrics_collector_registry + ) # nb must be called this for @measure_func self.hs = hs self.store = hs.get_datastores().main self.notifier = hs.get_notifier() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d4fdcd53f6..7fb459fdca 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -481,10 +481,13 @@ class EventCreationHandler: self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.state = hs.get_state_handler() - self.clock = hs.get_clock() self.validator = EventValidator() - self.profile_handler = hs.get_profile_handler() self.event_builder_factory = hs.get_event_builder_factory() + self.clock = hs.get_clock() # nb must be called this for @measure_func + self.metrics_collector_registry = ( + hs.metrics_collector_registry + ) # nb must be called this for @measure_func + self.profile_handler = hs.get_profile_handler() self.server_name = hs.hostname self.notifier = hs.get_notifier() self.config = hs.config diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 111a7915d7..0c045d5aae 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -174,6 +174,7 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.hs = hs self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.presence_router = hs.get_presence_router() @@ -967,7 +968,9 @@ class PresenceHandler(BasePresenceHandler): now = self.clock.time_msec() - with Measure(self.clock, "presence_update_states"): + with Measure( + self.clock, self.metrics_collector_registry, "presence_update_states" + ): # NOTE: We purposefully don't await between now and when we've # calculated what we want to do with the new states, to avoid races. @@ -1524,7 +1527,7 @@ class PresenceHandler(BasePresenceHandler): async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date while True: - with Measure(self.clock, "presence_delta"): + with Measure(self.clock, self.metrics_collector_registry, "presence_delta"): room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self._event_pos == room_max_stream_ordering: return @@ -1798,6 +1801,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): self.get_presence_handler = hs.get_presence_handler self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.store = hs.get_datastores().main self.get_updates_counter = Counter( @@ -1835,7 +1839,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): user_id = user.to_string() stream_change_cache = self.store.presence_stream_cache - with Measure(self.clock, "presence.get_new_events"): + with Measure( + self.clock, self.metrics_collector_registry, "presence.get_new_events" + ): if from_key is not None: from_key = int(from_key) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5307847b68..3d235357e9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -336,6 +336,7 @@ class SyncHandler: self._push_rules_handler = hs.get_push_rules_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.state = hs.get_state_handler() self.auth_blocking = hs.get_auth_blocking() self._storage_controllers = hs.get_storage_controllers() @@ -711,7 +712,7 @@ class SyncHandler: sync_config = sync_result_builder.sync_config - with Measure(self.clock, "ephemeral_by_room"): + with Measure(self.clock, self.metrics_collector_registry, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else 0 room_ids = sync_result_builder.joined_room_ids @@ -784,7 +785,9 @@ class SyncHandler: and current token to send down to clients. newly_joined_room """ - with Measure(self.clock, "load_filtered_recents"): + with Measure( + self.clock, self.metrics_collector_registry, "load_filtered_recents" + ): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = ( sync_config.filter_collection.blocks_all_room_timeline() @@ -1175,7 +1178,9 @@ class SyncHandler: # updates even if they occurred logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - with Measure(self.clock, "compute_state_delta"): + with Measure( + self.clock, self.metrics_collector_registry, "compute_state_delta" + ): # The memberships needed for events in the timeline. # Only calculated when `lazy_load_members` is on. members_to_fetch: Optional[Set[str]] = None @@ -1792,7 +1797,9 @@ class SyncHandler: # the DB. return RoomNotifCounts.empty() - with Measure(self.clock, "unread_notifs_for_room_id"): + with Measure( + self.clock, self.metrics_collector_registry, "unread_notifs_for_room_id" + ): return await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c083b105ff..8f15023a6c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -507,6 +507,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]): def __init__(self, hs: "HomeServer"): self._main_store = hs.get_datastores().main self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry # We can't call get_typing_handler here because there's a cycle: # # Typing -> Notifier -> TypingNotificationEventSource -> Typing @@ -537,7 +538,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]): appservice may be interested in. * The latest known room serial. """ - with Measure(self.clock, "typing.get_new_events_as"): + with Measure( + self.clock, self.metrics_collector_registry, "typing.get_new_events_as" + ): handler = self.get_typing_handler() events = [] @@ -573,7 +576,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]): Find typing notifications for given rooms (> `from_token` and <= `to_token`) """ - with Measure(self.clock, "typing.get_new_events"): + with Measure( + self.clock, self.metrics_collector_registry, "typing.get_new_events" + ): from_key = int(from_key) handler = self.get_typing_handler() diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 33edef5f14..e71f2a0fb1 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -104,6 +104,7 @@ class UserDirectoryHandler(StateDeltasHandler): self._storage_controllers = hs.get_storage_controllers() self.server_name = hs.hostname self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.worker.should_update_user_directory @@ -237,7 +238,7 @@ class UserDirectoryHandler(StateDeltasHandler): # Loop round handling deltas until we're up to date while True: - with Measure(self.clock, "user_dir_delta"): + with Measure(self.clock, self.metrics_collector_registry, "user_dir_delta"): room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self.pos == room_max_stream_ordering: return diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 4ac9d3728a..04323dada5 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -26,6 +26,7 @@ from urllib.request import ( # type: ignore[attr-defined] ) from netaddr import AddrFormatError, IPAddress, IPSet +from prometheus_client import CollectorRegistry from zope.interface import implementer from twisted.internet import defer @@ -101,6 +102,7 @@ class MatrixFederationAgent: user_agent: bytes, ip_allowlist: Optional[IPSet], ip_blocklist: IPSet, + metrics_collector_registry: CollectorRegistry, cache_manager: CacheManager, _srv_resolver: Optional[SrvResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None, @@ -144,6 +146,7 @@ class MatrixFederationAgent: ip_blocklist=ip_blocklist, ), user_agent=self.user_agent, + metrics_collector_registry=metrics_collector_registry, cache_manager=cache_manager, ) diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 18dfd2cf09..5ab673d305 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -25,6 +25,7 @@ from io import BytesIO from typing import Callable, Dict, Optional, Tuple import attr +from prometheus_client import CollectorRegistry from twisted.internet import defer from twisted.internet.interfaces import IReactorTime @@ -91,12 +92,14 @@ class WellKnownResolver: reactor: IReactorTime, agent: IAgent, user_agent: bytes, + metrics_collector_registry: CollectorRegistry, cache_manager: CacheManager, well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None, had_well_known_cache: Optional[TTLCache[bytes, bool]] = None, ): self._reactor = reactor self._clock = Clock(reactor) + self._metrics_collector_registry = metrics_collector_registry self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache( "well-known", @@ -133,7 +136,9 @@ class WellKnownResolver: # TODO: should we linearise so that we don't end up doing two .well-known # requests for the same server in parallel? try: - with Measure(self._clock, "get_well_known"): + with Measure( + self._clock, self._metrics_collector_registry, "get_well_known" + ): result: Optional[bytes] cache_period: float diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index a272f37434..b11737c533 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -422,6 +422,7 @@ class MatrixFederationHttpClient: user_agent=user_agent.encode("ascii"), ip_allowlist=hs.config.server.federation_ip_range_allowlist, ip_blocklist=hs.config.server.federation_ip_range_blocklist, + metrics_collector_registry=hs.metrics_collector_registry, cache_manager=hs.get_cache_manager(), ) else: @@ -452,6 +453,7 @@ class MatrixFederationHttpClient: ) self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 @@ -698,7 +700,11 @@ class MatrixFederationHttpClient: outgoing_requests_counter.labels(request.method).inc() try: - with Measure(self.clock, "outbound_request"): + with Measure( + self.clock, + self.metrics_collector_registry, + "outbound_request", + ): # we don't want all the fancy cookie and redirect handling # that treq.request gives: just use the raw Agent. diff --git a/synapse/module_api/callbacks/spamchecker_callbacks.py b/synapse/module_api/callbacks/spamchecker_callbacks.py index a86b46ba54..0e4d908757 100644 --- a/synapse/module_api/callbacks/spamchecker_callbacks.py +++ b/synapse/module_api/callbacks/spamchecker_callbacks.py @@ -321,6 +321,7 @@ class SpamCheckerModuleApiCallbacks: def __init__(self, hs: "synapse.server.HomeServer") -> None: self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = [] self._should_drop_federated_event_callbacks: List[ @@ -436,7 +437,11 @@ class SpamCheckerModuleApiCallbacks: generally discouraged as it doesn't support internationalization. """ for callback in self._check_event_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(event)) if res is False or res == self.NOT_SPAM: # This spam-checker accepts the event. @@ -489,7 +494,11 @@ class SpamCheckerModuleApiCallbacks: True if the event should be silently dropped """ for callback in self._should_drop_federated_event_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res: Union[bool, str] = await delay_cancellation(callback(event)) if res: return res @@ -511,7 +520,11 @@ class SpamCheckerModuleApiCallbacks: NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise. """ for callback in self._user_may_join_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(user_id, room_id, is_invited)) # Normalize return values to `Codes` or `"NOT_SPAM"`. if res is True or res is self.NOT_SPAM: @@ -550,7 +563,11 @@ class SpamCheckerModuleApiCallbacks: NOT_SPAM if the operation is permitted, Codes otherwise. """ for callback in self._user_may_invite_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation( callback(inviter_userid, invitee_userid, room_id) ) @@ -595,7 +612,11 @@ class SpamCheckerModuleApiCallbacks: NOT_SPAM if the operation is permitted, Codes otherwise. """ for callback in self._user_may_send_3pid_invite_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation( callback(inviter_userid, medium, address, room_id) ) @@ -630,7 +651,11 @@ class SpamCheckerModuleApiCallbacks: userid: The ID of the user attempting to create a room """ for callback in self._user_may_create_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(userid)) if res is True or res is self.NOT_SPAM: continue @@ -664,7 +689,11 @@ class SpamCheckerModuleApiCallbacks: """ for callback in self._user_may_create_room_alias_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(userid, room_alias)) if res is True or res is self.NOT_SPAM: continue @@ -697,7 +726,11 @@ class SpamCheckerModuleApiCallbacks: room_id: The ID of the room that would be published """ for callback in self._user_may_publish_room_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(userid, room_id)) if res is True or res is self.NOT_SPAM: continue @@ -739,7 +772,11 @@ class SpamCheckerModuleApiCallbacks: True if the user is spammy. """ for callback in self._check_username_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): checker_args = inspect.signature(callback) # Make a copy of the user profile object to ensure the spam checker cannot # modify it. @@ -788,7 +825,11 @@ class SpamCheckerModuleApiCallbacks: """ for callback in self._check_registration_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): behaviour = await delay_cancellation( callback(email_threepid, username, request_info, auth_provider_id) ) @@ -830,7 +871,11 @@ class SpamCheckerModuleApiCallbacks: """ for callback in self._check_media_file_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation(callback(file_wrapper, file_info)) # Normalize return values to `Codes` or `"NOT_SPAM"`. if res is False or res is self.NOT_SPAM: @@ -877,7 +922,11 @@ class SpamCheckerModuleApiCallbacks: """ for callback in self._check_login_for_spam_callbacks: - with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"): + with Measure( + self.clock, + self.metrics_collector_registry, + f"{callback.__module__}.{callback.__qualname__}", + ): res = await delay_cancellation( callback( user_id, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 54d7f7f7c4..5e3ccd4173 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -127,7 +127,10 @@ class BulkPushRuleEvaluator: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main - self.clock = hs.get_clock() + self.clock = hs.get_clock() # nb must be called this for @measure_func + self.metrics_collector_registry = ( + hs.metrics_collector_registry + ) # nb must be called this for @measure_func self._event_auth_handler = hs.get_event_auth_handler() self.should_calculate_push_rules = self.hs.config.push.enable_push diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 940f418396..b76cd58cc8 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -79,6 +79,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.federation_event_handler = hs.get_federation_event_handler() @staticmethod @@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_fed_send_events_parse"): + with Measure( + self.clock, self.metrics_collector_registry, "repl_fed_send_events_parse" + ): room_id = content["room_id"] backfilled = content["backfilled"] diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 01952a8d59..24e19e5b01 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -80,6 +80,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry @staticmethod async def _serialize_payload( # type: ignore[override] @@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict, event_id: str ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_send_event_parse"): + with Measure( + self.clock, self.metrics_collector_registry, "repl_send_event_parse" + ): event_dict = content["event"] room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]] internal_metadata = content["internal_metadata"] diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py index d965ce5492..cc3a4c8076 100644 --- a/synapse/replication/http/send_events.py +++ b/synapse/replication/http/send_events.py @@ -81,6 +81,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry @staticmethod async def _serialize_payload( # type: ignore[override] @@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, payload: JsonDict ) -> Tuple[int, JsonDict]: - with Measure(self.clock, "repl_send_events_parse"): + with Measure( + self.clock, self.metrics_collector_registry, "repl_send_events_parse" + ): events_and_context = [] events = payload["events"] rooms = set() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 0bd5478cd3..4c69031b26 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -79,6 +79,7 @@ class ReplicationDataHandler: self.notifier = hs.get_notifier() self._reactor = hs.get_reactor() self._clock = hs.get_clock() + self._metrics_collector_registry = hs.metrics_collector_registry self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() @@ -342,7 +343,11 @@ class ReplicationDataHandler: waiting_list.add((position, deferred)) # We measure here to get in flight counts and average waiting time. - with Measure(self._clock, "repl.wait_for_stream_position"): + with Measure( + self._clock, + self._metrics_collector_registry, + "repl.wait_for_stream_position", + ): logger.info( "Waiting for repl stream %r to reach %s (%s); currently at: %s", stream_name, diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d647a2b332..b6d46ccf03 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -80,6 +80,7 @@ class ReplicationStreamer: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.notifier = hs.get_notifier() self._instance_name = hs.get_instance_name() @@ -155,7 +156,11 @@ class ReplicationStreamer: while self.pending_updates: self.pending_updates = False - with Measure(self.clock, "repl.stream.get_updates"): + with Measure( + self.clock, + self.metrics_collector_registry, + "repl.stream.get_updates", + ): all_streams = self.streams if self._replication_torture_level is not None: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 45b7e64345..9b4189163d 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -188,7 +188,10 @@ class StateHandler: """ def __init__(self, hs: "HomeServer"): - self.clock = hs.get_clock() + self.clock = hs.get_clock() # nb must be called this for @measure_func + self.metrics_collector_registry = ( + hs.metrics_collector_registry + ) # nb must be called this for @measure_func self.store = hs.get_datastores().main self._state_storage_controller = hs.get_storage_controllers().state self.hs = hs @@ -631,6 +634,7 @@ class StateResolutionHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() + self.metrics_collector_registry = hs.metrics_collector_registry self.resolve_linearizer = Linearizer(name="state_resolve_lock") @@ -747,7 +751,9 @@ class StateResolutionHandler: # which will be used as a cache key for future resolutions, but # not get persisted. - with Measure(self.clock, "state.create_group_ids"): + with Measure( + self.clock, self.metrics_collector_registry, "state.create_group_ids" + ): cache = _make_state_cache_entry(new_state, state_groups_ids) self._state_cache[group_names] = cache @@ -785,7 +791,9 @@ class StateResolutionHandler: a map from (type, state_key) to event_id. """ try: - with Measure(self.clock, "state._resolve_events") as m: + with Measure( + self.clock, self.metrics_collector_registry, "state._resolve_events" + ) as m: room_version_obj = KNOWN_ROOM_VERSIONS[room_version] if room_version_obj.state_res == StateResolutionVersions.V1: return await v1.resolve_events_with_store( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b5fe7dd858..799a9e7369 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -56,6 +56,8 @@ class SQLBaseStore(metaclass=ABCMeta): ): self.hs = hs self._clock = hs.get_clock() + self._metrics_collector_registry = hs.metrics_collector_registry + self._metrics_collector_registry = hs.metrics_collector_registry self.database_engine = database.engine self.db_pool = database diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f5131fe291..48606b280c 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -338,6 +338,7 @@ class EventsPersistenceStorageController: self.persist_events_store = stores.persist_events self._clock = hs.get_clock() + self._metrics_collector_registry = hs.metrics_collector_registry self._instance_name = hs.get_instance_name() self.is_mine_id = hs.is_mine_id self._event_persist_queue = _EventPeristenceQueue( @@ -616,7 +617,11 @@ class EventsPersistenceStorageController: state_delta_for_room = None if not backfilled: - with Measure(self._clock, "_calculate_state_and_extrem"): + with Measure( + self._clock, + self._metrics_collector_registry, + "_calculate_state_and_extrem", + ): # Work out the new "current state" for the room. # We do this by working out what the new extremities are and then # calculating the state from that. @@ -627,7 +632,11 @@ class EventsPersistenceStorageController: room_id, chunk ) - with Measure(self._clock, "calculate_chain_cover_index_for_events"): + with Measure( + self._clock, + self._metrics_collector_registry, + "calculate_chain_cover_index_for_events", + ): # We now calculate chain ID/sequence numbers for any state events we're # persisting. We ignore out of band memberships as we're not in the room # and won't have their auth chain (we'll fix it up later if we join the @@ -719,7 +728,11 @@ class EventsPersistenceStorageController: break logger.debug("Calculating state delta for room %s", room_id) - with Measure(self._clock, "persist_events.get_new_state_after_events"): + with Measure( + self._clock, + self._metrics_collector_registry, + "persist_events.get_new_state_after_events", + ): res = await self._get_new_state_after_events( room_id, ev_ctx_rm, @@ -746,7 +759,11 @@ class EventsPersistenceStorageController: # removed keys entirely. delta = DeltaState([], delta_ids) elif current_state is not None: - with Measure(self._clock, "persist_events.calculate_state_delta"): + with Measure( + self._clock, + self._metrics_collector_registry, + "persist_events.calculate_state_delta", + ): delta = await self._calculate_state_delta(room_id, current_state) if delta: diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index f28f5d7e03..ee429e5ec1 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -70,6 +70,7 @@ class StateStorageController: def __init__(self, hs: "HomeServer", stores: "Databases"): self._is_mine_id = hs.is_mine_id self._clock = hs.get_clock() + self._metrics_collector_registry = hs.metrics_collector_registry self.stores = stores self._partial_state_events_tracker = PartialStateEventsTracker(stores.main) self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main) @@ -812,7 +813,7 @@ class StateStorageController: state_group = object() assert state_group is not None - with Measure(self._clock, "get_joined_hosts"): + with Measure(self._clock, self._metrics_collector_registry, "get_joined_hosts"): return await self._get_joined_hosts( room_id, state_group, state_entry=state_entry ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f713f09d68..fac16af6c3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1236,7 +1236,9 @@ class EventsWorkerStore(SQLBaseStore): to event row. Note that it may well contain additional events that were not part of this request. """ - with Measure(self._clock, "_fetch_event_list"): + with Measure( + self._clock, self._metrics_collector_registry, "_fetch_event_list" + ): try: events_to_fetch = { event_id for events, _ in event_list for event_id in events diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index fb08cb5bf8..bab59b4f08 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -917,7 +917,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): `_get_user_ids_from_membership_event_ids` for any uncached events. """ - with Measure(self._clock, "get_joined_user_ids_from_state"): + with Measure( + self._clock, + self._metrics_collector_registry, + "get_joined_user_ids_from_state", + ): users_in_room = set() member_event_ids = [ e_id for key, e_id in state.items() if key[0] == EventTypes.Member diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6a389f7a7e..5be8a9d996 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -79,21 +79,20 @@ class _InFlightMetric(Protocol): real_time_sum: float -# Tracks the number of blocks currently active -in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge( - "synapse_util_metrics_block_in_flight", - "", - labels=["block_name"], - sub_metrics=["real_time_max", "real_time_sum"], -) +metrics_collector_registry_to_in_flight_gauge: Dict[ + CollectorRegistry, InFlightGauge +] = {} P = ParamSpec("P") R = TypeVar("R") -class HasClock(Protocol): +class HasClockAndMetricsCollectorRegistry(Protocol): + # Used to measure functions clock: Clock + # Used to namespace the metrics to the given homeserver + metrics_collector_registry: CollectorRegistry def measure_func( @@ -119,13 +118,17 @@ def measure_func( """ def wrapper( - func: Callable[Concatenate[HasClock, P], Awaitable[R]], + func: Callable[ + Concatenate[HasClockAndMetricsCollectorRegistry, P], Awaitable[R] + ], ) -> Callable[P, Awaitable[R]]: block_name = func.__name__ if name is None else name @wraps(func) - async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R: - with Measure(self.clock, block_name): + async def measured_func( + self: HasClockAndMetricsCollectorRegistry, *args: P.args, **kwargs: P.kwargs + ) -> R: + with Measure(self.clock, self.metrics_collector_registry, block_name): r = await func(self, *args, **kwargs) return r @@ -141,12 +144,15 @@ def measure_func( class Measure: __slots__ = [ "clock", + "metrics_collector_registry", "name", "_logging_context", "start", ] - def __init__(self, clock: Clock, name: str) -> None: + def __init__( + self, clock: Clock, metrics_collector_registry: CollectorRegistry, name: str + ) -> None: """ Args: clock: An object with a "time()" method, which returns the current @@ -154,6 +160,7 @@ class Measure: name: The name of the metric to report. """ self.clock = clock + self.metrics_collector_registry = metrics_collector_registry self.name = name curr_context = current_context() if not curr_context: @@ -168,13 +175,30 @@ class Measure: self._logging_context = LoggingContext(str(curr_context), parent_context) self.start: Optional[float] = None + def get_in_flight_gauge(self) -> InFlightGauge[_InFlightMetric]: + existing_in_flight_gauge = metrics_collector_registry_to_in_flight_gauge.get( + self.metrics_collector_registry, None + ) + if existing_in_flight_gauge is not None: + return existing_in_flight_gauge + + # Tracks the number of blocks currently active + in_flight_gauge: InFlightGauge[_InFlightMetric] = InFlightGauge( + "synapse_util_metrics_block_in_flight", + "", + labels=["block_name"], + sub_metrics=["real_time_max", "real_time_sum"], + ) + + return in_flight_gauge + def __enter__(self) -> "Measure": if self.start is not None: raise RuntimeError("Measure() objects cannot be re-used") self.start = self.clock.time() self._logging_context.__enter__() - in_flight.register((self.name,), self._update_in_flight) + self.get_in_flight_gauge().register((self.name,), self._update_in_flight) logger.debug("Entering block %s", self.name) @@ -194,7 +218,7 @@ class Measure: duration = self.clock.time() - self.start usage = self.get_resource_usage() - in_flight.unregister((self.name,), self._update_in_flight) + self.get_in_flight_gauge().unregister((self.name,), self._update_in_flight) self._logging_context.__exit__(exc_type, exc_val, exc_tb) try: diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 877db33890..896c176029 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -105,6 +105,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ip_allowlist=None, ip_blocklist=IPSet(), # After we get access to the `hs` homeserver instance, we can replace the federation agent + metrics_collector_registry=hs.metrics_collector_registry, cache_manager=hs.get_cache_manager(), ) diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index d9c568801e..cf1a669aa5 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -25,6 +25,7 @@ from unittest.mock import AsyncMock, Mock, call, patch import treq from netaddr import IPSet +from prometheus_client import CollectorRegistry from service_identity import VerificationError from zope.interface import implementer @@ -101,6 +102,7 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor, Agent(self.reactor, contextFactory=self.tls_factory), b"test-agent", + metrics_collector_registry=CollectorRegistry(auto_describe=True), cache_manager=cache_manager, well_known_cache=self.well_known_cache, had_well_known_cache=self.had_well_known_cache, @@ -282,6 +284,7 @@ class MatrixFederationAgentTests(unittest.TestCase): user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided. ip_allowlist=IPSet(), ip_blocklist=IPSet(), + metrics_collector_registry=CollectorRegistry(auto_describe=True), cache_manager=Mock(spec=CacheManager), _srv_resolver=self.mock_resolver, _well_known_resolver=self.well_known_resolver, @@ -1025,12 +1028,14 @@ class MatrixFederationAgentTests(unittest.TestCase): user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below. ip_allowlist=IPSet(), ip_blocklist=IPSet(), + metrics_collector_registry=CollectorRegistry(auto_describe=True), cache_manager=Mock(spec=CacheManager), _srv_resolver=self.mock_resolver, _well_known_resolver=WellKnownResolver( cast(ISynapseReactor, self.reactor), Agent(self.reactor, contextFactory=tls_factory), b"test-agent", + metrics_collector_registry=CollectorRegistry(auto_describe=True), cache_manager=Mock(spec=CacheManager), well_known_cache=self.well_known_cache, had_well_known_cache=self.had_well_known_cache, diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 37417ad855..b617bbc0d8 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -22,6 +22,7 @@ import logging from unittest.mock import AsyncMock, Mock from netaddr import IPSet +from prometheus_client import CollectorRegistry from signedjson.key import ( encode_verify_key_base64, generate_signing_key, @@ -74,6 +75,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): user_agent=b"SynapseInTrialTest/0.0.0", ip_allowlist=None, ip_blocklist=IPSet(), + metrics_collector_registry=CollectorRegistry(auto_describe=True), cache_manager=Mock(spec=CacheManager), )