Compare commits
42 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1c51b74e2d | |||
| ec16224828 | |||
| 00140fcc99 | |||
| 6e49db78ce | |||
| d4d59a49b3 | |||
| 5f5b3496f9 | |||
| e433a680a1 | |||
| c5bbff9584 | |||
| 6a11964228 | |||
| 962d5cecbf | |||
| e36c6f7803 | |||
| 827b78554f | |||
| 57546bffad | |||
| 9f82f66c76 | |||
| 3b12305376 | |||
| b4ed25e4fb | |||
| 356792fe6d | |||
| f75ea5cfcd | |||
| f68211bd0c | |||
| a707a25389 | |||
| 627a77bc5e | |||
| 6314f6cba0 | |||
| 4a5c2f89c7 | |||
| fc04be1bed | |||
| 2521a17ef4 | |||
| e72d782189 | |||
| efa6018519 | |||
| 65080f8464 | |||
| 1400e155b2 | |||
| ccd0a71395 | |||
| 4018aa4d98 | |||
| f49789bfd0 | |||
| 9943cd39ad | |||
| d3d228ea2d | |||
| d3c23f8235 | |||
| 3de2c0970e | |||
| 3215897de6 | |||
| 86f39b04a6 | |||
| 72b3becb89 | |||
| a6dd8a96bb | |||
| 9c6863e1a7 | |||
| c07d3108cc |
@@ -0,0 +1 @@
|
||||
Refactor metrics to collect in homeserver-specific registry.
|
||||
@@ -53,15 +53,17 @@
|
||||
type: http
|
||||
x_forwarded: true
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false
|
||||
|
||||
# beginning of the new metrics listener
|
||||
- port: 9000
|
||||
type: metrics
|
||||
type: http
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
resources:
|
||||
- names: [metrics]
|
||||
compress: false
|
||||
```
|
||||
|
||||
1. Restart Synapse.
|
||||
|
||||
@@ -451,8 +451,7 @@ properties:
|
||||
type: string
|
||||
description: >-
|
||||
The type of listener. Normally `http`, but other valid options are
|
||||
[`manhole`](../../manhole.md) and
|
||||
[`metrics`](../../metrics-howto.md).
|
||||
[`manhole`](../../manhole.md).
|
||||
enum:
|
||||
- http
|
||||
- manhole
|
||||
|
||||
@@ -207,6 +207,7 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
# the old access token to continue working for a short time.
|
||||
self._introspection_cache: ResponseCache[str] = ResponseCache(
|
||||
self._clock,
|
||||
hs.get_cache_manager(),
|
||||
"token_introspection",
|
||||
timeout_ms=120_000,
|
||||
# don't log because the keys are access tokens
|
||||
|
||||
+22
-18
@@ -74,9 +74,13 @@ from synapse.handlers.auth import load_legacy_password_auth_providers
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import init_tracer
|
||||
from synapse.metrics import install_gc_manager, register_threadpool
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||
from synapse.metrics import CPUMetrics, install_gc_manager, register_threadpool
|
||||
from synapse.metrics._gc import GCCounts, PyPyGCStats, running_on_pypy
|
||||
from synapse.metrics._reactor_metrics import setup_reactor_metrics
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
BackgroundProcessCollector,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
|
||||
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
|
||||
load_legacy_third_party_event_rules,
|
||||
@@ -178,7 +182,6 @@ def start_reactor(
|
||||
|
||||
def run() -> None:
|
||||
logger.info("Running")
|
||||
setup_jemalloc_stats()
|
||||
change_resource_limit(soft_file_limit)
|
||||
if gc_thresholds:
|
||||
gc.set_threshold(*gc_thresholds)
|
||||
@@ -283,20 +286,6 @@ def register_start(
|
||||
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
|
||||
|
||||
|
||||
def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
|
||||
"""
|
||||
Start Prometheus metrics server.
|
||||
"""
|
||||
from prometheus_client import start_http_server as start_http_server_prometheus
|
||||
|
||||
from synapse.metrics import RegistryProxy
|
||||
|
||||
for host in bind_addresses:
|
||||
logger.info("Starting metrics listener on %s:%d", host, port)
|
||||
_set_prometheus_client_use_created_metrics(False)
|
||||
start_http_server_prometheus(port, addr=host, registry=RegistryProxy)
|
||||
|
||||
|
||||
def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
|
||||
"""
|
||||
Sets whether prometheus_client should expose `_created`-suffixed metrics for
|
||||
@@ -607,6 +596,7 @@ async def start(hs: "HomeServer") -> None:
|
||||
)
|
||||
|
||||
setup_sentry(hs)
|
||||
setup_global_metrics(hs)
|
||||
setup_sdnotify(hs)
|
||||
|
||||
# If background tasks are running on the main process or this is the worker in
|
||||
@@ -694,6 +684,20 @@ def setup_sentry(hs: "HomeServer") -> None:
|
||||
global_scope.set_tag("worker_name", name)
|
||||
|
||||
|
||||
def setup_global_metrics(hs: "HomeServer") -> None:
|
||||
"""Set up global metrics for this homeserver.
|
||||
|
||||
This is called after the homeserver has been set up, but before any
|
||||
listeners are started.
|
||||
"""
|
||||
CPUMetrics(registry=hs.metrics_collector_registry)
|
||||
if running_on_pypy:
|
||||
PyPyGCStats(registry=hs.metrics_collector_registry)
|
||||
GCCounts(registry=hs.metrics_collector_registry)
|
||||
BackgroundProcessCollector(registry=hs.metrics_collector_registry)
|
||||
setup_reactor_metrics(registry=hs.metrics_collector_registry)
|
||||
|
||||
|
||||
def setup_sdnotify(hs: "HomeServer") -> None:
|
||||
"""Adds process state hooks to tell systemd what we are up to."""
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
from synapse.http.server import JsonResource, OptionsResource
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.rest import ClientRestResource, admin
|
||||
from synapse.rest.health import HealthResource
|
||||
@@ -186,7 +186,9 @@ class GenericWorkerServer(HomeServer):
|
||||
for res in listener_config.http_options.resources:
|
||||
for name in res.names:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
resources[METRICS_PREFIX] = MetricsResource(
|
||||
self.metrics_collector_registry
|
||||
)
|
||||
elif name == "client":
|
||||
resource: Resource = ClientRestResource(self)
|
||||
|
||||
@@ -283,23 +285,6 @@ class GenericWorkerServer(HomeServer):
|
||||
raise ConfigError(
|
||||
"Can not using a unix socket for manhole at this time."
|
||||
)
|
||||
|
||||
elif listener.type == "metrics":
|
||||
if not self.config.metrics.enable_metrics:
|
||||
logger.warning(
|
||||
"Metrics listener configured, but enable_metrics is not True!"
|
||||
)
|
||||
else:
|
||||
if isinstance(listener, TCPListenerConfig):
|
||||
_base.listen_metrics(
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
)
|
||||
else:
|
||||
raise ConfigError(
|
||||
"Can not use a unix socket for metrics at this time."
|
||||
)
|
||||
|
||||
else:
|
||||
logger.warning("Unsupported listener type: %s", listener.type)
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ from synapse.http.server import (
|
||||
StaticResource,
|
||||
)
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.rest import ClientRestResource, admin
|
||||
from synapse.rest.health import HealthResource
|
||||
@@ -252,7 +252,9 @@ class SynapseHomeServer(HomeServer):
|
||||
resources[SERVER_KEY_PREFIX] = KeyResource(self)
|
||||
|
||||
if name == "metrics" and self.config.metrics.enable_metrics:
|
||||
metrics_resource: Resource = MetricsResource(RegistryProxy)
|
||||
metrics_resource: Resource = MetricsResource(
|
||||
self.metrics_collector_registry
|
||||
)
|
||||
if compress:
|
||||
metrics_resource = gz_wrap(metrics_resource)
|
||||
resources[METRICS_PREFIX] = metrics_resource
|
||||
@@ -286,22 +288,6 @@ class SynapseHomeServer(HomeServer):
|
||||
raise ConfigError(
|
||||
"Can not use a unix socket for manhole at this time."
|
||||
)
|
||||
elif listener.type == "metrics":
|
||||
if not self.config.metrics.enable_metrics:
|
||||
logger.warning(
|
||||
"Metrics listener configured, but enable_metrics is not True!"
|
||||
)
|
||||
else:
|
||||
if isinstance(listener, TCPListenerConfig):
|
||||
_base.listen_metrics(
|
||||
listener.bind_addresses,
|
||||
listener.port,
|
||||
)
|
||||
else:
|
||||
raise ConfigError(
|
||||
"Can not use a unix socket for metrics at this time."
|
||||
)
|
||||
|
||||
else:
|
||||
# this shouldn't happen, as the listener type should have been checked
|
||||
# during parsing
|
||||
|
||||
+44
-33
@@ -56,33 +56,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_transactions_counter = Counter(
|
||||
"synapse_appservice_api_sent_transactions",
|
||||
"Number of /transactions/ requests sent",
|
||||
["service"],
|
||||
)
|
||||
|
||||
failed_transactions_counter = Counter(
|
||||
"synapse_appservice_api_failed_transactions",
|
||||
"Number of /transactions/ requests that failed to send",
|
||||
["service"],
|
||||
)
|
||||
|
||||
sent_events_counter = Counter(
|
||||
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
|
||||
)
|
||||
|
||||
sent_ephemeral_counter = Counter(
|
||||
"synapse_appservice_api_sent_ephemeral",
|
||||
"Number of ephemeral events sent to the AS",
|
||||
["service"],
|
||||
)
|
||||
|
||||
sent_todevice_counter = Counter(
|
||||
"synapse_appservice_api_sent_todevice",
|
||||
"Number of todevice messages sent to the AS",
|
||||
["service"],
|
||||
)
|
||||
|
||||
HOUR_IN_MS = 60 * 60 * 1000
|
||||
|
||||
@@ -130,7 +103,45 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
self.config = hs.config.appservice
|
||||
|
||||
self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
|
||||
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"as_protocol_meta",
|
||||
timeout_ms=HOUR_IN_MS,
|
||||
)
|
||||
|
||||
self.sent_transactions_counter = Counter(
|
||||
"synapse_appservice_api_sent_transactions",
|
||||
"Number of /transactions/ requests sent",
|
||||
["service"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.failed_transactions_counter = Counter(
|
||||
"synapse_appservice_api_failed_transactions",
|
||||
"Number of /transactions/ requests that failed to send",
|
||||
["service"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_events_counter = Counter(
|
||||
"synapse_appservice_api_sent_events",
|
||||
"Number of events sent to the AS",
|
||||
["service"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_ephemeral_counter = Counter(
|
||||
"synapse_appservice_api_sent_ephemeral",
|
||||
"Number of ephemeral events sent to the AS",
|
||||
["service"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_todevice_counter = Counter(
|
||||
"synapse_appservice_api_sent_todevice",
|
||||
"Number of todevice messages sent to the AS",
|
||||
["service"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]:
|
||||
@@ -395,10 +406,10 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
service.url,
|
||||
[event.get("event_id") for event in events],
|
||||
)
|
||||
sent_transactions_counter.labels(service.id).inc()
|
||||
sent_events_counter.labels(service.id).inc(len(serialized_events))
|
||||
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
|
||||
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
|
||||
self.sent_transactions_counter.labels(service.id).inc()
|
||||
self.sent_events_counter.labels(service.id).inc(len(serialized_events))
|
||||
self.sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
|
||||
self.sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
|
||||
return True
|
||||
except CodeMessageException as e:
|
||||
logger.warning(
|
||||
@@ -417,7 +428,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
ex.args,
|
||||
exc_info=logger.isEnabledFor(logging.DEBUG),
|
||||
)
|
||||
failed_transactions_counter.labels(service.id).inc()
|
||||
self.failed_transactions_counter.labels(service.id).inc()
|
||||
return False
|
||||
|
||||
async def claim_client_keys(
|
||||
|
||||
@@ -85,8 +85,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
|
||||
|
||||
|
||||
PDU_RETRY_TIME_MS = 1 * 60 * 1000
|
||||
|
||||
@@ -145,6 +143,7 @@ class FederationClient(FederationBase):
|
||||
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
|
||||
cache_name="get_pdu_cache",
|
||||
clock=self._clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=1000,
|
||||
expiry_ms=120 * 1000,
|
||||
reset_expiry_on_get=False,
|
||||
@@ -163,11 +162,19 @@ class FederationClient(FederationBase):
|
||||
] = ExpiringCache(
|
||||
cache_name="get_room_hierarchy_cache",
|
||||
clock=self._clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=1000,
|
||||
expiry_ms=5 * 60 * 1000,
|
||||
reset_expiry_on_get=False,
|
||||
)
|
||||
|
||||
self.sent_queries_counter = Counter(
|
||||
"synapse_federation_client_sent_queries",
|
||||
"",
|
||||
["type"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def _clear_tried_cache(self) -> None:
|
||||
"""Clear pdu_destination_tried cache"""
|
||||
now = self._clock.time_msec()
|
||||
@@ -207,7 +214,7 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels(query_type).inc()
|
||||
self.sent_queries_counter.labels(query_type).inc()
|
||||
|
||||
return await self.transport_layer.make_query(
|
||||
destination,
|
||||
@@ -229,7 +236,7 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels("client_device_keys").inc()
|
||||
self.sent_queries_counter.labels("client_device_keys").inc()
|
||||
return await self.transport_layer.query_client_keys(
|
||||
destination, content, timeout
|
||||
)
|
||||
@@ -240,7 +247,7 @@ class FederationClient(FederationBase):
|
||||
"""Query the device keys for a list of user ids hosted on a remote
|
||||
server.
|
||||
"""
|
||||
sent_queries_counter.labels("user_devices").inc()
|
||||
self.sent_queries_counter.labels("user_devices").inc()
|
||||
return await self.transport_layer.query_user_devices(
|
||||
destination, user_id, timeout
|
||||
)
|
||||
@@ -262,7 +269,7 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
The JSON object from the response
|
||||
"""
|
||||
sent_queries_counter.labels("client_one_time_keys").inc()
|
||||
self.sent_queries_counter.labels("client_one_time_keys").inc()
|
||||
|
||||
# Convert the query with counts into a stable and unstable query and check
|
||||
# if attempting to claim more than 1 OTK.
|
||||
|
||||
@@ -105,25 +105,6 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
|
||||
|
||||
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
|
||||
|
||||
received_queries_counter = Counter(
|
||||
"synapse_federation_server_received_queries", "", ["type"]
|
||||
)
|
||||
|
||||
pdu_process_time = Histogram(
|
||||
"synapse_federation_server_pdu_process_time",
|
||||
"Time taken to process an event",
|
||||
)
|
||||
|
||||
last_pdu_ts_metric = Gauge(
|
||||
"synapse_federation_last_received_pdu_time",
|
||||
"The timestamp of the last PDU which was successfully received from the given domain",
|
||||
labelnames=("server_name",),
|
||||
)
|
||||
|
||||
|
||||
# The name of the lock to use when process events in a room received over
|
||||
# federation.
|
||||
@@ -160,7 +141,10 @@ class FederationServer(FederationBase):
|
||||
|
||||
# We cache results for transaction with the same ID
|
||||
self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
|
||||
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"fed_txn_handler",
|
||||
timeout_ms=30000,
|
||||
)
|
||||
|
||||
self.transaction_actions = TransactionActions(self.store)
|
||||
@@ -170,10 +154,18 @@ class FederationServer(FederationBase):
|
||||
# We cache responses to state queries, as they take a while and often
|
||||
# come in waves.
|
||||
self._state_resp_cache: ResponseCache[Tuple[str, Optional[str]]] = (
|
||||
ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
|
||||
ResponseCache(
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"state_resp",
|
||||
timeout_ms=30000,
|
||||
)
|
||||
)
|
||||
self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
|
||||
hs.get_clock(), "state_ids_resp", timeout_ms=30000
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"state_ids_resp",
|
||||
timeout_ms=30000,
|
||||
)
|
||||
|
||||
self._federation_metrics_domains = (
|
||||
@@ -185,6 +177,38 @@ class FederationServer(FederationBase):
|
||||
# Whether we have started handling old events in the staging area.
|
||||
self._started_handling_of_staged_events = False
|
||||
|
||||
self.received_pdus_counter = Counter(
|
||||
"synapse_federation_server_received_pdus",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.received_edus_counter = Counter(
|
||||
"synapse_federation_server_received_edus",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.received_queries_counter = Counter(
|
||||
"synapse_federation_server_received_queries",
|
||||
"",
|
||||
["type"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.pdu_process_time = Histogram(
|
||||
"synapse_federation_server_pdu_process_time",
|
||||
"Time taken to process an event",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.last_pdu_ts_metric = Gauge(
|
||||
"synapse_federation_last_received_pdu_time",
|
||||
"The timestamp of the last PDU which was successfully received from the given domain",
|
||||
labelnames=("server_name",),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_handle_old_staged_events")
|
||||
async def _handle_old_staged_events(self) -> None:
|
||||
"""Handle old staged events by fetching all rooms that have staged
|
||||
@@ -424,7 +448,7 @@ class FederationServer(FederationBase):
|
||||
report back to the sending server.
|
||||
"""
|
||||
|
||||
received_pdus_counter.inc(len(transaction.pdus))
|
||||
self.received_pdus_counter.inc(len(transaction.pdus))
|
||||
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
|
||||
@@ -535,7 +559,7 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
|
||||
if newest_pdu_ts and origin in self._federation_metrics_domains:
|
||||
last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)
|
||||
self.last_pdu_ts_metric.labels(server_name=origin).set(newest_pdu_ts / 1000)
|
||||
|
||||
return pdu_results
|
||||
|
||||
@@ -543,7 +567,7 @@ class FederationServer(FederationBase):
|
||||
"""Process the EDUs in a received transaction."""
|
||||
|
||||
async def _process_edu(edu_dict: JsonDict) -> None:
|
||||
received_edus_counter.inc()
|
||||
self.received_edus_counter.inc()
|
||||
|
||||
edu = Edu(
|
||||
origin=origin,
|
||||
@@ -658,7 +682,7 @@ class FederationServer(FederationBase):
|
||||
async def on_query_request(
|
||||
self, query_type: str, args: Dict[str, str]
|
||||
) -> Tuple[int, Dict[str, Any]]:
|
||||
received_queries_counter.labels(query_type).inc()
|
||||
self.received_queries_counter.labels(query_type).inc()
|
||||
resp = await self.registry.on_query(query_type, args)
|
||||
return 200, resp
|
||||
|
||||
@@ -1300,7 +1324,7 @@ class FederationServer(FederationBase):
|
||||
origin, event.event_id
|
||||
)
|
||||
if received_ts is not None:
|
||||
pdu_process_time.observe(
|
||||
self.pdu_process_time.observe(
|
||||
(self._clock.time_msec() - received_ts) / 1000
|
||||
)
|
||||
|
||||
|
||||
@@ -73,6 +73,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.is_mine_server_name = hs.is_mine_server_name
|
||||
@@ -117,6 +118,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"",
|
||||
[],
|
||||
lambda: len(queue),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
@@ -156,7 +158,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete: int) -> None:
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_destinations.keys()
|
||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||
|
||||
@@ -186,15 +186,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations_count",
|
||||
"Number of PDUs queued for sending to one or more destinations",
|
||||
)
|
||||
|
||||
sent_pdus_destination_dist_total = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations",
|
||||
"Total number of PDUs queued for sending across all destinations",
|
||||
)
|
||||
|
||||
# Time (in s) to wait before trying to wake up destinations that have
|
||||
# catch-up outstanding.
|
||||
@@ -378,6 +369,7 @@ class FederationSender(AbstractFederationSender):
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.is_mine_server_name = hs.is_mine_server_name
|
||||
|
||||
@@ -399,6 +391,7 @@ class FederationSender(AbstractFederationSender):
|
||||
for d in self._per_destination_queues.values()
|
||||
if d.transmission_loop_running
|
||||
),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
@@ -408,6 +401,7 @@ class FederationSender(AbstractFederationSender):
|
||||
lambda: sum(
|
||||
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
||||
),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
LaterGauge(
|
||||
"synapse_federation_transaction_queue_pending_edus",
|
||||
@@ -416,6 +410,19 @@ class FederationSender(AbstractFederationSender):
|
||||
lambda: sum(
|
||||
d.pending_edu_count() for d in self._per_destination_queues.values()
|
||||
),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_pdus_destination_dist_count = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations_count",
|
||||
"Number of PDUs queued for sending to one or more destinations",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_pdus_destination_dist_total = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations",
|
||||
"Total number of PDUs queued for sending across all destinations",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self._is_processing = False
|
||||
@@ -657,7 +664,11 @@ class FederationSender(AbstractFederationSender):
|
||||
logger.debug(
|
||||
"Handling %i events in room %s", len(events), events[0].room_id
|
||||
)
|
||||
with Measure(self.clock, "handle_room_events"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"handle_room_events",
|
||||
):
|
||||
for event in events:
|
||||
await handle_event(event)
|
||||
|
||||
@@ -723,8 +734,8 @@ class FederationSender(AbstractFederationSender):
|
||||
if not destinations:
|
||||
return
|
||||
|
||||
sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.inc()
|
||||
self.sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
self.sent_pdus_destination_dist_count.inc()
|
||||
|
||||
assert pdu.internal_metadata.stream_ordering
|
||||
|
||||
|
||||
@@ -55,17 +55,6 @@ MAX_EDUS_PER_TRANSACTION = 100
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
sent_edus_counter = Counter(
|
||||
"synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
|
||||
)
|
||||
|
||||
sent_edus_by_type = Counter(
|
||||
"synapse_federation_client_sent_edus_by_type",
|
||||
"Number of sent EDUs successfully sent, by event type",
|
||||
["type"],
|
||||
)
|
||||
|
||||
|
||||
# If the retry interval is larger than this then we enter "catchup" mode
|
||||
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
|
||||
|
||||
@@ -164,6 +153,19 @@ class PerDestinationQueue:
|
||||
# stream_id of last successfully sent device list update.
|
||||
self._last_device_list_stream_id = 0
|
||||
|
||||
self.sent_edus_counter = Counter(
|
||||
"synapse_federation_client_sent_edus",
|
||||
"Total number of EDUs successfully sent",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.sent_edus_by_type = Counter(
|
||||
"synapse_federation_client_sent_edus_by_type",
|
||||
"Number of sent EDUs successfully sent, by event type",
|
||||
["type"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "PerDestinationQueue[%s]" % self._destination
|
||||
|
||||
@@ -361,9 +363,9 @@ class PerDestinationQueue:
|
||||
)
|
||||
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
self.sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
self.sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
|
||||
@@ -60,6 +60,9 @@ class TransactionManager:
|
||||
def __init__(self, hs: "synapse.server.HomeServer"):
|
||||
self._server_name = hs.hostname
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self._store = hs.get_datastores().main
|
||||
self._transaction_actions = TransactionActions(self._store)
|
||||
self._transport_layer = hs.get_federation_transport_client()
|
||||
|
||||
@@ -68,8 +68,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
|
||||
|
||||
|
||||
class ApplicationServicesHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -79,6 +77,7 @@ class ApplicationServicesHandler:
|
||||
self.scheduler = hs.get_application_service_scheduler()
|
||||
self.started_scheduler = False
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notify_appservices = hs.config.worker.should_notify_appservices
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self._msc2409_to_device_messages_enabled = (
|
||||
@@ -95,6 +94,12 @@ class ApplicationServicesHandler:
|
||||
name="appservice_ephemeral_events"
|
||||
)
|
||||
|
||||
self.events_processed_counter = Counter(
|
||||
"synapse_handlers_appservice_events_processed",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||
"""Notifies (pushes) all application services interested in this event.
|
||||
|
||||
@@ -120,7 +125,9 @@ class ApplicationServicesHandler:
|
||||
|
||||
@wrap_as_background_process("notify_interested_services")
|
||||
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||
with Measure(self.clock, "notify_interested_services"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "notify_interested_services"
|
||||
):
|
||||
self.is_processing = True
|
||||
try:
|
||||
upper_bound = -1
|
||||
@@ -200,7 +207,7 @@ class ApplicationServicesHandler:
|
||||
"appservice_sender"
|
||||
).set(upper_bound)
|
||||
|
||||
events_processed_counter.inc(len(events))
|
||||
self.events_processed_counter.inc(len(events))
|
||||
|
||||
event_processing_loop_room_count.labels("appservice_sender").inc(
|
||||
len(events_by_room)
|
||||
@@ -329,7 +336,11 @@ class ApplicationServicesHandler:
|
||||
users: Collection[Union[str, UserID]],
|
||||
) -> None:
|
||||
logger.debug("Checking interested services for %s", stream_key)
|
||||
with Measure(self.clock, "notify_interested_services_ephemeral"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"notify_interested_services_ephemeral",
|
||||
):
|
||||
for service in services:
|
||||
if stream_key == StreamKeyType.TYPING:
|
||||
# Note that we don't persist the token (via set_appservice_stream_type_pos)
|
||||
|
||||
@@ -92,12 +92,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
INVALID_USERNAME_OR_PASSWORD = "Invalid username or password"
|
||||
|
||||
invalid_login_token_counter = Counter(
|
||||
"synapse_user_login_invalid_login_tokens",
|
||||
"Counts the number of rejected m.login.token on /login",
|
||||
["reason"],
|
||||
)
|
||||
|
||||
|
||||
def convert_client_dict_legacy_fields_to_identifier(
|
||||
submission: JsonDict,
|
||||
@@ -281,6 +275,13 @@ class AuthHandler:
|
||||
|
||||
self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
|
||||
|
||||
self.invalid_login_token_counter = Counter(
|
||||
"synapse_user_login_invalid_login_tokens",
|
||||
"Counts the number of rejected m.login.token on /login",
|
||||
["reason"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
async def validate_user_via_ui_auth(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -1477,11 +1478,11 @@ class AuthHandler:
|
||||
try:
|
||||
return await self.store.consume_login_token(login_token)
|
||||
except LoginTokenExpired:
|
||||
invalid_login_token_counter.labels("expired").inc()
|
||||
self.invalid_login_token_counter.labels("expired").inc()
|
||||
except LoginTokenReused:
|
||||
invalid_login_token_counter.labels("reused").inc()
|
||||
self.invalid_login_token_counter.labels("reused").inc()
|
||||
except NotFoundError:
|
||||
invalid_login_token_counter.labels("not found").inc()
|
||||
self.invalid_login_token_counter.labels("not found").inc()
|
||||
|
||||
raise AuthError(403, "Invalid login token", errcode=Codes.FORBIDDEN)
|
||||
|
||||
|
||||
@@ -58,6 +58,7 @@ class DelayedEventsHandler:
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._config = hs.config
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._event_creation_handler = hs.get_event_creation_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
|
||||
@@ -159,7 +160,9 @@ class DelayedEventsHandler:
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self._clock, "delayed_events_delta"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "delayed_events_delta"
|
||||
):
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
@@ -88,7 +88,10 @@ class DeviceWorkerHandler:
|
||||
device_list_updater: "DeviceListWorkerUpdater"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
self.notifier = hs.get_notifier()
|
||||
@@ -1232,6 +1235,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||
self._seen_updates: ExpiringCache[str, Set[str]] = ExpiringCache(
|
||||
cache_name="device_update_edu",
|
||||
clock=self.clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=10000,
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
iterable=True,
|
||||
|
||||
@@ -105,41 +105,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
soft_failed_event_counter = Counter(
|
||||
"synapse_federation_soft_failed_events_total",
|
||||
"Events received over federation that we marked as soft_failed",
|
||||
)
|
||||
|
||||
# Added to debug performance and track progress on optimizations
|
||||
backfill_processing_after_timer = Histogram(
|
||||
"synapse_federation_backfill_processing_after_time_seconds",
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
0.1,
|
||||
0.25,
|
||||
0.5,
|
||||
1.0,
|
||||
2.5,
|
||||
5.0,
|
||||
7.5,
|
||||
10.0,
|
||||
15.0,
|
||||
20.0,
|
||||
25.0,
|
||||
30.0,
|
||||
40.0,
|
||||
50.0,
|
||||
60.0,
|
||||
80.0,
|
||||
100.0,
|
||||
120.0,
|
||||
150.0,
|
||||
180.0,
|
||||
"+Inf",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class FederationEventHandler:
|
||||
"""Handles events that originated from federation.
|
||||
@@ -195,6 +160,43 @@ class FederationEventHandler:
|
||||
|
||||
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
||||
|
||||
self.soft_failed_event_counter = Counter(
|
||||
"synapse_federation_soft_failed_events_total",
|
||||
"Events received over federation that we marked as soft_failed",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# Added to debug performance and track progress on optimizations
|
||||
self.backfill_processing_after_timer = Histogram(
|
||||
"synapse_federation_backfill_processing_after_time_seconds",
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
0.1,
|
||||
0.25,
|
||||
0.5,
|
||||
1.0,
|
||||
2.5,
|
||||
5.0,
|
||||
7.5,
|
||||
10.0,
|
||||
15.0,
|
||||
20.0,
|
||||
25.0,
|
||||
30.0,
|
||||
40.0,
|
||||
50.0,
|
||||
60.0,
|
||||
80.0,
|
||||
100.0,
|
||||
120.0,
|
||||
150.0,
|
||||
180.0,
|
||||
"+Inf",
|
||||
),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
|
||||
"""Process a PDU received via a federation /send/ transaction
|
||||
|
||||
@@ -698,7 +700,7 @@ class FederationEventHandler:
|
||||
if not events:
|
||||
return
|
||||
|
||||
with backfill_processing_after_timer.time():
|
||||
with self.backfill_processing_after_timer.time():
|
||||
# if there are any events in the wrong room, the remote server is buggy and
|
||||
# should not be trusted.
|
||||
for ev in events:
|
||||
@@ -2062,7 +2064,7 @@ class FederationEventHandler:
|
||||
"hs": origin,
|
||||
},
|
||||
)
|
||||
soft_failed_event_counter.inc()
|
||||
self.soft_failed_event_counter.inc()
|
||||
event.internal_metadata.soft_failed = True
|
||||
|
||||
async def _load_or_fetch_auth_events_for_event(
|
||||
|
||||
@@ -77,7 +77,7 @@ class InitialSyncHandler:
|
||||
bool,
|
||||
bool,
|
||||
]
|
||||
] = ResponseCache(hs.get_clock(), "initial_sync_cache")
|
||||
] = ResponseCache(hs.get_clock(), hs.get_cache_manager(), "initial_sync_cache")
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._state_storage_controller = self._storage_controllers.state
|
||||
|
||||
@@ -481,10 +481,13 @@ class EventCreationHandler:
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.state = hs.get_state_handler()
|
||||
self.clock = hs.get_clock()
|
||||
self.validator = EventValidator()
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
self.event_builder_factory = hs.get_event_builder_factory()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.profile_handler = hs.get_profile_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.notifier = hs.get_notifier()
|
||||
self.config = hs.config
|
||||
@@ -560,6 +563,7 @@ class EventCreationHandler:
|
||||
self._external_cache_joined_hosts_updates = ExpiringCache(
|
||||
"_external_cache_joined_hosts_updates",
|
||||
self.clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
|
||||
@@ -137,26 +137,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
|
||||
federation_presence_out_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence_out", ""
|
||||
)
|
||||
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
|
||||
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
|
||||
federation_presence_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence", ""
|
||||
)
|
||||
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
|
||||
|
||||
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
|
||||
|
||||
notify_reason_counter = Counter(
|
||||
"synapse_handler_presence_notify_reason", "", ["locality", "reason"]
|
||||
)
|
||||
state_transition_counter = Counter(
|
||||
"synapse_handler_presence_state_transition", "", ["locality", "from", "to"]
|
||||
)
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
||||
@@ -194,6 +174,7 @@ class BasePresenceHandler(abc.ABC):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.presence_router = hs.get_presence_router()
|
||||
@@ -222,6 +203,19 @@ class BasePresenceHandler(abc.ABC):
|
||||
# The combined status across all user devices.
|
||||
self.user_to_current_state = {state.user_id: state for state in active_presence}
|
||||
|
||||
self.notify_reason_counter = Counter(
|
||||
"synapse_handler_presence_notify_reason",
|
||||
"",
|
||||
["locality", "reason"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.state_transition_counter = Counter(
|
||||
"synapse_handler_presence_state_transition",
|
||||
"",
|
||||
["locality", "from", "to"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
async def user_syncing(
|
||||
self,
|
||||
@@ -666,7 +660,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||
old_state = self.user_to_current_state.get(new_state.user_id)
|
||||
self.user_to_current_state[new_state.user_id] = new_state
|
||||
is_mine = self.is_mine_id(new_state.user_id)
|
||||
if not old_state or should_notify(old_state, new_state, is_mine):
|
||||
if not old_state or should_notify(self, old_state, new_state, is_mine):
|
||||
state_to_notify.append(new_state)
|
||||
|
||||
stream_id = token
|
||||
@@ -761,6 +755,38 @@ class PresenceHandler(BasePresenceHandler):
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.user_to_current_state),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self.notified_presence_counter = Counter(
|
||||
"synapse_handler_presence_notified_presence",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.federation_presence_out_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence_out",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.presence_updates_counter = Counter(
|
||||
"synapse_handler_presence_presence_updates",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.timers_fired_counter = Counter(
|
||||
"synapse_handler_presence_timers_fired",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.federation_presence_counter = Counter(
|
||||
"synapse_handler_presence_federation_presence",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.bump_active_time_counter = Counter(
|
||||
"synapse_handler_presence_bump_active_time",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# The per-device presence state, maps user to devices to per-device presence state.
|
||||
@@ -863,6 +889,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.wheel_timer),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# Used to handle sending of presence to newly joined users/servers
|
||||
@@ -941,7 +968,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
with Measure(self.clock, "presence_update_states"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "presence_update_states"
|
||||
):
|
||||
# NOTE: We purposefully don't await between now and when we've
|
||||
# calculated what we want to do with the new states, to avoid races.
|
||||
|
||||
@@ -966,6 +995,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
)
|
||||
|
||||
new_state, should_notify, should_ping = handle_update(
|
||||
self,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=self.is_mine_id(user_id),
|
||||
@@ -988,10 +1018,10 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
# TODO: We should probably ensure there are no races hereafter
|
||||
|
||||
presence_updates_counter.inc(len(new_states))
|
||||
self.presence_updates_counter.inc(len(new_states))
|
||||
|
||||
if to_notify:
|
||||
notified_presence_counter.inc(len(to_notify))
|
||||
self.notified_presence_counter.inc(len(to_notify))
|
||||
await self._persist_and_notify(list(to_notify.values()))
|
||||
|
||||
self.unpersisted_users_changes |= {s.user_id for s in new_states}
|
||||
@@ -1010,7 +1040,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
if user_id not in to_notify
|
||||
}
|
||||
if to_federation_ping:
|
||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||
self.federation_presence_out_counter.inc(len(to_federation_ping))
|
||||
|
||||
hosts_to_states = await get_interested_remotes(
|
||||
self.store,
|
||||
@@ -1060,7 +1090,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
for user_id in users_to_check
|
||||
]
|
||||
|
||||
timers_fired_counter.inc(len(states))
|
||||
self.timers_fired_counter.inc(len(states))
|
||||
|
||||
# Set of user ID & device IDs which are currently syncing.
|
||||
syncing_user_devices = {
|
||||
@@ -1094,7 +1124,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
self.bump_active_time_counter.inc()
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
@@ -1346,7 +1376,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
updates.append(prev_state.copy_and_replace(**new_fields))
|
||||
|
||||
if updates:
|
||||
federation_presence_counter.inc(len(updates))
|
||||
self.federation_presence_counter.inc(len(updates))
|
||||
await self._update_states(updates)
|
||||
|
||||
async def set_state(
|
||||
@@ -1497,7 +1527,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "presence_delta"):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
@@ -1655,7 +1685,10 @@ class PresenceHandler(BasePresenceHandler):
|
||||
|
||||
|
||||
def should_notify(
|
||||
old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool
|
||||
base_presence_handler: BasePresenceHandler,
|
||||
old_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
) -> bool:
|
||||
"""Decides if a presence state change should be sent to interested parties."""
|
||||
user_location = "remote"
|
||||
@@ -1666,19 +1699,25 @@ def should_notify(
|
||||
return False
|
||||
|
||||
if old_state.status_msg != new_state.status_msg:
|
||||
notify_reason_counter.labels(user_location, "status_msg_change").inc()
|
||||
base_presence_handler.notify_reason_counter.labels(
|
||||
user_location, "status_msg_change"
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if old_state.state != new_state.state:
|
||||
notify_reason_counter.labels(user_location, "state_change").inc()
|
||||
state_transition_counter.labels(
|
||||
base_presence_handler.notify_reason_counter.labels(
|
||||
user_location, "state_change"
|
||||
).inc()
|
||||
base_presence_handler.state_transition_counter.labels(
|
||||
user_location, old_state.state, new_state.state
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if old_state.state == PresenceState.ONLINE:
|
||||
if new_state.currently_active != old_state.currently_active:
|
||||
notify_reason_counter.labels(user_location, "current_active_change").inc()
|
||||
base_presence_handler.notify_reason_counter.labels(
|
||||
user_location, "current_active_change"
|
||||
).inc()
|
||||
return True
|
||||
|
||||
if (
|
||||
@@ -1687,14 +1726,14 @@ def should_notify(
|
||||
):
|
||||
# Only notify about last active bumps if we're not currently active
|
||||
if not new_state.currently_active:
|
||||
notify_reason_counter.labels(
|
||||
base_presence_handler.notify_reason_counter.labels(
|
||||
user_location, "last_active_change_online"
|
||||
).inc()
|
||||
return True
|
||||
|
||||
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
|
||||
# Always notify for a transition where last active gets bumped.
|
||||
notify_reason_counter.labels(
|
||||
base_presence_handler.notify_reason_counter.labels(
|
||||
user_location, "last_active_change_not_online"
|
||||
).inc()
|
||||
return True
|
||||
@@ -1762,8 +1801,16 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
self.get_presence_handler = hs.get_presence_handler
|
||||
self.get_presence_router = hs.get_presence_router
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
self.get_updates_counter = Counter(
|
||||
"synapse_handler_presence_get_updates",
|
||||
"",
|
||||
["type"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
async def get_new_events(
|
||||
self,
|
||||
user: UserID,
|
||||
@@ -1792,7 +1839,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
user_id = user.to_string()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
|
||||
with Measure(self.clock, "presence.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "presence.get_new_events"
|
||||
):
|
||||
if from_key is not None:
|
||||
from_key = int(from_key)
|
||||
|
||||
@@ -1870,7 +1919,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
|
||||
# If we have the full list of changes for presence we can
|
||||
# simply check which ones share a room with the user.
|
||||
get_updates_counter.labels("stream").inc()
|
||||
self.get_updates_counter.labels("stream").inc()
|
||||
|
||||
sharing_users = await self.store.do_users_share_a_room(
|
||||
user_id, updated_users
|
||||
@@ -1883,7 +1932,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
else:
|
||||
# Too many possible updates. Find all users we can see and check
|
||||
# if any of them have changed.
|
||||
get_updates_counter.labels("full").inc()
|
||||
self.get_updates_counter.labels("full").inc()
|
||||
|
||||
users_interested_in = (
|
||||
await self.store.get_users_who_share_room_with_user(user_id)
|
||||
@@ -2130,6 +2179,7 @@ def handle_timeout(
|
||||
|
||||
|
||||
def handle_update(
|
||||
base_presence_handler: BasePresenceHandler,
|
||||
prev_state: UserPresenceState,
|
||||
new_state: UserPresenceState,
|
||||
is_mine: bool,
|
||||
@@ -2213,7 +2263,7 @@ def handle_update(
|
||||
)
|
||||
|
||||
# Check whether the change was something worth notifying about
|
||||
if should_notify(prev_state, new_state, is_mine):
|
||||
if should_notify(base_presence_handler, prev_state, new_state, is_mine):
|
||||
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
|
||||
persist_and_notify = True
|
||||
|
||||
|
||||
@@ -174,7 +174,10 @@ class RoomCreationHandler:
|
||||
# succession, only process the first attempt and return its result to
|
||||
# subsequent requests
|
||||
self._upgrade_response_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
|
||||
hs.get_clock(), "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"room_upgrade",
|
||||
timeout_ms=FIVE_MINUTES_IN_MS,
|
||||
)
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
|
||||
|
||||
@@ -67,10 +67,15 @@ class RoomListHandler:
|
||||
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
|
||||
self.response_cache: ResponseCache[
|
||||
Tuple[Optional[int], Optional[str], Optional[ThirdPartyInstanceID]]
|
||||
] = ResponseCache(hs.get_clock(), "room_list")
|
||||
] = ResponseCache(hs.get_clock(), hs.get_cache_manager(), "room_list")
|
||||
self.remote_response_cache: ResponseCache[
|
||||
Tuple[str, Optional[int], Optional[str], bool, Optional[str]]
|
||||
] = ResponseCache(hs.get_clock(), "remote_room_list", timeout_ms=30 * 1000)
|
||||
] = ResponseCache(
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"remote_room_list",
|
||||
timeout_ms=30 * 1000,
|
||||
)
|
||||
|
||||
async def get_local_public_room_list(
|
||||
self,
|
||||
|
||||
@@ -114,6 +114,7 @@ class RoomSummaryHandler:
|
||||
Tuple[str, str, bool, Optional[int], Optional[int], Optional[str]]
|
||||
] = ResponseCache(
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"get_room_hierarchy",
|
||||
)
|
||||
self._msc3266_enabled = hs.config.experimental.msc3266_enabled
|
||||
|
||||
@@ -337,6 +337,7 @@ class SyncHandler:
|
||||
self._push_rules_handler = hs.get_push_rules_handler()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth_blocking = hs.get_auth_blocking()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
@@ -353,6 +354,7 @@ class SyncHandler:
|
||||
# memory.
|
||||
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"sync",
|
||||
timeout_ms=hs.config.caches.sync_response_cache_duration,
|
||||
)
|
||||
@@ -363,6 +365,7 @@ class SyncHandler:
|
||||
] = ExpiringCache(
|
||||
"lazy_loaded_members_cache",
|
||||
self.clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=0,
|
||||
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
|
||||
)
|
||||
@@ -710,7 +713,7 @@ class SyncHandler:
|
||||
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
with Measure(self.clock, "ephemeral_by_room"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "ephemeral_by_room"):
|
||||
typing_key = since_token.typing_key if since_token else 0
|
||||
|
||||
room_ids = sync_result_builder.joined_room_ids
|
||||
@@ -783,7 +786,9 @@ class SyncHandler:
|
||||
and current token to send down to clients.
|
||||
newly_joined_room
|
||||
"""
|
||||
with Measure(self.clock, "load_filtered_recents"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "load_filtered_recents"
|
||||
):
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
block_all_timeline = (
|
||||
sync_config.filter_collection.blocks_all_room_timeline()
|
||||
@@ -1174,7 +1179,9 @@ class SyncHandler:
|
||||
# updates even if they occurred logically before the previous event.
|
||||
# TODO(mjark) Check for new redactions in the state events.
|
||||
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "compute_state_delta"
|
||||
):
|
||||
# The memberships needed for events in the timeline.
|
||||
# Only calculated when `lazy_load_members` is on.
|
||||
members_to_fetch: Optional[Set[str]] = None
|
||||
@@ -1791,7 +1798,9 @@ class SyncHandler:
|
||||
# the DB.
|
||||
return RoomNotifCounts.empty()
|
||||
|
||||
with Measure(self.clock, "unread_notifs_for_room_id"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "unread_notifs_for_room_id"
|
||||
):
|
||||
return await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
room_id,
|
||||
sync_config.user.to_string(),
|
||||
|
||||
@@ -280,7 +280,9 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
# caches which room_ids changed at which serials
|
||||
self._typing_stream_change_cache = StreamChangeCache(
|
||||
"TypingStreamChangeCache", self._latest_room_serial
|
||||
name="TypingStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=self._latest_room_serial,
|
||||
)
|
||||
|
||||
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
|
||||
@@ -505,6 +507,7 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._main_store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
# We can't call get_typing_handler here because there's a cycle:
|
||||
#
|
||||
# Typing -> Notifier -> TypingNotificationEventSource -> Typing
|
||||
@@ -535,7 +538,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
appservice may be interested in.
|
||||
* The latest known room serial.
|
||||
"""
|
||||
with Measure(self.clock, "typing.get_new_events_as"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "typing.get_new_events_as"
|
||||
):
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
events = []
|
||||
@@ -571,7 +576,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
|
||||
Find typing notifications for given rooms (> `from_token` and <= `to_token`)
|
||||
"""
|
||||
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "typing.get_new_events"
|
||||
):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
|
||||
@@ -104,6 +104,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.update_user_directory = hs.config.worker.should_update_user_directory
|
||||
@@ -237,7 +238,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "user_dir_delta"):
|
||||
with Measure(self.clock, self.metrics_collector_registry, "user_dir_delta"):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self.pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
@@ -26,6 +26,7 @@ from urllib.request import ( # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
from netaddr import AddrFormatError, IPAddress, IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -50,6 +51,7 @@ from synapse.http.proxyagent import ProxyAgent
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches import CacheManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -81,6 +83,8 @@ class MatrixFederationAgent:
|
||||
reactor might have some blocking applied (i.e. for DNS queries),
|
||||
but we need unblocked access to the proxy.
|
||||
|
||||
cache_manager: The cache manager to handle metrics
|
||||
|
||||
_srv_resolver:
|
||||
SrvResolver implementation to use for looking up SRV records. None
|
||||
to use a default implementation.
|
||||
@@ -92,11 +96,14 @@ class MatrixFederationAgent:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
reactor: ISynapseReactor,
|
||||
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
||||
user_agent: bytes,
|
||||
ip_allowlist: Optional[IPSet],
|
||||
ip_blocklist: IPSet,
|
||||
metrics_collector_registry: CollectorRegistry,
|
||||
cache_manager: CacheManager,
|
||||
_srv_resolver: Optional[SrvResolver] = None,
|
||||
_well_known_resolver: Optional[WellKnownResolver] = None,
|
||||
):
|
||||
@@ -139,6 +146,8 @@ class MatrixFederationAgent:
|
||||
ip_blocklist=ip_blocklist,
|
||||
),
|
||||
user_agent=self.user_agent,
|
||||
metrics_collector_registry=metrics_collector_registry,
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
|
||||
@@ -25,6 +25,7 @@ from io import BytesIO
|
||||
from typing import Callable, Dict, Optional, Tuple
|
||||
|
||||
import attr
|
||||
from prometheus_client import CollectorRegistry
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IReactorTime
|
||||
@@ -36,6 +37,7 @@ from twisted.web.iweb import IAgent, IResponse
|
||||
from synapse.http.client import BodyExceededMaxSize, read_body_with_max_size
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock, json_decoder
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -77,10 +79,6 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache("well-known")
|
||||
_had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache("had-valid-well-known")
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class WellKnownLookupResult:
|
||||
delegated_server: Optional[bytes]
|
||||
@@ -94,20 +92,24 @@ class WellKnownResolver:
|
||||
reactor: IReactorTime,
|
||||
agent: IAgent,
|
||||
user_agent: bytes,
|
||||
metrics_collector_registry: CollectorRegistry,
|
||||
cache_manager: CacheManager,
|
||||
well_known_cache: Optional[TTLCache[bytes, Optional[bytes]]] = None,
|
||||
had_well_known_cache: Optional[TTLCache[bytes, bool]] = None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
self._metrics_collector_registry = metrics_collector_registry
|
||||
|
||||
if well_known_cache is None:
|
||||
well_known_cache = _well_known_cache
|
||||
self._well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
|
||||
"well-known",
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
self._had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache(
|
||||
"had-valid-well-known",
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
|
||||
if had_well_known_cache is None:
|
||||
had_well_known_cache = _had_valid_well_known_cache
|
||||
|
||||
self._well_known_cache = well_known_cache
|
||||
self._had_valid_well_known_cache = had_well_known_cache
|
||||
self._well_known_agent = RedirectAgent(agent)
|
||||
self.user_agent = user_agent
|
||||
|
||||
@@ -134,7 +136,9 @@ class WellKnownResolver:
|
||||
# TODO: should we linearise so that we don't end up doing two .well-known
|
||||
# requests for the same server in parallel?
|
||||
try:
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "get_well_known"
|
||||
):
|
||||
result: Optional[bytes]
|
||||
cache_period: float
|
||||
|
||||
|
||||
@@ -417,11 +417,13 @@ class MatrixFederationHttpClient:
|
||||
if hs.get_instance_name() in outbound_federation_restricted_to:
|
||||
# Talk to federation directly
|
||||
federation_agent: IAgent = MatrixFederationAgent(
|
||||
self.reactor,
|
||||
tls_client_options_factory,
|
||||
user_agent.encode("ascii"),
|
||||
hs.config.server.federation_ip_range_allowlist,
|
||||
hs.config.server.federation_ip_range_blocklist,
|
||||
reactor=self.reactor,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
user_agent=user_agent.encode("ascii"),
|
||||
ip_allowlist=hs.config.server.federation_ip_range_allowlist,
|
||||
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
|
||||
metrics_collector_registry=hs.metrics_collector_registry,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
else:
|
||||
proxy_authorization_secret = hs.config.worker.worker_replication_secret
|
||||
@@ -451,6 +453,7 @@ class MatrixFederationHttpClient:
|
||||
)
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._store = hs.get_datastores().main
|
||||
self.version_string_bytes = hs.version_string.encode("ascii")
|
||||
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
|
||||
@@ -697,7 +700,11 @@ class MatrixFederationHttpClient:
|
||||
outgoing_requests_counter.labels(request.method).inc()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "outbound_request"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"outbound_request",
|
||||
):
|
||||
# we don't want all the fancy cookie and redirect handling
|
||||
# that treq.request gives: just use the raw Agent.
|
||||
|
||||
|
||||
@@ -201,6 +201,7 @@ class UrlPreviewer:
|
||||
self._cache: ExpiringCache[str, ObservableDeferred] = ExpiringCache(
|
||||
cache_name="url_previews",
|
||||
clock=self.clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
# don't spider URLs more often than once an hour
|
||||
expiry_ms=ONE_HOUR,
|
||||
)
|
||||
|
||||
+18
-42
@@ -37,7 +37,6 @@ from typing import (
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
import attr
|
||||
@@ -62,26 +61,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_PREFIX = "/_synapse/metrics"
|
||||
|
||||
all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
|
||||
class _RegistryProxy:
|
||||
@staticmethod
|
||||
def collect() -> Iterable[Metric]:
|
||||
for metric in REGISTRY.collect():
|
||||
if not metric.name.startswith("__"):
|
||||
yield metric
|
||||
|
||||
|
||||
# A little bit nasty, but collect() above is static so a Protocol doesn't work.
|
||||
# _RegistryProxy matches the signature of a CollectorRegistry instance enough
|
||||
# for it to be usable in the contexts in which we use it.
|
||||
# TODO Do something nicer about this.
|
||||
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
|
||||
|
||||
|
||||
@attr.s(slots=True, hash=True, auto_attribs=True)
|
||||
class LaterGauge(Collector):
|
||||
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
|
||||
@@ -94,6 +76,7 @@ class LaterGauge(Collector):
|
||||
caller: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
]
|
||||
registry: Optional[CollectorRegistry] = REGISTRY
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
|
||||
@@ -114,15 +97,8 @@ class LaterGauge(Collector):
|
||||
yield g
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
self._register()
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
if self.registry is not None:
|
||||
self.registry.register(self)
|
||||
|
||||
|
||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||
@@ -144,6 +120,8 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
desc
|
||||
labels
|
||||
sub_metrics: A list of sub metrics that the callbacks will update.
|
||||
registry: The Prometheus metrics `CollectorRegistry` to register the metric
|
||||
with. If not provided, the default `REGISTRY` will be used.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -152,11 +130,13 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
desc: str,
|
||||
labels: StrSequence,
|
||||
sub_metrics: StrSequence,
|
||||
registry: Optional[CollectorRegistry] = REGISTRY,
|
||||
):
|
||||
self.name = name
|
||||
self.desc = desc
|
||||
self.labels = labels
|
||||
self.sub_metrics = sub_metrics
|
||||
self.registry = registry
|
||||
|
||||
# Create a class which have the sub_metrics values as attributes, which
|
||||
# default to 0 on initialization. Used to pass to registered callbacks.
|
||||
@@ -243,12 +223,8 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
if self.registry is not None:
|
||||
self.registry.register(self)
|
||||
|
||||
|
||||
class GaugeBucketCollector(Collector):
|
||||
@@ -272,7 +248,7 @@ class GaugeBucketCollector(Collector):
|
||||
name: str,
|
||||
documentation: str,
|
||||
buckets: Iterable[float],
|
||||
registry: CollectorRegistry = REGISTRY,
|
||||
registry: Optional[CollectorRegistry] = REGISTRY,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -280,7 +256,8 @@ class GaugeBucketCollector(Collector):
|
||||
will be added.)
|
||||
documentation: help text for the metric
|
||||
buckets: The top bounds of the buckets to report
|
||||
registry: metric registry to register with
|
||||
registry: The Prometheus metrics `CollectorRegistry` to register the metric
|
||||
with. If not provided, the default `REGISTRY` will be used.
|
||||
"""
|
||||
self._name = name
|
||||
self._documentation = documentation
|
||||
@@ -297,7 +274,8 @@ class GaugeBucketCollector(Collector):
|
||||
# this has been initialised after a successful data update
|
||||
self._metric: Optional[GaugeHistogramMetricFamily] = None
|
||||
|
||||
registry.register(self)
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
# Don't report metrics unless we've already collected some data
|
||||
@@ -343,10 +321,8 @@ class GaugeBucketCollector(Collector):
|
||||
#
|
||||
# Detailed CPU metrics
|
||||
#
|
||||
|
||||
|
||||
class CPUMetrics(Collector):
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
ticks_per_sec = 100
|
||||
try:
|
||||
# Try and get the system config
|
||||
@@ -356,6 +332,9 @@ class CPUMetrics(Collector):
|
||||
|
||||
self.ticks_per_sec = ticks_per_sec
|
||||
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
if not HAVE_PROC_SELF_STAT:
|
||||
return
|
||||
@@ -373,9 +352,6 @@ class CPUMetrics(Collector):
|
||||
yield sys
|
||||
|
||||
|
||||
REGISTRY.register(CPUMetrics())
|
||||
|
||||
|
||||
#
|
||||
# Federation Metrics
|
||||
#
|
||||
|
||||
+10
-7
@@ -24,10 +24,11 @@ import gc
|
||||
import logging
|
||||
import platform
|
||||
import time
|
||||
from typing import Iterable
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from prometheus_client.core import (
|
||||
REGISTRY,
|
||||
CollectorRegistry,
|
||||
CounterMetricFamily,
|
||||
Gauge,
|
||||
GaugeMetricFamily,
|
||||
@@ -81,6 +82,10 @@ gc_time = Histogram(
|
||||
|
||||
|
||||
class GCCounts(Collector):
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
|
||||
for n, m in enumerate(gc.get_count()):
|
||||
@@ -101,8 +106,6 @@ def install_gc_manager() -> None:
|
||||
if running_on_pypy:
|
||||
return
|
||||
|
||||
REGISTRY.register(GCCounts())
|
||||
|
||||
gc.disable()
|
||||
|
||||
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
|
||||
@@ -145,6 +148,10 @@ def install_gc_manager() -> None:
|
||||
|
||||
|
||||
class PyPyGCStats(Collector):
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
# @stats is a pretty-printer object with __str__() returning a nice table,
|
||||
# plus some fields that contain data from that table.
|
||||
@@ -205,7 +212,3 @@ class PyPyGCStats(Collector):
|
||||
pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
|
||||
pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
|
||||
yield pypy_mem
|
||||
|
||||
|
||||
if running_on_pypy:
|
||||
REGISTRY.register(PyPyGCStats())
|
||||
|
||||
@@ -22,10 +22,10 @@
|
||||
import logging
|
||||
import time
|
||||
from selectors import SelectSelector, _PollLikeSelector # type: ignore[attr-defined]
|
||||
from typing import Any, Callable, Iterable
|
||||
from typing import Any, Callable, Iterable, Optional
|
||||
|
||||
from prometheus_client import Histogram, Metric
|
||||
from prometheus_client.core import REGISTRY, GaugeMetricFamily
|
||||
from prometheus_client.core import REGISTRY, CollectorRegistry, GaugeMetricFamily
|
||||
|
||||
from twisted.internet import reactor, selectreactor
|
||||
from twisted.internet.asyncioreactor import AsyncioSelectorReactor
|
||||
@@ -110,9 +110,16 @@ class ObjWrapper:
|
||||
|
||||
|
||||
class ReactorLastSeenMetric(Collector):
|
||||
def __init__(self, call_wrapper: CallWrapper):
|
||||
def __init__(
|
||||
self,
|
||||
call_wrapper: CallWrapper,
|
||||
registry: Optional[CollectorRegistry] = REGISTRY,
|
||||
):
|
||||
self._call_wrapper = call_wrapper
|
||||
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
cm = GaugeMetricFamily(
|
||||
"python_twisted_reactor_last_seen",
|
||||
@@ -164,5 +171,6 @@ except Exception as e:
|
||||
logger.warning("Configuring ReactorLastSeenMetric failed: %r", e)
|
||||
|
||||
|
||||
if wrapper:
|
||||
REGISTRY.register(ReactorLastSeenMetric(wrapper))
|
||||
def setup_reactor_metrics(registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
if wrapper:
|
||||
ReactorLastSeenMetric(wrapper, registry=registry)
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
from prometheus_client import REGISTRY, CollectorRegistry, generate_latest
|
||||
from prometheus_client import CollectorRegistry, generate_latest
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.server import Request
|
||||
@@ -35,7 +35,7 @@ class MetricsResource(Resource):
|
||||
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, registry: CollectorRegistry = REGISTRY):
|
||||
def __init__(self, registry: CollectorRegistry):
|
||||
self.registry = registry
|
||||
|
||||
def render_GET(self, request: Request) -> bytes:
|
||||
|
||||
@@ -38,7 +38,7 @@ from typing import (
|
||||
)
|
||||
|
||||
from prometheus_client import Metric
|
||||
from prometheus_client.core import REGISTRY, Counter, Gauge
|
||||
from prometheus_client.core import REGISTRY, CollectorRegistry, Counter, Gauge
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -134,13 +134,17 @@ _background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set(
|
||||
_bg_metrics_lock = threading.Lock()
|
||||
|
||||
|
||||
class _Collector(Collector):
|
||||
class BackgroundProcessCollector(Collector):
|
||||
"""A custom metrics collector for the background process metrics.
|
||||
|
||||
Ensures that all of the metrics are up-to-date with any in-flight processes
|
||||
before they are returned.
|
||||
"""
|
||||
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
global _background_processes_active_since_last_scrape
|
||||
|
||||
@@ -165,9 +169,6 @@ class _Collector(Collector):
|
||||
yield from m.collect()
|
||||
|
||||
|
||||
REGISTRY.register(_Collector())
|
||||
|
||||
|
||||
class _BackgroundProcess:
|
||||
def __init__(self, desc: str, ctx: LoggingContext):
|
||||
self.desc = desc
|
||||
|
||||
+18
-23
@@ -23,14 +23,17 @@ import ctypes
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Iterable, Literal, Optional, overload
|
||||
from typing import TYPE_CHECKING, Iterable, Literal, Optional, overload
|
||||
|
||||
import attr
|
||||
from prometheus_client import REGISTRY, Metric
|
||||
from prometheus_client import REGISTRY, CollectorRegistry, Metric
|
||||
|
||||
from synapse.metrics import GaugeMetricFamily
|
||||
from synapse.metrics._types import Collector
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -131,25 +134,11 @@ class JemallocStats:
|
||||
return self._mallctl(f"stats.{name}")
|
||||
|
||||
|
||||
_JEMALLOC_STATS: Optional[JemallocStats] = None
|
||||
|
||||
|
||||
def get_jemalloc_stats() -> Optional[JemallocStats]:
|
||||
"""Returns an interface to jemalloc, if it is being used.
|
||||
|
||||
Note that this will always return None until `setup_jemalloc_stats` has been
|
||||
called.
|
||||
"""
|
||||
return _JEMALLOC_STATS
|
||||
|
||||
|
||||
def _setup_jemalloc_stats() -> None:
|
||||
def _setup_jemalloc_stats(hs: "HomeServer") -> Optional[JemallocStats]:
|
||||
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
|
||||
statistics exposed by jemalloc.
|
||||
"""
|
||||
|
||||
global _JEMALLOC_STATS
|
||||
|
||||
# Try to find the loaded jemalloc shared library, if any. We need to
|
||||
# introspect into what is loaded, rather than loading whatever is on the
|
||||
# path, as if we load a *different* jemalloc version things will seg fault.
|
||||
@@ -157,7 +146,7 @@ def _setup_jemalloc_stats() -> None:
|
||||
# We look in `/proc/self/maps`, which only exists on linux.
|
||||
if not os.path.exists("/proc/self/maps"):
|
||||
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
|
||||
return
|
||||
return None
|
||||
|
||||
# We're looking for a path at the end of the line that includes
|
||||
# "libjemalloc".
|
||||
@@ -173,18 +162,21 @@ def _setup_jemalloc_stats() -> None:
|
||||
if not jemalloc_path:
|
||||
# No loaded jemalloc was found.
|
||||
logger.debug("jemalloc not found")
|
||||
return
|
||||
return None
|
||||
|
||||
logger.debug("Found jemalloc at %s", jemalloc_path)
|
||||
|
||||
jemalloc_dll = ctypes.CDLL(jemalloc_path)
|
||||
|
||||
stats = JemallocStats(jemalloc_dll)
|
||||
_JEMALLOC_STATS = stats
|
||||
|
||||
class JemallocCollector(Collector):
|
||||
"""Metrics for internal jemalloc stats."""
|
||||
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
if registry is not None:
|
||||
registry.register(self)
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
stats.refresh_stats()
|
||||
|
||||
@@ -230,17 +222,20 @@ def _setup_jemalloc_stats() -> None:
|
||||
|
||||
yield g
|
||||
|
||||
REGISTRY.register(JemallocCollector())
|
||||
JemallocCollector(registry=hs.metrics_collector_registry)
|
||||
|
||||
logger.debug("Added jemalloc stats")
|
||||
return stats
|
||||
|
||||
|
||||
def setup_jemalloc_stats() -> None:
|
||||
def setup_jemalloc_stats(hs: "HomeServer") -> Optional[JemallocStats]:
|
||||
"""Try to setup jemalloc stats, if jemalloc is loaded."""
|
||||
|
||||
try:
|
||||
_setup_jemalloc_stats()
|
||||
return _setup_jemalloc_stats(hs)
|
||||
except Exception as e:
|
||||
# This should only happen if we find the loaded jemalloc library, but
|
||||
# fail to load it somehow (e.g. we somehow picked the wrong version).
|
||||
logger.info("Failed to setup collector to record jemalloc stats: %s", e)
|
||||
|
||||
return None
|
||||
|
||||
@@ -340,6 +340,7 @@ class SpamCheckerModuleApiCallbacks:
|
||||
|
||||
def __init__(self, hs: "synapse.server.HomeServer") -> None:
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
self._check_event_for_spam_callbacks: List[CHECK_EVENT_FOR_SPAM_CALLBACK] = []
|
||||
self._should_drop_federated_event_callbacks: List[
|
||||
@@ -464,7 +465,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
generally discouraged as it doesn't support internationalization.
|
||||
"""
|
||||
for callback in self._check_event_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(event))
|
||||
if res is False or res == self.NOT_SPAM:
|
||||
# This spam-checker accepts the event.
|
||||
@@ -517,7 +522,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the event should be silently dropped
|
||||
"""
|
||||
for callback in self._should_drop_federated_event_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res: Union[bool, str] = await delay_cancellation(callback(event))
|
||||
if res:
|
||||
return res
|
||||
@@ -539,7 +548,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, [Codes, Dict] otherwise.
|
||||
"""
|
||||
for callback in self._user_may_join_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(user_id, room_id, is_invited))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
@@ -578,7 +591,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, invitee_userid, room_id)
|
||||
)
|
||||
@@ -623,7 +640,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
NOT_SPAM if the operation is permitted, Codes otherwise.
|
||||
"""
|
||||
for callback in self._user_may_send_3pid_invite_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(inviter_userid, medium, address, room_id)
|
||||
)
|
||||
@@ -659,7 +680,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
room_config: The room creation configuration which is the body of the /createRoom request
|
||||
"""
|
||||
for callback in self._user_may_create_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
checker_args = inspect.signature(callback)
|
||||
# Also ensure backwards compatibility with spam checker callbacks
|
||||
# that don't expect the room_config argument.
|
||||
@@ -751,7 +776,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
|
||||
"""
|
||||
for callback in self._user_may_create_room_alias_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_alias))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@@ -784,7 +813,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
room_id: The ID of the room that would be published
|
||||
"""
|
||||
for callback in self._user_may_publish_room_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(userid, room_id))
|
||||
if res is True or res is self.NOT_SPAM:
|
||||
continue
|
||||
@@ -826,7 +859,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
True if the user is spammy.
|
||||
"""
|
||||
for callback in self._check_username_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
checker_args = inspect.signature(callback)
|
||||
# Make a copy of the user profile object to ensure the spam checker cannot
|
||||
# modify it.
|
||||
@@ -875,7 +912,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_registration_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
behaviour = await delay_cancellation(
|
||||
callback(email_threepid, username, request_info, auth_provider_id)
|
||||
)
|
||||
@@ -917,7 +958,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_media_file_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(callback(file_wrapper, file_info))
|
||||
# Normalize return values to `Codes` or `"NOT_SPAM"`.
|
||||
if res is False or res is self.NOT_SPAM:
|
||||
@@ -964,7 +1009,11 @@ class SpamCheckerModuleApiCallbacks:
|
||||
"""
|
||||
|
||||
for callback in self._check_login_for_spam_callbacks:
|
||||
with Measure(self.clock, f"{callback.__module__}.{callback.__qualname__}"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
f"{callback.__module__}.{callback.__qualname__}",
|
||||
):
|
||||
res = await delay_cancellation(
|
||||
callback(
|
||||
user_id,
|
||||
|
||||
+13
-2
@@ -267,16 +267,27 @@ class Notifier:
|
||||
|
||||
return sum(stream.count_listeners() for stream in all_user_streams)
|
||||
|
||||
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
|
||||
LaterGauge(
|
||||
"synapse_notifier_listeners",
|
||||
"",
|
||||
[],
|
||||
count_listeners,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
"synapse_notifier_rooms",
|
||||
"",
|
||||
[],
|
||||
lambda: count(bool, list(self.room_to_user_streams.values())),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
LaterGauge(
|
||||
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
|
||||
"synapse_notifier_users",
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.user_to_user_stream),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||
|
||||
@@ -59,7 +59,6 @@ from synapse.types import JsonValue
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import gather_results
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_event_for_clients_with_state
|
||||
|
||||
@@ -129,13 +128,16 @@ class BulkPushRuleEvaluator:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self._event_auth_handler = hs.get_event_auth_handler()
|
||||
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
||||
|
||||
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
|
||||
|
||||
self.room_push_rule_cache_metrics = register_cache(
|
||||
self.room_push_rule_cache_metrics = hs.get_cache_manager().register_cache(
|
||||
"cache",
|
||||
"room_push_rule_cache",
|
||||
cache=[], # Meaningless size, as this isn't a cache that stores values,
|
||||
|
||||
@@ -123,7 +123,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
if self.CACHE:
|
||||
self.response_cache: ResponseCache[str] = ResponseCache(
|
||||
hs.get_clock(), "repl." + self.NAME, timeout_ms=30 * 60 * 1000
|
||||
hs.get_clock(),
|
||||
hs.get_cache_manager(),
|
||||
"repl." + self.NAME,
|
||||
timeout_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
# We reserve `instance_name` as a parameter to sending requests, so we
|
||||
|
||||
@@ -79,6 +79,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.federation_event_handler = hs.get_federation_event_handler()
|
||||
|
||||
@staticmethod
|
||||
@@ -122,7 +123,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_fed_send_events_parse"
|
||||
):
|
||||
room_id = content["room_id"]
|
||||
backfilled = content["backfilled"]
|
||||
|
||||
|
||||
@@ -80,6 +80,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
@@ -121,7 +122,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict, event_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_event_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_send_event_parse"
|
||||
):
|
||||
event_dict = content["event"]
|
||||
room_ver = KNOWN_ROOM_VERSIONS[content["room_version"]]
|
||||
internal_metadata = content["internal_metadata"]
|
||||
|
||||
@@ -81,6 +81,7 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
@@ -122,7 +123,9 @@ class ReplicationSendEventsRestServlet(ReplicationEndpoint):
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, payload: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
with Measure(self.clock, "repl_send_events_parse"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "repl_send_events_parse"
|
||||
):
|
||||
events_and_context = []
|
||||
events = payload["events"]
|
||||
rooms = set()
|
||||
|
||||
@@ -79,6 +79,7 @@ class ReplicationDataHandler:
|
||||
self.notifier = hs.get_notifier()
|
||||
self._reactor = hs.get_reactor()
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._streams = hs.get_replication_streams()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._typing_handler = hs.get_typing_handler()
|
||||
@@ -342,7 +343,11 @@ class ReplicationDataHandler:
|
||||
waiting_list.add((position, deferred))
|
||||
|
||||
# We measure here to get in flight counts and average waiting time.
|
||||
with Measure(self._clock, "repl.wait_for_stream_position"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"repl.wait_for_stream_position",
|
||||
):
|
||||
logger.info(
|
||||
"Waiting for repl stream %r to reach %s (%s); currently at: %s",
|
||||
stream_name,
|
||||
|
||||
@@ -215,6 +215,7 @@ class ReplicationCommandHandler:
|
||||
"",
|
||||
[],
|
||||
lambda: len(self._connections),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||
@@ -241,6 +242,7 @@ class ReplicationCommandHandler:
|
||||
(stream_name,): len(queue)
|
||||
for stream_name, queue in self._command_queues_by_stream.items()
|
||||
},
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self._is_master = hs.config.worker.worker_app is None
|
||||
|
||||
@@ -80,6 +80,7 @@ class ReplicationStreamer:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.notifier = hs.get_notifier()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
@@ -155,7 +156,11 @@ class ReplicationStreamer:
|
||||
while self.pending_updates:
|
||||
self.pending_updates = False
|
||||
|
||||
with Measure(self.clock, "repl.stream.get_updates"):
|
||||
with Measure(
|
||||
self.clock,
|
||||
self.metrics_collector_registry,
|
||||
"repl.stream.get_updates",
|
||||
):
|
||||
all_streams = self.streams
|
||||
|
||||
if self._replication_torture_level is not None:
|
||||
|
||||
@@ -125,6 +125,8 @@ class SyncRestServlet(RestServlet):
|
||||
self._json_filter_cache: LruCache[str, bool] = LruCache(
|
||||
max_size=1000,
|
||||
cache_name="sync_valid_filter",
|
||||
# TODO
|
||||
# cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
|
||||
# Ratelimiter for presence updates, keyed by requester.
|
||||
|
||||
+12
-1
@@ -30,6 +30,7 @@ import functools
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Type, TypeVar, cast
|
||||
|
||||
from prometheus_client import CollectorRegistry
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
from twisted.internet.interfaces import IOpenSSLContextFactory
|
||||
@@ -129,6 +130,7 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.media.media_repository import MediaRepository
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
|
||||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.module_api.callbacks import ModuleApiCallbacks
|
||||
from synapse.notifier import Notifier, ReplicationNotifier
|
||||
@@ -152,6 +154,7 @@ from synapse.streams.events import EventSources
|
||||
from synapse.synapse_rust.rendezvous import RendezvousHandler
|
||||
from synapse.types import DomainSpecificString, ISynapseReactor
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.distributor import Distributor
|
||||
from synapse.util.macaroons import MacaroonGenerator
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
@@ -310,6 +313,11 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
# This attribute is set by the free function `refresh_certificate`.
|
||||
self.tls_server_context_factory: Optional[IOpenSSLContextFactory] = None
|
||||
|
||||
self.metrics_collector_registry = CollectorRegistry(auto_describe=True)
|
||||
# Also see `setup_global_metrics` for other metric setup
|
||||
self.jemalloc_stats = setup_jemalloc_stats(self)
|
||||
self.cache_manager = CacheManager(self)
|
||||
|
||||
def register_module_web_resource(self, path: str, resource: Resource) -> None:
|
||||
"""Allows a module to register a web resource to be served at the given path.
|
||||
|
||||
@@ -368,7 +376,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self.setup_background_tasks()
|
||||
|
||||
def start_listening(self) -> None: # noqa: B027 (no-op by design)
|
||||
"""Start the HTTP, manhole, metrics, etc listeners
|
||||
"""Start the HTTP, manhole, etc listeners
|
||||
|
||||
Does nothing in this base class; overridden in derived classes to start the
|
||||
appropriate listeners.
|
||||
@@ -420,6 +428,9 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
|
||||
return self.datastores
|
||||
|
||||
def get_cache_manager(self) -> CacheManager:
|
||||
return self.cache_manager
|
||||
|
||||
@cache_in_self
|
||||
def get_distributor(self) -> Distributor:
|
||||
return Distributor()
|
||||
|
||||
@@ -189,7 +189,10 @@ class StateHandler:
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.clock = hs.get_clock() # nb must be called this for @measure_func
|
||||
self.metrics_collector_registry = (
|
||||
hs.metrics_collector_registry
|
||||
) # nb must be called this for @measure_func
|
||||
self.store = hs.get_datastores().main
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
self.hs = hs
|
||||
@@ -632,6 +635,7 @@ class StateResolutionHandler:
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.metrics_collector_registry = hs.metrics_collector_registry
|
||||
|
||||
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
||||
|
||||
@@ -640,6 +644,7 @@ class StateResolutionHandler:
|
||||
ExpiringCache(
|
||||
cache_name="state_cache",
|
||||
clock=self.clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=100000,
|
||||
expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
|
||||
iterable=True,
|
||||
@@ -747,7 +752,9 @@ class StateResolutionHandler:
|
||||
# which will be used as a cache key for future resolutions, but
|
||||
# not get persisted.
|
||||
|
||||
with Measure(self.clock, "state.create_group_ids"):
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "state.create_group_ids"
|
||||
):
|
||||
cache = _make_state_cache_entry(new_state, state_groups_ids)
|
||||
|
||||
self._state_cache[group_names] = cache
|
||||
@@ -785,7 +792,9 @@ class StateResolutionHandler:
|
||||
a map from (type, state_key) to event_id.
|
||||
"""
|
||||
try:
|
||||
with Measure(self.clock, "state._resolve_events") as m:
|
||||
with Measure(
|
||||
self.clock, self.metrics_collector_registry, "state._resolve_events"
|
||||
) as m:
|
||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||
if room_version_obj.state_res == StateResolutionVersions.V1:
|
||||
return await v1.resolve_events_with_store(
|
||||
|
||||
@@ -56,6 +56,8 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.database_engine = database.engine
|
||||
self.db_pool = database
|
||||
|
||||
|
||||
@@ -338,6 +338,7 @@ class EventsPersistenceStorageController:
|
||||
self.persist_events_store = stores.persist_events
|
||||
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self._event_persist_queue = _EventPeristenceQueue(
|
||||
@@ -616,7 +617,11 @@ class EventsPersistenceStorageController:
|
||||
state_delta_for_room = None
|
||||
|
||||
if not backfilled:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"_calculate_state_and_extrem",
|
||||
):
|
||||
# Work out the new "current state" for the room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
# calculating the state from that.
|
||||
@@ -627,7 +632,11 @@ class EventsPersistenceStorageController:
|
||||
room_id, chunk
|
||||
)
|
||||
|
||||
with Measure(self._clock, "calculate_chain_cover_index_for_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"calculate_chain_cover_index_for_events",
|
||||
):
|
||||
# We now calculate chain ID/sequence numbers for any state events we're
|
||||
# persisting. We ignore out of band memberships as we're not in the room
|
||||
# and won't have their auth chain (we'll fix it up later if we join the
|
||||
@@ -719,7 +728,11 @@ class EventsPersistenceStorageController:
|
||||
break
|
||||
|
||||
logger.debug("Calculating state delta for room %s", room_id)
|
||||
with Measure(self._clock, "persist_events.get_new_state_after_events"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"persist_events.get_new_state_after_events",
|
||||
):
|
||||
res = await self._get_new_state_after_events(
|
||||
room_id,
|
||||
ev_ctx_rm,
|
||||
@@ -746,7 +759,11 @@ class EventsPersistenceStorageController:
|
||||
# removed keys entirely.
|
||||
delta = DeltaState([], delta_ids)
|
||||
elif current_state is not None:
|
||||
with Measure(self._clock, "persist_events.calculate_state_delta"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"persist_events.calculate_state_delta",
|
||||
):
|
||||
delta = await self._calculate_state_delta(room_id, current_state)
|
||||
|
||||
if delta:
|
||||
|
||||
@@ -70,6 +70,7 @@ class StateStorageController:
|
||||
def __init__(self, hs: "HomeServer", stores: "Databases"):
|
||||
self._is_mine_id = hs.is_mine_id
|
||||
self._clock = hs.get_clock()
|
||||
self._metrics_collector_registry = hs.metrics_collector_registry
|
||||
self.stores = stores
|
||||
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
|
||||
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
|
||||
@@ -812,7 +813,7 @@ class StateStorageController:
|
||||
state_group = object()
|
||||
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_hosts"):
|
||||
with Measure(self._clock, self._metrics_collector_registry, "get_joined_hosts"):
|
||||
return await self._get_joined_hosts(
|
||||
room_id, state_group, state_entry=state_entry
|
||||
)
|
||||
|
||||
@@ -572,6 +572,7 @@ class DatabasePool:
|
||||
"Background update status",
|
||||
[],
|
||||
self.updates.get_status,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
|
||||
@@ -65,6 +65,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
self.cache_manager = hs.get_cache_manager()
|
||||
|
||||
self._can_write_to_account_data = (
|
||||
self._instance_name in hs.config.worker.writers.account_data
|
||||
@@ -89,7 +90,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||
|
||||
account_max = self.get_max_account_data_stream_id()
|
||||
self._account_data_stream_cache = StreamChangeCache(
|
||||
"AccountDataAndTagsChangeCache", account_max
|
||||
name="AccountDataAndTagsChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=account_max,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
|
||||
@@ -434,7 +434,10 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
|
||||
|
||||
# (user_id, access_token, ip,) -> last_seen
|
||||
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
|
||||
cache_name="client_ip_last_seen", max_size=50000
|
||||
max_size=50000,
|
||||
cache_name="client_ip_last_seen",
|
||||
# TODO
|
||||
# cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
|
||||
if hs.config.worker.run_background_tasks and self.user_ips_max_age:
|
||||
|
||||
@@ -94,6 +94,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
] = ExpiringCache(
|
||||
cache_name="last_device_delete_cache",
|
||||
clock=self._clock,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
max_len=10000,
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
@@ -126,8 +127,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
limit=1000,
|
||||
)
|
||||
self._device_inbox_stream_cache = StreamChangeCache(
|
||||
"DeviceInboxStreamChangeCache",
|
||||
min_device_inbox_id,
|
||||
name="DeviceInboxStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_device_inbox_id,
|
||||
prefilled_cache=device_inbox_prefill,
|
||||
)
|
||||
|
||||
@@ -142,8 +144,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
limit=1000,
|
||||
)
|
||||
self._device_federation_outbox_stream_cache = StreamChangeCache(
|
||||
"DeviceFederationOutboxStreamChangeCache",
|
||||
min_device_outbox_id,
|
||||
name="DeviceFederationOutboxStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_device_outbox_id,
|
||||
prefilled_cache=device_outbox_prefill,
|
||||
)
|
||||
|
||||
|
||||
@@ -128,8 +128,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_stream_cache = StreamChangeCache(
|
||||
"DeviceListStreamChangeCache",
|
||||
min_device_list_id,
|
||||
name="DeviceListStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_device_list_id,
|
||||
prefilled_cache=device_list_prefill,
|
||||
)
|
||||
|
||||
@@ -142,8 +143,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_room_stream_cache = StreamChangeCache(
|
||||
"DeviceListRoomStreamChangeCache",
|
||||
min_device_list_room_id,
|
||||
name="DeviceListRoomStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_device_list_room_id,
|
||||
prefilled_cache=device_list_room_prefill,
|
||||
)
|
||||
|
||||
@@ -159,8 +161,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
limit=1000,
|
||||
)
|
||||
self._user_signature_stream_cache = StreamChangeCache(
|
||||
"UserSignatureStreamChangeCache",
|
||||
user_signature_stream_list_id,
|
||||
name="UserSignatureStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=user_signature_stream_list_id,
|
||||
prefilled_cache=user_signature_stream_prefill,
|
||||
)
|
||||
|
||||
@@ -178,8 +181,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
limit=10000,
|
||||
)
|
||||
self._device_list_federation_stream_cache = StreamChangeCache(
|
||||
"DeviceListFederationStreamChangeCache",
|
||||
device_list_federation_list_id,
|
||||
name="DeviceListFederationStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=device_list_federation_list_id,
|
||||
prefilled_cache=device_list_federation_prefill,
|
||||
)
|
||||
|
||||
@@ -1773,7 +1777,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
# Map of (user_id, device_id) -> bool. If there is an entry that implies
|
||||
# the device exists.
|
||||
self.device_id_exists_cache: LruCache[Tuple[str, str], Literal[True]] = (
|
||||
LruCache(cache_name="device_id_exists", max_size=10000)
|
||||
LruCache(
|
||||
max_size=10000,
|
||||
cache_name="device_id_exists",
|
||||
# TODO
|
||||
# cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
)
|
||||
|
||||
async def store_device(
|
||||
|
||||
@@ -145,7 +145,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||
|
||||
# Cache of event ID to list of auth event IDs and their depths.
|
||||
self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
|
||||
500000, "_event_auth_cache", size_callback=len
|
||||
500000,
|
||||
cache_name="_event_auth_cache",
|
||||
# TODO
|
||||
# cache_manager=hs.get_cache_manager(),
|
||||
size_callback=len,
|
||||
)
|
||||
|
||||
# Flag used by unit tests to disable fallback when there is no chain cover
|
||||
|
||||
@@ -88,12 +88,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
|
||||
event_counter = Counter(
|
||||
"synapse_storage_events_persisted_events_sep",
|
||||
"",
|
||||
["type", "origin_type", "origin_entity"],
|
||||
)
|
||||
|
||||
# State event type/key pairs that we need to gather to fill in the
|
||||
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables.
|
||||
@@ -260,6 +254,18 @@ class PersistEventsStore:
|
||||
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
|
||||
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
|
||||
|
||||
self.persist_event_counter = Counter(
|
||||
"synapse_storage_events_persisted_events",
|
||||
"",
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
self.event_counter = Counter(
|
||||
"synapse_storage_events_persisted_events_sep",
|
||||
"",
|
||||
["type", "origin_type", "origin_entity"],
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
@trace
|
||||
async def _persist_events_and_state_updates(
|
||||
self,
|
||||
@@ -356,7 +362,7 @@ class PersistEventsStore:
|
||||
new_event_links=new_event_links,
|
||||
sliding_sync_table_changes=sliding_sync_table_changes,
|
||||
)
|
||||
persist_event_counter.inc(len(events_and_contexts))
|
||||
self.persist_event_counter.inc(len(events_and_contexts))
|
||||
|
||||
if not use_negative_stream_ordering:
|
||||
# we don't want to set the event_persisted_position to a negative
|
||||
@@ -374,7 +380,7 @@ class PersistEventsStore:
|
||||
origin_type = "remote"
|
||||
origin_entity = get_domain_from_id(event.sender)
|
||||
|
||||
event_counter.labels(event.type, origin_type, origin_entity).inc()
|
||||
self.event_counter.labels(event.type, origin_type, origin_entity).inc()
|
||||
|
||||
if new_forward_extremities:
|
||||
self.store.get_latest_event_ids_in_room.prefill(
|
||||
|
||||
@@ -269,8 +269,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
limit=1000,
|
||||
)
|
||||
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
|
||||
"_curr_state_delta_stream_cache",
|
||||
min_curr_state_delta_id,
|
||||
name="_curr_state_delta_stream_cache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_curr_state_delta_id,
|
||||
prefilled_cache=curr_state_delta_prefill,
|
||||
)
|
||||
|
||||
@@ -283,8 +284,10 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
self._get_event_cache: AsyncLruCache[Tuple[str], EventCacheEntry] = (
|
||||
AsyncLruCache(
|
||||
cache_name="*getEvent*",
|
||||
max_size=hs.config.caches.event_cache_size,
|
||||
cache_name="*getEvent*",
|
||||
# TODO
|
||||
# cache_manager=hs.get_cache_manager(),
|
||||
# `extra_index_cb` Returns a tuple as that is the key type
|
||||
extra_index_cb=lambda _, v: (v.event.room_id,),
|
||||
)
|
||||
@@ -1233,7 +1236,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
to event row. Note that it may well contain additional events that
|
||||
were not part of this request.
|
||||
"""
|
||||
with Measure(self._clock, "_fetch_event_list"):
|
||||
with Measure(
|
||||
self._clock, self._metrics_collector_registry, "_fetch_event_list"
|
||||
):
|
||||
try:
|
||||
events_to_fetch = {
|
||||
event_id for events, _ in event_list for event_id in events
|
||||
|
||||
@@ -108,8 +108,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
|
||||
max_value=self._presence_id_gen.get_current_token(),
|
||||
)
|
||||
self.presence_stream_cache = StreamChangeCache(
|
||||
"PresenceStreamChangeCache",
|
||||
min_presence_val,
|
||||
name="PresenceStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_presence_val,
|
||||
prefilled_cache=presence_cache_prefill,
|
||||
)
|
||||
|
||||
|
||||
@@ -163,8 +163,9 @@ class PushRulesWorkerStore(
|
||||
)
|
||||
|
||||
self.push_rules_stream_cache = StreamChangeCache(
|
||||
"PushRulesStreamChangeCache",
|
||||
push_rules_id,
|
||||
name="PushRulesStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=push_rules_id,
|
||||
prefilled_cache=push_rules_prefill,
|
||||
)
|
||||
|
||||
|
||||
@@ -158,8 +158,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
limit=10000,
|
||||
)
|
||||
self._receipts_stream_cache = StreamChangeCache(
|
||||
"ReceiptsRoomChangeCache",
|
||||
min_receipts_stream_id,
|
||||
name="ReceiptsRoomChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_receipts_stream_id,
|
||||
prefilled_cache=receipts_stream_prefill,
|
||||
)
|
||||
|
||||
|
||||
@@ -121,6 +121,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
"",
|
||||
[],
|
||||
lambda: self._known_servers_count,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_count_known_servers")
|
||||
@@ -983,7 +984,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
`_get_user_ids_from_membership_event_ids` for any uncached events.
|
||||
"""
|
||||
|
||||
with Measure(self._clock, "get_joined_user_ids_from_state"):
|
||||
with Measure(
|
||||
self._clock,
|
||||
self._metrics_collector_registry,
|
||||
"get_joined_user_ids_from_state",
|
||||
):
|
||||
users_in_room = set()
|
||||
member_event_ids = [
|
||||
e_id for key, e_id in state.items() if key[0] == EventTypes.Member
|
||||
|
||||
@@ -617,12 +617,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
max_value=events_max,
|
||||
)
|
||||
self._events_stream_cache = StreamChangeCache(
|
||||
"EventsRoomStreamChangeCache",
|
||||
min_event_val,
|
||||
name="EventsRoomStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=min_event_val,
|
||||
prefilled_cache=event_cache_prefill,
|
||||
)
|
||||
self._membership_stream_cache = StreamChangeCache(
|
||||
"MembershipStreamChangeCache", events_max
|
||||
name="MembershipStreamChangeCache",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
current_stream_pos=events_max,
|
||||
)
|
||||
|
||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||
|
||||
@@ -123,14 +123,16 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
# vast majority of state in Matrix (today) is member events.
|
||||
|
||||
self._state_group_cache: DictionaryCache[int, StateKey, str] = DictionaryCache(
|
||||
"*stateGroupCache*",
|
||||
# TODO: this hasn't been tuned yet
|
||||
50000,
|
||||
max_entries=50000,
|
||||
name="*stateGroupCache*",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
self._state_group_members_cache: DictionaryCache[int, StateKey, str] = (
|
||||
DictionaryCache(
|
||||
"*stateGroupMembersCache*",
|
||||
500000,
|
||||
max_entries=500000,
|
||||
name="*stateGroupMembersCache*",
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
+146
-100
@@ -24,77 +24,102 @@ import logging
|
||||
import typing
|
||||
from enum import Enum, auto
|
||||
from sys import intern
|
||||
from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sized, TypeVar
|
||||
|
||||
import attr
|
||||
from prometheus_client import REGISTRY
|
||||
from prometheus_client import REGISTRY, CollectorRegistry
|
||||
from prometheus_client.core import Gauge
|
||||
|
||||
from synapse.config.cache import add_resizable_cache
|
||||
from synapse.util.metrics import DynamicCollectorRegistry
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Whether to track estimated memory usage of the LruCaches.
|
||||
TRACK_MEMORY_USAGE = False
|
||||
|
||||
# We track cache metrics in a special registry that lets us update the metrics
|
||||
# just before they are returned from the scrape endpoint.
|
||||
CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
|
||||
|
||||
caches_by_name: Dict[str, Sized] = {}
|
||||
class CacheMetrics:
|
||||
def __init__(self, registry: Optional[CollectorRegistry] = REGISTRY) -> None:
|
||||
# We track cache metrics in a special registry that lets us update the metrics
|
||||
# just before they are returned from the scrape endpoint.
|
||||
self.CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
|
||||
|
||||
cache_size = Gauge(
|
||||
"synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
|
||||
)
|
||||
cache_hits = Gauge(
|
||||
"synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
|
||||
)
|
||||
cache_evicted = Gauge(
|
||||
"synapse_util_caches_cache_evicted_size",
|
||||
"",
|
||||
["name", "reason"],
|
||||
registry=CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
cache_total = Gauge(
|
||||
"synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
|
||||
)
|
||||
cache_max_size = Gauge(
|
||||
"synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
|
||||
)
|
||||
cache_memory_usage = Gauge(
|
||||
"synapse_util_caches_cache_size_bytes",
|
||||
"Estimated memory usage of the caches",
|
||||
["name"],
|
||||
registry=CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_size = Gauge(
|
||||
"synapse_util_caches_cache_size",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_hits = Gauge(
|
||||
"synapse_util_caches_cache_hits",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_evicted = Gauge(
|
||||
"synapse_util_caches_cache_evicted_size",
|
||||
"",
|
||||
["name", "reason"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_total = Gauge(
|
||||
"synapse_util_caches_cache",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_max_size = Gauge(
|
||||
"synapse_util_caches_cache_max_size",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.cache_memory_usage = Gauge(
|
||||
"synapse_util_caches_cache_size_bytes",
|
||||
"Estimated memory usage of the caches",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
|
||||
response_cache_size = Gauge(
|
||||
"synapse_util_caches_response_cache_size",
|
||||
"",
|
||||
["name"],
|
||||
registry=CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
response_cache_hits = Gauge(
|
||||
"synapse_util_caches_response_cache_hits",
|
||||
"",
|
||||
["name"],
|
||||
registry=CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
response_cache_evicted = Gauge(
|
||||
"synapse_util_caches_response_cache_evicted_size",
|
||||
"",
|
||||
["name", "reason"],
|
||||
registry=CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
response_cache_total = Gauge(
|
||||
"synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
|
||||
)
|
||||
self.response_cache_size = Gauge(
|
||||
"synapse_util_caches_response_cache_size",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.response_cache_hits = Gauge(
|
||||
"synapse_util_caches_response_cache_hits",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.response_cache_evicted = Gauge(
|
||||
"synapse_util_caches_response_cache_evicted_size",
|
||||
"",
|
||||
["name", "reason"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
self.response_cache_total = Gauge(
|
||||
"synapse_util_caches_response_cache",
|
||||
"",
|
||||
["name"],
|
||||
registry=self.CACHE_METRIC_REGISTRY,
|
||||
)
|
||||
|
||||
# Register our custom cache metrics registry with the global registry
|
||||
if registry is not None:
|
||||
registry.register(self.CACHE_METRIC_REGISTRY)
|
||||
|
||||
# Register our custom cache metrics registry with the global registry
|
||||
REGISTRY.register(CACHE_METRIC_REGISTRY)
|
||||
def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
|
||||
"""
|
||||
Registers a hook that is called before metric collection.
|
||||
"""
|
||||
self.CACHE_METRIC_REGISTRY.register_hook(metric_name, hook)
|
||||
|
||||
|
||||
class EvictionReason(Enum):
|
||||
@@ -105,6 +130,7 @@ class EvictionReason(Enum):
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class CacheMetric:
|
||||
_cache_metrics: CacheMetrics
|
||||
_cache: Sized
|
||||
_cache_type: str
|
||||
_cache_name: str
|
||||
@@ -146,31 +172,41 @@ class CacheMetric:
|
||||
def collect(self) -> None:
|
||||
try:
|
||||
if self._cache_type == "response_cache":
|
||||
response_cache_size.labels(self._cache_name).set(len(self._cache))
|
||||
response_cache_hits.labels(self._cache_name).set(self.hits)
|
||||
self._cache_metrics.response_cache_size.labels(self._cache_name).set(
|
||||
len(self._cache)
|
||||
)
|
||||
self._cache_metrics.response_cache_hits.labels(self._cache_name).set(
|
||||
self.hits
|
||||
)
|
||||
for reason in EvictionReason:
|
||||
response_cache_evicted.labels(self._cache_name, reason.name).set(
|
||||
self.eviction_size_by_reason[reason]
|
||||
)
|
||||
response_cache_total.labels(self._cache_name).set(
|
||||
self._cache_metrics.response_cache_evicted.labels(
|
||||
self._cache_name, reason.name
|
||||
).set(self.eviction_size_by_reason[reason])
|
||||
self._cache_metrics.response_cache_total.labels(self._cache_name).set(
|
||||
self.hits + self.misses
|
||||
)
|
||||
else:
|
||||
cache_size.labels(self._cache_name).set(len(self._cache))
|
||||
cache_hits.labels(self._cache_name).set(self.hits)
|
||||
self._cache_metrics.cache_size.labels(self._cache_name).set(
|
||||
len(self._cache)
|
||||
)
|
||||
self._cache_metrics.cache_hits.labels(self._cache_name).set(self.hits)
|
||||
for reason in EvictionReason:
|
||||
cache_evicted.labels(self._cache_name, reason.name).set(
|
||||
self.eviction_size_by_reason[reason]
|
||||
)
|
||||
cache_total.labels(self._cache_name).set(self.hits + self.misses)
|
||||
self._cache_metrics.cache_evicted.labels(
|
||||
self._cache_name, reason.name
|
||||
).set(self.eviction_size_by_reason[reason])
|
||||
self._cache_metrics.cache_total.labels(self._cache_name).set(
|
||||
self.hits + self.misses
|
||||
)
|
||||
max_size = getattr(self._cache, "max_size", None)
|
||||
if max_size:
|
||||
cache_max_size.labels(self._cache_name).set(max_size)
|
||||
self._cache_metrics.cache_max_size.labels(self._cache_name).set(
|
||||
max_size
|
||||
)
|
||||
|
||||
if TRACK_MEMORY_USAGE:
|
||||
# self.memory_usage can be None if nothing has been inserted
|
||||
# into the cache yet.
|
||||
cache_memory_usage.labels(self._cache_name).set(
|
||||
self._cache_metrics.cache_memory_usage.labels(self._cache_name).set(
|
||||
self.memory_usage or 0
|
||||
)
|
||||
if self._collect_callback:
|
||||
@@ -180,41 +216,51 @@ class CacheMetric:
|
||||
raise
|
||||
|
||||
|
||||
def register_cache(
|
||||
cache_type: str,
|
||||
cache_name: str,
|
||||
cache: Sized,
|
||||
collect_callback: Optional[Callable] = None,
|
||||
resizable: bool = True,
|
||||
resize_callback: Optional[Callable] = None,
|
||||
) -> CacheMetric:
|
||||
"""Register a cache object for metric collection and resizing.
|
||||
|
||||
Args:
|
||||
cache_type: a string indicating the "type" of the cache. This is used
|
||||
only for deduplication so isn't too important provided it's constant.
|
||||
cache_name: name of the cache
|
||||
cache: cache itself, which must implement __len__(), and may optionally implement
|
||||
a max_size property
|
||||
collect_callback: If given, a function which is called during metric
|
||||
collection to update additional metrics.
|
||||
resizable: Whether this cache supports being resized, in which case either
|
||||
resize_callback must be provided, or the cache must support set_max_size().
|
||||
resize_callback: A function which can be called to resize the cache.
|
||||
|
||||
Returns:
|
||||
an object which provides inc_{hits,misses,evictions} methods
|
||||
class CacheManager:
|
||||
"""
|
||||
Used as a central point to register caches and collect metrics on them.
|
||||
"""
|
||||
if resizable:
|
||||
if not resize_callback:
|
||||
resize_callback = cache.set_cache_factor # type: ignore
|
||||
add_resizable_cache(cache_name, resize_callback)
|
||||
|
||||
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
|
||||
metric_name = "cache_%s_%s" % (cache_type, cache_name)
|
||||
caches_by_name[cache_name] = cache
|
||||
CACHE_METRIC_REGISTRY.register_hook(metric_name, metric.collect)
|
||||
return metric
|
||||
def __init__(self, hs: "HomeServer") -> None:
|
||||
self._cache_metrics = CacheMetrics(hs.metrics_collector_registry)
|
||||
|
||||
def register_cache(
|
||||
self,
|
||||
cache_type: str,
|
||||
cache_name: str,
|
||||
cache: Sized,
|
||||
collect_callback: Optional[Callable] = None,
|
||||
resizable: bool = True,
|
||||
resize_callback: Optional[Callable] = None,
|
||||
) -> CacheMetric:
|
||||
"""Register a cache object for metric collection and resizing.
|
||||
|
||||
Args:
|
||||
cache_type: a string indicating the "type" of the cache. This is used
|
||||
only for deduplication so isn't too important provided it's constant.
|
||||
cache_name: name of the cache
|
||||
cache: cache itself, which must implement __len__(), and may optionally implement
|
||||
a max_size property
|
||||
collect_callback: If given, a function which is called during metric
|
||||
collection to update additional metrics.
|
||||
resizable: Whether this cache supports being resized, in which case either
|
||||
resize_callback must be provided, or the cache must support set_max_size().
|
||||
resize_callback: A function which can be called to resize the cache.
|
||||
|
||||
Returns:
|
||||
an object which provides inc_{hits,misses,evictions} methods
|
||||
"""
|
||||
if resizable:
|
||||
if not resize_callback:
|
||||
resize_callback = cache.set_cache_factor # type: ignore
|
||||
add_resizable_cache(cache_name, resize_callback)
|
||||
|
||||
metric = CacheMetric(
|
||||
self._cache_metrics, cache, cache_type, cache_name, collect_callback
|
||||
)
|
||||
metric_name = "cache_%s_%s" % (cache_type, cache_name)
|
||||
self._cache_metrics.register_hook(metric_name, metric.collect)
|
||||
return metric
|
||||
|
||||
|
||||
KNOWN_KEYS = {
|
||||
|
||||
@@ -35,6 +35,7 @@ from typing import (
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache
|
||||
|
||||
@@ -127,7 +128,7 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
for the '2' dict key.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, max_entries: int = 1000):
|
||||
def __init__(self, *, max_entries: int, name: str, cache_manager: CacheManager):
|
||||
# We use a single LruCache to store two different types of entries:
|
||||
# 1. Map from (key, dict_key) -> dict value (or sentinel, indicating
|
||||
# the key doesn't exist in the dict); and
|
||||
@@ -153,6 +154,8 @@ class DictionaryCache(Generic[KT, DKT, DV]):
|
||||
] = LruCache(
|
||||
max_size=max_entries,
|
||||
cache_name=name,
|
||||
# TODO
|
||||
# cache_manager=cache_manager,
|
||||
cache_type=TreeCache,
|
||||
size_callback=len,
|
||||
)
|
||||
|
||||
@@ -30,7 +30,7 @@ from twisted.internet import defer
|
||||
from synapse.config import cache as cache_config
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches import EvictionReason, register_cache
|
||||
from synapse.util.caches import CacheManager, EvictionReason
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -48,6 +48,7 @@ class ExpiringCache(Generic[KT, VT]):
|
||||
self,
|
||||
cache_name: str,
|
||||
clock: Clock,
|
||||
cache_manager: CacheManager,
|
||||
max_len: int = 0,
|
||||
expiry_ms: int = 0,
|
||||
reset_expiry_on_get: bool = False,
|
||||
@@ -83,7 +84,7 @@ class ExpiringCache(Generic[KT, VT]):
|
||||
|
||||
self.iterable = iterable
|
||||
|
||||
self.metrics = register_cache("expiring", cache_name, self)
|
||||
self.metrics = cache_manager.register_cache("expiring", cache_name, self)
|
||||
|
||||
if not self._expiry_ms:
|
||||
# Don't bother starting the loop if things never expire
|
||||
|
||||
@@ -50,9 +50,8 @@ from twisted.internet.interfaces import IReactorTime
|
||||
|
||||
from synapse.config import cache as cache_config
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.metrics.jemalloc import get_jemalloc_stats
|
||||
from synapse.util import Clock, caches
|
||||
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
|
||||
from synapse.util.caches import CacheMetric, EvictionReason
|
||||
from synapse.util.caches.treecache import (
|
||||
TreeCache,
|
||||
iterate_tree_cache_entry,
|
||||
@@ -120,7 +119,10 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
|
||||
|
||||
@wrap_as_background_process("LruCache._expire_old_entries")
|
||||
async def _expire_old_entries(
|
||||
clock: Clock, expiry_seconds: float, autotune_config: Optional[dict]
|
||||
hs: "HomeServer",
|
||||
clock: Clock,
|
||||
expiry_seconds: float,
|
||||
autotune_config: Optional[dict],
|
||||
) -> None:
|
||||
"""Walks the global cache list to find cache entries that haven't been
|
||||
accessed in the given number of seconds, or if a given memory threshold has been breached.
|
||||
@@ -141,7 +143,7 @@ async def _expire_old_entries(
|
||||
evicting_due_to_memory = False
|
||||
|
||||
# determine if we're evicting due to memory
|
||||
jemalloc_interface = get_jemalloc_stats()
|
||||
jemalloc_interface = hs.jemalloc_stats
|
||||
if jemalloc_interface and autotune_config:
|
||||
try:
|
||||
jemalloc_interface.refresh_stats()
|
||||
@@ -238,6 +240,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
|
||||
clock.looping_call(
|
||||
_expire_old_entries,
|
||||
30 * 1000,
|
||||
hs,
|
||||
clock,
|
||||
expiry_time,
|
||||
hs.config.caches.cache_autotuning,
|
||||
@@ -379,10 +382,11 @@ class LruCache(Generic[KT, VT]):
|
||||
def __init__(
|
||||
self,
|
||||
max_size: int,
|
||||
*,
|
||||
cache_name: Optional[str] = None,
|
||||
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
||||
cache_type: Type[Union[dict, TreeCache]] = dict,
|
||||
size_callback: Optional[Callable[[VT], int]] = None,
|
||||
metrics_collection_callback: Optional[Callable[[], None]] = None,
|
||||
apply_cache_factor_from_config: bool = True,
|
||||
clock: Optional[Clock] = None,
|
||||
prune_unread_entries: bool = True,
|
||||
@@ -395,11 +399,7 @@ class LruCache(Generic[KT, VT]):
|
||||
cache_name: The name of this cache, for the prometheus metrics. If unset,
|
||||
no metrics will be reported on this cache.
|
||||
|
||||
cache_type:
|
||||
type of underlying cache to be used. Typically one of dict
|
||||
or TreeCache.
|
||||
|
||||
size_callback:
|
||||
Ignored if `cache_name` is `None`.
|
||||
|
||||
metrics_collection_callback:
|
||||
metrics collection callback. This is called early in the metrics
|
||||
@@ -407,7 +407,13 @@ class LruCache(Generic[KT, VT]):
|
||||
prometheus Registry are collected, so can be used to update any dynamic
|
||||
metrics.
|
||||
|
||||
Ignored if cache_name is None.
|
||||
Ignored if `cache_name` is `None`.
|
||||
|
||||
cache_type:
|
||||
type of underlying cache to be used. Typically one of dict
|
||||
or TreeCache.
|
||||
|
||||
size_callback:
|
||||
|
||||
apply_cache_factor_from_config: If true, `max_size` will be
|
||||
multiplied by a cache factor derived from the homeserver config
|
||||
@@ -457,15 +463,17 @@ class LruCache(Generic[KT, VT]):
|
||||
# do yet when we get resized.
|
||||
self._on_resize: Optional[Callable[[], None]] = None
|
||||
|
||||
if cache_name is not None:
|
||||
metrics: Optional[CacheMetric] = register_cache(
|
||||
"lru_cache",
|
||||
cache_name,
|
||||
self,
|
||||
collect_callback=metrics_collection_callback,
|
||||
)
|
||||
else:
|
||||
metrics = None
|
||||
# TODO
|
||||
# if cache_name is not None:
|
||||
# metrics: Optional[CacheMetric] = cache_manager.register_cache(
|
||||
# "lru_cache",
|
||||
# cache_name,
|
||||
# self,
|
||||
# collect_callback=metrics_collection_callback,
|
||||
# )
|
||||
# else:
|
||||
# metrics = None
|
||||
metrics: Optional[CacheMetric] = None
|
||||
|
||||
# this is exposed for access from outside this class
|
||||
self.metrics = metrics
|
||||
|
||||
@@ -43,7 +43,7 @@ from synapse.logging.opentracing import (
|
||||
)
|
||||
from synapse.util import Clock
|
||||
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
|
||||
from synapse.util.caches import EvictionReason, register_cache
|
||||
from synapse.util.caches import CacheManager, EvictionReason
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -104,6 +104,7 @@ class ResponseCache(Generic[KV]):
|
||||
def __init__(
|
||||
self,
|
||||
clock: Clock,
|
||||
cache_manager: CacheManager,
|
||||
name: str,
|
||||
timeout_ms: float = 0,
|
||||
enable_logging: bool = True,
|
||||
@@ -114,7 +115,9 @@ class ResponseCache(Generic[KV]):
|
||||
self.timeout_sec = timeout_ms / 1000.0
|
||||
|
||||
self._name = name
|
||||
self._metrics = register_cache("response_cache", name, self, resizable=False)
|
||||
self._metrics = cache_manager.register_cache(
|
||||
"response_cache", name, self, resizable=False
|
||||
)
|
||||
self._enable_logging = enable_logging
|
||||
|
||||
def size(self) -> int:
|
||||
|
||||
@@ -26,7 +26,7 @@ from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Un
|
||||
import attr
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from synapse.util import caches
|
||||
from synapse.util.caches import CacheManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -73,7 +73,9 @@ class StreamChangeCache:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
cache_manager: CacheManager,
|
||||
current_stream_pos: int,
|
||||
max_size: int = 10000,
|
||||
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
|
||||
@@ -95,7 +97,7 @@ class StreamChangeCache:
|
||||
self._earliest_known_stream_pos = current_stream_pos
|
||||
|
||||
self.name = name
|
||||
self.metrics = caches.register_cache(
|
||||
self.metrics = cache_manager.register_cache(
|
||||
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
|
||||
)
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ from typing import Any, Callable, Dict, Generic, Tuple, TypeVar, Union
|
||||
import attr
|
||||
from sortedcontainers import SortedList
|
||||
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.caches import CacheManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -40,7 +40,12 @@ VT = TypeVar("VT")
|
||||
class TTLCache(Generic[KT, VT]):
|
||||
"""A key/value cache implementation where each entry has its own TTL"""
|
||||
|
||||
def __init__(self, cache_name: str, timer: Callable[[], float] = time.time):
|
||||
def __init__(
|
||||
self,
|
||||
cache_name: str,
|
||||
cache_manager: CacheManager,
|
||||
timer: Callable[[], float] = time.time,
|
||||
):
|
||||
# map from key to _CacheEntry
|
||||
self._data: Dict[KT, _CacheEntry[KT, VT]] = {}
|
||||
|
||||
@@ -49,7 +54,9 @@ class TTLCache(Generic[KT, VT]):
|
||||
|
||||
self._timer = timer
|
||||
|
||||
self._metrics = register_cache("ttl", cache_name, self, resizable=False)
|
||||
self._metrics = cache_manager.register_cache(
|
||||
"ttl", cache_name, self, resizable=False
|
||||
)
|
||||
|
||||
def set(self, key: KT, value: VT, ttl: float) -> None:
|
||||
"""Add/update an entry in the cache
|
||||
|
||||
+45
-14
@@ -79,21 +79,23 @@ class _InFlightMetric(Protocol):
|
||||
real_time_sum: float
|
||||
|
||||
|
||||
# Tracks the number of blocks currently active
|
||||
in_flight: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
||||
"synapse_util_metrics_block_in_flight",
|
||||
"",
|
||||
labels=["block_name"],
|
||||
sub_metrics=["real_time_max", "real_time_sum"],
|
||||
)
|
||||
# Each homeserver will have its own `CollectorRegistry`, which is used to namespace the
|
||||
# metrics to the given homeserver. Because of how `@measure_func` is used as a
|
||||
# decorator, we have to keep track of this out of band.
|
||||
metrics_collector_registry_to_in_flight_gauge: Dict[
|
||||
CollectorRegistry, InFlightGauge
|
||||
] = {}
|
||||
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class HasClock(Protocol):
|
||||
class HasClockAndMetricsCollectorRegistry(Protocol):
|
||||
# Used to measure functions
|
||||
clock: Clock
|
||||
# Used to namespace the metrics to the given homeserver
|
||||
metrics_collector_registry: CollectorRegistry
|
||||
|
||||
|
||||
def measure_func(
|
||||
@@ -119,13 +121,17 @@ def measure_func(
|
||||
"""
|
||||
|
||||
def wrapper(
|
||||
func: Callable[Concatenate[HasClock, P], Awaitable[R]],
|
||||
func: Callable[
|
||||
Concatenate[HasClockAndMetricsCollectorRegistry, P], Awaitable[R]
|
||||
],
|
||||
) -> Callable[P, Awaitable[R]]:
|
||||
block_name = func.__name__ if name is None else name
|
||||
|
||||
@wraps(func)
|
||||
async def measured_func(self: HasClock, *args: P.args, **kwargs: P.kwargs) -> R:
|
||||
with Measure(self.clock, block_name):
|
||||
async def measured_func(
|
||||
self: HasClockAndMetricsCollectorRegistry, *args: P.args, **kwargs: P.kwargs
|
||||
) -> R:
|
||||
with Measure(self.clock, self.metrics_collector_registry, block_name):
|
||||
r = await func(self, *args, **kwargs)
|
||||
return r
|
||||
|
||||
@@ -141,12 +147,15 @@ def measure_func(
|
||||
class Measure:
|
||||
__slots__ = [
|
||||
"clock",
|
||||
"metrics_collector_registry",
|
||||
"name",
|
||||
"_logging_context",
|
||||
"start",
|
||||
]
|
||||
|
||||
def __init__(self, clock: Clock, name: str) -> None:
|
||||
def __init__(
|
||||
self, clock: Clock, metrics_collector_registry: CollectorRegistry, name: str
|
||||
) -> None:
|
||||
"""
|
||||
Args:
|
||||
clock: An object with a "time()" method, which returns the current
|
||||
@@ -154,6 +163,7 @@ class Measure:
|
||||
name: The name of the metric to report.
|
||||
"""
|
||||
self.clock = clock
|
||||
self.metrics_collector_registry = metrics_collector_registry
|
||||
self.name = name
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
@@ -168,13 +178,34 @@ class Measure:
|
||||
self._logging_context = LoggingContext(str(curr_context), parent_context)
|
||||
self.start: Optional[float] = None
|
||||
|
||||
def get_in_flight_gauge(self) -> InFlightGauge[_InFlightMetric]:
|
||||
existing_in_flight_gauge = metrics_collector_registry_to_in_flight_gauge.get(
|
||||
self.metrics_collector_registry, None
|
||||
)
|
||||
if existing_in_flight_gauge is not None:
|
||||
return existing_in_flight_gauge
|
||||
|
||||
# Tracks the number of blocks currently active
|
||||
in_flight_gauge: InFlightGauge[_InFlightMetric] = InFlightGauge(
|
||||
"synapse_util_metrics_block_in_flight",
|
||||
"",
|
||||
labels=["block_name"],
|
||||
sub_metrics=["real_time_max", "real_time_sum"],
|
||||
registry=self.metrics_collector_registry,
|
||||
)
|
||||
metrics_collector_registry_to_in_flight_gauge[
|
||||
self.metrics_collector_registry
|
||||
] = in_flight_gauge
|
||||
|
||||
return in_flight_gauge
|
||||
|
||||
def __enter__(self) -> "Measure":
|
||||
if self.start is not None:
|
||||
raise RuntimeError("Measure() objects cannot be re-used")
|
||||
|
||||
self.start = self.clock.time()
|
||||
self._logging_context.__enter__()
|
||||
in_flight.register((self.name,), self._update_in_flight)
|
||||
self.get_in_flight_gauge().register((self.name,), self._update_in_flight)
|
||||
|
||||
logger.debug("Entering block %s", self.name)
|
||||
|
||||
@@ -194,7 +225,7 @@ class Measure:
|
||||
duration = self.clock.time() - self.start
|
||||
usage = self.get_resource_usage()
|
||||
|
||||
in_flight.unregister((self.name,), self._update_in_flight)
|
||||
self.get_in_flight_gauge().unregister((self.name,), self._update_in_flight)
|
||||
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
try:
|
||||
|
||||
@@ -132,6 +132,7 @@ class TaskScheduler:
|
||||
"The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labels=None,
|
||||
caller=lambda: len(self._running_tasks),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def register_action(
|
||||
|
||||
@@ -75,6 +75,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self.store = homeserver.get_datastores().main
|
||||
self.presence_handler = homeserver.get_presence_handler()
|
||||
|
||||
def test_offline_to_online(self) -> None:
|
||||
wheel_timer = Mock()
|
||||
@@ -87,6 +88,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -134,6 +136,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -184,6 +187,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -232,6 +236,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -272,6 +277,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=False,
|
||||
@@ -311,6 +317,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -338,6 +345,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -428,6 +436,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
new_state = prev_state.copy_and_replace(state=final_state, last_active_ts=now)
|
||||
|
||||
handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
@@ -491,6 +500,7 @@ class PresenceUpdateTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
|
||||
state, persist_and_notify, federation_ping = handle_update(
|
||||
self.presence_handler,
|
||||
prev_state,
|
||||
new_state,
|
||||
is_mine=True,
|
||||
|
||||
@@ -84,14 +84,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# we mock out the federation client too
|
||||
self.mock_federation_client = AsyncMock(spec=["put_json"])
|
||||
self.mock_federation_client.put_json.return_value = (200, "OK")
|
||||
self.mock_federation_client.agent = MatrixFederationAgent(
|
||||
reactor,
|
||||
tls_client_options_factory=None,
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
ip_allowlist=None,
|
||||
ip_blocklist=IPSet(),
|
||||
)
|
||||
|
||||
# the tests assume that we are starting at unix time 1000
|
||||
reactor.pump((1000,))
|
||||
@@ -104,6 +96,19 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
|
||||
replication_streams={},
|
||||
)
|
||||
|
||||
# Finish off mocking the federation client
|
||||
self.mock_federation_client.put_json.return_value = (200, "OK")
|
||||
self.mock_federation_client.agent = MatrixFederationAgent(
|
||||
reactor=reactor,
|
||||
tls_client_options_factory=None,
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
ip_allowlist=None,
|
||||
ip_blocklist=IPSet(),
|
||||
# After we get access to the `hs` homeserver instance, we can replace the federation agent
|
||||
metrics_collector_registry=hs.metrics_collector_registry,
|
||||
cache_manager=hs.get_cache_manager(),
|
||||
)
|
||||
|
||||
return hs
|
||||
|
||||
def create_resource_dict(self) -> Dict[str, Resource]:
|
||||
|
||||
@@ -21,10 +21,11 @@ import base64
|
||||
import logging
|
||||
import os
|
||||
from typing import Generator, List, Optional, cast
|
||||
from unittest.mock import AsyncMock, call, patch
|
||||
from unittest.mock import AsyncMock, Mock, call, patch
|
||||
|
||||
import treq
|
||||
from netaddr import IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from service_identity import VerificationError
|
||||
from zope.interface import implementer
|
||||
|
||||
@@ -60,6 +61,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
)
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -84,16 +86,24 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
|
||||
self.tls_factory = FederationPolicyForHTTPS(config)
|
||||
|
||||
cache_manager = Mock(spec=CacheManager)
|
||||
|
||||
self.well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache(
|
||||
"test_cache", timer=self.reactor.seconds
|
||||
"test_cache",
|
||||
timer=self.reactor.seconds,
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
self.had_well_known_cache: TTLCache[bytes, bool] = TTLCache(
|
||||
"test_cache", timer=self.reactor.seconds
|
||||
"test_cache",
|
||||
timer=self.reactor.seconds,
|
||||
cache_manager=cache_manager,
|
||||
)
|
||||
self.well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
b"test-agent",
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=cache_manager,
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
)
|
||||
@@ -274,6 +284,8 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
user_agent=b"test-agent", # Note that this is unused since _well_known_resolver is provided.
|
||||
ip_allowlist=IPSet(),
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_resolver=self.well_known_resolver,
|
||||
)
|
||||
@@ -1016,11 +1028,15 @@ class MatrixFederationAgentTests(unittest.TestCase):
|
||||
user_agent=b"test-agent", # This is unused since _well_known_resolver is passed below.
|
||||
ip_allowlist=IPSet(),
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_resolver=WellKnownResolver(
|
||||
cast(ISynapseReactor, self.reactor),
|
||||
Agent(self.reactor, contextFactory=tls_factory),
|
||||
b"test-agent",
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
),
|
||||
|
||||
@@ -139,7 +139,9 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
self.hs.get_replication_command_handler()._streams["typing"].last_token = 0
|
||||
typing._latest_room_serial = 0
|
||||
typing._typing_stream_change_cache = StreamChangeCache(
|
||||
"TypingStreamChangeCache", typing._latest_room_serial
|
||||
name="TypingStreamChangeCache",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=typing._latest_room_serial,
|
||||
)
|
||||
typing._reset()
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import logging
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from netaddr import IPSet
|
||||
from prometheus_client import CollectorRegistry
|
||||
from signedjson.key import (
|
||||
encode_verify_key_base64,
|
||||
generate_signing_key,
|
||||
@@ -42,6 +43,7 @@ from synapse.server import HomeServer
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict, UserID, create_requester
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches import CacheManager
|
||||
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
from tests.server import get_clock
|
||||
@@ -68,11 +70,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
reactor, _ = get_clock()
|
||||
self.matrix_federation_agent = MatrixFederationAgent(
|
||||
reactor,
|
||||
reactor=reactor,
|
||||
tls_client_options_factory=None,
|
||||
user_agent=b"SynapseInTrialTest/0.0.0",
|
||||
ip_allowlist=None,
|
||||
ip_blocklist=IPSet(),
|
||||
metrics_collector_registry=CollectorRegistry(auto_describe=True),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
)
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
#
|
||||
from prometheus_client import generate_latest
|
||||
|
||||
from synapse.metrics import REGISTRY
|
||||
from synapse.types import UserID, create_requester
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
@@ -61,7 +60,7 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
|
||||
items = list(
|
||||
filter(
|
||||
lambda x: b"synapse_forward_extremities_" in x and b"# HELP" not in x,
|
||||
generate_latest(REGISTRY).split(b"\n"),
|
||||
generate_latest(self.hs.metrics_collector_registry).split(b"\n"),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from parameterized import parameterized
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
||||
|
||||
from tests.server import get_clock
|
||||
@@ -44,9 +45,10 @@ class ResponseCacheTestCase(TestCase):
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.reactor, self.clock = get_clock()
|
||||
self.cache_manager = Mock(spec=CacheManager)
|
||||
|
||||
def with_cache(self, name: str, ms: int = 0) -> ResponseCache:
|
||||
return ResponseCache(self.clock, name, timeout_ms=ms)
|
||||
return ResponseCache(self.clock, self.cache_manager, name, timeout_ms=ms)
|
||||
|
||||
@staticmethod
|
||||
async def instant_return(o: str) -> str:
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
from unittest.mock import Mock
|
||||
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -28,7 +29,11 @@ from tests import unittest
|
||||
class CacheTestCase(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.mock_timer = Mock(side_effect=lambda: 100.0)
|
||||
self.cache: TTLCache[str, str] = TTLCache("test_cache", self.mock_timer)
|
||||
self.cache: TTLCache[str, str] = TTLCache(
|
||||
"test_cache",
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
timer=self.mock_timer,
|
||||
)
|
||||
|
||||
def test_get(self) -> None:
|
||||
"""simple set/get tests"""
|
||||
|
||||
@@ -20,6 +20,9 @@
|
||||
#
|
||||
|
||||
|
||||
from unittest.mock import Mock
|
||||
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -28,7 +31,9 @@ from tests import unittest
|
||||
class DictCacheTestCase(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.cache: DictionaryCache[str, str, str] = DictionaryCache(
|
||||
"foobar", max_entries=10
|
||||
max_entries=10,
|
||||
name="foobar",
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
)
|
||||
|
||||
def test_simple_cache_hit_full(self) -> None:
|
||||
|
||||
@@ -20,8 +20,10 @@
|
||||
#
|
||||
|
||||
from typing import List, cast
|
||||
from unittest.mock import Mock
|
||||
|
||||
from synapse.util import Clock
|
||||
from synapse.util.caches import CacheManager
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
||||
from tests.utils import MockClock
|
||||
@@ -33,7 +35,7 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
|
||||
def test_get_set(self) -> None:
|
||||
clock = MockClock()
|
||||
cache: ExpiringCache[str, str] = ExpiringCache(
|
||||
"test", cast(Clock, clock), max_len=1
|
||||
"test", cast(Clock, clock), cache_manager=Mock(spec=CacheManager), max_len=1
|
||||
)
|
||||
|
||||
cache["key"] = "value"
|
||||
@@ -43,7 +45,7 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
|
||||
def test_eviction(self) -> None:
|
||||
clock = MockClock()
|
||||
cache: ExpiringCache[str, str] = ExpiringCache(
|
||||
"test", cast(Clock, clock), max_len=2
|
||||
"test", cast(Clock, clock), cache_manager=Mock(spec=CacheManager), max_len=2
|
||||
)
|
||||
|
||||
cache["key"] = "value"
|
||||
@@ -59,7 +61,11 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
|
||||
def test_iterable_eviction(self) -> None:
|
||||
clock = MockClock()
|
||||
cache: ExpiringCache[str, List[int]] = ExpiringCache(
|
||||
"test", cast(Clock, clock), max_len=5, iterable=True
|
||||
"test",
|
||||
cast(Clock, clock),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
max_len=5,
|
||||
iterable=True,
|
||||
)
|
||||
|
||||
cache["key"] = [1]
|
||||
@@ -79,7 +85,10 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
|
||||
def test_time_eviction(self) -> None:
|
||||
clock = MockClock()
|
||||
cache: ExpiringCache[str, int] = ExpiringCache(
|
||||
"test", cast(Clock, clock), expiry_ms=1000
|
||||
"test",
|
||||
cast(Clock, clock),
|
||||
cache_manager=Mock(spec=CacheManager),
|
||||
expiry_ms=1000,
|
||||
)
|
||||
|
||||
cache["key"] = 1
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
|
||||
from typing import List, Tuple
|
||||
from unittest.mock import Mock, patch
|
||||
from unittest.mock import Mock
|
||||
|
||||
from synapse.metrics.jemalloc import JemallocStats
|
||||
from synapse.types import JsonDict
|
||||
@@ -96,7 +96,10 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
@override_config({"caches": {"per_cache_factors": {"mycache": 10}}})
|
||||
def test_special_size(self) -> None:
|
||||
cache: LruCache = LruCache(10, "mycache")
|
||||
cache: LruCache = LruCache(
|
||||
10,
|
||||
cache_name="mycache",
|
||||
)
|
||||
self.assertEqual(cache.max_size, 100)
|
||||
|
||||
|
||||
@@ -342,10 +345,9 @@ class MemoryEvictionTestCase(unittest.HomeserverTestCase):
|
||||
}
|
||||
}
|
||||
)
|
||||
@patch("synapse.util.caches.lrucache.get_jemalloc_stats")
|
||||
def test_evict_memory(self, jemalloc_interface: Mock) -> None:
|
||||
def test_evict_memory(self) -> None:
|
||||
mock_jemalloc_class = Mock(spec=JemallocStats)
|
||||
jemalloc_interface.return_value = mock_jemalloc_class
|
||||
self.hs.jemalloc_stats = mock_jemalloc_class()
|
||||
|
||||
# set the return value of get_stat() to be greater than max_cache_memory_usage
|
||||
mock_jemalloc_class.get_stat.return_value = 924288000
|
||||
|
||||
@@ -15,7 +15,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
Providing a prefilled cache to StreamChangeCache will result in a cache
|
||||
with the prefilled-cache entered in.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
prefilled_cache={"user@foo.com": 2},
|
||||
)
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
|
||||
|
||||
def test_has_entity_changed(self) -> None:
|
||||
@@ -23,7 +28,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
StreamChangeCache.entity_has_changed will mark entities as changed, and
|
||||
has_entity_changed will observe the changed entities.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 3)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=3,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 6)
|
||||
cache.entity_has_changed("bar@baz.net", 7)
|
||||
@@ -61,7 +70,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
StreamChangeCache.entity_has_changed will respect the max size and
|
||||
purge the oldest items upon reaching that max size.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1, max_size=2)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
max_size=2,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
@@ -100,7 +114,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
entities since the given position. If the position is before the start
|
||||
of the known stream, it returns None instead.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
@@ -148,7 +166,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
stream position is before it, it will return True, otherwise False if
|
||||
the cache has no entries.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
)
|
||||
|
||||
# With no entities, it returns True for the past, present, and False for
|
||||
# the future.
|
||||
@@ -175,7 +197,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
stream position is earlier than the earliest known position, it will
|
||||
return all of the entities queried for.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
@@ -242,7 +268,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
recent point where the entity could have changed. If the entity is not
|
||||
known, the stream start is provided instead.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
@@ -260,7 +290,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
|
||||
"""
|
||||
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1, max_size=10)
|
||||
cache = StreamChangeCache(
|
||||
name="#test",
|
||||
cache_manager=self.hs.get_cache_manager(),
|
||||
current_stream_pos=1,
|
||||
max_size=10,
|
||||
)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
|
||||
Reference in New Issue
Block a user