Update Measure and @measure_func
This commit is contained in:
@@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.is_mine_server_name = hs.is_mine_server_name
|
||||
@@ -157,7 +158,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_destinations.keys()
|
||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||
|
||||
@@ -367,6 +367,7 @@ class FederationSender(AbstractFederationSender):
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.is_mine_server_name = hs.is_mine_server_name
|
||||
|
||||
@@ -650,7 +651,11 @@ class FederationSender(AbstractFederationSender):
|
||||
logger.debug(
|
||||
"Handling %i events in room %s", len(events), events[0].room_id
|
||||
)
|
||||
with Measure(self.clock, "handle_room_events"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"handle_room_events",
|
||||
):
|
||||
for event in events:
|
||||
await handle_event(event)
|
||||
|
||||
|
||||
@@ -60,6 +60,9 @@ class TransactionManager:
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
self._server_name = hs.hostname
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self._store = hs.get_datastores().main
|
||||
self._transaction_actions = TransactionActions(self._store)
|
||||
self._transport_layer = hs.get_federation_transport_client()
|
||||
|
||||
@@ -77,6 +77,7 @@ class ApplicationServicesHandler:
|
||||
self.scheduler = hs.get_application_service_scheduler()
|
||||
self.started_scheduler = False
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notify_appservices = hs.config.worker.should_notify_appservices
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self._msc2409_to_device_messages_enabled = (
|
||||
@@ -124,7 +125,9 @@ class ApplicationServicesHandler:
|
||||
|
||||
@wrap_as_background_process("notify_interested_services")
|
||||
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||
with Measure(self.clock, "notify_interested_services"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "notify_interested_services"
|
||||
):
|
||||
self.is_processing = True
|
||||
try:
|
||||
upper_bound = -1
|
||||
@@ -333,7 +336,11 @@ class ApplicationServicesHandler:
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
logger.debug("Checking interested services for %s", stream_key)
|
||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"notify_interested_services_ephemeral",
|
||||
):
|
||||
for service in services:
|
||||
if stream_key == StreamKeyType.TYPING:
|
||||
# Note that we don't persist the token (via set_appservice_stream_type_pos)
|
||||
|
||||
@@ -58,6 +58,7 @@ class DelayedEventsHandler:
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._config = hs.config
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._event_creation_handler = hs.get_event_creation_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
@@ -159,7 +160,9 @@ class DelayedEventsHandler:
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self._clock, "delayed_events_delta"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "delayed_events_delta"
|
||||
):
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
@@ -87,7 +87,10 @@ class DeviceWorkerHandler:
|
||||
device_list_updater: "DeviceListWorkerUpdater"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
@@ -481,10 +481,13 @@ class EventCreationHandler:
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.state = hs.get_state_handler()
|
||||
self.clock = hs.get_clock()
|
||||
self.validator = EventValidator()
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
self.event_builder_factory = hs.get_event_builder_factory()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.notifier = hs.get_notifier()
|
||||
self.config = hs.config
|
||||
|
||||
@@ -174,6 +174,7 @@ class BasePresenceHandler(abc.ABC):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.presence_router = hs.get_presence_router()
|
||||
@@ -967,7 +968,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
with Measure(self.clock, "presence_update_states"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "presence_update_states"
|
||||
):
|
||||
# NOTE: We purposefully don't await between now and when we've
|
||||
# calculated what we want to do with the new states, to avoid races.
|
||||
|
||||
@@ -1524,7 +1527,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "presence_delta"):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
@@ -1798,6 +1801,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
self.get_presence_handler = hs.get_presence_handler
|
||||
self.get_presence_router = hs.get_presence_router
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
self.get_updates_counter = Counter(
|
||||
@@ -1835,7 +1839,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
user_id = user.to_string()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
|
||||
with Measure(self.clock, "presence.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "presence.get_new_events"
|
||||
):
|
||||
if from_key is not None:
|
||||
from_key = int(from_key)
|
||||
|
||||
|
||||
@@ -336,6 +336,7 @@ class SyncHandler:
|
||||
self._push_rules_handler = hs.get_push_rules_handler()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth_blocking = hs.get_auth_blocking()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
@@ -711,7 +712,7 @@ class SyncHandler:
|
||||
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
with Measure(self.clock, "ephemeral_by_room"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "ephemeral_by_room"):
|
||||
typing_key = since_token.typing_key if since_token else 0
|
||||
|
||||
room_ids = sync_result_builder.joined_room_ids
|
||||
@@ -784,7 +785,9 @@ class SyncHandler:
|
||||
and current token to send down to clients.
|
||||
newly_joined_room
|
||||
"""
|
||||
with Measure(self.clock, "load_filtered_recents"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "load_filtered_recents"
|
||||
):
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
block_all_timeline = (
|
||||
sync_config.filter_collection.blocks_all_room_timeline()
|
||||
@@ -1175,7 +1178,9 @@ class SyncHandler:
|
||||
# updates even if they occurred logically before the previous event.
|
||||
# TODO(mjark) Check for new redactions in the state events.
|
||||
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "compute_state_delta"
|
||||
):
|
||||
# The memberships needed for events in the timeline.
|
||||
# Only calculated when `lazy_load_members` is on.
|
||||
members_to_fetch: Optional[Set[str]] = None
|
||||
@@ -1792,7 +1797,9 @@ class SyncHandler:
|
||||
# the DB.
|
||||
return RoomNotifCounts.empty()
|
||||
|
||||
with Measure(self.clock, "unread_notifs_for_room_id"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "unread_notifs_for_room_id"
|
||||
):
|
||||
return await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id,
|
||||
sync_config.user.to_string(),
|
||||
|
||||
@@ -507,6 +507,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._main_store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
# We can't call get_typing_handler here because there's a cycle:
|
||||
#
|
||||
# Typing -> Notifier -> TypingNotificationEventSource -> Typing
|
||||
@@ -537,7 +538,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
appservice may be interested in.
|
||||
* The latest known room serial.
|
||||
"""
|
||||
with Measure(self.clock, "typing.get_new_events_as"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "typing.get_new_events_as"
|
||||
):
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
events = []
|
||||
@@ -573,7 +576,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "typing.get_new_events"
|
||||
):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.update_user_directory = hs.config.worker.should_update_user_directory
|
||||
@@ -237,7 +238,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "user_dir_delta"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "user_dir_delta"):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self.pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
@@ -26,6 +26,7 @@ from urllib.request import ( # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
from netaddr import AddrFormatError, IPAddress, IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -101,6 +102,7 @@ class MatrixFederationAgent:
|
||||
user_agent: bytes,
|
||||
ip_allowlist: Optional[IPSet],
|
||||
ip_blocklist: IPSet,
|
||||
metrics_collector_registry: CollectorRegistry,
|
||||
cache_manager: CacheManager,
|
||||
_srv_resolver: Optional[SrvResolver] = None,
|
||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||
@@ -144,6 +146,7 @@ class MatrixFederationAgent:
|
||||
ip_blocklist=ip_blocklist,
|
||||
),
|
||||
user_agent=self.user_agent,
|
||||
metrics_collector_registry=metrics_collector_registry,
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from io import BytesIO
|
||||
from typing import Callable, Dict, Optional, Tuple
|
||||
|
||||
import attr
|
||||
from prometheus_client import CollectorRegistry
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IReactorTime
|
||||
@@ -91,12 +92,14 @@ class WellKnownResolver:
|
||||
reactor: IReactorTime,
|
||||
agent: IAgent,
|
||||
user_agent: bytes,
|
||||
metrics_collector_registry: CollectorRegistry,
|
||||
cache_manager: CacheManager,
|
||||
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
||||
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
self._metrics_collector_registry = metrics_collector_registry
|
||||
|
||||
self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
|
||||
"well-known",
|
||||
@@ -133,7 +136,9 @@ class WellKnownResolver:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
try:
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "get_well_known"
|
||||
):
|
||||
result: Optional[bytes]
|
||||
cache_period: float
|
||||
|
||||
|
||||
@@ -422,6 +422,7 @@ class MatrixFederationHttpClient:
|
||||
user_agent=user_agent.encode("ascii"),
|
||||
ip_allowlist=hs.config.server.federation_ip_range_allowlist,
|
||||
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
|
||||
metrics_collector_registry=hs.metrics_collector_registry,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
else:
|
||||
@@ -452,6 +453,7 @@ class MatrixFederationHttpClient:
|
||||
)
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._store = hs.get_datastores().main
|
||||
self.version_string_bytes = hs.version_string.encode("ascii")
|
||||
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
|
||||
@@ -698,7 +700,11 @@ class MatrixFederationHttpClient:
|
||||
outgoing_requests_counter.labels(request.method).inc()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "outbound_request"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"outbound_request",
|
||||
):
|
||||
# we don't want all the fancy cookie and redirect handling
|
||||
# that treq.request gives: just use the raw Agent.
|
||||
|
||||
|
||||
@@ -321,6 +321,7 @@ class SpamCheckerModuleApiCallbacks:
|
||||
|
||||
def __init__(self, hs: "synapse.server.HomeServer") -> None:
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
|
||||
self._should_drop_federated_event_callbacks: List[
|
||||
@@ -436,7 +437,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
generally discouraged as it doesn't support internationalization.
|
||||
"""
|
||||
for callback in self._check_event_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(event))
|
||||
if res is False or res == self.NOT_SPAM:
|
||||
# This spam-checker accepts the event.
|
||||
@@ -489,7 +494,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the event should be silently dropped
|
||||
"""
|
||||
for callback in self._should_drop_federated_event_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res: Union[bool, str] = await delay_cancellation(callback(event))
|
||||
if res:
|
||||
return res
|
||||
@@ -511,7 +520,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
|
||||
"""
|
||||
for callback in self._user_may_join_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(user_id, room_id, is_invited))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
@@ -550,7 +563,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, invitee_userid, room_id)
|
||||
)
|
||||
@@ -595,7 +612,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_send_3pid_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, medium, address, room_id)
|
||||
)
|
||||
@@ -630,7 +651,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
userid: The ID of the user attempting to create a room
|
||||
"""
|
||||
for callback in self._user_may_create_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(userid))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@@ -664,7 +689,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
|
||||
"""
|
||||
for callback in self._user_may_create_room_alias_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_alias))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@@ -697,7 +726,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
room_id: The ID of the room that would be published
|
||||
"""
|
||||
for callback in self._user_may_publish_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_id))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@@ -739,7 +772,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the user is spammy.
|
||||
"""
|
||||
for callback in self._check_username_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
checker_args = inspect.signature(callback)
|
||||
# Make a copy of the user profile object to ensure the spam checker cannot
|
||||
# modify it.
|
||||
@@ -788,7 +825,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_registration_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
behaviour = await delay_cancellation(
|
||||
callback(email_threepid, username, request_info, auth_provider_id)
|
||||
)
|
||||
@@ -830,7 +871,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_media_file_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(file_wrapper, file_info))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is False or res is self.NOT_SPAM:
|
||||
@@ -877,7 +922,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_login_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(
|
||||
user_id,
|
||||
|
||||
@@ -127,7 +127,10 @@ class BulkPushRuleEvaluator:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
||||
|
||||
|
||||
@@ -79,6 +79,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.federation_event_handler = hs.get_federation_event_handler()
|
||||
|
||||
@staticmethod
|
||||
@@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_fed_send_events_parse"
|
||||
):
|
||||
room_id = content["room_id"]
|
||||
backfilled = content["backfilled"]
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
@@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict, event_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_event_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_send_event_parse"
|
||||
):
|
||||
event_dict = content["event"]
|
||||
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
||||
internal_metadata = content["internal_metadata"]
|
||||
|
||||
@@ -81,6 +81,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
@@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, payload: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_send_events_parse"
|
||||
):
|
||||
events_and_context = []
|
||||
events = payload["events"]
|
||||
rooms = set()
|
||||
|
||||
@@ -79,6 +79,7 @@ class ReplicationDataHandler:
|
||||
self.notifier = hs.get_notifier()
|
||||
self._reactor = hs.get_reactor()
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._streams = hs.get_replication_streams()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._typing_handler = hs.get_typing_handler()
|
||||
@@ -342,7 +343,11 @@ class ReplicationDataHandler:
|
||||
waiting_list.add((position, deferred))
|
||||
|
||||
# We measure here to get in flight counts and average waiting time.
|
||||
with Measure(self._clock, "repl.wait_for_stream_position"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"repl.wait_for_stream_position",
|
||||
):
|
||||
logger.info(
|
||||
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
|
||||
stream_name,
|
||||
|
||||
@@ -80,6 +80,7 @@ class ReplicationStreamer:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
@@ -155,7 +156,11 @@ class ReplicationStreamer:
|
||||
while self.pending_updates:
|
||||
self.pending_updates = False
|
||||
|
||||
with Measure(self.clock, "repl.stream.get_updates"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"repl.stream.get_updates",
|
||||
):
|
||||
all_streams = self.streams
|
||||
|
||||
if self._replication_torture_level is not None:
|
||||
|
||||
@@ -188,7 +188,10 @@ class StateHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.store = hs.get_datastores().main
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
self.hs = hs
|
||||
@@ -631,6 +634,7 @@ class StateResolutionHandler:
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
||||
|
||||
@@ -747,7 +751,9 @@ class StateResolutionHandler:
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
|
||||
with Measure(self.clock, "state.create_group_ids"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "state.create_group_ids"
|
||||
):
|
||||
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
||||
|
||||
self._state_cache[group_names] = cache
|
||||
@@ -785,7 +791,9 @@ class StateResolutionHandler:
|
||||
a map from (type, state_key) to event_id.
|
||||
"""
|
||||
try:
|
||||
with Measure(self.clock, "state._resolve_events") as m:
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "state._resolve_events"
|
||||
) as m:
|
||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||
if room_version_obj.state_res == StateResolutionVersions.V1:
|
||||
return await v1.resolve_events_with_store(
|
||||
|
||||
@@ -56,6 +56,8 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.database_engine = database.engine
|
||||
self.db_pool = database
|
||||
|
||||
|
||||
@@ -338,6 +338,7 @@ class EventsPersistenceStorageController:
|
||||
self.persist_events_store = stores.persist_events
|
||||
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self._event_persist_queue = _EventPeristenceQueue(
|
||||
@@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
|
||||
state_delta_for_room = None
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"_calculate_state_and_extrem",
|
||||
):
|
||||
# Work out the new "current state" for the room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
# calculating the state from that.
|
||||
@@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
|
||||
room_id, chunk
|
||||
)
|
||||
|
||||
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"calculate_chain_cover_index_for_events",
|
||||
):
|
||||
# We now calculate chain ID/sequence numbers for any state events we're
|
||||
# persisting. We ignore out of band memberships as we're not in the room
|
||||
# and won't have their auth chain (we'll fix it up later if we join the
|
||||
@@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
|
||||
break
|
||||
|
||||
logger.debug("Calculating state delta for room %s", room_id)
|
||||
with Measure(self._clock, "persist_events.get_new_state_after_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"persist_events.get_new_state_after_events",
|
||||
):
|
||||
res = await self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
@@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
|
||||
# removed keys entirely.
|
||||
delta = DeltaState([], delta_ids)
|
||||
elif current_state is not None:
|
||||
with Measure(self._clock, "persist_events.calculate_state_delta"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"persist_events.calculate_state_delta",
|
||||
):
|
||||
delta = await self._calculate_state_delta(room_id, current_state)
|
||||
|
||||
if delta:
|
||||
|
||||
@@ -70,6 +70,7 @@ class StateStorageController:
|
||||
def __init__(self, hs: "HomeServer", stores: "Databases"):
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.stores = stores
|
||||
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
|
||||
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
|
||||
@@ -812,7 +813,7 @@ class StateStorageController:
|
||||
state_group = object()
|
||||
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_hosts"):
|
||||
with Measure(self._clock, self._metrics_collector_registry, "get_joined_hosts"):
|
||||
return await self._get_joined_hosts(
|
||||
room_id, state_group, state_entry=state_entry
|
||||
)
|
||||
|
||||
@@ -1236,7 +1236,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
to event row. Note that it may well contain additional events that
|
||||
were not part of this request.
|
||||
"""
|
||||
with Measure(self._clock, "_fetch_event_list"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "_fetch_event_list"
|
||||
):
|
||||
try:
|
||||
events_to_fetch = {
|
||||
event_id for events, _ in event_list for event_id in events
|
||||
|
||||
@@ -917,7 +917,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
`_get_user_ids_from_membership_event_ids` for any uncached events.
|
||||
"""
|
||||
|
||||
with Measure(self._clock, "get_joined_user_ids_from_state"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"get_joined_user_ids_from_state",
|
||||
):
|
||||
users_in_room = set()
|
||||
member_event_ids = [
|
||||
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
|
||||
|
||||
+38
-14
@@ -79,21 +79,20 @@ class _InFlightMetric(Protocol):
|
||||
real_time_sum: float
|
||||
|
||||
|
||||
# Tracks the number of blocks currently active
|
||||
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
||||
"synapse_util_metrics_block_in_flight",
|
||||
"",
|
||||
labels=["block_name"],
|
||||
sub_metrics=["real_time_max", "real_time_sum"],
|
||||
)
|
||||
metrics_collector_registry_to_in_flight_gauge: Dict[
|
||||
CollectorRegistry, InFlightGauge
|
||||
] = {}
|
||||
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class HasClock(Protocol):
|
||||
class HasClockAndMetricsCollectorRegistry(Protocol):
|
||||
# Used to measure functions
|
||||
clock: Clock
|
||||
# Used to namespace the metrics to the given homeserver
|
||||
metrics_collector_registry: CollectorRegistry
|
||||
|
||||
|
||||
def measure_func(
|
||||
@@ -119,13 +118,17 @@ def measure_func(
|
||||
"""
|
||||
|
||||
def wrapper(
|
||||
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
|
||||
func: Callable[
|
||||
Concatenate[HasClockAndMetricsCollectorRegistry, P], Awaitable[R]
|
||||
],
|
||||
) -> Callable[P, Awaitable[R]]:
|
||||
block_name = func.__name__ if name is None else name
|
||||
|
||||
@wraps(func)
|
||||
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
|
||||
with Measure(self.clock, block_name):
|
||||
async def measured_func(
|
||||
self: HasClockAndMetricsCollectorRegistry, *args: P.args, **kwargs: P.kwargs
|
||||
) -> R:
|
||||
with Measure(self.clock, self.metrics_collector_registry, block_name):
|
||||
r = await func(self, *args, **kwargs)
|
||||
return r
|
||||
|
||||
@@ -141,12 +144,15 @@ def measure_func(
|
||||
class Measure:
|
||||
__slots__ = [
|
||||
"clock",
|
||||
"metrics_collector_registry",
|
||||
"name",
|
||||
"_logging_context",
|
||||
"start",
|
||||
]
|
||||
|
||||
def __init__(self, clock: Clock, name: str) -> None:
|
||||
def __init__(
|
||||
self, clock: Clock, metrics_collector_registry: CollectorRegistry, name: str
|
||||
) -> None:
|
||||
"""
|
||||
Args:
|
||||
clock: An object with a "time()" method, which returns the current
|
||||
@@ -154,6 +160,7 @@ class Measure:
|
||||
name: The name of the metric to report.
|
||||
"""
|
||||
self.clock = clock
|
||||
self.metrics_collector_registry = metrics_collector_registry
|
||||
self.name = name
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
@@ -168,13 +175,30 @@ class Measure:
|
||||
self._logging_context = LoggingContext(str(curr_context), parent_context)
|
||||
self.start: Optional[float] = None
|
||||
|
||||
def get_in_flight_gauge(self) -> InFlightGauge[_InFlightMetric]:
|
||||
existing_in_flight_gauge = metrics_collector_registry_to_in_flight_gauge.get(
|
||||
self.metrics_collector_registry, None
|
||||
)
|
||||
if existing_in_flight_gauge is not None:
|
||||
return existing_in_flight_gauge
|
||||
|
||||
# Tracks the number of blocks currently active
|
||||
in_flight_gauge: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
||||
"synapse_util_metrics_block_in_flight",
|
||||
"",
|
||||
labels=["block_name"],
|
||||
sub_metrics=["real_time_max", "real_time_sum"],
|
||||
)
|
||||
|
||||
return in_flight_gauge
|
||||
|
||||
def __enter__(self) -> "Measure":
|
||||
if self.start is not None:
|
||||
raise RuntimeError("Measure() objects cannot be re-used")
|
||||
|
||||
self.start = self.clock.time()
|
||||
self._logging_context.__enter__()
|
||||
in_flight.register((self.name,), self._update_in_flight)
|
||||
self.get_in_flight_gauge().register((self.name,), self._update_in_flight)
|
||||
|
||||
logger.debug("Entering block %s", self.name)
|
||||
|
||||
@@ -194,7 +218,7 @@ class Measure:
|
||||
duration = self.clock.time() - self.start
|
||||
usage = self.get_resource_usage()
|
||||
|
||||
in_flight.unregister((self.name,), self._update_in_flight)
|
||||
self.get_in_flight_gauge().unregister((self.name,), self._update_in_flight)
|
||||
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
try:
|
||||
|
||||
@@ -105,6 +105,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
ip_allowlist=None,
|
||||
ip_blocklist=IPSet(),
|
||||
# After we get access to the `hs` homeserver instance, we can replace the federation agent
|
||||
metrics_collector_registry=hs.metrics_collector_registry,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from unittest.mock import AsyncMock, Mock, call, patch
|
||||
|
||||
import treq
|
||||
from netaddr import IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from service_identity import VerificationError
|
||||
from zope.interface import implementer
|
||||
|
||||
@@ -101,6 +102,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
b"test-agent",
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=cache_manager,
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
@@ -282,6 +284,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
|
||||
ip_allowlist=IPSet(),
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_resolver=self.well_known_resolver,
|
||||
@@ -1025,12 +1028,14 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
|
||||
ip_allowlist=IPSet(),
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_resolver=WellKnownResolver(
|
||||
cast(ISynapseReactor, self.reactor),
|
||||
Agent(self.reactor, contextFactory=tls_factory),
|
||||
b"test-agent",
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
|
||||
@@ -22,6 +22,7 @@ import logging
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from netaddr import IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from signedjson.key import (
|
||||
encode_verify_key_base64,
|
||||
generate_signing_key,
|
||||
@@ -74,6 +75,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
ip_allowlist=None,
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user