Be mindful of other SIGHUP handlers in 3rd-party code (#19095)
Be mindful that Synapse can be run alongside other code in the same Python process. We shouldn't clobber other `SIGHUP` handlers as only one can be set at time. (no clobber) ### Background As part of Element's plan to support a light form of vhosting (virtual host) (multiple instances of Synapse in the same Python process), we're currently diving into the details and implications of running multiple instances of Synapse in the same Python process. "Per-tenant logging" tracked internally by https://github.com/element-hq/synapse-small-hosts/issues/48 Relevant to logging as we use a `SIGHUP` to reload log config in Synapse.
This commit is contained in:
1
changelog.d/19095.misc
Normal file
1
changelog.d/19095.misc
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Avoid clobbering other `SIGHUP` handlers in 3rd-party code.
|
||||||
@@ -29,6 +29,7 @@ import traceback
|
|||||||
import warnings
|
import warnings
|
||||||
from textwrap import indent
|
from textwrap import indent
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
from types import FrameType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
@@ -36,6 +37,7 @@ from typing import (
|
|||||||
Callable,
|
Callable,
|
||||||
NoReturn,
|
NoReturn,
|
||||||
Optional,
|
Optional,
|
||||||
|
Union,
|
||||||
cast,
|
cast,
|
||||||
)
|
)
|
||||||
from wsgiref.simple_server import WSGIServer
|
from wsgiref.simple_server import WSGIServer
|
||||||
@@ -72,7 +74,6 @@ from synapse.handlers.auth import load_legacy_password_auth_providers
|
|||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||||
from synapse.metrics import install_gc_manager, register_threadpool
|
from synapse.metrics import install_gc_manager, register_threadpool
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
||||||
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
from synapse.metrics.jemalloc import setup_jemalloc_stats
|
||||||
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
|
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
|
||||||
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
|
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
|
||||||
@@ -108,7 +109,7 @@ P = ParamSpec("P")
|
|||||||
|
|
||||||
|
|
||||||
def register_sighup(
|
def register_sighup(
|
||||||
homeserver_instance_id: str,
|
hs: "HomeServer",
|
||||||
func: Callable[P, None],
|
func: Callable[P, None],
|
||||||
*args: P.args,
|
*args: P.args,
|
||||||
**kwargs: P.kwargs,
|
**kwargs: P.kwargs,
|
||||||
@@ -123,19 +124,25 @@ def register_sighup(
|
|||||||
*args, **kwargs: args and kwargs to be passed to the target function.
|
*args, **kwargs: args and kwargs to be passed to the target function.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_instance_id_to_sighup_callbacks_map.setdefault(homeserver_instance_id, []).append(
|
# Wrap the function so we can run it within a logcontext
|
||||||
(func, args, kwargs)
|
def _callback_wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
|
||||||
|
with LoggingContext(name="sighup", server_name=hs.hostname):
|
||||||
|
func(*args, **kwargs)
|
||||||
|
|
||||||
|
_instance_id_to_sighup_callbacks_map.setdefault(hs.get_instance_id(), []).append(
|
||||||
|
(_callback_wrapper, args, kwargs)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def unregister_sighups(instance_id: str) -> None:
|
def unregister_sighups(homeserver_instance_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
Unregister all sighup functions associated with this Synapse instance.
|
Unregister all sighup functions associated with this Synapse instance.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
instance_id: Unique ID for this Synapse process instance.
|
homeserver_instance_id: The unique ID for this Synapse process instance to
|
||||||
|
unregister hooks for (`hs.get_instance_id()`).
|
||||||
"""
|
"""
|
||||||
_instance_id_to_sighup_callbacks_map.pop(instance_id, [])
|
_instance_id_to_sighup_callbacks_map.pop(homeserver_instance_id, [])
|
||||||
|
|
||||||
|
|
||||||
def start_worker_reactor(
|
def start_worker_reactor(
|
||||||
@@ -540,6 +547,61 @@ def refresh_certificate(hs: "HomeServer") -> None:
|
|||||||
logger.info("Context factories updated.")
|
logger.info("Context factories updated.")
|
||||||
|
|
||||||
|
|
||||||
|
_already_setup_sighup_handling = False
|
||||||
|
"""
|
||||||
|
Marks whether we've already successfully ran `setup_sighup_handling()`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def setup_sighup_handling() -> None:
|
||||||
|
"""
|
||||||
|
Set up SIGHUP handling to call registered callbacks.
|
||||||
|
|
||||||
|
This can be called multiple times safely.
|
||||||
|
"""
|
||||||
|
global _already_setup_sighup_handling
|
||||||
|
# We only need to set things up once per process.
|
||||||
|
if _already_setup_sighup_handling:
|
||||||
|
return
|
||||||
|
|
||||||
|
previous_sighup_handler: Union[
|
||||||
|
Callable[[int, Optional[FrameType]], Any], int, None
|
||||||
|
] = None
|
||||||
|
|
||||||
|
# Set up the SIGHUP machinery.
|
||||||
|
if hasattr(signal, "SIGHUP"):
|
||||||
|
|
||||||
|
def handle_sighup(*args: Any, **kwargs: Any) -> None:
|
||||||
|
# Tell systemd our state, if we're using it. This will silently fail if
|
||||||
|
# we're not using systemd.
|
||||||
|
sdnotify(b"RELOADING=1")
|
||||||
|
|
||||||
|
if callable(previous_sighup_handler):
|
||||||
|
previous_sighup_handler(*args, **kwargs)
|
||||||
|
|
||||||
|
for sighup_callbacks in _instance_id_to_sighup_callbacks_map.values():
|
||||||
|
for func, args, kwargs in sighup_callbacks:
|
||||||
|
func(*args, **kwargs)
|
||||||
|
|
||||||
|
sdnotify(b"READY=1")
|
||||||
|
|
||||||
|
# We defer running the sighup handlers until next reactor tick. This
|
||||||
|
# is so that we're in a sane state, e.g. flushing the logs may fail
|
||||||
|
# if the sighup happens in the middle of writing a log entry.
|
||||||
|
def run_sighup(*args: Any, **kwargs: Any) -> None:
|
||||||
|
# `callFromThread` should be "signal safe" as well as thread
|
||||||
|
# safe.
|
||||||
|
reactor.callFromThread(handle_sighup, *args, **kwargs)
|
||||||
|
|
||||||
|
# Register for the SIGHUP signal, chaining any existing handler as there can
|
||||||
|
# only be one handler per signal and we don't want to clobber any existing
|
||||||
|
# handlers (like the `multi_synapse` shard process in the context of Synapse Pro
|
||||||
|
# for small hosts)
|
||||||
|
previous_sighup_handler = signal.signal(signal.SIGHUP, run_sighup)
|
||||||
|
|
||||||
|
_already_setup_sighup_handling = True
|
||||||
|
|
||||||
|
|
||||||
async def start(hs: "HomeServer", freeze: bool = True) -> None:
|
async def start(hs: "HomeServer", freeze: bool = True) -> None:
|
||||||
"""
|
"""
|
||||||
Start a Synapse server or worker.
|
Start a Synapse server or worker.
|
||||||
@@ -579,45 +641,9 @@ async def start(hs: "HomeServer", freeze: bool = True) -> None:
|
|||||||
name="gai_resolver", server_name=server_name, threadpool=resolver_threadpool
|
name="gai_resolver", server_name=server_name, threadpool=resolver_threadpool
|
||||||
)
|
)
|
||||||
|
|
||||||
# Set up the SIGHUP machinery.
|
setup_sighup_handling()
|
||||||
if hasattr(signal, "SIGHUP"):
|
register_sighup(hs, refresh_certificate, hs)
|
||||||
|
register_sighup(hs, reload_cache_config, hs.config)
|
||||||
def handle_sighup(*args: Any, **kwargs: Any) -> "defer.Deferred[None]":
|
|
||||||
async def _handle_sighup(*args: Any, **kwargs: Any) -> None:
|
|
||||||
# Tell systemd our state, if we're using it. This will silently fail if
|
|
||||||
# we're not using systemd.
|
|
||||||
sdnotify(b"RELOADING=1")
|
|
||||||
|
|
||||||
for sighup_callbacks in _instance_id_to_sighup_callbacks_map.values():
|
|
||||||
for func, args, kwargs in sighup_callbacks:
|
|
||||||
func(*args, **kwargs)
|
|
||||||
|
|
||||||
sdnotify(b"READY=1")
|
|
||||||
|
|
||||||
# It's okay to ignore the linter error here and call
|
|
||||||
# `run_as_background_process` directly because `_handle_sighup` operates
|
|
||||||
# outside of the scope of a specific `HomeServer` instance and holds no
|
|
||||||
# references to it which would prevent a clean shutdown.
|
|
||||||
return run_as_background_process( # type: ignore[untracked-background-process]
|
|
||||||
"sighup",
|
|
||||||
server_name,
|
|
||||||
_handle_sighup,
|
|
||||||
*args,
|
|
||||||
**kwargs,
|
|
||||||
)
|
|
||||||
|
|
||||||
# We defer running the sighup handlers until next reactor tick. This
|
|
||||||
# is so that we're in a sane state, e.g. flushing the logs may fail
|
|
||||||
# if the sighup happens in the middle of writing a log entry.
|
|
||||||
def run_sighup(*args: Any, **kwargs: Any) -> None:
|
|
||||||
# `callFromThread` should be "signal safe" as well as thread
|
|
||||||
# safe.
|
|
||||||
reactor.callFromThread(handle_sighup, *args, **kwargs)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGHUP, run_sighup)
|
|
||||||
|
|
||||||
register_sighup(hs.get_instance_id(), refresh_certificate, hs)
|
|
||||||
register_sighup(hs.get_instance_id(), reload_cache_config, hs.config)
|
|
||||||
|
|
||||||
# Apply the cache config.
|
# Apply the cache config.
|
||||||
hs.config.caches.resize_all_caches()
|
hs.config.caches.resize_all_caches()
|
||||||
|
|||||||
@@ -345,9 +345,7 @@ def setup_logging(
|
|||||||
# Add a SIGHUP handler to reload the logging configuration, if one is available.
|
# Add a SIGHUP handler to reload the logging configuration, if one is available.
|
||||||
from synapse.app import _base as appbase
|
from synapse.app import _base as appbase
|
||||||
|
|
||||||
appbase.register_sighup(
|
appbase.register_sighup(hs, _reload_logging_config, log_config_path)
|
||||||
hs.get_instance_id(), _reload_logging_config, log_config_path
|
|
||||||
)
|
|
||||||
|
|
||||||
# Log immediately so we can grep backwards.
|
# Log immediately so we can grep backwards.
|
||||||
logger.warning("***** STARTING SERVER *****")
|
logger.warning("***** STARTING SERVER *****")
|
||||||
|
|||||||
Reference in New Issue
Block a user