Move LaterGauge further along
This commit is contained in:
@@ -117,6 +117,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"",
|
||||
[],
|
||||
lambda: len(queue),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
|
||||
@@ -888,6 +888,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.wheel_timer),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# Used to handle sending of presence to newly joined users/servers
|
||||
|
||||
@@ -61,8 +61,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_PREFIX = "/_synapse/metrics"
|
||||
|
||||
all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
|
||||
@@ -99,17 +97,8 @@ class LaterGauge(Collector):
|
||||
yield g
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
self._register()
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||
if self.registry is not None:
|
||||
self.registry.unregister(all_gauges.pop(self.name))
|
||||
|
||||
if self.registry is not None:
|
||||
self.registry.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||
@@ -234,14 +223,8 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||
if self.registry is not None:
|
||||
self.registry.unregister(all_gauges.pop(self.name))
|
||||
|
||||
if self.registry is not None:
|
||||
self.registry.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
class GaugeBucketCollector(Collector):
|
||||
|
||||
+13
-2
@@ -267,16 +267,27 @@ class Notifier:
|
||||
|
||||
return sum(stream.count_listeners() for stream in all_user_streams)
|
||||
|
||||
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
|
||||
LaterGauge(
|
||||
"synapse_notifier_listeners",
|
||||
"",
|
||||
[],
|
||||
count_listeners,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
"synapse_notifier_rooms",
|
||||
"",
|
||||
[],
|
||||
lambda: count(bool, list(self.room_to_user_streams.values())),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
LaterGauge(
|
||||
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
|
||||
"synapse_notifier_users",
|
||||
"",
|
||||
[],
|
||||
lambda: len(self.user_to_user_stream),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||
|
||||
@@ -215,6 +215,7 @@ class ReplicationCommandHandler:
|
||||
"",
|
||||
[],
|
||||
lambda: len(self._connections),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||
@@ -241,6 +242,7 @@ class ReplicationCommandHandler:
|
||||
(stream_name,): len(queue)
|
||||
for stream_name, queue in self._command_queues_by_stream.items()
|
||||
},
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self._is_master = hs.config.worker.worker_app is None
|
||||
|
||||
@@ -572,6 +572,7 @@ class DatabasePool:
|
||||
"Background update status",
|
||||
[],
|
||||
self.updates.get_status,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
|
||||
@@ -121,6 +121,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
"",
|
||||
[],
|
||||
lambda: self._known_servers_count,
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_count_known_servers")
|
||||
|
||||
@@ -132,6 +132,7 @@ class TaskScheduler:
|
||||
"The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labels=None,
|
||||
caller=lambda: len(self._running_tasks),
|
||||
registry=hs.metrics_collector_registry,
|
||||
)
|
||||
|
||||
def register_action(
|
||||
|
||||
Reference in New Issue
Block a user