1
0

Compare commits

...

15 Commits

Author SHA1 Message Date
Devon Hudson
c9f212ab44 Revert "Fix LaterGauge metrics to collect from all servers (#18751)"
This reverts commit 076db0ab49.
2025-08-06 15:49:26 -06:00
Devon Hudson
85e3adba86 Revert "Temporarily disable problem test"
This reverts commit 4333eff1d5.
2025-08-06 15:49:03 -06:00
Devon Hudson
d3bdf8b091 Revert "Temporarily disable all tests that call generate_latest"
This reverts commit d8ab5434d5.
2025-08-06 15:48:36 -06:00
Devon Hudson
d8ab5434d5 Temporarily disable all tests that call generate_latest 2025-08-06 15:21:42 -06:00
Devon Hudson
4333eff1d5 Temporarily disable problem test 2025-08-06 14:39:35 -06:00
Devon Hudson
c9f04f3484 Re-enable parallel 2025-08-06 14:39:18 -06:00
Olivier 'reivilibre
a387d6ecf8 better monitor 2025-08-06 18:03:31 +01:00
Olivier 'reivilibre
9e473d9e38 fail slow 2025-08-06 18:02:30 +01:00
Olivier 'reivilibre
d2ea7e32f5 Revert "choose a test"
This reverts commit a256423553.
2025-08-06 18:02:11 +01:00
Olivier 'reivilibre
2db0f1e49b don't wait for lint 2025-08-06 17:58:14 +01:00
Olivier 'reivilibre
a256423553 choose a test 2025-08-06 17:56:54 +01:00
Olivier 'reivilibre
e91aa4fd2f debug diskspace 2025-08-06 17:12:49 +01:00
Olivier 'reivilibre
499c1631de no parallel? 2025-08-06 16:29:03 +01:00
Olivier 'reivilibre
4ecd9aba95 Newsfile
Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
2025-08-06 16:07:32 +01:00
Olivier 'reivilibre
4e947d05ab Remove stray @DEBUG 2025-08-06 16:07:07 +01:00
17 changed files with 163 additions and 245 deletions

View File

@@ -373,7 +373,7 @@ jobs:
calculate-test-jobs: calculate-test-jobs:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done # needs: linting-done
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
@@ -393,6 +393,7 @@ jobs:
- changes - changes
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
fail-fast: false
matrix: matrix:
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }} job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
@@ -426,7 +427,24 @@ jobs:
if: ${{ matrix.job.postgres-version }} if: ${{ matrix.job.postgres-version }}
timeout-minutes: 2 timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=6 tests - run: |
(
while true; do
echo "......."
date
df -h | grep root
free -m
sleep 10
done
) &
MONITOR_PID=$!
poetry run trial --jobs=6 tests
STATUS=$?
kill $MONITOR_PID
exit $STATUS
env: env:
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }} SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: /var/run/postgresql SYNAPSE_POSTGRES_HOST: /var/run/postgresql

View File

@@ -1 +0,0 @@
Fix `LaterGauge` metrics to collect from all servers.

1
changelog.d/18787.misc Normal file
View File

@@ -0,0 +1 @@
CI debugging.

View File

@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
""" """
import logging import logging
from enum import Enum
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Dict, Dict,
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender): class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender""" """A drop in replacement for FederationSender"""
@@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner # we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which # lambda binds to the queue rather than to the name of the queue which
# changes. ARGH. # changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None: def register(name: str, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook( LaterGauge(
lambda: {(self.server_name,): len(queue)} name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
) )
for queue_name in QueueNames: for queue_name in [
queue = getattr(self, queue_name.value) "presence_map",
assert isinstance(queue, Sized) "keyed_edu",
register(queue_name, queue=queue) "keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
self.clock.looping_call(self._clear_queue, 30 * 1000) self.clock.looping_call(self._clear_queue, 30 * 1000)

View File

@@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL], labelnames=[SERVER_NAME_LABEL],
) )
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have # Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. # catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is # Please note that rate limiting still applies, so while the loop is
@@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue # map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {} self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
transaction_queue_pending_destinations_gauge.register_hook( LaterGauge(
lambda: { name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
1 1
for d in self._per_destination_queues.values() for d in self._per_destination_queues.values()
if d.transmission_loop_running if d.transmission_loop_running
) )
} },
) )
transaction_queue_pending_pdus_gauge.register_hook(
lambda: { LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values() d.pending_pdu_count() for d in self._per_destination_queues.values()
) )
} },
) )
transaction_queue_pending_edus_gauge.register_hook( LaterGauge(
lambda: { name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): sum( (self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values() d.pending_edu_count() for d in self._per_destination_queues.values()
) )
} },
) )
self._is_processing = False self._is_processing = False

View File

@@ -173,18 +173,6 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL], labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
) )
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active" # "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000 LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence EduTypes.PRESENCE, self.incoming_presence
) )
presence_user_to_current_state_size_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): len(self.user_to_current_state)} name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
) )
# The per-device presence state, maps user to devices to per-device presence state. # The per-device presence state, maps user to devices to per-device presence state.
@@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000, 60 * 1000,
) )
presence_wheel_timer_size_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): len(self.wheel_timer)} name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
) )
# Used to handle sending of presence to newly joined users/servers # Used to handle sending of presence to newly joined users/servers

View File

@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts return counts
in_flight_requests = LaterGauge( LaterGauge(
name="synapse_http_server_in_flight_requests_count", name="synapse_http_server_in_flight_requests_count",
desc="", desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL], labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
) )
in_flight_requests.register_hook(_get_in_flight_counts)
class RequestMetrics: class RequestMetrics:

View File

@@ -31,7 +31,6 @@ from typing import (
Dict, Dict,
Generic, Generic,
Iterable, Iterable,
List,
Mapping, Mapping,
Optional, Optional,
Sequence, Sequence,
@@ -74,6 +73,8 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics" METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name" SERVER_NAME_LABEL = "server_name"
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
name: str name: str
desc: str desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False) labelnames: Optional[StrSequence] = attr.ib(hash=False)
# List of callbacks: each callback should either return a value (if there are no # callback: should either return a value (if there are no labels for this metric),
# labels for this metric), or dict mapping from a label tuple to a value # or dict mapping from a label tuple to a value
_hooks: List[ caller: Callable[
Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ]
]
] = attr.ib(factory=list, hash=False)
def collect(self) -> Iterable[Metric]: def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up). # (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
for hook in self._hooks: try:
try: calls = self.caller()
hook_result = hook() except Exception:
except Exception: logger.exception("Exception running callback for LaterGauge(%s)", self.name)
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
yield g yield g
return
def register_hook( if isinstance(calls, (int, float)):
self, g.add_metric([], calls)
hook: Callable[ else:
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] for k, v in calls.items():
], g.add_metric(k, v)
) -> None:
self._hooks.append(hook) yield g
def __attrs_post_init__(self) -> None: def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self) REGISTRY.register(self)
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`, # `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations # Protects access to _registrations
self._lock = threading.Lock() self._lock = threading.Lock()
REGISTRY.register(self) self._register_with_collector()
def register( def register(
self, self,
@@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name)) gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
""" """

View File

@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL], labelnames=["stream", SERVER_NAME_LABEL],
) )
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T") T = TypeVar("T")
@@ -299,16 +281,28 @@ class Notifier:
) )
} }
notifier_listeners_gauge.register_hook(count_listeners) LaterGauge(
notifier_rooms_gauge.register_hook( name="synapse_notifier_listeners",
lambda: { desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
(self.server_name,): count( (self.server_name,): count(
bool, list(self.room_to_user_streams.values()) bool, list(self.room_to_user_streams.values())
) )
} },
) )
notifier_users_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): len(self.user_to_user_stream)} name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
) )
def add_replication_callback(self, cb: Callable[[], None]) -> None: def add_replication_callback(self, cb: Callable[[], None]) -> None:

View File

@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
) )
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream # the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[ _StreamCommandQueue = Deque[
@@ -255,8 +243,11 @@ class ReplicationCommandHandler:
# outgoing replication commands to.) # outgoing replication commands to.)
self._connections: List[IReplicationConnection] = [] self._connections: List[IReplicationConnection] = []
tcp_resource_total_connections_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): len(self._connections)} name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
) )
# When POSITION or RDATA commands arrive, we stick them in a queue and process # When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -275,11 +266,14 @@ class ReplicationCommandHandler:
# from that connection. # from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
tcp_command_queue_gauge.register_hook( LaterGauge(
lambda: { name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
(stream_name, self.server_name): len(queue) (stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items() for stream_name, queue in self._command_queues_by_stream.items()
} },
) )
self._is_master = hs.config.worker.worker_app is None self._is_master = hs.config.worker.worker_app is None

View File

@@ -527,11 +527,9 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands", name="synapse_replication_tcp_protocol_pending_commands",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
) caller=lambda: {
pending_commands.register_hook(
lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections (p.name, p.server_name): len(p.pending_commands) for p in connected_connections
} },
) )
@@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer", name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
) caller=lambda: {
transport_send_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
} },
) )
@@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
) caller=lambda: {
tcp_transport_kernel_send_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False) (p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections for p in connected_connections
} },
) )
@@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="", desc="",
labelnames=["name", SERVER_NAME_LABEL], labelnames=["name", SERVER_NAME_LABEL],
) caller=lambda: {
tcp_transport_kernel_read_buffer.register_hook(
lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True) (p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections for p in connected_connections
} },
) )

View File

@@ -100,12 +100,6 @@ sql_txn_duration = Counter(
labelnames=["desc", SERVER_NAME_LABEL], labelnames=["desc", SERVER_NAME_LABEL],
) )
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
)
# Unique indexes which have been added in background updates. Maps from table name # Unique indexes which have been added in background updates. Maps from table name
# to the name of the background update which added the unique index to that table. # to the name of the background update which added the unique index to that table.
@@ -617,8 +611,11 @@ class DatabasePool:
) )
self.updates = BackgroundUpdater(hs, self) self.updates = BackgroundUpdater(hs, self)
background_update_status.register_hook( LaterGauge(
lambda: {(self.server_name,): self.updates.get_status()}, name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
) )
self._previous_txn_total_time = 0.0 self._previous_txn_total_time = 0.0

View File

@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True) @attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership: class EventIdMembership:
"""Returned by `get_membership_from_event_ids`""" """Returned by `get_membership_from_event_ids`"""
@@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1, 1,
self._count_known_servers, self._count_known_servers,
) )
federation_known_servers_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): self._known_servers_count} name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
) )
@wrap_as_background_process("_count_known_servers") @wrap_as_background_process("_count_known_servers")

View File

@@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can # We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general # differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation. # ratelimit tuning problem across the federation.
sleep_affected_hosts_gauge = LaterGauge( LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts", name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep", desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
) caller=lambda: _get_counts_from_rate_limiter_instance(
sleep_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_sleep() ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()
) )
) ),
) )
reject_affected_hosts_gauge = LaterGauge( LaterGauge(
name="synapse_rate_limit_reject_affected_hosts", name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected", desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL], labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
) caller=lambda: _get_counts_from_rate_limiter_instance(
reject_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum( lambda rate_limiter_instance: sum(
ratelimiter.should_reject() ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values() for ratelimiter in rate_limiter_instance.ratelimiters.values()
) )
) ),
) )

View File

@@ -44,13 +44,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler: class TaskScheduler:
""" """
This is a simple task scheduler designed for resumable tasks. Normally, This is a simple task scheduler designed for resumable tasks. Normally,
@@ -137,8 +130,11 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS, TaskScheduler.SCHEDULE_INTERVAL_MS,
) )
running_tasks_gauge.register_hook( LaterGauge(
lambda: {(self.server_name,): len(self._running_tasks)} name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
) )
def register_action( def register_action(

View File

@@ -22,7 +22,7 @@
from synapse.api.constants import EduTypes from synapse.api.constants import EduTypes
from tests import unittest from tests import unittest
from tests.unittest import DEBUG, override_config from tests.unittest import override_config
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase): class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
@@ -48,7 +48,6 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
) )
self.assertEqual(200, channel.code) self.assertEqual(200, channel.code)
@DEBUG
def test_edu_debugging_doesnt_explode(self) -> None: def test_edu_debugging_doesnt_explode(self) -> None:
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled. """Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.

View File

@@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple
from prometheus_client.core import Sample from prometheus_client.core import Sample
from synapse.metrics import ( from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest from tests import unittest
@@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0") self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
later_gauge.register_hook(lambda: {("hs1",): 1})
later_gauge.register_hook(lambda: {("hs2",): 2})
metrics_map = get_latest_metrics()
# Find the metrics for the caches from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
)
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
# Sanity check the metric values
self.assertEqual(hs1_metric_value, "1.0")
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]: def get_latest_metrics() -> Dict[str, str]:
""" """
Collect the latest metrics from the registry and parse them into an easy to use map. Collect the latest metrics from the registry and parse them into an easy to use map.