1
0

SCARY HACKS

This commit is contained in:
Olivier Wilkinson (reivilibre)
2022-05-27 13:20:39 +01:00
parent 7c79d6c5e2
commit 2102b5d4dc
7 changed files with 115 additions and 23 deletions

View File

@@ -26,6 +26,9 @@ COPY conf-workers/workers-shared.yaml /conf/workers/shared.yaml
WORKDIR /data
COPY conf-workers/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
COPY conf-workers/synapse_forking.supervisord.conf.j2 /conf/
COPY conf/log_config.yaml.j2 /conf/
# Copy the entrypoint
COPY conf-workers/start-complement-synapse-workers.sh /

View File

@@ -0,0 +1,26 @@
[program:synapse_forking]
# TODO prefix-log will be no good. We'll have to hack around ourselves.
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app._complement_fork_starter /data/homeserver.yaml \
{%- for worker_config in worker_configs %}
-- \
{{ worker_config.app }}
--config-path="{{ worker_config.config_path }}" \
--config-path=/conf/workers/shared.yaml \
--config-path=/conf/workers/{{ worker_config.name }}.yaml \
{%- endfor %}
-- \
synapse.app.homeserver \
--config-path="{{ main_config_path }}" \
--config-path=/conf/workers/shared.yaml
autorestart=unexpected
priority=500
exitcodes=0
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
# Required because the forking launcher creates subprocesses but doesn't
# handle signals for us.
stopasgroup=true

View File

@@ -2,7 +2,11 @@ version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
{% if worker_name %}
format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
{% else %}
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
{% endif %}
filters:
context:

View File

@@ -28,17 +28,17 @@ stderr_logfile_maxbytes=0
username=redis
autorestart=true
[program:synapse_main]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
priority=10
# Log startup failures to supervisord's stdout/err
# Regular synapse logs will still go in the configured data directory
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
autorestart=unexpected
exitcodes=0
## [program:synapse_main]
## command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
## priority=10
## # Log startup failures to supervisord's stdout/err
## # Regular synapse logs will still go in the configured data directory
## stdout_logfile=/dev/stdout
## stdout_logfile_maxbytes=0
## stderr_logfile=/dev/stderr
## stderr_logfile_maxbytes=0
## autorestart=unexpected
## exitcodes=0
# Additional process blocks
{{ worker_config }}
{{ worker_config }}

View File

@@ -401,8 +401,11 @@ def generate_worker_files(
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
worker_configs: List[Dict[str, Any]] = []
# For each worker type specified by the user, create config values
for worker_type in worker_types:
startup_config: Dict[str, Any] = {}
worker_type = worker_type.strip()
worker_config = WORKERS_CONFIG.get(worker_type)
@@ -438,6 +441,8 @@ def generate_worker_files(
# Enable the worker in supervisord
supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
worker_configs.append(worker_config)
# Add nginx location blocks for this worker's endpoints (if any are defined)
for pattern in worker_config["endpoint_patterns"]:
# Determine whether we need to load-balance this worker
@@ -530,7 +535,15 @@ def generate_worker_files(
"/conf/supervisord.conf.j2",
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
worker_config=supervisord_config,
#worker_config=supervisord_config,
worker_config="",
)
convert(
"/conf/synapse_forking.supervisord.conf.j2",
"/etc/supervisor/conf.d/synapse_forking.supervisor.conf",
worker_configs=worker_configs,
main_config_path=config_path,
)
# healthcheck config
@@ -562,7 +575,7 @@ def generate_worker_log_config(
# Render and write the file
log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
convert(
"/conf/log.config",
"/conf/log_config.yaml.j2",
log_config_filepath,
worker_name=worker_name,
**extra_log_template_args,

View File

@@ -106,7 +106,7 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs)
def start_worker_reactor(
appname: str,
config: HomeServerConfig,
run_command: Callable[[], None] = reactor.run,
run_command: Callable[[], None] = lambda: reactor.run(),
) -> None:
"""Run the reactor in the main process
@@ -141,7 +141,7 @@ def start_reactor(
daemonize: bool,
print_pidfile: bool,
logger: logging.Logger,
run_command: Callable[[], None] = reactor.run,
run_command: Callable[[], None] = lambda: reactor.run(),
) -> None:
"""Run the reactor in the main process

View File

@@ -31,22 +31,54 @@ import sys
# synapse.app.generic_worker [args..] -- \
# ...
# synapse.app.generic_worker [args..]
from typing import Callable, List
from typing import Callable, List, Any
from twisted.internet.main import installReactor
def _worker_entrypoint(func: Callable[[], None], args: List[str]) -> None:
class ProxiedReactor:
"""
Global state is horrible. Use this proxy reactor so we can 'reinstall'
the reactor by changing the target of the proxy.
"""
def __init__(self):
self.___reactor_target = None
def ___install(self, new_reactor):
self.___reactor_target = new_reactor
def __getattr__(self, attr_name: str) -> Any:
if attr_name == "___install":
return self.___install
return getattr(self.___reactor_target, attr_name)
def _worker_entrypoint(func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]) -> None:
sys.argv = args
from twisted.internet.epollreactor import EPollReactor
proxy_reactor.___install(EPollReactor())
func()
def main() -> None:
# Split up the arguments into each workers' arguments
# Strip out any newlines.
# HACK
db_config_path = sys.argv[1]
args = [arg.replace("\n", "") for arg in sys.argv[2:]]
args_by_worker: List[List[str]] = [
list(args)
for cond, args in itertools.groupby(sys.argv[1:], lambda ele: ele != "--")
if cond
for cond, args in itertools.groupby(args, lambda ele: ele != "--")
if cond and args
]
print(args_by_worker)
# Prevent Twisted from installing a shared reactor that all the workers will
# pick up.
proxy_reactor = ProxiedReactor()
installReactor(proxy_reactor)
# Import the entrypoints for all the workers
worker_functions = []
for worker_args in args_by_worker:
@@ -61,10 +93,24 @@ def main() -> None:
# which *can* use fork() on Unix platforms.
# Now we fork our process!
# TODO Can we do this better?
# We need to prepare the database first as otherwise all the workers will
# try to create a schema version table and some will crash out.
# HACK
from synapse._scripts import update_synapse_database
update_proc = multiprocessing.Process(
target=_worker_entrypoint, args=(update_synapse_database.main, proxy_reactor, ["update_synapse_database", "--database-config", db_config_path, "--run-background-updates"])
)
print("===== PREPARING DATABASE =====", file=sys.stderr)
update_proc.start()
print("JNG UPROC", file=sys.stderr)
update_proc.join()
print("===== PREPARED DATABASE =====", file=sys.stderr)
processes = []
for (func, worker_args) in zip(worker_functions, args_by_worker):
process = multiprocessing.Process(
target=_worker_entrypoint, args=(func, worker_args)
target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
)
process.start()
processes.append(process)