Compare commits
15 Commits
anoa/modul
...
rei/ci_deb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9f212ab44 | ||
|
|
85e3adba86 | ||
|
|
d3bdf8b091 | ||
|
|
d8ab5434d5 | ||
|
|
4333eff1d5 | ||
|
|
c9f04f3484 | ||
|
|
a387d6ecf8 | ||
|
|
9e473d9e38 | ||
|
|
d2ea7e32f5 | ||
|
|
2db0f1e49b | ||
|
|
a256423553 | ||
|
|
e91aa4fd2f | ||
|
|
499c1631de | ||
|
|
4ecd9aba95 | ||
|
|
4e947d05ab |
22
.github/workflows/tests.yml
vendored
22
.github/workflows/tests.yml
vendored
@@ -373,7 +373,7 @@ jobs:
|
|||||||
|
|
||||||
calculate-test-jobs:
|
calculate-test-jobs:
|
||||||
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
|
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
|
||||||
needs: linting-done
|
# needs: linting-done
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||||
@@ -393,6 +393,7 @@ jobs:
|
|||||||
- changes
|
- changes
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
|
job: ${{ fromJson(needs.calculate-test-jobs.outputs.trial_test_matrix) }}
|
||||||
|
|
||||||
@@ -426,7 +427,24 @@ jobs:
|
|||||||
if: ${{ matrix.job.postgres-version }}
|
if: ${{ matrix.job.postgres-version }}
|
||||||
timeout-minutes: 2
|
timeout-minutes: 2
|
||||||
run: until pg_isready -h localhost; do sleep 1; done
|
run: until pg_isready -h localhost; do sleep 1; done
|
||||||
- run: poetry run trial --jobs=6 tests
|
- run: |
|
||||||
|
(
|
||||||
|
while true; do
|
||||||
|
echo "......."
|
||||||
|
date
|
||||||
|
df -h | grep root
|
||||||
|
free -m
|
||||||
|
sleep 10
|
||||||
|
done
|
||||||
|
) &
|
||||||
|
MONITOR_PID=$!
|
||||||
|
|
||||||
|
poetry run trial --jobs=6 tests
|
||||||
|
STATUS=$?
|
||||||
|
|
||||||
|
kill $MONITOR_PID
|
||||||
|
exit $STATUS
|
||||||
|
|
||||||
env:
|
env:
|
||||||
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
|
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
|
||||||
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
|
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
Fix `LaterGauge` metrics to collect from all servers.
|
|
||||||
1
changelog.d/18787.misc
Normal file
1
changelog.d/18787.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
CI debugging.
|
||||||
@@ -37,7 +37,6 @@ Events are replicated via a separate events stream.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from enum import Enum
|
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Dict,
|
Dict,
|
||||||
@@ -68,25 +67,6 @@ if TYPE_CHECKING:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class QueueNames(str, Enum):
|
|
||||||
PRESENCE_MAP = "presence_map"
|
|
||||||
KEYED_EDU = "keyed_edu"
|
|
||||||
KEYED_EDU_CHANGED = "keyed_edu_changed"
|
|
||||||
EDUS = "edus"
|
|
||||||
POS_TIME = "pos_time"
|
|
||||||
PRESENCE_DESTINATIONS = "presence_destinations"
|
|
||||||
|
|
||||||
|
|
||||||
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
|
|
||||||
|
|
||||||
for queue_name in QueueNames:
|
|
||||||
queue_name_to_gauge_map[queue_name] = LaterGauge(
|
|
||||||
name=f"synapse_federation_send_queue_{queue_name.value}_size",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
"""A drop in replacement for FederationSender"""
|
"""A drop in replacement for FederationSender"""
|
||||||
|
|
||||||
@@ -131,15 +111,23 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
|||||||
# we make a new function, so we need to make a new function so the inner
|
# we make a new function, so we need to make a new function so the inner
|
||||||
# lambda binds to the queue rather than to the name of the queue which
|
# lambda binds to the queue rather than to the name of the queue which
|
||||||
# changes. ARGH.
|
# changes. ARGH.
|
||||||
def register(queue_name: QueueNames, queue: Sized) -> None:
|
def register(name: str, queue: Sized) -> None:
|
||||||
queue_name_to_gauge_map[queue_name].register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(queue)}
|
name="synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(queue)},
|
||||||
)
|
)
|
||||||
|
|
||||||
for queue_name in QueueNames:
|
for queue_name in [
|
||||||
queue = getattr(self, queue_name.value)
|
"presence_map",
|
||||||
assert isinstance(queue, Sized)
|
"keyed_edu",
|
||||||
register(queue_name, queue=queue)
|
"keyed_edu_changed",
|
||||||
|
"edus",
|
||||||
|
"pos_time",
|
||||||
|
"presence_destinations",
|
||||||
|
]:
|
||||||
|
register(queue_name, getattr(self, queue_name))
|
||||||
|
|
||||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||||
|
|
||||||
|
|||||||
@@ -199,24 +199,6 @@ sent_pdus_destination_dist_total = Counter(
|
|||||||
labelnames=[SERVER_NAME_LABEL],
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
)
|
)
|
||||||
|
|
||||||
transaction_queue_pending_destinations_gauge = LaterGauge(
|
|
||||||
name="synapse_federation_transaction_queue_pending_destinations",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
transaction_queue_pending_pdus_gauge = LaterGauge(
|
|
||||||
name="synapse_federation_transaction_queue_pending_pdus",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
transaction_queue_pending_edus_gauge = LaterGauge(
|
|
||||||
name="synapse_federation_transaction_queue_pending_edus",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Time (in s) to wait before trying to wake up destinations that have
|
# Time (in s) to wait before trying to wake up destinations that have
|
||||||
# catch-up outstanding.
|
# catch-up outstanding.
|
||||||
# Please note that rate limiting still applies, so while the loop is
|
# Please note that rate limiting still applies, so while the loop is
|
||||||
@@ -416,28 +398,38 @@ class FederationSender(AbstractFederationSender):
|
|||||||
# map from destination to PerDestinationQueue
|
# map from destination to PerDestinationQueue
|
||||||
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
|
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
|
||||||
|
|
||||||
transaction_queue_pending_destinations_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {
|
name="synapse_federation_transaction_queue_pending_destinations",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {
|
||||||
(self.server_name,): sum(
|
(self.server_name,): sum(
|
||||||
1
|
1
|
||||||
for d in self._per_destination_queues.values()
|
for d in self._per_destination_queues.values()
|
||||||
if d.transmission_loop_running
|
if d.transmission_loop_running
|
||||||
)
|
)
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
transaction_queue_pending_pdus_gauge.register_hook(
|
|
||||||
lambda: {
|
LaterGauge(
|
||||||
|
name="synapse_federation_transaction_queue_pending_pdus",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {
|
||||||
(self.server_name,): sum(
|
(self.server_name,): sum(
|
||||||
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
||||||
)
|
)
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
transaction_queue_pending_edus_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {
|
name="synapse_federation_transaction_queue_pending_edus",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {
|
||||||
(self.server_name,): sum(
|
(self.server_name,): sum(
|
||||||
d.pending_edu_count() for d in self._per_destination_queues.values()
|
d.pending_edu_count() for d in self._per_destination_queues.values()
|
||||||
)
|
)
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
|||||||
@@ -173,18 +173,6 @@ state_transition_counter = Counter(
|
|||||||
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
||||||
)
|
)
|
||||||
|
|
||||||
presence_user_to_current_state_size_gauge = LaterGauge(
|
|
||||||
name="synapse_handlers_presence_user_to_current_state_size",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
presence_wheel_timer_size_gauge = LaterGauge(
|
|
||||||
name="synapse_handlers_presence_wheel_timer_size",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||||
# "currently_active"
|
# "currently_active"
|
||||||
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
||||||
@@ -791,8 +779,11 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
EduTypes.PRESENCE, self.incoming_presence
|
EduTypes.PRESENCE, self.incoming_presence
|
||||||
)
|
)
|
||||||
|
|
||||||
presence_user_to_current_state_size_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(self.user_to_current_state)}
|
name="synapse_handlers_presence_user_to_current_state_size",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||||
)
|
)
|
||||||
|
|
||||||
# The per-device presence state, maps user to devices to per-device presence state.
|
# The per-device presence state, maps user to devices to per-device presence state.
|
||||||
@@ -891,8 +882,11 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
60 * 1000,
|
60 * 1000,
|
||||||
)
|
)
|
||||||
|
|
||||||
presence_wheel_timer_size_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(self.wheel_timer)}
|
name="synapse_handlers_presence_wheel_timer_size",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Used to handle sending of presence to newly joined users/servers
|
# Used to handle sending of presence to newly joined users/servers
|
||||||
|
|||||||
@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
|
|||||||
return counts
|
return counts
|
||||||
|
|
||||||
|
|
||||||
in_flight_requests = LaterGauge(
|
LaterGauge(
|
||||||
name="synapse_http_server_in_flight_requests_count",
|
name="synapse_http_server_in_flight_requests_count",
|
||||||
desc="",
|
desc="",
|
||||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||||
|
caller=_get_in_flight_counts,
|
||||||
)
|
)
|
||||||
in_flight_requests.register_hook(_get_in_flight_counts)
|
|
||||||
|
|
||||||
|
|
||||||
class RequestMetrics:
|
class RequestMetrics:
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ from typing import (
|
|||||||
Dict,
|
Dict,
|
||||||
Generic,
|
Generic,
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
|
||||||
Mapping,
|
Mapping,
|
||||||
Optional,
|
Optional,
|
||||||
Sequence,
|
Sequence,
|
||||||
@@ -74,6 +73,8 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
METRICS_PREFIX = "/_synapse/metrics"
|
METRICS_PREFIX = "/_synapse/metrics"
|
||||||
|
|
||||||
|
all_gauges: Dict[str, Collector] = {}
|
||||||
|
|
||||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||||
|
|
||||||
SERVER_NAME_LABEL = "server_name"
|
SERVER_NAME_LABEL = "server_name"
|
||||||
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
|
|||||||
name: str
|
name: str
|
||||||
desc: str
|
desc: str
|
||||||
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
||||||
# List of callbacks: each callback should either return a value (if there are no
|
# callback: should either return a value (if there are no labels for this metric),
|
||||||
# labels for this metric), or dict mapping from a label tuple to a value
|
# or dict mapping from a label tuple to a value
|
||||||
_hooks: List[
|
caller: Callable[
|
||||||
Callable[
|
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
]
|
||||||
]
|
|
||||||
] = attr.ib(factory=list, hash=False)
|
|
||||||
|
|
||||||
def collect(self) -> Iterable[Metric]:
|
def collect(self) -> Iterable[Metric]:
|
||||||
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
||||||
# (we don't enforce it here, one level up).
|
# (we don't enforce it here, one level up).
|
||||||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
||||||
|
|
||||||
for hook in self._hooks:
|
try:
|
||||||
try:
|
calls = self.caller()
|
||||||
hook_result = hook()
|
except Exception:
|
||||||
except Exception:
|
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
||||||
logger.exception(
|
|
||||||
"Exception running callback for LaterGauge(%s)", self.name
|
|
||||||
)
|
|
||||||
yield g
|
|
||||||
return
|
|
||||||
|
|
||||||
if isinstance(hook_result, (int, float)):
|
|
||||||
g.add_metric([], hook_result)
|
|
||||||
else:
|
|
||||||
for k, v in hook_result.items():
|
|
||||||
g.add_metric(k, v)
|
|
||||||
|
|
||||||
yield g
|
yield g
|
||||||
|
return
|
||||||
|
|
||||||
def register_hook(
|
if isinstance(calls, (int, float)):
|
||||||
self,
|
g.add_metric([], calls)
|
||||||
hook: Callable[
|
else:
|
||||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
for k, v in calls.items():
|
||||||
],
|
g.add_metric(k, v)
|
||||||
) -> None:
|
|
||||||
self._hooks.append(hook)
|
yield g
|
||||||
|
|
||||||
def __attrs_post_init__(self) -> None:
|
def __attrs_post_init__(self) -> None:
|
||||||
|
self._register()
|
||||||
|
|
||||||
|
def _register(self) -> None:
|
||||||
|
if self.name in all_gauges.keys():
|
||||||
|
logger.warning("%s already registered, reregistering", self.name)
|
||||||
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||||
|
|
||||||
REGISTRY.register(self)
|
REGISTRY.register(self)
|
||||||
|
all_gauges[self.name] = self
|
||||||
|
|
||||||
|
|
||||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||||
@@ -254,7 +250,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
# Protects access to _registrations
|
# Protects access to _registrations
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
REGISTRY.register(self)
|
self._register_with_collector()
|
||||||
|
|
||||||
def register(
|
def register(
|
||||||
self,
|
self,
|
||||||
@@ -345,6 +341,14 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
|||||||
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||||
yield gauge
|
yield gauge
|
||||||
|
|
||||||
|
def _register_with_collector(self) -> None:
|
||||||
|
if self.name in all_gauges.keys():
|
||||||
|
logger.warning("%s already registered, reregistering", self.name)
|
||||||
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||||
|
|
||||||
|
REGISTRY.register(self)
|
||||||
|
all_gauges[self.name] = self
|
||||||
|
|
||||||
|
|
||||||
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -86,24 +86,6 @@ users_woken_by_stream_counter = Counter(
|
|||||||
labelnames=["stream", SERVER_NAME_LABEL],
|
labelnames=["stream", SERVER_NAME_LABEL],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
notifier_listeners_gauge = LaterGauge(
|
|
||||||
name="synapse_notifier_listeners",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
notifier_rooms_gauge = LaterGauge(
|
|
||||||
name="synapse_notifier_rooms",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
notifier_users_gauge = LaterGauge(
|
|
||||||
name="synapse_notifier_users",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
T = TypeVar("T")
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
@@ -299,16 +281,28 @@ class Notifier:
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
notifier_listeners_gauge.register_hook(count_listeners)
|
LaterGauge(
|
||||||
notifier_rooms_gauge.register_hook(
|
name="synapse_notifier_listeners",
|
||||||
lambda: {
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=count_listeners,
|
||||||
|
)
|
||||||
|
|
||||||
|
LaterGauge(
|
||||||
|
name="synapse_notifier_rooms",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {
|
||||||
(self.server_name,): count(
|
(self.server_name,): count(
|
||||||
bool, list(self.room_to_user_streams.values())
|
bool, list(self.room_to_user_streams.values())
|
||||||
)
|
)
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
notifier_users_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(self.user_to_user_stream)}
|
name="synapse_notifier_users",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||||
|
|||||||
@@ -106,18 +106,6 @@ user_ip_cache_counter = Counter(
|
|||||||
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
||||||
)
|
)
|
||||||
|
|
||||||
tcp_resource_total_connections_gauge = LaterGauge(
|
|
||||||
name="synapse_replication_tcp_resource_total_connections",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
tcp_command_queue_gauge = LaterGauge(
|
|
||||||
name="synapse_replication_tcp_command_queue",
|
|
||||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
|
||||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# the type of the entries in _command_queues_by_stream
|
# the type of the entries in _command_queues_by_stream
|
||||||
_StreamCommandQueue = Deque[
|
_StreamCommandQueue = Deque[
|
||||||
@@ -255,8 +243,11 @@ class ReplicationCommandHandler:
|
|||||||
# outgoing replication commands to.)
|
# outgoing replication commands to.)
|
||||||
self._connections: List[IReplicationConnection] = []
|
self._connections: List[IReplicationConnection] = []
|
||||||
|
|
||||||
tcp_resource_total_connections_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(self._connections)}
|
name="synapse_replication_tcp_resource_total_connections",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(self._connections)},
|
||||||
)
|
)
|
||||||
|
|
||||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||||
@@ -275,11 +266,14 @@ class ReplicationCommandHandler:
|
|||||||
# from that connection.
|
# from that connection.
|
||||||
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
|
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
|
||||||
|
|
||||||
tcp_command_queue_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {
|
name="synapse_replication_tcp_command_queue",
|
||||||
|
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||||
|
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {
|
||||||
(stream_name, self.server_name): len(queue)
|
(stream_name, self.server_name): len(queue)
|
||||||
for stream_name, queue in self._command_queues_by_stream.items()
|
for stream_name, queue in self._command_queues_by_stream.items()
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
self._is_master = hs.config.worker.worker_app is None
|
self._is_master = hs.config.worker.worker_app is None
|
||||||
|
|||||||
@@ -527,11 +527,9 @@ pending_commands = LaterGauge(
|
|||||||
name="synapse_replication_tcp_protocol_pending_commands",
|
name="synapse_replication_tcp_protocol_pending_commands",
|
||||||
desc="",
|
desc="",
|
||||||
labelnames=["name", SERVER_NAME_LABEL],
|
labelnames=["name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: {
|
||||||
pending_commands.register_hook(
|
|
||||||
lambda: {
|
|
||||||
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
|
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -546,11 +544,9 @@ transport_send_buffer = LaterGauge(
|
|||||||
name="synapse_replication_tcp_protocol_transport_send_buffer",
|
name="synapse_replication_tcp_protocol_transport_send_buffer",
|
||||||
desc="",
|
desc="",
|
||||||
labelnames=["name", SERVER_NAME_LABEL],
|
labelnames=["name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: {
|
||||||
transport_send_buffer.register_hook(
|
|
||||||
lambda: {
|
|
||||||
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
|
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -575,12 +571,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
|
|||||||
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
||||||
desc="",
|
desc="",
|
||||||
labelnames=["name", SERVER_NAME_LABEL],
|
labelnames=["name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: {
|
||||||
tcp_transport_kernel_send_buffer.register_hook(
|
|
||||||
lambda: {
|
|
||||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
|
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
|
||||||
for p in connected_connections
|
for p in connected_connections
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -588,10 +582,8 @@ tcp_transport_kernel_read_buffer = LaterGauge(
|
|||||||
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
||||||
desc="",
|
desc="",
|
||||||
labelnames=["name", SERVER_NAME_LABEL],
|
labelnames=["name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: {
|
||||||
tcp_transport_kernel_read_buffer.register_hook(
|
|
||||||
lambda: {
|
|
||||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
|
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
|
||||||
for p in connected_connections
|
for p in connected_connections
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -100,12 +100,6 @@ sql_txn_duration = Counter(
|
|||||||
labelnames=["desc", SERVER_NAME_LABEL],
|
labelnames=["desc", SERVER_NAME_LABEL],
|
||||||
)
|
)
|
||||||
|
|
||||||
background_update_status = LaterGauge(
|
|
||||||
name="synapse_background_update_status",
|
|
||||||
desc="Background update status",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Unique indexes which have been added in background updates. Maps from table name
|
# Unique indexes which have been added in background updates. Maps from table name
|
||||||
# to the name of the background update which added the unique index to that table.
|
# to the name of the background update which added the unique index to that table.
|
||||||
@@ -617,8 +611,11 @@ class DatabasePool:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.updates = BackgroundUpdater(hs, self)
|
self.updates = BackgroundUpdater(hs, self)
|
||||||
background_update_status.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): self.updates.get_status()},
|
name="synapse_background_update_status",
|
||||||
|
desc="Background update status",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): self.updates.get_status()},
|
||||||
)
|
)
|
||||||
|
|
||||||
self._previous_txn_total_time = 0.0
|
self._previous_txn_total_time = 0.0
|
||||||
|
|||||||
@@ -84,13 +84,6 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
|||||||
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
|
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
|
||||||
|
|
||||||
|
|
||||||
federation_known_servers_gauge = LaterGauge(
|
|
||||||
name="synapse_federation_known_servers",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
class EventIdMembership:
|
class EventIdMembership:
|
||||||
"""Returned by `get_membership_from_event_ids`"""
|
"""Returned by `get_membership_from_event_ids`"""
|
||||||
@@ -123,8 +116,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
|||||||
1,
|
1,
|
||||||
self._count_known_servers,
|
self._count_known_servers,
|
||||||
)
|
)
|
||||||
federation_known_servers_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): self._known_servers_count}
|
name="synapse_federation_known_servers",
|
||||||
|
desc="",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): self._known_servers_count},
|
||||||
)
|
)
|
||||||
|
|
||||||
@wrap_as_background_process("_count_known_servers")
|
@wrap_as_background_process("_count_known_servers")
|
||||||
|
|||||||
@@ -131,31 +131,27 @@ def _get_counts_from_rate_limiter_instance(
|
|||||||
# We track the number of affected hosts per time-period so we can
|
# We track the number of affected hosts per time-period so we can
|
||||||
# differentiate one really noisy homeserver from a general
|
# differentiate one really noisy homeserver from a general
|
||||||
# ratelimit tuning problem across the federation.
|
# ratelimit tuning problem across the federation.
|
||||||
sleep_affected_hosts_gauge = LaterGauge(
|
LaterGauge(
|
||||||
name="synapse_rate_limit_sleep_affected_hosts",
|
name="synapse_rate_limit_sleep_affected_hosts",
|
||||||
desc="Number of hosts that had requests put to sleep",
|
desc="Number of hosts that had requests put to sleep",
|
||||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||||
sleep_affected_hosts_gauge.register_hook(
|
|
||||||
lambda: _get_counts_from_rate_limiter_instance(
|
|
||||||
lambda rate_limiter_instance: sum(
|
lambda rate_limiter_instance: sum(
|
||||||
ratelimiter.should_sleep()
|
ratelimiter.should_sleep()
|
||||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
)
|
)
|
||||||
reject_affected_hosts_gauge = LaterGauge(
|
LaterGauge(
|
||||||
name="synapse_rate_limit_reject_affected_hosts",
|
name="synapse_rate_limit_reject_affected_hosts",
|
||||||
desc="Number of hosts that had requests rejected",
|
desc="Number of hosts that had requests rejected",
|
||||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||||
)
|
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||||
reject_affected_hosts_gauge.register_hook(
|
|
||||||
lambda: _get_counts_from_rate_limiter_instance(
|
|
||||||
lambda rate_limiter_instance: sum(
|
lambda rate_limiter_instance: sum(
|
||||||
ratelimiter.should_reject()
|
ratelimiter.should_reject()
|
||||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||||
)
|
)
|
||||||
)
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -44,13 +44,6 @@ if TYPE_CHECKING:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
running_tasks_gauge = LaterGauge(
|
|
||||||
name="synapse_scheduler_running_tasks",
|
|
||||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TaskScheduler:
|
class TaskScheduler:
|
||||||
"""
|
"""
|
||||||
This is a simple task scheduler designed for resumable tasks. Normally,
|
This is a simple task scheduler designed for resumable tasks. Normally,
|
||||||
@@ -137,8 +130,11 @@ class TaskScheduler:
|
|||||||
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
||||||
)
|
)
|
||||||
|
|
||||||
running_tasks_gauge.register_hook(
|
LaterGauge(
|
||||||
lambda: {(self.server_name,): len(self._running_tasks)}
|
name="synapse_scheduler_running_tasks",
|
||||||
|
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||||
|
labelnames=[SERVER_NAME_LABEL],
|
||||||
|
caller=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||||
)
|
)
|
||||||
|
|
||||||
def register_action(
|
def register_action(
|
||||||
|
|||||||
@@ -22,7 +22,7 @@
|
|||||||
from synapse.api.constants import EduTypes
|
from synapse.api.constants import EduTypes
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from tests.unittest import DEBUG, override_config
|
from tests.unittest import override_config
|
||||||
|
|
||||||
|
|
||||||
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
||||||
@@ -48,7 +48,6 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual(200, channel.code)
|
self.assertEqual(200, channel.code)
|
||||||
|
|
||||||
@DEBUG
|
|
||||||
def test_edu_debugging_doesnt_explode(self) -> None:
|
def test_edu_debugging_doesnt_explode(self) -> None:
|
||||||
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.
|
"""Sanity check incoming federation succeeds with `synapse.debug_8631` enabled.
|
||||||
|
|
||||||
|
|||||||
@@ -22,13 +22,7 @@ from typing import Dict, Protocol, Tuple
|
|||||||
|
|
||||||
from prometheus_client.core import Sample
|
from prometheus_client.core import Sample
|
||||||
|
|
||||||
from synapse.metrics import (
|
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
|
||||||
REGISTRY,
|
|
||||||
SERVER_NAME_LABEL,
|
|
||||||
InFlightGauge,
|
|
||||||
LaterGauge,
|
|
||||||
generate_latest,
|
|
||||||
)
|
|
||||||
from synapse.util.caches.deferred_cache import DeferredCache
|
from synapse.util.caches.deferred_cache import DeferredCache
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
@@ -291,42 +285,6 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
|
|||||||
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
||||||
|
|
||||||
|
|
||||||
class LaterGaugeTests(unittest.HomeserverTestCase):
|
|
||||||
def test_later_gauge_multiple_servers(self) -> None:
|
|
||||||
"""
|
|
||||||
Test that LaterGauge metrics are reported correctly across multiple servers. We
|
|
||||||
will have an metrics entry for each homeserver that is labeled with the
|
|
||||||
`server_name` label.
|
|
||||||
"""
|
|
||||||
later_gauge = LaterGauge(
|
|
||||||
name="foo",
|
|
||||||
desc="",
|
|
||||||
labelnames=[SERVER_NAME_LABEL],
|
|
||||||
)
|
|
||||||
later_gauge.register_hook(lambda: {("hs1",): 1})
|
|
||||||
later_gauge.register_hook(lambda: {("hs2",): 2})
|
|
||||||
|
|
||||||
metrics_map = get_latest_metrics()
|
|
||||||
|
|
||||||
# Find the metrics for the caches from both homeservers
|
|
||||||
hs1_metric = 'foo{server_name="hs1"}'
|
|
||||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
|
||||||
self.assertIsNotNone(
|
|
||||||
hs1_metric_value,
|
|
||||||
f"Missing metric {hs1_metric} in cache metrics {metrics_map}",
|
|
||||||
)
|
|
||||||
hs2_metric = 'foo{server_name="hs2"}'
|
|
||||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
|
||||||
self.assertIsNotNone(
|
|
||||||
hs2_metric_value,
|
|
||||||
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sanity check the metric values
|
|
||||||
self.assertEqual(hs1_metric_value, "1.0")
|
|
||||||
self.assertEqual(hs2_metric_value, "2.0")
|
|
||||||
|
|
||||||
|
|
||||||
def get_latest_metrics() -> Dict[str, str]:
|
def get_latest_metrics() -> Dict[str, str]:
|
||||||
"""
|
"""
|
||||||
Collect the latest metrics from the registry and parse them into an easy to use map.
|
Collect the latest metrics from the registry and parse them into an easy to use map.
|
||||||
|
|||||||
Reference in New Issue
Block a user