1
0

Add support for reactor metrics with the ProxiedReactor used in worker Complement tests (#19385)

Follow-up to https://github.com/element-hq/synapse/pull/19383

The
[`ProxiedReactor`](079c52e16b/synapse/app/complement_fork_starter.py (L38-L71))
is a special custom reactor used in the
`synapse/app/complement_fork_starter.py`. It's used by default when
using `WORKERS=1 ./scripts-dev/complement.sh` (see
`SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER`). The point of the forking
launcher is to improve start-up times and the point of the
[`ProxiedReactor`](079c52e16b/synapse/app/complement_fork_starter.py (L38-L71))
is explained in the [docstring](079c52e16b/synapse/app/complement_fork_starter.py (L38-L56))

(introduced in https://github.com/matrix-org/synapse/pull/13127)

### Reproduction instructions

1. Using the Complement scripts **with workers**: `WORKERS=1
COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh ./tests/csapi`
 1. `docker logs complement_csapi_dirty_hs1 2>&1 | grep -i "reactor"`
1. With these changes, notice `Twisted reactor: ProxiedReactor` but no
warning about `Skipping configuring ReactorLastSeenMetric: unexpected
reactor type: <__main__.ProxiedReactor object at 0x7fc0adaaea50>`
 1. Cleanup:
- `docker stop $(docker ps --all --filter "label=complement_context"
--quiet)`
- `docker rm $(docker ps --all --filter "label=complement_context"
--quiet)`

To test that we're actually seeing reactor metrics, I've been using this
with the load-test runs in
https://github.com/element-hq/synapse-rust-apps/pull/397
This commit is contained in:
Eric Eastwood
2026-01-19 08:31:43 -06:00
committed by GitHub
parent 8b36740bad
commit f54fd64929
4 changed files with 114 additions and 74 deletions

1
changelog.d/19385.misc Normal file
View File

@@ -0,0 +1 @@
Add support for reactor metrics with the `ProxiedReactor` used in worker Complement tests.

View File

@@ -0,0 +1,60 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2026 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Any
class ProxiedReactor:
"""
Twisted tracks the 'installed' reactor as a global variable.
(Actually, it does some module trickery, but the effect is similar.)
The default EpollReactor is buggy if it's created before a process is
forked, then used in the child.
See https://twistedmatrix.com/trac/ticket/4759#comment:17.
However, importing certain Twisted modules will automatically create and
install a reactor if one hasn't already been installed.
It's not normally possible to re-install a reactor.
Given the goal of launching workers with fork() to only import the code once,
this presents a conflict.
Our work around is to 'install' this ProxiedReactor which prevents Twisted
from creating and installing one, but which lets us replace the actual reactor
in use later on.
"""
def __init__(self) -> None:
self.___reactor_target: Any = None
def _install_real_reactor(self, new_reactor: Any) -> None:
"""
Install a real reactor for this ProxiedReactor to forward lookups onto.
This method is specific to our ProxiedReactor and should not clash with
any names used on an actual Twisted reactor.
"""
self.___reactor_target = new_reactor
# Import here to avoid circular imports.
from synapse.metrics._reactor_metrics import install_reactor_metrics
# Install reactor metrics now we've got a real reactor.
install_reactor_metrics(new_reactor)
def __getattr__(self, attr_name: str) -> Any:
return getattr(self.___reactor_target, attr_name)

View File

@@ -30,47 +30,13 @@ from typing import Any, Callable
from twisted.internet.main import installReactor
from synapse.app.complement_fork_proxied_reactor import ProxiedReactor
# a list of the original signal handlers, before we installed our custom ones.
# We restore these in our child processes.
_original_signal_handlers: dict[int, Any] = {}
class ProxiedReactor:
"""
Twisted tracks the 'installed' reactor as a global variable.
(Actually, it does some module trickery, but the effect is similar.)
The default EpollReactor is buggy if it's created before a process is
forked, then used in the child.
See https://twistedmatrix.com/trac/ticket/4759#comment:17.
However, importing certain Twisted modules will automatically create and
install a reactor if one hasn't already been installed.
It's not normally possible to re-install a reactor.
Given the goal of launching workers with fork() to only import the code once,
this presents a conflict.
Our work around is to 'install' this ProxiedReactor which prevents Twisted
from creating and installing one, but which lets us replace the actual reactor
in use later on.
"""
def __init__(self) -> None:
self.___reactor_target: Any = None
def _install_real_reactor(self, new_reactor: Any) -> None:
"""
Install a real reactor for this ProxiedReactor to forward lookups onto.
This method is specific to our ProxiedReactor and should not clash with
any names used on an actual Twisted reactor.
"""
self.___reactor_target = new_reactor
def __getattr__(self, attr_name: str) -> Any:
return getattr(self.___reactor_target, attr_name)
def _worker_entrypoint(
func: Callable[[], None], proxy_reactor: ProxiedReactor, args: list[str]
) -> None:

View File

@@ -30,6 +30,7 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily
from twisted.internet import reactor, selectreactor
from twisted.internet.asyncioreactor import AsyncioSelectorReactor
from synapse.app.complement_fork_proxied_reactor import ProxiedReactor
from synapse.metrics._types import Collector
try:
@@ -124,53 +125,65 @@ class ReactorLastSeenMetric(Collector):
yield cm
# Twisted has already select a reasonable reactor for us, so assumptions can be
# made about the shape.
wrapper = None
try:
if isinstance(reactor, (PollReactor, EPollReactor)):
reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined]
wrapper = reactor._poller._wrapped_method # type: ignore[attr-defined]
def install_reactor_metrics(target_reactor: Any) -> None:
# Twisted has already select a reasonable reactor for us, so assumptions can be
# made about the shape.
wrapper = None
try:
if isinstance(target_reactor, (PollReactor, EPollReactor)):
target_reactor._poller = ObjWrapper(reactor._poller, "poll") # type: ignore[attr-defined]
wrapper = target_reactor._poller._wrapped_method # type: ignore[attr-defined]
elif isinstance(reactor, selectreactor.SelectReactor):
# Twisted uses a module-level _select function.
wrapper = selectreactor._select = CallWrapper(selectreactor._select)
elif isinstance(target_reactor, selectreactor.SelectReactor):
# Twisted uses a module-level _select function.
wrapper = selectreactor._select = CallWrapper(selectreactor._select)
elif isinstance(reactor, AsyncioSelectorReactor):
# For asyncio look at the underlying asyncio event loop.
asyncio_loop = reactor._asyncioEventloop # A sub-class of BaseEventLoop,
elif isinstance(target_reactor, AsyncioSelectorReactor):
# For asyncio look at the underlying asyncio event loop.
asyncio_loop = (
target_reactor._asyncioEventloop
) # A sub-class of BaseEventLoop,
# A sub-class of BaseSelector.
selector = asyncio_loop._selector # type: ignore[attr-defined]
# A sub-class of BaseSelector.
selector = asyncio_loop._selector # type: ignore[attr-defined]
if isinstance(selector, SelectSelector):
wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined]
if isinstance(selector, SelectSelector):
wrapper = selector._select = CallWrapper(selector._select) # type: ignore[attr-defined]
# poll, epoll, and /dev/poll.
elif isinstance(selector, _PollLikeSelector):
selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]
# poll, epoll, and /dev/poll.
elif isinstance(selector, _PollLikeSelector):
selector._selector = ObjWrapper(selector._selector, "poll") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]
elif isinstance(selector, KqueueSelector):
selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]
elif isinstance(selector, KqueueSelector):
selector._selector = ObjWrapper(selector._selector, "control") # type: ignore[attr-defined]
wrapper = selector._selector._wrapped_method # type: ignore[attr-defined]
else:
# E.g. this does not support the (Windows-only) ProactorEventLoop.
logger.warning(
"Skipping configuring reactor metrics: unexpected asyncio loop selector: %r via %r",
selector,
asyncio_loop,
)
else:
# E.g. this does not support the (Windows-only) ProactorEventLoop.
logger.warning(
"Skipping configuring reactor metrics: unexpected asyncio loop selector: %r via %r",
selector,
asyncio_loop,
"Skipping configuring reactor metrics: unexpected reactor type: %r",
target_reactor,
)
else:
logger.warning(
"Skipping configuring reactor metrics: unexpected reactor type: %r",
reactor,
)
except Exception as e:
logger.warning("Configuring reactor metrics failed: %r", e)
except Exception as e:
logger.warning("Configuring reactor metrics failed: %r", e)
if wrapper:
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
REGISTRY.register(ReactorLastSeenMetric(wrapper)) # type: ignore[missing-server-name-label]
if wrapper:
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
REGISTRY.register(ReactorLastSeenMetric(wrapper)) # type: ignore[missing-server-name-label]
# Install reactor metrics for the global reactor.
#
# Skip installation if using `ProxiedReactor` (used by `complement_fork_starter.py`).
# The `ProxiedReactor` will handle calling `install_reactor_metrics(...)` itself when
# ready. Skipping allows us to avoid seeing confusing `Skipping configuring reactor
# metrics: unexpected reactor type: ProxiedReactor` warnings.
if not isinstance(reactor, ProxiedReactor):
install_reactor_metrics(reactor)