1
0

Compare commits

...

42 Commits

Author SHA1 Message Date
Eric Eastwood 1c51b74e2d Merge branch 'develop' into madlittlemods/hs-specific-metrics
Conflicts:
	synapse/metrics/__init__.py
2025-06-20 11:24:57 -05:00
Eric Eastwood ec16224828 Remove unrelated comment from reviewing another PR
From my review of https://github.com/element-hq/synapse/pull/18416
2025-06-13 16:37:02 -05:00
Eric Eastwood 00140fcc99 Merge branch 'develop' into madlittlemods/hs-specific-metrics
Conflicts:
	docs/usage/configuration/config_documentation.md
	synapse/module_api/callbacks/spamchecker_callbacks.py
2025-06-11 16:37:39 -05:00
Eric Eastwood 6e49db78ce Revert "Reapply "WIP: @cached progress""
This reverts commit 827b78554f.
2025-05-21 13:13:10 -05:00
Eric Eastwood d4d59a49b3 Reapply "Reapply "Revert LruCache CacheManager changes for now""
This reverts commit e36c6f7803.
2025-05-21 13:13:03 -05:00
Eric Eastwood 5f5b3496f9 Reapply "Fixup after LruCache changes revert"
This reverts commit 962d5cecbf.
2025-05-21 13:12:57 -05:00
Eric Eastwood e433a680a1 Revert "WIP: Idea on managing caches"
This reverts commit c5bbff9584.
2025-05-21 13:12:32 -05:00
Eric Eastwood c5bbff9584 WIP: Idea on managing caches 2025-05-21 13:11:47 -05:00
Eric Eastwood 6a11964228 Interim comment 2025-05-21 09:29:39 -05:00
Eric Eastwood 962d5cecbf Revert "Fixup after LruCache changes revert"
This reverts commit 9f82f66c76.
2025-05-21 09:28:46 -05:00
Eric Eastwood e36c6f7803 Revert "Reapply "Revert LruCache CacheManager changes for now""
This reverts commit 3b12305376.
2025-05-21 09:28:36 -05:00
Eric Eastwood 827b78554f Reapply "WIP: @cached progress"
This reverts commit b4ed25e4fb.
2025-05-21 09:28:27 -05:00
Eric Eastwood 57546bffad Fix in-flight gauges 2025-05-20 16:15:51 -05:00
Eric Eastwood 9f82f66c76 Fixup after LruCache changes revert 2025-05-20 16:11:44 -05:00
Eric Eastwood 3b12305376 Reapply "Revert LruCache CacheManager changes for now"
This reverts commit a707a25389.
2025-05-20 16:10:36 -05:00
Eric Eastwood b4ed25e4fb Revert "WIP: @cached progress"
This reverts commit f68211bd0c.
2025-05-20 16:10:23 -05:00
Eric Eastwood 356792fe6d Add to correct registry 2025-05-20 16:09:33 -05:00
Eric Eastwood f75ea5cfcd Fix DictionaryCache 2025-05-20 16:01:05 -05:00
Eric Eastwood f68211bd0c WIP: @cached progress 2025-05-20 15:27:50 -05:00
Eric Eastwood a707a25389 Revert "Revert LruCache CacheManager changes for now"
This reverts commit ccd0a71395.
2025-05-20 15:06:38 -05:00
Eric Eastwood 627a77bc5e Update Measure and @measure_func 2025-05-20 15:04:34 -05:00
Eric Eastwood 6314f6cba0 Revert "Fix lints"
This reverts commit 4a5c2f89c7.
2025-05-20 14:22:22 -05:00
Eric Eastwood 4a5c2f89c7 Fix lints 2025-05-20 14:21:57 -05:00
Eric Eastwood fc04be1bed Merge branch 'develop' into madlittlemods/hs-specific-metrics
Conflicts:
	synapse/app/generic_worker.py
	synapse/app/homeserver.py
2025-05-20 14:21:19 -05:00
Eric Eastwood 2521a17ef4 Add changelog 2025-05-20 14:16:24 -05:00
Eric Eastwood e72d782189 Fix current lints 2025-05-20 14:15:33 -05:00
Eric Eastwood efa6018519 Move LaterGauge further along 2025-05-20 13:58:02 -05:00
Eric Eastwood 65080f8464 More cleanup 2025-05-20 13:35:00 -05:00
Eric Eastwood 1400e155b2 Fix most lints 2025-05-20 13:29:12 -05:00
Eric Eastwood ccd0a71395 Revert LruCache CacheManager changes for now 2025-05-20 13:24:49 -05:00
Eric Eastwood 4018aa4d98 Fix incorrect @cache_in_self underscore prefix in tests 2025-05-20 13:17:07 -05:00
Eric Eastwood f49789bfd0 Fix LruCache @overload 2025-05-20 13:03:49 -05:00
Eric Eastwood 9943cd39ad Fix return lints 2025-05-20 12:53:51 -05:00
Eric Eastwood d3d228ea2d More cache updates (LruCache not finished) 2025-05-19 16:59:17 -05:00
Eric Eastwood d3c23f8235 Partial changes for CacheManager 2025-05-19 16:09:16 -05:00
Eric Eastwood 3de2c0970e Move jemalloc metrics further along 2025-05-19 14:26:27 -05:00
Eric Eastwood 3215897de6 Try organize some of the global metrics 2025-05-16 17:54:39 -05:00
Eric Eastwood 86f39b04a6 Partial Counter update 2025-05-16 17:10:22 -05:00
Eric Eastwood 72b3becb89 Refactor more custom Collector to be stubbed to use hs.metrics_collector_registry 2025-05-15 16:49:39 -05:00
Eric Eastwood a6dd8a96bb Refactor our custom Collector to accept a registry 2025-05-15 16:47:36 -05:00
Eric Eastwood 9c6863e1a7 Remove metrics listener type in favor of http listener with metrics resource 2025-05-15 16:20:41 -05:00
Eric Eastwood c07d3108cc Move a couple metrics to homeserver specific CollectorRegistry 2025-05-15 15:45:26 -05:00
89 changed files with 1127 additions and 613 deletions
+1
View File
@@ -0,0 +1 @@
Refactor metrics to collect in homeserver-specific registry.
+4 -2
View File
@@ -53,15 +53,17 @@
type: http
x_forwarded: true
bind_addresses: ['::1', '127.0.0.1']
resources:
- names: [client, federation]
compress: false
# beginning of the new metrics listener
- port: 9000
type: metrics
type: http
bind_addresses: ['::1', '127.0.0.1']
resources:
- names: [metrics]
compress: false
```
1. Restart Synapse.
+1 -2
View File
@@ -451,8 +451,7 @@ properties:
type: string
description: >-
The type of listener. Normally `http`, but other valid options are
[`manhole`](../../manhole.md) and
[`metrics`](../../metrics-howto.md).
[`manhole`](../../manhole.md).
enum:
- http
- manhole
+1
View File
@@ -207,6 +207,7 @@ class MSC3861DelegatedAuth(BaseAuth):
# the old access token to continue working for a short time.
self._introspection_cache: ResponseCache[str] = ResponseCache(
self._clock,
hs.get_cache_manager(),
"token_introspection",
timeout_ms=120_000,
# don't log because the keys are access tokens
+22 -18
View File
@@ -74,9 +74,13 @@ from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.http.site import SynapseSite
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.metrics import CPUMetrics, install_gc_manager, register_threadpool
from synapse.metrics._gc import GCCounts, PyPyGCStats, running_on_pypy
from synapse.metrics._reactor_metrics import setup_reactor_metrics
from synapse.metrics.background_process_metrics import (
BackgroundProcessCollector,
wrap_as_background_process,
)
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
load_legacy_third_party_event_rules,
@@ -178,7 +182,6 @@ def start_reactor(
def run() -> None:
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
@@ -283,20 +286,6 @@ def register_start(
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
"""
Start Prometheus metrics server.
"""
from prometheus_client import start_http_server as start_http_server_prometheus
from synapse.metrics import RegistryProxy
for host in bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
_set_prometheus_client_use_created_metrics(False)
start_http_server_prometheus(port, addr=host, registry=RegistryProxy)
def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
"""
Sets whether prometheus_client should expose `_created`-suffixed metrics for
@@ -607,6 +596,7 @@ async def start(hs: "HomeServer") -> None:
)
setup_sentry(hs)
setup_global_metrics(hs)
setup_sdnotify(hs)
# If background tasks are running on the main process or this is the worker in
@@ -694,6 +684,20 @@ def setup_sentry(hs: "HomeServer") -> None:
global_scope.set_tag("worker_name", name)
def setup_global_metrics(hs: "HomeServer") -> None:
"""Set up global metrics for this homeserver.
This is called after the homeserver has been set up, but before any
listeners are started.
"""
CPUMetrics(registry=hs.metrics_collector_registry)
if running_on_pypy:
PyPyGCStats(registry=hs.metrics_collector_registry)
GCCounts(registry=hs.metrics_collector_registry)
BackgroundProcessCollector(registry=hs.metrics_collector_registry)
setup_reactor_metrics(registry=hs.metrics_collector_registry)
def setup_sdnotify(hs: "HomeServer") -> None:
"""Adds process state hooks to tell systemd what we are up to."""
+4 -19
View File
@@ -49,7 +49,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
@@ -186,7 +186,9 @@ class GenericWorkerServer(HomeServer):
for res in listener_config.http_options.resources:
for name in res.names:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
resources[METRICS_PREFIX] = MetricsResource(
self.metrics_collector_registry
)
elif name == "client":
resource: Resource = ClientRestResource(self)
@@ -283,23 +285,6 @@ class GenericWorkerServer(HomeServer):
raise ConfigError(
"Can not using a unix socket for manhole at this time."
)
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
"Metrics listener configured, but enable_metrics is not True!"
)
else:
if isinstance(listener, TCPListenerConfig):
_base.listen_metrics(
listener.bind_addresses,
listener.port,
)
else:
raise ConfigError(
"Can not use a unix socket for metrics at this time."
)
else:
logger.warning("Unsupported listener type: %s", listener.type)
+4 -18
View File
@@ -60,7 +60,7 @@ from synapse.http.server import (
StaticResource,
)
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
@@ -252,7 +252,9 @@ class SynapseHomeServer(HomeServer):
resources[SERVER_KEY_PREFIX] = KeyResource(self)
if name == "metrics" and self.config.metrics.enable_metrics:
metrics_resource: Resource = MetricsResource(RegistryProxy)
metrics_resource: Resource = MetricsResource(
self.metrics_collector_registry
)
if compress:
metrics_resource = gz_wrap(metrics_resource)
resources[METRICS_PREFIX] = metrics_resource
@@ -286,22 +288,6 @@ class SynapseHomeServer(HomeServer):
raise ConfigError(
"Can not use a unix socket for manhole at this time."
)
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
"Metrics listener configured, but enable_metrics is not True!"
)
else:
if isinstance(listener, TCPListenerConfig):
_base.listen_metrics(
listener.bind_addresses,
listener.port,
)
else:
raise ConfigError(
"Can not use a unix socket for metrics at this time."
)
else:
# this shouldn't happen, as the listener type should have been checked
# during parsing
+44 -33
View File
@@ -56,33 +56,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"],
)
failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"],
)
sent_events_counter = Counter(
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
)
sent_ephemeral_counter = Counter(
"synapse_appservice_api_sent_ephemeral",
"Number of ephemeral events sent to the AS",
["service"],
)
sent_todevice_counter = Counter(
"synapse_appservice_api_sent_todevice",
"Number of todevice messages sent to the AS",
["service"],
)
HOUR_IN_MS = 60 * 60 * 1000
@@ -130,7 +103,45 @@ class ApplicationServiceApi(SimpleHttpClient):
self.config = hs.config.appservice
self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
hs.get_clock(),
hs.get_cache_manager(),
"as_protocol_meta",
timeout_ms=HOUR_IN_MS,
)
self.sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"],
registry=hs.metrics_collector_registry,
)
self.failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"],
registry=hs.metrics_collector_registry,
)
self.sent_events_counter = Counter(
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)
self.sent_ephemeral_counter = Counter(
"synapse_appservice_api_sent_ephemeral",
"Number of ephemeral events sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)
self.sent_todevice_counter = Counter(
"synapse_appservice_api_sent_todevice",
"Number of todevice messages sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)
def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]:
@@ -395,10 +406,10 @@ class ApplicationServiceApi(SimpleHttpClient):
service.url,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(serialized_events))
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
self.sent_transactions_counter.labels(service.id).inc()
self.sent_events_counter.labels(service.id).inc(len(serialized_events))
self.sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
self.sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
return True
except CodeMessageException as e:
logger.warning(
@@ -417,7 +428,7 @@ class ApplicationServiceApi(SimpleHttpClient):
ex.args,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
failed_transactions_counter.labels(service.id).inc()
self.failed_transactions_counter.labels(service.id).inc()
return False
async def claim_client_keys(
+13 -6
View File
@@ -85,8 +85,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -145,6 +143,7 @@ class FederationClient(FederationBase):
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
cache_manager=hs.get_cache_manager(),
max_len=1000,
expiry_ms=120 * 1000,
reset_expiry_on_get=False,
@@ -163,11 +162,19 @@ class FederationClient(FederationBase):
] = ExpiringCache(
cache_name="get_room_hierarchy_cache",
clock=self._clock,
cache_manager=hs.get_cache_manager(),
max_len=1000,
expiry_ms=5 * 60 * 1000,
reset_expiry_on_get=False,
)
self.sent_queries_counter = Counter(
"synapse_federation_client_sent_queries",
"",
["type"],
registry=hs.metrics_collector_registry,
)
def _clear_tried_cache(self) -> None:
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
@@ -207,7 +214,7 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels(query_type).inc()
self.sent_queries_counter.labels(query_type).inc()
return await self.transport_layer.make_query(
destination,
@@ -229,7 +236,7 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_device_keys").inc()
self.sent_queries_counter.labels("client_device_keys").inc()
return await self.transport_layer.query_client_keys(
destination, content, timeout
)
@@ -240,7 +247,7 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.labels("user_devices").inc()
self.sent_queries_counter.labels("user_devices").inc()
return await self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@@ -262,7 +269,7 @@ class FederationClient(FederationBase):
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
self.sent_queries_counter.labels("client_one_time_keys").inc()
# Convert the query with counts into a stable and unstable query and check
# if attempting to claim more than 1 OTK.
+51 -27
View File
@@ -105,25 +105,6 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time",
"Time taken to process an event",
)
last_pdu_ts_metric = Gauge(
"synapse_federation_last_received_pdu_time",
"The timestamp of the last PDU which was successfully received from the given domain",
labelnames=("server_name",),
)
# The name of the lock to use when process events in a room received over
# federation.
@@ -160,7 +141,10 @@ class FederationServer(FederationBase):
# We cache results for transaction with the same ID
self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
hs.get_clock(),
hs.get_cache_manager(),
"fed_txn_handler",
timeout_ms=30000,
)
self.transaction_actions = TransactionActions(self.store)
@@ -170,10 +154,18 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache: ResponseCache[Tuple[str, Optional[str]]] = (
ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
ResponseCache(
hs.get_clock(),
hs.get_cache_manager(),
"state_resp",
timeout_ms=30000,
)
)
self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "state_ids_resp", timeout_ms=30000
hs.get_clock(),
hs.get_cache_manager(),
"state_ids_resp",
timeout_ms=30000,
)
self._federation_metrics_domains = (
@@ -185,6 +177,38 @@ class FederationServer(FederationBase):
# Whether we have started handling old events in the staging area.
self._started_handling_of_staged_events = False
self.received_pdus_counter = Counter(
"synapse_federation_server_received_pdus",
"",
registry=hs.metrics_collector_registry,
)
self.received_edus_counter = Counter(
"synapse_federation_server_received_edus",
"",
registry=hs.metrics_collector_registry,
)
self.received_queries_counter = Counter(
"synapse_federation_server_received_queries",
"",
["type"],
registry=hs.metrics_collector_registry,
)
self.pdu_process_time = Histogram(
"synapse_federation_server_pdu_process_time",
"Time taken to process an event",
registry=hs.metrics_collector_registry,
)
self.last_pdu_ts_metric = Gauge(
"synapse_federation_last_received_pdu_time",
"The timestamp of the last PDU which was successfully received from the given domain",
labelnames=("server_name",),
registry=hs.metrics_collector_registry,
)
@wrap_as_background_process("_handle_old_staged_events")
async def _handle_old_staged_events(self) -> None:
"""Handle old staged events by fetching all rooms that have staged
@@ -424,7 +448,7 @@ class FederationServer(FederationBase):
report back to the sending server.
"""
received_pdus_counter.inc(len(transaction.pdus))
self.received_pdus_counter.inc(len(transaction.pdus))
origin_host, _ = parse_server_name(origin)
@@ -535,7 +559,7 @@ class FederationServer(FederationBase):
)
if newest_pdu_ts and origin in self._federation_metrics_domains:
last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)
self.last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)
return pdu_results
@@ -543,7 +567,7 @@ class FederationServer(FederationBase):
"""Process the EDUs in a received transaction."""
async def _process_edu(edu_dict: JsonDict) -> None:
received_edus_counter.inc()
self.received_edus_counter.inc()
edu = Edu(
origin=origin,
@@ -658,7 +682,7 @@ class FederationServer(FederationBase):
async def on_query_request(
self, query_type: str, args: Dict[str, str]
) -> Tuple[int, Dict[str, Any]]:
received_queries_counter.labels(query_type).inc()
self.received_queries_counter.labels(query_type).inc()
resp = await self.registry.on_query(query_type, args)
return 200, resp
@@ -1300,7 +1324,7 @@ class FederationServer(FederationBase):
origin, event.event_id
)
if received_ts is not None:
pdu_process_time.observe(
self.pdu_process_time.observe(
(self._clock.time_msec() - received_ts) / 1000
)
+3 -1
View File
@@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name
@@ -117,6 +118,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
"",
[],
lambda: len(queue),
registry=hs.metrics_collector_registry,
)
for queue_name in [
@@ -156,7 +158,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
with Measure(self.clock, self.metrics_collector_registry, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
+23 -12
View File
@@ -186,15 +186,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
@@ -378,6 +369,7 @@ class FederationSender(AbstractFederationSender):
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.is_mine_id = hs.is_mine_id
self.is_mine_server_name = hs.is_mine_server_name
@@ -399,6 +391,7 @@ class FederationSender(AbstractFederationSender):
for d in self._per_destination_queues.values()
if d.transmission_loop_running
),
registry=hs.metrics_collector_registry,
)
LaterGauge(
@@ -408,6 +401,7 @@ class FederationSender(AbstractFederationSender):
lambda: sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
),
registry=hs.metrics_collector_registry,
)
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
@@ -416,6 +410,19 @@ class FederationSender(AbstractFederationSender):
lambda: sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
),
registry=hs.metrics_collector_registry,
)
self.sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
registry=hs.metrics_collector_registry,
)
self.sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
registry=hs.metrics_collector_registry,
)
self._is_processing = False
@@ -657,7 +664,11 @@ class FederationSender(AbstractFederationSender):
logger.debug(
"Handling %i events in room %s", len(events), events[0].room_id
)
with Measure(self.clock, "handle_room_events"):
with Measure(
self.clock,
self.metrics_collector_registry,
"handle_room_events",
):
for event in events:
await handle_event(event)
@@ -723,8 +734,8 @@ class FederationSender(AbstractFederationSender):
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
self.sent_pdus_destination_dist_total.inc(len(destinations))
self.sent_pdus_destination_dist_count.inc()
assert pdu.internal_metadata.stream_ordering
@@ -55,17 +55,6 @@ MAX_EDUS_PER_TRANSACTION = 100
logger = logging.getLogger(__name__)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
@@ -164,6 +153,19 @@ class PerDestinationQueue:
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
self.sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
registry=hs.metrics_collector_registry,
)
self.sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
registry=hs.metrics_collector_registry,
)
def __str__(self) -> str:
return "PerDestinationQueue[%s]" % self._destination
@@ -361,9 +363,9 @@ class PerDestinationQueue:
)
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
self.sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
self.sent_edus_by_type.labels(edu.edu_type).inc()
except NotRetryingDestination as e:
logger.debug(
@@ -60,6 +60,9 @@ class TransactionManager:
def __init__(self, hs: "synapse.server.HomeServer"):
self._server_name = hs.hostname
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_collector_registry = (
hs.metrics_collector_registry
) # nb must be called this for @measure_func
self._store = hs.get_datastores().main
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
+16 -5
View File
@@ -68,8 +68,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
class ApplicationServicesHandler:
def __init__(self, hs: "HomeServer"):
@@ -79,6 +77,7 @@ class ApplicationServicesHandler:
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.notify_appservices = hs.config.worker.should_notify_appservices
self.event_sources = hs.get_event_sources()
self._msc2409_to_device_messages_enabled = (
@@ -95,6 +94,12 @@ class ApplicationServicesHandler:
name="appservice_ephemeral_events"
)
self.events_processed_counter = Counter(
"synapse_handlers_appservice_events_processed",
"",
registry=hs.metrics_collector_registry,
)
def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
@@ -120,7 +125,9 @@ class ApplicationServicesHandler:
@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
with Measure(
self.clock, self.metrics_collector_registry, "notify_interested_services"
):
self.is_processing = True
try:
upper_bound = -1
@@ -200,7 +207,7 @@ class ApplicationServicesHandler:
"appservice_sender"
).set(upper_bound)
events_processed_counter.inc(len(events))
self.events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels("appservice_sender").inc(
len(events_by_room)
@@ -329,7 +336,11 @@ class ApplicationServicesHandler:
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
with Measure(
self.clock,
self.metrics_collector_registry,
"notify_interested_services_ephemeral",
):
for service in services:
if stream_key == StreamKeyType.TYPING:
# Note that we don't persist the token (via set_appservice_stream_type_pos)
+10 -9
View File
@@ -92,12 +92,6 @@ logger = logging.getLogger(__name__)
INVALID_USERNAME_OR_PASSWORD = "Invalid username or password"
invalid_login_token_counter = Counter(
"synapse_user_login_invalid_login_tokens",
"Counts the number of rejected m.login.token on /login",
["reason"],
)
def convert_client_dict_legacy_fields_to_identifier(
submission: JsonDict,
@@ -281,6 +275,13 @@ class AuthHandler:
self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
self.invalid_login_token_counter = Counter(
"synapse_user_login_invalid_login_tokens",
"Counts the number of rejected m.login.token on /login",
["reason"],
registry=hs.metrics_collector_registry,
)
async def validate_user_via_ui_auth(
self,
requester: Requester,
@@ -1477,11 +1478,11 @@ class AuthHandler:
try:
return await self.store.consume_login_token(login_token)
except LoginTokenExpired:
invalid_login_token_counter.labels("expired").inc()
self.invalid_login_token_counter.labels("expired").inc()
except LoginTokenReused:
invalid_login_token_counter.labels("reused").inc()
self.invalid_login_token_counter.labels("reused").inc()
except NotFoundError:
invalid_login_token_counter.labels("not found").inc()
self.invalid_login_token_counter.labels("not found").inc()
raise AuthError(403, "Invalid login token", errcode=Codes.FORBIDDEN)
+4 -1
View File
@@ -58,6 +58,7 @@ class DelayedEventsHandler:
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
self._clock = hs.get_clock()
self._metrics_collector_registry = hs.metrics_collector_registry
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()
@@ -159,7 +160,9 @@ class DelayedEventsHandler:
# Loop round handling deltas until we're up to date
while True:
with Measure(self._clock, "delayed_events_delta"):
with Measure(
self._clock, self._metrics_collector_registry, "delayed_events_delta"
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
+5 -1
View File
@@ -88,7 +88,10 @@ class DeviceWorkerHandler:
device_list_updater: "DeviceListWorkerUpdater"
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_collector_registry = (
hs.metrics_collector_registry
) # nb must be called this for @measure_func
self.hs = hs
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
@@ -1232,6 +1235,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
self._seen_updates: ExpiringCache[str, Set[str]] = ExpiringCache(
cache_name="device_update_edu",
clock=self.clock,
cache_manager=hs.get_cache_manager(),
max_len=10000,
expiry_ms=30 * 60 * 1000,
iterable=True,
+39 -37
View File
@@ -105,41 +105,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
soft_failed_event_counter = Counter(
"synapse_federation_soft_failed_events_total",
"Events received over federation that we marked as soft_failed",
)
# Added to debug performance and track progress on optimizations
backfill_processing_after_timer = Histogram(
"synapse_federation_backfill_processing_after_time_seconds",
"sec",
[],
buckets=(
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
7.5,
10.0,
15.0,
20.0,
25.0,
30.0,
40.0,
50.0,
60.0,
80.0,
100.0,
120.0,
150.0,
180.0,
"+Inf",
),
)
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -195,6 +160,43 @@ class FederationEventHandler:
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
self.soft_failed_event_counter = Counter(
"synapse_federation_soft_failed_events_total",
"Events received over federation that we marked as soft_failed",
registry=hs.metrics_collector_registry,
)
# Added to debug performance and track progress on optimizations
self.backfill_processing_after_timer = Histogram(
"synapse_federation_backfill_processing_after_time_seconds",
"sec",
[],
buckets=(
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
7.5,
10.0,
15.0,
20.0,
25.0,
30.0,
40.0,
50.0,
60.0,
80.0,
100.0,
120.0,
150.0,
180.0,
"+Inf",
),
registry=hs.metrics_collector_registry,
)
async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
"""Process a PDU received via a federation /send/ transaction
@@ -698,7 +700,7 @@ class FederationEventHandler:
if not events:
return
with backfill_processing_after_timer.time():
with self.backfill_processing_after_timer.time():
# if there are any events in the wrong room, the remote server is buggy and
# should not be trusted.
for ev in events:
@@ -2062,7 +2064,7 @@ class FederationEventHandler:
"hs": origin,
},
)
soft_failed_event_counter.inc()
self.soft_failed_event_counter.inc()
event.internal_metadata.soft_failed = True
async def _load_or_fetch_auth_events_for_event(
+1 -1
View File
@@ -77,7 +77,7 @@ class InitialSyncHandler:
bool,
bool,
]
] = ResponseCache(hs.get_clock(), "initial_sync_cache")
] = ResponseCache(hs.get_clock(), hs.get_cache_manager(), "initial_sync_cache")
self._event_serializer = hs.get_event_client_serializer()
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
+6 -2
View File
@@ -481,10 +481,13 @@ class EventCreationHandler:
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.profile_handler = hs.get_profile_handler()
self.event_builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_collector_registry = (
hs.metrics_collector_registry
) # nb must be called this for @measure_func
self.profile_handler = hs.get_profile_handler()
self.server_name = hs.hostname
self.notifier = hs.get_notifier()
self.config = hs.config
@@ -560,6 +563,7 @@ class EventCreationHandler:
self._external_cache_joined_hosts_updates = ExpiringCache(
"_external_cache_joined_hosts_updates",
self.clock,
cache_manager=hs.get_cache_manager(),
expiry_ms=30 * 60 * 1000,
)
+90 -40
View File
@@ -137,26 +137,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", ""
)
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter(
"synapse_handler_presence_federation_presence", ""
)
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["locality", "reason"]
)
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["locality", "from", "to"]
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -194,6 +174,7 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.presence_router = hs.get_presence_router()
@@ -222,6 +203,19 @@ class BasePresenceHandler(abc.ABC):
# The combined status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence}
self.notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason",
"",
["locality", "reason"],
registry=hs.metrics_collector_registry,
)
self.state_transition_counter = Counter(
"synapse_handler_presence_state_transition",
"",
["locality", "from", "to"],
registry=hs.metrics_collector_registry,
)
@abc.abstractmethod
async def user_syncing(
self,
@@ -666,7 +660,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
old_state = self.user_to_current_state.get(new_state.user_id)
self.user_to_current_state[new_state.user_id] = new_state
is_mine = self.is_mine_id(new_state.user_id)
if not old_state or should_notify(old_state, new_state, is_mine):
if not old_state or should_notify(self, old_state, new_state, is_mine):
state_to_notify.append(new_state)
stream_id = token
@@ -761,6 +755,38 @@ class PresenceHandler(BasePresenceHandler):
"",
[],
lambda: len(self.user_to_current_state),
registry=hs.metrics_collector_registry,
)
self.notified_presence_counter = Counter(
"synapse_handler_presence_notified_presence",
"",
registry=hs.metrics_collector_registry,
)
self.federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out",
"",
registry=hs.metrics_collector_registry,
)
self.presence_updates_counter = Counter(
"synapse_handler_presence_presence_updates",
"",
registry=hs.metrics_collector_registry,
)
self.timers_fired_counter = Counter(
"synapse_handler_presence_timers_fired",
"",
registry=hs.metrics_collector_registry,
)
self.federation_presence_counter = Counter(
"synapse_handler_presence_federation_presence",
"",
registry=hs.metrics_collector_registry,
)
self.bump_active_time_counter = Counter(
"synapse_handler_presence_bump_active_time",
"",
registry=hs.metrics_collector_registry,
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -863,6 +889,7 @@ class PresenceHandler(BasePresenceHandler):
"",
[],
lambda: len(self.wheel_timer),
registry=hs.metrics_collector_registry,
)
# Used to handle sending of presence to newly joined users/servers
@@ -941,7 +968,9 @@ class PresenceHandler(BasePresenceHandler):
now = self.clock.time_msec()
with Measure(self.clock, "presence_update_states"):
with Measure(
self.clock, self.metrics_collector_registry, "presence_update_states"
):
# NOTE: We purposefully don't await between now and when we've
# calculated what we want to do with the new states, to avoid races.
@@ -966,6 +995,7 @@ class PresenceHandler(BasePresenceHandler):
)
new_state, should_notify, should_ping = handle_update(
self,
prev_state,
new_state,
is_mine=self.is_mine_id(user_id),
@@ -988,10 +1018,10 @@ class PresenceHandler(BasePresenceHandler):
# TODO: We should probably ensure there are no races hereafter
presence_updates_counter.inc(len(new_states))
self.presence_updates_counter.inc(len(new_states))
if to_notify:
notified_presence_counter.inc(len(to_notify))
self.notified_presence_counter.inc(len(to_notify))
await self._persist_and_notify(list(to_notify.values()))
self.unpersisted_users_changes |= {s.user_id for s in new_states}
@@ -1010,7 +1040,7 @@ class PresenceHandler(BasePresenceHandler):
if user_id not in to_notify
}
if to_federation_ping:
federation_presence_out_counter.inc(len(to_federation_ping))
self.federation_presence_out_counter.inc(len(to_federation_ping))
hosts_to_states = await get_interested_remotes(
self.store,
@@ -1060,7 +1090,7 @@ class PresenceHandler(BasePresenceHandler):
for user_id in users_to_check
]
timers_fired_counter.inc(len(states))
self.timers_fired_counter.inc(len(states))
# Set of user ID & device IDs which are currently syncing.
syncing_user_devices = {
@@ -1094,7 +1124,7 @@ class PresenceHandler(BasePresenceHandler):
user_id = user.to_string()
bump_active_time_counter.inc()
self.bump_active_time_counter.inc()
now = self.clock.time_msec()
@@ -1346,7 +1376,7 @@ class PresenceHandler(BasePresenceHandler):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
federation_presence_counter.inc(len(updates))
self.federation_presence_counter.inc(len(updates))
await self._update_states(updates)
async def set_state(
@@ -1497,7 +1527,7 @@ class PresenceHandler(BasePresenceHandler):
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
with Measure(self.clock, self.metrics_collector_registry, "presence_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos == room_max_stream_ordering:
return
@@ -1655,7 +1685,10 @@ class PresenceHandler(BasePresenceHandler):
def should_notify(
old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool
base_presence_handler: BasePresenceHandler,
old_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
) -> bool:
"""Decides if a presence state change should be sent to interested parties."""
user_location = "remote"
@@ -1666,19 +1699,25 @@ def should_notify(
return False
if old_state.status_msg != new_state.status_msg:
notify_reason_counter.labels(user_location, "status_msg_change").inc()
base_presence_handler.notify_reason_counter.labels(
user_location, "status_msg_change"
).inc()
return True
if old_state.state != new_state.state:
notify_reason_counter.labels(user_location, "state_change").inc()
state_transition_counter.labels(
base_presence_handler.notify_reason_counter.labels(
user_location, "state_change"
).inc()
base_presence_handler.state_transition_counter.labels(
user_location, old_state.state, new_state.state
).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.labels(user_location, "current_active_change").inc()
base_presence_handler.notify_reason_counter.labels(
user_location, "current_active_change"
).inc()
return True
if (
@@ -1687,14 +1726,14 @@ def should_notify(
):
# Only notify about last active bumps if we're not currently active
if not new_state.currently_active:
notify_reason_counter.labels(
base_presence_handler.notify_reason_counter.labels(
user_location, "last_active_change_online"
).inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
notify_reason_counter.labels(
base_presence_handler.notify_reason_counter.labels(
user_location, "last_active_change_not_online"
).inc()
return True
@@ -1762,8 +1801,16 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
self.get_presence_handler = hs.get_presence_handler
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.store = hs.get_datastores().main
self.get_updates_counter = Counter(
"synapse_handler_presence_get_updates",
"",
["type"],
registry=hs.metrics_collector_registry,
)
async def get_new_events(
self,
user: UserID,
@@ -1792,7 +1839,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
user_id = user.to_string()
stream_change_cache = self.store.presence_stream_cache
with Measure(self.clock, "presence.get_new_events"):
with Measure(
self.clock, self.metrics_collector_registry, "presence.get_new_events"
):
if from_key is not None:
from_key = int(from_key)
@@ -1870,7 +1919,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# If we have the full list of changes for presence we can
# simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
self.get_updates_counter.labels("stream").inc()
sharing_users = await self.store.do_users_share_a_room(
user_id, updated_users
@@ -1883,7 +1932,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
self.get_updates_counter.labels("full").inc()
users_interested_in = (
await self.store.get_users_who_share_room_with_user(user_id)
@@ -2130,6 +2179,7 @@ def handle_timeout(
def handle_update(
base_presence_handler: BasePresenceHandler,
prev_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
@@ -2213,7 +2263,7 @@ def handle_update(
)
# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state, is_mine):
if should_notify(base_presence_handler, prev_state, new_state, is_mine):
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
persist_and_notify = True
+4 -1
View File
@@ -174,7 +174,10 @@ class RoomCreationHandler:
# succession, only process the first attempt and return its result to
# subsequent requests
self._upgrade_response_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
hs.get_clock(),
hs.get_cache_manager(),
"room_upgrade",
timeout_ms=FIVE_MINUTES_IN_MS,
)
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
+7 -2
View File
@@ -67,10 +67,15 @@ class RoomListHandler:
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.response_cache: ResponseCache[
Tuple[Optional[int], Optional[str], Optional[ThirdPartyInstanceID]]
] = ResponseCache(hs.get_clock(), "room_list")
] = ResponseCache(hs.get_clock(), hs.get_cache_manager(), "room_list")
self.remote_response_cache: ResponseCache[
Tuple[str, Optional[int], Optional[str], bool, Optional[str]]
] = ResponseCache(hs.get_clock(), "remote_room_list", timeout_ms=30 * 1000)
] = ResponseCache(
hs.get_clock(),
hs.get_cache_manager(),
"remote_room_list",
timeout_ms=30 * 1000,
)
async def get_local_public_room_list(
self,
+1
View File
@@ -114,6 +114,7 @@ class RoomSummaryHandler:
Tuple[str, str, bool, Optional[int], Optional[int], Optional[str]]
] = ResponseCache(
hs.get_clock(),
hs.get_cache_manager(),
"get_room_hierarchy",
)
self._msc3266_enabled = hs.config.experimental.msc3266_enabled
+13 -4
View File
@@ -337,6 +337,7 @@ class SyncHandler:
self._push_rules_handler = hs.get_push_rules_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.state = hs.get_state_handler()
self.auth_blocking = hs.get_auth_blocking()
self._storage_controllers = hs.get_storage_controllers()
@@ -353,6 +354,7 @@ class SyncHandler:
# memory.
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
hs.get_clock(),
hs.get_cache_manager(),
"sync",
timeout_ms=hs.config.caches.sync_response_cache_duration,
)
@@ -363,6 +365,7 @@ class SyncHandler:
] = ExpiringCache(
"lazy_loaded_members_cache",
self.clock,
cache_manager=hs.get_cache_manager(),
max_len=0,
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
@@ -710,7 +713,7 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"):
with Measure(self.clock, self.metrics_collector_registry, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids
@@ -783,7 +786,9 @@ class SyncHandler:
and current token to send down to clients.
newly_joined_room
"""
with Measure(self.clock, "load_filtered_recents"):
with Measure(
self.clock, self.metrics_collector_registry, "load_filtered_recents"
):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
sync_config.filter_collection.blocks_all_room_timeline()
@@ -1174,7 +1179,9 @@ class SyncHandler:
# updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
with Measure(
self.clock, self.metrics_collector_registry, "compute_state_delta"
):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch: Optional[Set[str]] = None
@@ -1791,7 +1798,9 @@ class SyncHandler:
# the DB.
return RoomNotifCounts.empty()
with Measure(self.clock, "unread_notifs_for_room_id"):
with Measure(
self.clock, self.metrics_collector_registry, "unread_notifs_for_room_id"
):
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
+10 -3
View File
@@ -280,7 +280,9 @@ class TypingWriterHandler(FollowerTypingHandler):
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial
name="TypingStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=self._latest_room_serial,
)
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
@@ -505,6 +507,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
# We can't call get_typing_handler here because there's a cycle:
#
# Typing -> Notifier -> TypingNotificationEventSource -> Typing
@@ -535,7 +538,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
with Measure(
self.clock, self.metrics_collector_registry, "typing.get_new_events_as"
):
handler = self.get_typing_handler()
events = []
@@ -571,7 +576,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
"""
with Measure(self.clock, "typing.get_new_events"):
with Measure(
self.clock, self.metrics_collector_registry, "typing.get_new_events"
):
from_key = int(from_key)
handler = self.get_typing_handler()
+2 -1
View File
@@ -104,6 +104,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self._storage_controllers = hs.get_storage_controllers()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.worker.should_update_user_directory
@@ -237,7 +238,7 @@ class UserDirectoryHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
with Measure(self.clock, self.metrics_collector_registry, "user_dir_delta"):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
return
@@ -26,6 +26,7 @@ from urllib.request import ( # type: ignore[attr-defined]
)
from netaddr import AddrFormatError, IPAddress, IPSet
from prometheus_client import CollectorRegistry
from zope.interface import implementer
from twisted.internet import defer
@@ -50,6 +51,7 @@ from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -81,6 +83,8 @@ class MatrixFederationAgent:
reactor might have some blocking applied (i.e. for DNS queries),
but we need unblocked access to the proxy.
cache_manager: The cache manager to handle metrics
_srv_resolver:
SrvResolver implementation to use for looking up SRV records. None
to use a default implementation.
@@ -92,11 +96,14 @@ class MatrixFederationAgent:
def __init__(
self,
*,
reactor: ISynapseReactor,
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
user_agent: bytes,
ip_allowlist: Optional[IPSet],
ip_blocklist: IPSet,
metrics_collector_registry: CollectorRegistry,
cache_manager: CacheManager,
_srv_resolver: Optional[SrvResolver] = None,
_well_known_resolver: Optional[WellKnownResolver] = None,
):
@@ -139,6 +146,8 @@ class MatrixFederationAgent:
ip_blocklist=ip_blocklist,
),
user_agent=self.user_agent,
metrics_collector_registry=metrics_collector_registry,
cache_manager=cache_manager,
)
self._well_known_resolver = _well_known_resolver
+16 -12
View File
@@ -25,6 +25,7 @@ from io import BytesIO
from typing import Callable, Dict, Optional, Tuple
import attr
from prometheus_client import CollectorRegistry
from twisted.internet import defer
from twisted.internet.interfaces import IReactorTime
@@ -36,6 +37,7 @@ from twisted.web.iweb import IAgent, IResponse
from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock, json_decoder
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
@@ -77,10 +79,6 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3
logger = logging.getLogger(__name__)
_well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache("well-known")
_had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache("had-valid-well-known")
@attr.s(slots=True, frozen=True, auto_attribs=True)
class WellKnownLookupResult:
delegated_server: Optional[bytes]
@@ -94,20 +92,24 @@ class WellKnownResolver:
reactor: IReactorTime,
agent: IAgent,
user_agent: bytes,
metrics_collector_registry: CollectorRegistry,
cache_manager: CacheManager,
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
):
self._reactor = reactor
self._clock = Clock(reactor)
self._metrics_collector_registry = metrics_collector_registry
if well_known_cache is None:
well_known_cache = _well_known_cache
self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
"well-known",
cache_manager=cache_manager,
)
self._had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache(
"had-valid-well-known",
cache_manager=cache_manager,
)
if had_well_known_cache is None:
had_well_known_cache = _had_valid_well_known_cache
self._well_known_cache = well_known_cache
self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent)
self.user_agent = user_agent
@@ -134,7 +136,9 @@ class WellKnownResolver:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
with Measure(
self._clock, self._metrics_collector_registry, "get_well_known"
):
result: Optional[bytes]
cache_period: float
+13 -6
View File
@@ -417,11 +417,13 @@ class MatrixFederationHttpClient:
if hs.get_instance_name() in outbound_federation_restricted_to:
# Talk to federation directly
federation_agent: IAgent = MatrixFederationAgent(
self.reactor,
tls_client_options_factory,
user_agent.encode("ascii"),
hs.config.server.federation_ip_range_allowlist,
hs.config.server.federation_ip_range_blocklist,
reactor=self.reactor,
tls_client_options_factory=tls_client_options_factory,
user_agent=user_agent.encode("ascii"),
ip_allowlist=hs.config.server.federation_ip_range_allowlist,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
metrics_collector_registry=hs.metrics_collector_registry,
cache_manager=hs.get_cache_manager(),
)
else:
proxy_authorization_secret = hs.config.worker.worker_replication_secret
@@ -451,6 +453,7 @@ class MatrixFederationHttpClient:
)
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
@@ -697,7 +700,11 @@ class MatrixFederationHttpClient:
outgoing_requests_counter.labels(request.method).inc()
try:
with Measure(self.clock, "outbound_request"):
with Measure(
self.clock,
self.metrics_collector_registry,
"outbound_request",
):
# we don't want all the fancy cookie and redirect handling
# that treq.request gives: just use the raw Agent.
+1
View File
@@ -201,6 +201,7 @@ class UrlPreviewer:
self._cache: ExpiringCache[str, ObservableDeferred] = ExpiringCache(
cache_name="url_previews",
clock=self.clock,
cache_manager=hs.get_cache_manager(),
# don't spider URLs more often than once an hour
expiry_ms=ONE_HOUR,
)
+18 -42
View File
@@ -37,7 +37,6 @@ from typing import (
Type,
TypeVar,
Union,
cast,
)
import attr
@@ -62,26 +61,9 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class _RegistryProxy:
@staticmethod
def collect() -> Iterable[Metric]:
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
# A little bit nasty, but collect() above is static so a Protocol doesn't work.
# _RegistryProxy matches the signature of a CollectorRegistry instance enough
# for it to be usable in the contexts in which we use it.
# TODO Do something nicer about this.
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
@attr.s(slots=True, hash=True, auto_attribs=True)
class LaterGauge(Collector):
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
@@ -94,6 +76,7 @@ class LaterGauge(Collector):
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
registry: Optional[CollectorRegistry] = REGISTRY
def collect(self) -> Iterable[Metric]:
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
@@ -114,15 +97,8 @@ class LaterGauge(Collector):
yield g
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
if self.registry is not None:
self.registry.register(self)
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -144,6 +120,8 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
registry: The Prometheus metrics `CollectorRegistry` to register the metric
with. If not provided, the default `REGISTRY` will be used.
"""
def __init__(
@@ -152,11 +130,13 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
desc: str,
labels: StrSequence,
sub_metrics: StrSequence,
registry: Optional[CollectorRegistry] = REGISTRY,
):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
self.registry = registry
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
@@ -243,12 +223,8 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
if self.registry is not None:
self.registry.register(self)
class GaugeBucketCollector(Collector):
@@ -272,7 +248,7 @@ class GaugeBucketCollector(Collector):
name: str,
documentation: str,
buckets: Iterable[float],
registry: CollectorRegistry = REGISTRY,
registry: Optional[CollectorRegistry] = REGISTRY,
):
"""
Args:
@@ -280,7 +256,8 @@ class GaugeBucketCollector(Collector):
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
registry: The Prometheus metrics `CollectorRegistry` to register the metric
with. If not provided, the default `REGISTRY` will be used.
"""
self._name = name
self._documentation = documentation
@@ -297,7 +274,8 @@ class GaugeBucketCollector(Collector):
# this has been initialised after a successful data update
self._metric: Optional[GaugeHistogramMetricFamily] = None
registry.register(self)
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
# Don't report metrics unless we've already collected some data
@@ -343,10 +321,8 @@ class GaugeBucketCollector(Collector):
#
# Detailed CPU metrics
#
class CPUMetrics(Collector):
def __init__(self) -> None:
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
ticks_per_sec = 100
try:
# Try and get the system config
@@ -356,6 +332,9 @@ class CPUMetrics(Collector):
self.ticks_per_sec = ticks_per_sec
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
if not HAVE_PROC_SELF_STAT:
return
@@ -373,9 +352,6 @@ class CPUMetrics(Collector):
yield sys
REGISTRY.register(CPUMetrics())
#
# Federation Metrics
#
+10 -7
View File
@@ -24,10 +24,11 @@ import gc
import logging
import platform
import time
from typing import Iterable
from typing import Iterable, Optional
from prometheus_client.core import (
REGISTRY,
CollectorRegistry,
CounterMetricFamily,
Gauge,
GaugeMetricFamily,
@@ -81,6 +82,10 @@ gc_time = Histogram(
class GCCounts(Collector):
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -101,8 +106,6 @@ def install_gc_manager() -> None:
if running_on_pypy:
return
REGISTRY.register(GCCounts())
gc.disable()
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
@@ -145,6 +148,10 @@ def install_gc_manager() -> None:
class PyPyGCStats(Collector):
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
# @stats is a pretty-printer object with __str__() returning a nice table,
# plus some fields that contain data from that table.
@@ -205,7 +212,3 @@ class PyPyGCStats(Collector):
pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
yield pypy_mem
if running_on_pypy:
REGISTRY.register(PyPyGCStats())
+13 -5
View File
@@ -22,10 +22,10 @@
import logging
import time
from selectors import SelectSelector, _PollLikeSelector # type: ignore[attr-defined]
from typing import Any, Callable, Iterable
from typing import Any, Callable, Iterable, Optional
from prometheus_client import Histogram, Metric
from prometheus_client.core import REGISTRY, GaugeMetricFamily
from prometheus_client.core import REGISTRY, CollectorRegistry, GaugeMetricFamily
from twisted.internet import reactor, selectreactor
from twisted.internet.asyncioreactor import AsyncioSelectorReactor
@@ -110,9 +110,16 @@ class ObjWrapper:
class ReactorLastSeenMetric(Collector):
def __init__(self, call_wrapper: CallWrapper):
def __init__(
self,
call_wrapper: CallWrapper,
registry: Optional[CollectorRegistry] = REGISTRY,
):
self._call_wrapper = call_wrapper
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
@@ -164,5 +171,6 @@ except Exception as e:
logger.warning("Configuring ReactorLastSeenMetric failed: %r", e)
if wrapper:
REGISTRY.register(ReactorLastSeenMetric(wrapper))
def setup_reactor_metrics(registry: Optional[CollectorRegistry] = REGISTRY) -> None:
if wrapper:
ReactorLastSeenMetric(wrapper, registry=registry)
+2 -2
View File
@@ -20,7 +20,7 @@
#
#
from prometheus_client import REGISTRY, CollectorRegistry, generate_latest
from prometheus_client import CollectorRegistry, generate_latest
from twisted.web.resource import Resource
from twisted.web.server import Request
@@ -35,7 +35,7 @@ class MetricsResource(Resource):
isLeaf = True
def __init__(self, registry: CollectorRegistry = REGISTRY):
def __init__(self, registry: CollectorRegistry):
self.registry = registry
def render_GET(self, request: Request) -> bytes:
@@ -38,7 +38,7 @@ from typing import (
)
from prometheus_client import Metric
from prometheus_client.core import REGISTRY, Counter, Gauge
from prometheus_client.core import REGISTRY, CollectorRegistry, Counter, Gauge
from typing_extensions import ParamSpec
from twisted.internet import defer
@@ -134,13 +134,17 @@ _background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set(
_bg_metrics_lock = threading.Lock()
class _Collector(Collector):
class BackgroundProcessCollector(Collector):
"""A custom metrics collector for the background process metrics.
Ensures that all of the metrics are up-to-date with any in-flight processes
before they are returned.
"""
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
global _background_processes_active_since_last_scrape
@@ -165,9 +169,6 @@ class _Collector(Collector):
yield from m.collect()
REGISTRY.register(_Collector())
class _BackgroundProcess:
def __init__(self, desc: str, ctx: LoggingContext):
self.desc = desc
+18 -23
View File
@@ -23,14 +23,17 @@ import ctypes
import logging
import os
import re
from typing import Iterable, Literal, Optional, overload
from typing import TYPE_CHECKING, Iterable, Literal, Optional, overload
import attr
from prometheus_client import REGISTRY, Metric
from prometheus_client import REGISTRY, CollectorRegistry, Metric
from synapse.metrics import GaugeMetricFamily
from synapse.metrics._types import Collector
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -131,25 +134,11 @@ class JemallocStats:
return self._mallctl(f"stats.{name}")
_JEMALLOC_STATS: Optional[JemallocStats] = None
def get_jemalloc_stats() -> Optional[JemallocStats]:
"""Returns an interface to jemalloc, if it is being used.
Note that this will always return None until `setup_jemalloc_stats` has been
called.
"""
return _JEMALLOC_STATS
def _setup_jemalloc_stats() -> None:
def _setup_jemalloc_stats(hs: "HomeServer") -> Optional[JemallocStats]:
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""
global _JEMALLOC_STATS
# Try to find the loaded jemalloc shared library, if any. We need to
# introspect into what is loaded, rather than loading whatever is on the
# path, as if we load a *different* jemalloc version things will seg fault.
@@ -157,7 +146,7 @@ def _setup_jemalloc_stats() -> None:
# We look in `/proc/self/maps`, which only exists on linux.
if not os.path.exists("/proc/self/maps"):
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
return
return None
# We're looking for a path at the end of the line that includes
# "libjemalloc".
@@ -173,18 +162,21 @@ def _setup_jemalloc_stats() -> None:
if not jemalloc_path:
# No loaded jemalloc was found.
logger.debug("jemalloc not found")
return
return None
logger.debug("Found jemalloc at %s", jemalloc_path)
jemalloc_dll = ctypes.CDLL(jemalloc_path)
stats = JemallocStats(jemalloc_dll)
_JEMALLOC_STATS = stats
class JemallocCollector(Collector):
"""Metrics for internal jemalloc stats."""
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
if registry is not None:
registry.register(self)
def collect(self) -> Iterable[Metric]:
stats.refresh_stats()
@@ -230,17 +222,20 @@ def _setup_jemalloc_stats() -> None:
yield g
REGISTRY.register(JemallocCollector())
JemallocCollector(registry=hs.metrics_collector_registry)
logger.debug("Added jemalloc stats")
return stats
def setup_jemalloc_stats() -> None:
def setup_jemalloc_stats(hs: "HomeServer") -> Optional[JemallocStats]:
"""Try to setup jemalloc stats, if jemalloc is loaded."""
try:
_setup_jemalloc_stats()
return _setup_jemalloc_stats(hs)
except Exception as e:
# This should only happen if we find the loaded jemalloc library, but
# fail to load it somehow (e.g. we somehow picked the wrong version).
logger.info("Failed to setup collector to record jemalloc stats: %s", e)
return None
@@ -340,6 +340,7 @@ class SpamCheckerModuleApiCallbacks:
def __init__(self, hs: "synapse.server.HomeServer") -> None:
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
self._should_drop_federated_event_callbacks: List[
@@ -464,7 +465,11 @@ class SpamCheckerModuleApiCallbacks:
generally discouraged as it doesn't support internationalization.
"""
for callback in self._check_event_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(callback(event))
if res is False or res == self.NOT_SPAM:
# This spam-checker accepts the event.
@@ -517,7 +522,11 @@ class SpamCheckerModuleApiCallbacks:
True if the event should be silently dropped
"""
for callback in self._should_drop_federated_event_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res: Union[bool, str] = await delay_cancellation(callback(event))
if res:
return res
@@ -539,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
"""
for callback in self._user_may_join_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(callback(user_id, room_id, is_invited))
# Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is True or res is self.NOT_SPAM:
@@ -578,7 +591,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise.
"""
for callback in self._user_may_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(
callback(inviter_userid, invitee_userid, room_id)
)
@@ -623,7 +640,11 @@ class SpamCheckerModuleApiCallbacks:
NOT_SPAM if the operation is permitted, Codes otherwise.
"""
for callback in self._user_may_send_3pid_invite_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(
callback(inviter_userid, medium, address, room_id)
)
@@ -659,7 +680,11 @@ class SpamCheckerModuleApiCallbacks:
room_config: The room creation configuration which is the body of the /createRoom request
"""
for callback in self._user_may_create_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
checker_args = inspect.signature(callback)
# Also ensure backwards compatibility with spam checker callbacks
# that don't expect the room_config argument.
@@ -751,7 +776,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._user_may_create_room_alias_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(callback(userid, room_alias))
if res is True or res is self.NOT_SPAM:
continue
@@ -784,7 +813,11 @@ class SpamCheckerModuleApiCallbacks:
room_id: The ID of the room that would be published
"""
for callback in self._user_may_publish_room_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(callback(userid, room_id))
if res is True or res is self.NOT_SPAM:
continue
@@ -826,7 +859,11 @@ class SpamCheckerModuleApiCallbacks:
True if the user is spammy.
"""
for callback in self._check_username_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
checker_args = inspect.signature(callback)
# Make a copy of the user profile object to ensure the spam checker cannot
# modify it.
@@ -875,7 +912,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_registration_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
behaviour = await delay_cancellation(
callback(email_threepid, username, request_info, auth_provider_id)
)
@@ -917,7 +958,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_media_file_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(callback(file_wrapper, file_info))
# Normalize return values to `Codes` or `"NOT_SPAM"`.
if res is False or res is self.NOT_SPAM:
@@ -964,7 +1009,11 @@ class SpamCheckerModuleApiCallbacks:
"""
for callback in self._check_login_for_spam_callbacks:
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
with Measure(
self.clock,
self.metrics_collector_registry,
f"{callback.__module__}.{callback.__qualname__}",
):
res = await delay_cancellation(
callback(
user_id,
+13 -2
View File
@@ -267,16 +267,27 @@ class Notifier:
return sum(stream.count_listeners() for stream in all_user_streams)
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
LaterGauge(
"synapse_notifier_listeners",
"",
[],
count_listeners,
registry=hs.metrics_collector_registry,
)
LaterGauge(
"synapse_notifier_rooms",
"",
[],
lambda: count(bool, list(self.room_to_user_streams.values())),
registry=hs.metrics_collector_registry,
)
LaterGauge(
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
"synapse_notifier_users",
"",
[],
lambda: len(self.user_to_user_stream),
registry=hs.metrics_collector_registry,
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:
+5 -3
View File
@@ -59,7 +59,6 @@ from synapse.types import JsonValue
from synapse.types.state import StateFilter
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import gather_results
from synapse.util.caches import register_cache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_event_for_clients_with_state
@@ -129,13 +128,16 @@ class BulkPushRuleEvaluator:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_collector_registry = (
hs.metrics_collector_registry
) # nb must be called this for @measure_func
self._event_auth_handler = hs.get_event_auth_handler()
self.should_calculate_push_rules = self.hs.config.push.enable_push
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
self.room_push_rule_cache_metrics = register_cache(
self.room_push_rule_cache_metrics = hs.get_cache_manager().register_cache(
"cache",
"room_push_rule_cache",
cache=[], # Meaningless size, as this isn't a cache that stores values,
+4 -1
View File
@@ -123,7 +123,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer"):
if self.CACHE:
self.response_cache: ResponseCache[str] = ResponseCache(
hs.get_clock(), "repl." + self.NAME, timeout_ms=30 * 60 * 1000
hs.get_clock(),
hs.get_cache_manager(),
"repl." + self.NAME,
timeout_ms=30 * 60 * 1000,
)
# We reserve `instance_name` as a parameter to sending requests, so we
+4 -1
View File
@@ -79,6 +79,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.federation_event_handler = hs.get_federation_event_handler()
@staticmethod
@@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
with Measure(
self.clock, self.metrics_collector_registry, "repl_fed_send_events_parse"
):
room_id = content["room_id"]
backfilled = content["backfilled"]
+4 -1
View File
@@ -80,6 +80,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
@staticmethod
async def _serialize_payload( # type: ignore[override]
@@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, event_id: str
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_event_parse"):
with Measure(
self.clock, self.metrics_collector_registry, "repl_send_event_parse"
):
event_dict = content["event"]
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
internal_metadata = content["internal_metadata"]
+4 -1
View File
@@ -81,6 +81,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
@staticmethod
async def _serialize_payload( # type: ignore[override]
@@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, payload: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_send_events_parse"):
with Measure(
self.clock, self.metrics_collector_registry, "repl_send_events_parse"
):
events_and_context = []
events = payload["events"]
rooms = set()
+6 -1
View File
@@ -79,6 +79,7 @@ class ReplicationDataHandler:
self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor()
self._clock = hs.get_clock()
self._metrics_collector_registry = hs.metrics_collector_registry
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
@@ -342,7 +343,11 @@ class ReplicationDataHandler:
waiting_list.add((position, deferred))
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
with Measure(
self._clock,
self._metrics_collector_registry,
"repl.wait_for_stream_position",
):
logger.info(
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
stream_name,
+2
View File
@@ -215,6 +215,7 @@ class ReplicationCommandHandler:
"",
[],
lambda: len(self._connections),
registry=hs.metrics_collector_registry,
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -241,6 +242,7 @@ class ReplicationCommandHandler:
(stream_name,): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
registry=hs.metrics_collector_registry,
)
self._is_master = hs.config.worker.worker_app is None
+6 -1
View File
@@ -80,6 +80,7 @@ class ReplicationStreamer:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.notifier = hs.get_notifier()
self._instance_name = hs.get_instance_name()
@@ -155,7 +156,11 @@ class ReplicationStreamer:
while self.pending_updates:
self.pending_updates = False
with Measure(self.clock, "repl.stream.get_updates"):
with Measure(
self.clock,
self.metrics_collector_registry,
"repl.stream.get_updates",
):
all_streams = self.streams
if self._replication_torture_level is not None:
+2
View File
@@ -125,6 +125,8 @@ class SyncRestServlet(RestServlet):
self._json_filter_cache: LruCache[str, bool] = LruCache(
max_size=1000,
cache_name="sync_valid_filter",
# TODO
# cache_manager=hs.get_cache_manager(),
)
# Ratelimiter for presence updates, keyed by requester.
+12 -1
View File
@@ -30,6 +30,7 @@ import functools
import logging
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Type, TypeVar, cast
from prometheus_client import CollectorRegistry
from typing_extensions import TypeAlias
from twisted.internet.interfaces import IOpenSSLContextFactory
@@ -129,6 +130,7 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics import register_threadpool
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks
from synapse.notifier import Notifier, ReplicationNotifier
@@ -152,6 +154,7 @@ from synapse.streams.events import EventSources
from synapse.synapse_rust.rendezvous import RendezvousHandler
from synapse.types import DomainSpecificString, ISynapseReactor
from synapse.util import Clock
from synapse.util.caches import CacheManager
from synapse.util.distributor import Distributor
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
@@ -310,6 +313,11 @@ class HomeServer(metaclass=abc.ABCMeta):
# This attribute is set by the free function `refresh_certificate`.
self.tls_server_context_factory: Optional[IOpenSSLContextFactory] = None
self.metrics_collector_registry = CollectorRegistry(auto_describe=True)
# Also see `setup_global_metrics` for other metric setup
self.jemalloc_stats = setup_jemalloc_stats(self)
self.cache_manager = CacheManager(self)
def register_module_web_resource(self, path: str, resource: Resource) -> None:
"""Allows a module to register a web resource to be served at the given path.
@@ -368,7 +376,7 @@ class HomeServer(metaclass=abc.ABCMeta):
self.setup_background_tasks()
def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners
"""Start the HTTP, manhole, etc listeners
Does nothing in this base class; overridden in derived classes to start the
appropriate listeners.
@@ -420,6 +428,9 @@ class HomeServer(metaclass=abc.ABCMeta):
return self.datastores
def get_cache_manager(self) -> CacheManager:
return self.cache_manager
@cache_in_self
def get_distributor(self) -> Distributor:
return Distributor()
+12 -3
View File
@@ -189,7 +189,10 @@ class StateHandler:
"""
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.clock = hs.get_clock() # nb must be called this for @measure_func
self.metrics_collector_registry = (
hs.metrics_collector_registry
) # nb must be called this for @measure_func
self.store = hs.get_datastores().main
self._state_storage_controller = hs.get_storage_controllers().state
self.hs = hs
@@ -632,6 +635,7 @@ class StateResolutionHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.metrics_collector_registry = hs.metrics_collector_registry
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
@@ -640,6 +644,7 @@ class StateResolutionHandler:
ExpiringCache(
cache_name="state_cache",
clock=self.clock,
cache_manager=hs.get_cache_manager(),
max_len=100000,
expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
iterable=True,
@@ -747,7 +752,9 @@ class StateResolutionHandler:
# which will be used as a cache key for future resolutions, but
# not get persisted.
with Measure(self.clock, "state.create_group_ids"):
with Measure(
self.clock, self.metrics_collector_registry, "state.create_group_ids"
):
cache = _make_state_cache_entry(new_state, state_groups_ids)
self._state_cache[group_names] = cache
@@ -785,7 +792,9 @@ class StateResolutionHandler:
a map from (type, state_key) to event_id.
"""
try:
with Measure(self.clock, "state._resolve_events") as m:
with Measure(
self.clock, self.metrics_collector_registry, "state._resolve_events"
) as m:
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
if room_version_obj.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store(
+2
View File
@@ -56,6 +56,8 @@ class SQLBaseStore(metaclass=ABCMeta):
):
self.hs = hs
self._clock = hs.get_clock()
self._metrics_collector_registry = hs.metrics_collector_registry
self._metrics_collector_registry = hs.metrics_collector_registry
self.database_engine = database.engine
self.db_pool = database
+21 -4
View File
@@ -338,6 +338,7 @@ class EventsPersistenceStorageController:
self.persist_events_store = stores.persist_events
self._clock = hs.get_clock()
self._metrics_collector_registry = hs.metrics_collector_registry
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
self._event_persist_queue = _EventPeristenceQueue(
@@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
state_delta_for_room = None
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
with Measure(
self._clock,
self._metrics_collector_registry,
"_calculate_state_and_extrem",
):
# Work out the new "current state" for the room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
@@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
room_id, chunk
)
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
with Measure(
self._clock,
self._metrics_collector_registry,
"calculate_chain_cover_index_for_events",
):
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
# and won't have their auth chain (we'll fix it up later if we join the
@@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
break
logger.debug("Calculating state delta for room %s", room_id)
with Measure(self._clock, "persist_events.get_new_state_after_events"):
with Measure(
self._clock,
self._metrics_collector_registry,
"persist_events.get_new_state_after_events",
):
res = await self._get_new_state_after_events(
room_id,
ev_ctx_rm,
@@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
# removed keys entirely.
delta = DeltaState([], delta_ids)
elif current_state is not None:
with Measure(self._clock, "persist_events.calculate_state_delta"):
with Measure(
self._clock,
self._metrics_collector_registry,
"persist_events.calculate_state_delta",
):
delta = await self._calculate_state_delta(room_id, current_state)
if delta:
+2 -1
View File
@@ -70,6 +70,7 @@ class StateStorageController:
def __init__(self, hs: "HomeServer", stores: "Databases"):
self._is_mine_id = hs.is_mine_id
self._clock = hs.get_clock()
self._metrics_collector_registry = hs.metrics_collector_registry
self.stores = stores
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
@@ -812,7 +813,7 @@ class StateStorageController:
state_group = object()
assert state_group is not None
with Measure(self._clock, "get_joined_hosts"):
with Measure(self._clock, self._metrics_collector_registry, "get_joined_hosts"):
return await self._get_joined_hosts(
room_id, state_group, state_entry=state_entry
)
+1
View File
@@ -572,6 +572,7 @@ class DatabasePool:
"Background update status",
[],
self.updates.get_status,
registry=hs.metrics_collector_registry,
)
self._previous_txn_total_time = 0.0
@@ -65,6 +65,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.cache_manager = hs.get_cache_manager()
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
@@ -89,7 +90,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max
name="AccountDataAndTagsChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=account_max,
)
self.db_pool.updates.register_background_index_update(
+4 -1
View File
@@ -434,7 +434,10 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
max_size=50000,
cache_name="client_ip_last_seen",
# TODO
# cache_manager=hs.get_cache_manager(),
)
if hs.config.worker.run_background_tasks and self.user_ips_max_age:
@@ -94,6 +94,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
] = ExpiringCache(
cache_name="last_device_delete_cache",
clock=self._clock,
cache_manager=hs.get_cache_manager(),
max_len=10000,
expiry_ms=30 * 60 * 1000,
)
@@ -126,8 +127,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
name="DeviceInboxStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
@@ -142,8 +144,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
name="DeviceFederationOutboxStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)
+18 -9
View File
@@ -128,8 +128,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
name="DeviceListStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_list_id,
prefilled_cache=device_list_prefill,
)
@@ -142,8 +143,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_room_stream_cache = StreamChangeCache(
"DeviceListRoomStreamChangeCache",
min_device_list_room_id,
name="DeviceListRoomStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_device_list_room_id,
prefilled_cache=device_list_room_prefill,
)
@@ -159,8 +161,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=1000,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache",
user_signature_stream_list_id,
name="UserSignatureStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=user_signature_stream_list_id,
prefilled_cache=user_signature_stream_prefill,
)
@@ -178,8 +181,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
limit=10000,
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache",
device_list_federation_list_id,
name="DeviceListFederationStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=device_list_federation_list_id,
prefilled_cache=device_list_federation_prefill,
)
@@ -1773,7 +1777,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
self.device_id_exists_cache: LruCache[Tuple[str, str], Literal[True]] = (
LruCache(cache_name="device_id_exists", max_size=10000)
LruCache(
max_size=10000,
cache_name="device_id_exists",
# TODO
# cache_manager=hs.get_cache_manager(),
)
)
async def store_device(
@@ -145,7 +145,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# Cache of event ID to list of auth event IDs and their depths.
self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
500000, "_event_auth_cache", size_callback=len
500000,
cache_name="_event_auth_cache",
# TODO
# cache_manager=hs.get_cache_manager(),
size_callback=len,
)
# Flag used by unit tests to disable fallback when there is no chain cover
+14 -8
View File
@@ -88,12 +88,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
event_counter = Counter(
"synapse_storage_events_persisted_events_sep",
"",
["type", "origin_type", "origin_entity"],
)
# State event type/key pairs that we need to gather to fill in the
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
@@ -260,6 +254,18 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
self.persist_event_counter = Counter(
"synapse_storage_events_persisted_events",
"",
registry=hs.metrics_collector_registry,
)
self.event_counter = Counter(
"synapse_storage_events_persisted_events_sep",
"",
["type", "origin_type", "origin_entity"],
registry=hs.metrics_collector_registry,
)
@trace
async def _persist_events_and_state_updates(
self,
@@ -356,7 +362,7 @@ class PersistEventsStore:
new_event_links=new_event_links,
sliding_sync_table_changes=sliding_sync_table_changes,
)
persist_event_counter.inc(len(events_and_contexts))
self.persist_event_counter.inc(len(events_and_contexts))
if not use_negative_stream_ordering:
# we don't want to set the event_persisted_position to a negative
@@ -374,7 +380,7 @@ class PersistEventsStore:
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
event_counter.labels(event.type, origin_type, origin_entity).inc()
self.event_counter.labels(event.type, origin_type, origin_entity).inc()
if new_forward_extremities:
self.store.get_latest_event_ids_in_room.prefill(
@@ -269,8 +269,9 @@ class EventsWorkerStore(SQLBaseStore):
limit=1000,
)
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
name="_curr_state_delta_stream_cache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
@@ -283,8 +284,10 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache: AsyncLruCache[Tuple[str], EventCacheEntry] = (
AsyncLruCache(
cache_name="*getEvent*",
max_size=hs.config.caches.event_cache_size,
cache_name="*getEvent*",
# TODO
# cache_manager=hs.get_cache_manager(),
# `extra_index_cb` Returns a tuple as that is the key type
extra_index_cb=lambda _, v: (v.event.room_id,),
)
@@ -1233,7 +1236,9 @@ class EventsWorkerStore(SQLBaseStore):
to event row. Note that it may well contain additional events that
were not part of this request.
"""
with Measure(self._clock, "_fetch_event_list"):
with Measure(
self._clock, self._metrics_collector_registry, "_fetch_event_list"
):
try:
events_to_fetch = {
event_id for events, _ in event_list for event_id in events
+3 -2
View File
@@ -108,8 +108,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache",
min_presence_val,
name="PresenceStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_presence_val,
prefilled_cache=presence_cache_prefill,
)
+3 -2
View File
@@ -163,8 +163,9 @@ class PushRulesWorkerStore(
)
self.push_rules_stream_cache = StreamChangeCache(
"PushRulesStreamChangeCache",
push_rules_id,
name="PushRulesStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=push_rules_id,
prefilled_cache=push_rules_prefill,
)
+3 -2
View File
@@ -158,8 +158,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
limit=10000,
)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache",
min_receipts_stream_id,
name="ReceiptsRoomChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_receipts_stream_id,
prefilled_cache=receipts_stream_prefill,
)
+6 -1
View File
@@ -121,6 +121,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"",
[],
lambda: self._known_servers_count,
registry=hs.metrics_collector_registry,
)
@wrap_as_background_process("_count_known_servers")
@@ -983,7 +984,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
`_get_user_ids_from_membership_event_ids` for any uncached events.
"""
with Measure(self._clock, "get_joined_user_ids_from_state"):
with Measure(
self._clock,
self._metrics_collector_registry,
"get_joined_user_ids_from_state",
):
users_in_room = set()
member_event_ids = [
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
+6 -3
View File
@@ -617,12 +617,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache",
min_event_val,
name="EventsRoomStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max
name="MembershipStreamChangeCache",
cache_manager=hs.get_cache_manager(),
current_stream_pos=events_max,
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+6 -4
View File
@@ -123,14 +123,16 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
# vast majority of state in Matrix (today) is member events.
self._state_group_cache: DictionaryCache[int, StateKey, str] = DictionaryCache(
"*stateGroupCache*",
# TODO: this hasn't been tuned yet
50000,
max_entries=50000,
name="*stateGroupCache*",
cache_manager=hs.get_cache_manager(),
)
self._state_group_members_cache: DictionaryCache[int, StateKey, str] = (
DictionaryCache(
"*stateGroupMembersCache*",
500000,
max_entries=500000,
name="*stateGroupMembersCache*",
cache_manager=hs.get_cache_manager(),
)
)
+146 -100
View File
@@ -24,77 +24,102 @@ import logging
import typing
from enum import Enum, auto
from sys import intern
from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sized, TypeVar
import attr
from prometheus_client import REGISTRY
from prometheus_client import REGISTRY, CollectorRegistry
from prometheus_client.core import Gauge
from synapse.config.cache import add_resizable_cache
from synapse.util.metrics import DynamicCollectorRegistry
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# Whether to track estimated memory usage of the LruCaches.
TRACK_MEMORY_USAGE = False
# We track cache metrics in a special registry that lets us update the metrics
# just before they are returned from the scrape endpoint.
CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
caches_by_name: Dict[str, Sized] = {}
class CacheMetrics:
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
# We track cache metrics in a special registry that lets us update the metrics
# just before they are returned from the scrape endpoint.
self.CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
cache_size = Gauge(
"synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_hits = Gauge(
"synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_evicted = Gauge(
"synapse_util_caches_cache_evicted_size",
"",
["name", "reason"],
registry=CACHE_METRIC_REGISTRY,
)
cache_total = Gauge(
"synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_max_size = Gauge(
"synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
cache_memory_usage = Gauge(
"synapse_util_caches_cache_size_bytes",
"Estimated memory usage of the caches",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
self.cache_size = Gauge(
"synapse_util_caches_cache_size",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.cache_hits = Gauge(
"synapse_util_caches_cache_hits",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.cache_evicted = Gauge(
"synapse_util_caches_cache_evicted_size",
"",
["name", "reason"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.cache_total = Gauge(
"synapse_util_caches_cache",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.cache_max_size = Gauge(
"synapse_util_caches_cache_max_size",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.cache_memory_usage = Gauge(
"synapse_util_caches_cache_size_bytes",
"Estimated memory usage of the caches",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
response_cache_size = Gauge(
"synapse_util_caches_response_cache_size",
"",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_hits = Gauge(
"synapse_util_caches_response_cache_hits",
"",
["name"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_evicted = Gauge(
"synapse_util_caches_response_cache_evicted_size",
"",
["name", "reason"],
registry=CACHE_METRIC_REGISTRY,
)
response_cache_total = Gauge(
"synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
)
self.response_cache_size = Gauge(
"synapse_util_caches_response_cache_size",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.response_cache_hits = Gauge(
"synapse_util_caches_response_cache_hits",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.response_cache_evicted = Gauge(
"synapse_util_caches_response_cache_evicted_size",
"",
["name", "reason"],
registry=self.CACHE_METRIC_REGISTRY,
)
self.response_cache_total = Gauge(
"synapse_util_caches_response_cache",
"",
["name"],
registry=self.CACHE_METRIC_REGISTRY,
)
# Register our custom cache metrics registry with the global registry
if registry is not None:
registry.register(self.CACHE_METRIC_REGISTRY)
# Register our custom cache metrics registry with the global registry
REGISTRY.register(CACHE_METRIC_REGISTRY)
def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
"""
Registers a hook that is called before metric collection.
"""
self.CACHE_METRIC_REGISTRY.register_hook(metric_name, hook)
class EvictionReason(Enum):
@@ -105,6 +130,7 @@ class EvictionReason(Enum):
@attr.s(slots=True, auto_attribs=True)
class CacheMetric:
_cache_metrics: CacheMetrics
_cache: Sized
_cache_type: str
_cache_name: str
@@ -146,31 +172,41 @@ class CacheMetric:
def collect(self) -> None:
try:
if self._cache_type == "response_cache":
response_cache_size.labels(self._cache_name).set(len(self._cache))
response_cache_hits.labels(self._cache_name).set(self.hits)
self._cache_metrics.response_cache_size.labels(self._cache_name).set(
len(self._cache)
)
self._cache_metrics.response_cache_hits.labels(self._cache_name).set(
self.hits
)
for reason in EvictionReason:
response_cache_evicted.labels(self._cache_name, reason.name).set(
self.eviction_size_by_reason[reason]
)
response_cache_total.labels(self._cache_name).set(
self._cache_metrics.response_cache_evicted.labels(
self._cache_name, reason.name
).set(self.eviction_size_by_reason[reason])
self._cache_metrics.response_cache_total.labels(self._cache_name).set(
self.hits + self.misses
)
else:
cache_size.labels(self._cache_name).set(len(self._cache))
cache_hits.labels(self._cache_name).set(self.hits)
self._cache_metrics.cache_size.labels(self._cache_name).set(
len(self._cache)
)
self._cache_metrics.cache_hits.labels(self._cache_name).set(self.hits)
for reason in EvictionReason:
cache_evicted.labels(self._cache_name, reason.name).set(
self.eviction_size_by_reason[reason]
)
cache_total.labels(self._cache_name).set(self.hits + self.misses)
self._cache_metrics.cache_evicted.labels(
self._cache_name, reason.name
).set(self.eviction_size_by_reason[reason])
self._cache_metrics.cache_total.labels(self._cache_name).set(
self.hits + self.misses
)
max_size = getattr(self._cache, "max_size", None)
if max_size:
cache_max_size.labels(self._cache_name).set(max_size)
self._cache_metrics.cache_max_size.labels(self._cache_name).set(
max_size
)
if TRACK_MEMORY_USAGE:
# self.memory_usage can be None if nothing has been inserted
# into the cache yet.
cache_memory_usage.labels(self._cache_name).set(
self._cache_metrics.cache_memory_usage.labels(self._cache_name).set(
self.memory_usage or 0
)
if self._collect_callback:
@@ -180,41 +216,51 @@ class CacheMetric:
raise
def register_cache(
cache_type: str,
cache_name: str,
cache: Sized,
collect_callback: Optional[Callable] = None,
resizable: bool = True,
resize_callback: Optional[Callable] = None,
) -> CacheMetric:
"""Register a cache object for metric collection and resizing.
Args:
cache_type: a string indicating the "type" of the cache. This is used
only for deduplication so isn't too important provided it's constant.
cache_name: name of the cache
cache: cache itself, which must implement __len__(), and may optionally implement
a max_size property
collect_callback: If given, a function which is called during metric
collection to update additional metrics.
resizable: Whether this cache supports being resized, in which case either
resize_callback must be provided, or the cache must support set_max_size().
resize_callback: A function which can be called to resize the cache.
Returns:
an object which provides inc_{hits,misses,evictions} methods
class CacheManager:
"""
Used as a central point to register caches and collect metrics on them.
"""
if resizable:
if not resize_callback:
resize_callback = cache.set_cache_factor # type: ignore
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
caches_by_name[cache_name] = cache
CACHE_METRIC_REGISTRY.register_hook(metric_name, metric.collect)
return metric
def __init__(self, hs: "HomeServer") -> None:
self._cache_metrics = CacheMetrics(hs.metrics_collector_registry)
def register_cache(
self,
cache_type: str,
cache_name: str,
cache: Sized,
collect_callback: Optional[Callable] = None,
resizable: bool = True,
resize_callback: Optional[Callable] = None,
) -> CacheMetric:
"""Register a cache object for metric collection and resizing.
Args:
cache_type: a string indicating the "type" of the cache. This is used
only for deduplication so isn't too important provided it's constant.
cache_name: name of the cache
cache: cache itself, which must implement __len__(), and may optionally implement
a max_size property
collect_callback: If given, a function which is called during metric
collection to update additional metrics.
resizable: Whether this cache supports being resized, in which case either
resize_callback must be provided, or the cache must support set_max_size().
resize_callback: A function which can be called to resize the cache.
Returns:
an object which provides inc_{hits,misses,evictions} methods
"""
if resizable:
if not resize_callback:
resize_callback = cache.set_cache_factor # type: ignore
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(
self._cache_metrics, cache, cache_type, cache_name, collect_callback
)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
self._cache_metrics.register_hook(metric_name, metric.collect)
return metric
KNOWN_KEYS = {
+4 -1
View File
@@ -35,6 +35,7 @@ from typing import (
import attr
from synapse.util.caches import CacheManager
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
@@ -127,7 +128,7 @@ class DictionaryCache(Generic[KT, DKT, DV]):
for the '2' dict key.
"""
def __init__(self, name: str, max_entries: int = 1000):
def __init__(self, *, max_entries: int, name: str, cache_manager: CacheManager):
# We use a single LruCache to store two different types of entries:
# 1. Map from (key, dict_key) -> dict value (or sentinel, indicating
# the key doesn't exist in the dict); and
@@ -153,6 +154,8 @@ class DictionaryCache(Generic[KT, DKT, DV]):
] = LruCache(
max_size=max_entries,
cache_name=name,
# TODO
# cache_manager=cache_manager,
cache_type=TreeCache,
size_callback=len,
)
+3 -2
View File
@@ -30,7 +30,7 @@ from twisted.internet import defer
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.caches import EvictionReason, register_cache
from synapse.util.caches import CacheManager, EvictionReason
logger = logging.getLogger(__name__)
@@ -48,6 +48,7 @@ class ExpiringCache(Generic[KT, VT]):
self,
cache_name: str,
clock: Clock,
cache_manager: CacheManager,
max_len: int = 0,
expiry_ms: int = 0,
reset_expiry_on_get: bool = False,
@@ -83,7 +84,7 @@ class ExpiringCache(Generic[KT, VT]):
self.iterable = iterable
self.metrics = register_cache("expiring", cache_name, self)
self.metrics = cache_manager.register_cache("expiring", cache_name, self)
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
+28 -20
View File
@@ -50,9 +50,8 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches import CacheMetric, EvictionReason
from synapse.util.caches.treecache import (
TreeCache,
iterate_tree_cache_entry,
@@ -120,7 +119,10 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
@wrap_as_background_process("LruCache._expire_old_entries")
async def _expire_old_entries(
clock: Clock, expiry_seconds: float, autotune_config: Optional[dict]
hs: "HomeServer",
clock: Clock,
expiry_seconds: float,
autotune_config: Optional[dict],
) -> None:
"""Walks the global cache list to find cache entries that haven't been
accessed in the given number of seconds, or if a given memory threshold has been breached.
@@ -141,7 +143,7 @@ async def _expire_old_entries(
evicting_due_to_memory = False
# determine if we're evicting due to memory
jemalloc_interface = get_jemalloc_stats()
jemalloc_interface = hs.jemalloc_stats
if jemalloc_interface and autotune_config:
try:
jemalloc_interface.refresh_stats()
@@ -238,6 +240,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
clock.looping_call(
_expire_old_entries,
30 * 1000,
hs,
clock,
expiry_time,
hs.config.caches.cache_autotuning,
@@ -379,10 +382,11 @@ class LruCache(Generic[KT, VT]):
def __init__(
self,
max_size: int,
*,
cache_name: Optional[str] = None,
metrics_collection_callback: Optional[Callable[[], None]] = None,
cache_type: Type[Union[dict, TreeCache]] = dict,
size_callback: Optional[Callable[[VT], int]] = None,
metrics_collection_callback: Optional[Callable[[], None]] = None,
apply_cache_factor_from_config: bool = True,
clock: Optional[Clock] = None,
prune_unread_entries: bool = True,
@@ -395,11 +399,7 @@ class LruCache(Generic[KT, VT]):
cache_name: The name of this cache, for the prometheus metrics. If unset,
no metrics will be reported on this cache.
cache_type:
type of underlying cache to be used. Typically one of dict
or TreeCache.
size_callback:
Ignored if `cache_name` is `None`.
metrics_collection_callback:
metrics collection callback. This is called early in the metrics
@@ -407,7 +407,13 @@ class LruCache(Generic[KT, VT]):
prometheus Registry are collected, so can be used to update any dynamic
metrics.
Ignored if cache_name is None.
Ignored if `cache_name` is `None`.
cache_type:
type of underlying cache to be used. Typically one of dict
or TreeCache.
size_callback:
apply_cache_factor_from_config: If true, `max_size` will be
multiplied by a cache factor derived from the homeserver config
@@ -457,15 +463,17 @@ class LruCache(Generic[KT, VT]):
# do yet when we get resized.
self._on_resize: Optional[Callable[[], None]] = None
if cache_name is not None:
metrics: Optional[CacheMetric] = register_cache(
"lru_cache",
cache_name,
self,
collect_callback=metrics_collection_callback,
)
else:
metrics = None
# TODO
# if cache_name is not None:
# metrics: Optional[CacheMetric] = cache_manager.register_cache(
# "lru_cache",
# cache_name,
# self,
# collect_callback=metrics_collection_callback,
# )
# else:
# metrics = None
metrics: Optional[CacheMetric] = None
# this is exposed for access from outside this class
self.metrics = metrics
+5 -2
View File
@@ -43,7 +43,7 @@ from synapse.logging.opentracing import (
)
from synapse.util import Clock
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import EvictionReason, register_cache
from synapse.util.caches import CacheManager, EvictionReason
logger = logging.getLogger(__name__)
@@ -104,6 +104,7 @@ class ResponseCache(Generic[KV]):
def __init__(
self,
clock: Clock,
cache_manager: CacheManager,
name: str,
timeout_ms: float = 0,
enable_logging: bool = True,
@@ -114,7 +115,9 @@ class ResponseCache(Generic[KV]):
self.timeout_sec = timeout_ms / 1000.0
self._name = name
self._metrics = register_cache("response_cache", name, self, resizable=False)
self._metrics = cache_manager.register_cache(
"response_cache", name, self, resizable=False
)
self._enable_logging = enable_logging
def size(self) -> int:
+4 -2
View File
@@ -26,7 +26,7 @@ from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Un
import attr
from sortedcontainers import SortedDict
from synapse.util import caches
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -73,7 +73,9 @@ class StreamChangeCache:
def __init__(
self,
*,
name: str,
cache_manager: CacheManager,
current_stream_pos: int,
max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
@@ -95,7 +97,7 @@ class StreamChangeCache:
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = caches.register_cache(
self.metrics = cache_manager.register_cache(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)
+10 -3
View File
@@ -26,7 +26,7 @@ from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union
import attr
from sortedcontainers import SortedList
from synapse.util.caches import register_cache
from synapse.util.caches import CacheManager
logger = logging.getLogger(__name__)
@@ -40,7 +40,12 @@ VT = TypeVar("VT")
class TTLCache(Generic[KT, VT]):
"""A key/value cache implementation where each entry has its own TTL"""
def __init__(self, cache_name: str, timer: Callable[[], float] = time.time):
def __init__(
self,
cache_name: str,
cache_manager: CacheManager,
timer: Callable[[], float] = time.time,
):
# map from key to _CacheEntry
self._data: Dict[KT, _CacheEntry[KT, VT]] = {}
@@ -49,7 +54,9 @@ class TTLCache(Generic[KT, VT]):
self._timer = timer
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
self._metrics = cache_manager.register_cache(
"ttl", cache_name, self, resizable=False
)
def set(self, key: KT, value: VT, ttl: float) -> None:
"""Add/update an entry in the cache
+45 -14
View File
@@ -79,21 +79,23 @@ class _InFlightMetric(Protocol):
real_time_sum: float
# Tracks the number of blocks currently active
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
"synapse_util_metrics_block_in_flight",
"",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)
# Each homeserver will have its own `CollectorRegistry`, which is used to namespace the
# metrics to the given homeserver. Because of how `@measure_func` is used as a
# decorator, we have to keep track of this out of band.
metrics_collector_registry_to_in_flight_gauge: Dict[
CollectorRegistry, InFlightGauge
] = {}
P = ParamSpec("P")
R = TypeVar("R")
class HasClock(Protocol):
class HasClockAndMetricsCollectorRegistry(Protocol):
# Used to measure functions
clock: Clock
# Used to namespace the metrics to the given homeserver
metrics_collector_registry: CollectorRegistry
def measure_func(
@@ -119,13 +121,17 @@ def measure_func(
"""
def wrapper(
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
func: Callable[
Concatenate[HasClockAndMetricsCollectorRegistry, P], Awaitable[R]
],
) -> Callable[P, Awaitable[R]]:
block_name = func.__name__ if name is None else name
@wraps(func)
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
with Measure(self.clock, block_name):
async def measured_func(
self: HasClockAndMetricsCollectorRegistry, *args: P.args, **kwargs: P.kwargs
) -> R:
with Measure(self.clock, self.metrics_collector_registry, block_name):
r = await func(self, *args, **kwargs)
return r
@@ -141,12 +147,15 @@ def measure_func(
class Measure:
__slots__ = [
"clock",
"metrics_collector_registry",
"name",
"_logging_context",
"start",
]
def __init__(self, clock: Clock, name: str) -> None:
def __init__(
self, clock: Clock, metrics_collector_registry: CollectorRegistry, name: str
) -> None:
"""
Args:
clock: An object with a "time()" method, which returns the current
@@ -154,6 +163,7 @@ class Measure:
name: The name of the metric to report.
"""
self.clock = clock
self.metrics_collector_registry = metrics_collector_registry
self.name = name
curr_context = current_context()
if not curr_context:
@@ -168,13 +178,34 @@ class Measure:
self._logging_context = LoggingContext(str(curr_context), parent_context)
self.start: Optional[float] = None
def get_in_flight_gauge(self) -> InFlightGauge[_InFlightMetric]:
existing_in_flight_gauge = metrics_collector_registry_to_in_flight_gauge.get(
self.metrics_collector_registry, None
)
if existing_in_flight_gauge is not None:
return existing_in_flight_gauge
# Tracks the number of blocks currently active
in_flight_gauge: InFlightGauge[_InFlightMetric] = InFlightGauge(
"synapse_util_metrics_block_in_flight",
"",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
registry=self.metrics_collector_registry,
)
metrics_collector_registry_to_in_flight_gauge[
self.metrics_collector_registry
] = in_flight_gauge
return in_flight_gauge
def __enter__(self) -> "Measure":
if self.start is not None:
raise RuntimeError("Measure() objects cannot be re-used")
self.start = self.clock.time()
self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight)
self.get_in_flight_gauge().register((self.name,), self._update_in_flight)
logger.debug("Entering block %s", self.name)
@@ -194,7 +225,7 @@ class Measure:
duration = self.clock.time() - self.start
usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight)
self.get_in_flight_gauge().unregister((self.name,), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
try:
+1
View File
@@ -132,6 +132,7 @@ class TaskScheduler:
"The number of concurrent running tasks handled by the TaskScheduler",
labels=None,
caller=lambda: len(self._running_tasks),
registry=hs.metrics_collector_registry,
)
def register_action(
+10
View File
@@ -75,6 +75,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.store = homeserver.get_datastores().main
self.presence_handler = homeserver.get_presence_handler()
def test_offline_to_online(self) -> None:
wheel_timer = Mock()
@@ -87,6 +88,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -134,6 +136,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -184,6 +187,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -232,6 +236,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -272,6 +277,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=False,
@@ -311,6 +317,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -338,6 +345,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -428,6 +436,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
new_state = prev_state.copy_and_replace(state=final_state, last_active_ts=now)
handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
@@ -491,6 +500,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
)
state, persist_and_notify, federation_ping = handle_update(
self.presence_handler,
prev_state,
new_state,
is_mine=True,
+13 -8
View File
@@ -84,14 +84,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
# we mock out the federation client too
self.mock_federation_client = AsyncMock(spec=["put_json"])
self.mock_federation_client.put_json.return_value = (200, "OK")
self.mock_federation_client.agent = MatrixFederationAgent(
reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",
ip_allowlist=None,
ip_blocklist=IPSet(),
)
# the tests assume that we are starting at unix time 1000
reactor.pump((1000,))
@@ -104,6 +96,19 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
replication_streams={},
)
# Finish off mocking the federation client
self.mock_federation_client.put_json.return_value = (200, "OK")
self.mock_federation_client.agent = MatrixFederationAgent(
reactor=reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",
ip_allowlist=None,
ip_blocklist=IPSet(),
# After we get access to the `hs` homeserver instance, we can replace the federation agent
metrics_collector_registry=hs.metrics_collector_registry,
cache_manager=hs.get_cache_manager(),
)
return hs
def create_resource_dict(self) -> Dict[str, Resource]:
@@ -21,10 +21,11 @@ import base64
import logging
import os
from typing import Generator, List, Optional, cast
from unittest.mock import AsyncMock, call, patch
from unittest.mock import AsyncMock, Mock, call, patch
import treq
from netaddr import IPSet
from prometheus_client import CollectorRegistry
from service_identity import VerificationError
from zope.interface import implementer
@@ -60,6 +61,7 @@ from synapse.logging.context import (
current_context,
)
from synapse.types import ISynapseReactor
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from tests import unittest
@@ -84,16 +86,24 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.tls_factory = FederationPolicyForHTTPS(config)
cache_manager = Mock(spec=CacheManager)
self.well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
"test_cache", timer=self.reactor.seconds
"test_cache",
timer=self.reactor.seconds,
cache_manager=cache_manager,
)
self.had_well_known_cache: TTLCache[bytes, bool] = TTLCache(
"test_cache", timer=self.reactor.seconds
"test_cache",
timer=self.reactor.seconds,
cache_manager=cache_manager,
)
self.well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
b"test-agent",
metrics_collector_registry=CollectorRegistry(auto_describe=True),
cache_manager=cache_manager,
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
)
@@ -274,6 +284,8 @@ class MatrixFederationAgentTests(unittest.TestCase):
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
ip_allowlist=IPSet(),
ip_blocklist=IPSet(),
metrics_collector_registry=CollectorRegistry(auto_describe=True),
cache_manager=Mock(spec=CacheManager),
_srv_resolver=self.mock_resolver,
_well_known_resolver=self.well_known_resolver,
)
@@ -1016,11 +1028,15 @@ class MatrixFederationAgentTests(unittest.TestCase):
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
ip_allowlist=IPSet(),
ip_blocklist=IPSet(),
metrics_collector_registry=CollectorRegistry(auto_describe=True),
cache_manager=Mock(spec=CacheManager),
_srv_resolver=self.mock_resolver,
_well_known_resolver=WellKnownResolver(
cast(ISynapseReactor, self.reactor),
Agent(self.reactor, contextFactory=tls_factory),
b"test-agent",
metrics_collector_registry=CollectorRegistry(auto_describe=True),
cache_manager=Mock(spec=CacheManager),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
),
+3 -1
View File
@@ -139,7 +139,9 @@ class TypingStreamTestCase(BaseStreamTestCase):
self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
typing._latest_room_serial = 0
typing._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", typing._latest_room_serial
name="TypingStreamChangeCache",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=typing._latest_room_serial,
)
typing._reset()
@@ -22,6 +22,7 @@ import logging
from unittest.mock import AsyncMock, Mock
from netaddr import IPSet
from prometheus_client import CollectorRegistry
from signedjson.key import (
encode_verify_key_base64,
generate_signing_key,
@@ -42,6 +43,7 @@ from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock
from synapse.util.caches import CacheManager
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import get_clock
@@ -68,11 +70,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
reactor, _ = get_clock()
self.matrix_federation_agent = MatrixFederationAgent(
reactor,
reactor=reactor,
tls_client_options_factory=None,
user_agent=b"SynapseInTrialTest/0.0.0",
ip_allowlist=None,
ip_blocklist=IPSet(),
metrics_collector_registry=CollectorRegistry(auto_describe=True),
cache_manager=Mock(spec=CacheManager),
)
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+1 -2
View File
@@ -20,7 +20,6 @@
#
from prometheus_client import generate_latest
from synapse.metrics import REGISTRY
from synapse.types import UserID, create_requester
from tests.unittest import HomeserverTestCase
@@ -61,7 +60,7 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x and b"# HELP" not in x,
generate_latest(REGISTRY).split(b"\n"),
generate_latest(self.hs.metrics_collector_registry).split(b"\n"),
)
)
+3 -1
View File
@@ -25,6 +25,7 @@ from parameterized import parameterized
from twisted.internet import defer
from synapse.util.caches import CacheManager
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from tests.server import get_clock
@@ -44,9 +45,10 @@ class ResponseCacheTestCase(TestCase):
def setUp(self) -> None:
self.reactor, self.clock = get_clock()
self.cache_manager = Mock(spec=CacheManager)
def with_cache(self, name: str, ms: int = 0) -> ResponseCache:
return ResponseCache(self.clock, name, timeout_ms=ms)
return ResponseCache(self.clock, self.cache_manager, name, timeout_ms=ms)
@staticmethod
async def instant_return(o: str) -> str:
+6 -1
View File
@@ -20,6 +20,7 @@
from unittest.mock import Mock
from synapse.util.caches import CacheManager
from synapse.util.caches.ttlcache import TTLCache
from tests import unittest
@@ -28,7 +29,11 @@ from tests import unittest
class CacheTestCase(unittest.TestCase):
def setUp(self) -> None:
self.mock_timer = Mock(side_effect=lambda: 100.0)
self.cache: TTLCache[str, str] = TTLCache("test_cache", self.mock_timer)
self.cache: TTLCache[str, str] = TTLCache(
"test_cache",
cache_manager=Mock(spec=CacheManager),
timer=self.mock_timer,
)
def test_get(self) -> None:
"""simple set/get tests"""
+6 -1
View File
@@ -20,6 +20,9 @@
#
from unittest.mock import Mock
from synapse.util.caches import CacheManager
from synapse.util.caches.dictionary_cache import DictionaryCache
from tests import unittest
@@ -28,7 +31,9 @@ from tests import unittest
class DictCacheTestCase(unittest.TestCase):
def setUp(self) -> None:
self.cache: DictionaryCache[str, str, str] = DictionaryCache(
"foobar", max_entries=10
max_entries=10,
name="foobar",
cache_manager=Mock(spec=CacheManager),
)
def test_simple_cache_hit_full(self) -> None:
+13 -4
View File
@@ -20,8 +20,10 @@
#
from typing import List, cast
from unittest.mock import Mock
from synapse.util import Clock
from synapse.util.caches import CacheManager
from synapse.util.caches.expiringcache import ExpiringCache
from tests.utils import MockClock
@@ -33,7 +35,7 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
def test_get_set(self) -> None:
clock = MockClock()
cache: ExpiringCache[str, str] = ExpiringCache(
"test", cast(Clock, clock), max_len=1
"test", cast(Clock, clock), cache_manager=Mock(spec=CacheManager), max_len=1
)
cache["key"] = "value"
@@ -43,7 +45,7 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
def test_eviction(self) -> None:
clock = MockClock()
cache: ExpiringCache[str, str] = ExpiringCache(
"test", cast(Clock, clock), max_len=2
"test", cast(Clock, clock), cache_manager=Mock(spec=CacheManager), max_len=2
)
cache["key"] = "value"
@@ -59,7 +61,11 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
def test_iterable_eviction(self) -> None:
clock = MockClock()
cache: ExpiringCache[str, List[int]] = ExpiringCache(
"test", cast(Clock, clock), max_len=5, iterable=True
"test",
cast(Clock, clock),
cache_manager=Mock(spec=CacheManager),
max_len=5,
iterable=True,
)
cache["key"] = [1]
@@ -79,7 +85,10 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
def test_time_eviction(self) -> None:
clock = MockClock()
cache: ExpiringCache[str, int] = ExpiringCache(
"test", cast(Clock, clock), expiry_ms=1000
"test",
cast(Clock, clock),
cache_manager=Mock(spec=CacheManager),
expiry_ms=1000,
)
cache["key"] = 1
+7 -5
View File
@@ -21,7 +21,7 @@
from typing import List, Tuple
from unittest.mock import Mock, patch
from unittest.mock import Mock
from synapse.metrics.jemalloc import JemallocStats
from synapse.types import JsonDict
@@ -96,7 +96,10 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
@override_config({"caches": {"per_cache_factors": {"mycache": 10}}})
def test_special_size(self) -> None:
cache: LruCache = LruCache(10, "mycache")
cache: LruCache = LruCache(
10,
cache_name="mycache",
)
self.assertEqual(cache.max_size, 100)
@@ -342,10 +345,9 @@ class MemoryEvictionTestCase(unittest.HomeserverTestCase):
}
}
)
@patch("synapse.util.caches.lrucache.get_jemalloc_stats")
def test_evict_memory(self, jemalloc_interface: Mock) -> None:
def test_evict_memory(self) -> None:
mock_jemalloc_class = Mock(spec=JemallocStats)
jemalloc_interface.return_value = mock_jemalloc_class
self.hs.jemalloc_stats = mock_jemalloc_class()
# set the return value of get_stat() to be greater than max_cache_memory_usage
mock_jemalloc_class.get_stat.return_value = 924288000
+43 -8
View File
@@ -15,7 +15,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
Providing a prefilled cache to StreamChangeCache will result in a cache
with the prefilled-cache entered in.
"""
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
prefilled_cache={"user@foo.com": 2},
)
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
def test_has_entity_changed(self) -> None:
@@ -23,7 +28,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
StreamChangeCache.entity_has_changed will mark entities as changed, and
has_entity_changed will observe the changed entities.
"""
cache = StreamChangeCache("#test", 3)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=3,
)
cache.entity_has_changed("user@foo.com", 6)
cache.entity_has_changed("bar@baz.net", 7)
@@ -61,7 +70,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
StreamChangeCache.entity_has_changed will respect the max size and
purge the oldest items upon reaching that max size.
"""
cache = StreamChangeCache("#test", 1, max_size=2)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
max_size=2,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -100,7 +114,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
entities since the given position. If the position is before the start
of the known stream, it returns None instead.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -148,7 +166,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
stream position is before it, it will return True, otherwise False if
the cache has no entries.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
# With no entities, it returns True for the past, present, and False for
# the future.
@@ -175,7 +197,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
stream position is earlier than the earliest known position, it will
return all of the entities queried for.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -242,7 +268,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
recent point where the entity could have changed. If the entity is not
known, the stream start is provided instead.
"""
cache = StreamChangeCache("#test", 1)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)
@@ -260,7 +290,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
"""
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
"""
cache = StreamChangeCache("#test", 1, max_size=10)
cache = StreamChangeCache(
name="#test",
cache_manager=self.hs.get_cache_manager(),
current_stream_pos=1,
max_size=10,
)
cache.entity_has_changed("user@foo.com", 2)
cache.entity_has_changed("bar@baz.net", 3)