diff --git a/changelog.d/19385.misc b/changelog.d/19385.misc new file mode 100644 index 0000000000..aa23ea51c1 --- /dev/null +++ b/changelog.d/19385.misc @@ -0,0 +1 @@ +Add support for reactor metrics with the `ProxiedReactor` used in worker Complement tests. diff --git a/synapse/app/complement_fork_proxied_reactor.py b/synapse/app/complement_fork_proxied_reactor.py new file mode 100644 index 0000000000..7b03fede68 --- /dev/null +++ b/synapse/app/complement_fork_proxied_reactor.py @@ -0,0 +1,60 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2026 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# + +from typing import Any + + +class ProxiedReactor: + """ + Twisted tracks the 'installed' reactor as a global variable. + (Actually, it does some module trickery, but the effect is similar.) + + The default EpollReactor is buggy if it's created before a process is + forked, then used in the child. + See https://twistedmatrix.com/trac/ticket/4759#comment:17. + + However, importing certain Twisted modules will automatically create and + install a reactor if one hasn't already been installed. + It's not normally possible to re-install a reactor. + + Given the goal of launching workers with fork() to only import the code once, + this presents a conflict. + Our work around is to 'install' this ProxiedReactor which prevents Twisted + from creating and installing one, but which lets us replace the actual reactor + in use later on. + """ + + def __init__(self) -> None: + self.___reactor_target: Any = None + + def _install_real_reactor(self, new_reactor: Any) -> None: + """ + Install a real reactor for this ProxiedReactor to forward lookups onto. + + This method is specific to our ProxiedReactor and should not clash with + any names used on an actual Twisted reactor. + """ + self.___reactor_target = new_reactor + + # Import here to avoid circular imports. + from synapse.metrics._reactor_metrics import install_reactor_metrics + + # Install reactor metrics now we've got a real reactor. + install_reactor_metrics(new_reactor) + + def __getattr__(self, attr_name: str) -> Any: + return getattr(self.___reactor_target, attr_name) diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py index dcb45e234b..83d1d4f5ac 100644 --- a/synapse/app/complement_fork_starter.py +++ b/synapse/app/complement_fork_starter.py @@ -30,47 +30,13 @@ from typing import Any, Callable from twisted.internet.main import installReactor +from synapse.app.complement_fork_proxied_reactor import ProxiedReactor + # a list of the original signal handlers, before we installed our custom ones. # We restore these in our child processes. _original_signal_handlers: dict[int, Any] = {} -class ProxiedReactor: - """ - Twisted tracks the 'installed' reactor as a global variable. - (Actually, it does some module trickery, but the effect is similar.) - - The default EpollReactor is buggy if it's created before a process is - forked, then used in the child. - See https://twistedmatrix.com/trac/ticket/4759#comment:17. - - However, importing certain Twisted modules will automatically create and - install a reactor if one hasn't already been installed. - It's not normally possible to re-install a reactor. - - Given the goal of launching workers with fork() to only import the code once, - this presents a conflict. - Our work around is to 'install' this ProxiedReactor which prevents Twisted - from creating and installing one, but which lets us replace the actual reactor - in use later on. - """ - - def __init__(self) -> None: - self.___reactor_target: Any = None - - def _install_real_reactor(self, new_reactor: Any) -> None: - """ - Install a real reactor for this ProxiedReactor to forward lookups onto. - - This method is specific to our ProxiedReactor and should not clash with - any names used on an actual Twisted reactor. - """ - self.___reactor_target = new_reactor - - def __getattr__(self, attr_name: str) -> Any: - return getattr(self.___reactor_target, attr_name) - - def _worker_entrypoint( func: Callable[[], None], proxy_reactor: ProxiedReactor, args: list[str] ) -> None: diff --git a/synapse/metrics/_reactor_metrics.py b/synapse/metrics/_reactor_metrics.py index e73ade9890..d528b7c5e5 100644 --- a/synapse/metrics/_reactor_metrics.py +++ b/synapse/metrics/_reactor_metrics.py @@ -30,6 +30,7 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily from twisted.internet import reactor, selectreactor from twisted.internet.asyncioreactor import AsyncioSelectorReactor +from synapse.app.complement_fork_proxied_reactor import ProxiedReactor from synapse.metrics._types import Collector try: @@ -124,53 +125,65 @@ class ReactorLastSeenMetric(Collector): yield cm -# Twisted has already select a reasonable reactor for us, so assumptions can be -# made about the shape. -wrapper = None -try: - if isinstance(reactor, (PollReactor, EPollReactor)): - reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined] - wrapper = reactor._poller._wrapped_method # type: ignore[attr-defined] +def install_reactor_metrics(target_reactor: Any) -> None: + # Twisted has already select a reasonable reactor for us, so assumptions can be + # made about the shape. + wrapper = None + try: + if isinstance(target_reactor, (PollReactor, EPollReactor)): + target_reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined] + wrapper = target_reactor._poller._wrapped_method # type: ignore[attr-defined] - elif isinstance(reactor, selectreactor.SelectReactor): - # Twisted uses a module-level _select function. - wrapper = selectreactor._select = CallWrapper(selectreactor._select) + elif isinstance(target_reactor, selectreactor.SelectReactor): + # Twisted uses a module-level _select function. + wrapper = selectreactor._select = CallWrapper(selectreactor._select) - elif isinstance(reactor, AsyncioSelectorReactor): - # For asyncio look at the underlying asyncio event loop. - asyncio_loop = reactor._asyncioEventloop # A sub-class of BaseEventLoop, + elif isinstance(target_reactor, AsyncioSelectorReactor): + # For asyncio look at the underlying asyncio event loop. + asyncio_loop = ( + target_reactor._asyncioEventloop + ) # A sub-class of BaseEventLoop, - # A sub-class of BaseSelector. - selector = asyncio_loop._selector # type: ignore[attr-defined] + # A sub-class of BaseSelector. + selector = asyncio_loop._selector # type: ignore[attr-defined] - if isinstance(selector, SelectSelector): - wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined] + if isinstance(selector, SelectSelector): + wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined] - # poll, epoll, and /dev/poll. - elif isinstance(selector, _PollLikeSelector): - selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined] - wrapper = selector._selector._wrapped_method # type: ignore[attr-defined] + # poll, epoll, and /dev/poll. + elif isinstance(selector, _PollLikeSelector): + selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined] + wrapper = selector._selector._wrapped_method # type: ignore[attr-defined] - elif isinstance(selector, KqueueSelector): - selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined] - wrapper = selector._selector._wrapped_method # type: ignore[attr-defined] + elif isinstance(selector, KqueueSelector): + selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined] + wrapper = selector._selector._wrapped_method # type: ignore[attr-defined] + else: + # E.g. this does not support the (Windows-only) ProactorEventLoop. + logger.warning( + "Skipping configuring reactor metrics: unexpected asyncio loop selector: %r via %r", + selector, + asyncio_loop, + ) else: - # E.g. this does not support the (Windows-only) ProactorEventLoop. logger.warning( - "Skipping configuring reactor metrics: unexpected asyncio loop selector: %r via %r", - selector, - asyncio_loop, + "Skipping configuring reactor metrics: unexpected reactor type: %r", + target_reactor, ) - else: - logger.warning( - "Skipping configuring reactor metrics: unexpected reactor type: %r", - reactor, - ) -except Exception as e: - logger.warning("Configuring reactor metrics failed: %r", e) + except Exception as e: + logger.warning("Configuring reactor metrics failed: %r", e) + + if wrapper: + # This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. + REGISTRY.register(ReactorLastSeenMetric(wrapper)) # type: ignore[missing-server-name-label] -if wrapper: - # This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. - REGISTRY.register(ReactorLastSeenMetric(wrapper)) # type: ignore[missing-server-name-label] +# Install reactor metrics for the global reactor. +# +# Skip installation if using `ProxiedReactor` (used by `complement_fork_starter.py`). +# The `ProxiedReactor` will handle calling `install_reactor_metrics(...)` itself when +# ready. Skipping allows us to avoid seeing confusing `Skipping configuring reactor +# metrics: unexpected reactor type: ProxiedReactor` warnings. +if not isinstance(reactor, ProxiedReactor): + install_reactor_metrics(reactor)