Compare commits
11 Commits
v1.71.0
...
rei/comple
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31900e1a3c | ||
|
|
e7b109b300 | ||
|
|
54f3c50826 | ||
|
|
e643227d3d | ||
|
|
2102b5d4dc | ||
|
|
7c79d6c5e2 | ||
|
|
45eb81b194 | ||
|
|
a2ae63e89c | ||
|
|
a974ccbdb0 | ||
|
|
322d22f04e | ||
|
|
be69bd292a |
12
.github/workflows/tests.yml
vendored
12
.github/workflows/tests.yml
vendored
@@ -310,6 +310,16 @@ jobs:
|
||||
needs: linting-done
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
# GHA requires all matrix configurations to have at least one value. We don't want to set one here though.
|
||||
- _: monolith
|
||||
|
||||
# Test with workers
|
||||
- workers: workers
|
||||
|
||||
steps:
|
||||
# The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
|
||||
# See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
|
||||
@@ -356,7 +366,7 @@ jobs:
|
||||
|
||||
- run: |
|
||||
set -o pipefail
|
||||
COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
|
||||
WORKERS=${{ matrix.workers && 1 }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json ${{ matrix.regex && format('-run Test[{0}]', matrix.regex) || '' }} 2>&1 | gotestfmt
|
||||
shell: bash
|
||||
name: Run Complement Tests
|
||||
|
||||
|
||||
1
changelog.d/12810.misc
Normal file
1
changelog.d/12810.misc
Normal file
@@ -0,0 +1 @@
|
||||
Test Synapse against Complement with workers.
|
||||
1
changelog.d/12896.misc
Normal file
1
changelog.d/12896.misc
Normal file
@@ -0,0 +1 @@
|
||||
CI only, please ignore.
|
||||
@@ -26,6 +26,9 @@ COPY conf-workers/workers-shared.yaml /conf/workers/shared.yaml
|
||||
WORKDIR /data
|
||||
|
||||
COPY conf-workers/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
|
||||
COPY conf-workers/synapse_forking.supervisord.conf.j2 /conf/
|
||||
|
||||
COPY conf/log_config.yaml.j2 /conf/
|
||||
|
||||
# Copy the entrypoint
|
||||
COPY conf-workers/start-complement-synapse-workers.sh /
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
[program:synapse_forking]
|
||||
# TODO prefix-log will be no good. We'll have to hack around ourselves.
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app._complement_fork_starter /data/homeserver.yaml \
|
||||
{%- for worker_config in worker_configs %}
|
||||
-- \
|
||||
{{ worker_config.app }}
|
||||
--config-path="{{ worker_config.config_path }}" \
|
||||
--config-path=/conf/workers/shared.yaml \
|
||||
--config-path=/conf/workers/{{ worker_config.name }}.yaml \
|
||||
{%- endfor %}
|
||||
-- \
|
||||
synapse.app.homeserver \
|
||||
--config-path="{{ main_config_path }}" \
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
|
||||
autorestart=unexpected
|
||||
priority=500
|
||||
exitcodes=0
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
|
||||
# Required because the forking launcher creates subprocesses but doesn't
|
||||
# handle signals for us.
|
||||
stopasgroup=true
|
||||
@@ -2,7 +2,11 @@ version: 1
|
||||
|
||||
formatters:
|
||||
precise:
|
||||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
|
||||
{% if worker_name %}
|
||||
format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
|
||||
{% else %}
|
||||
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
|
||||
{% endif %}
|
||||
|
||||
filters:
|
||||
context:
|
||||
@@ -28,17 +28,17 @@ stderr_logfile_maxbytes=0
|
||||
username=redis
|
||||
autorestart=true
|
||||
|
||||
[program:synapse_main]
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
|
||||
priority=10
|
||||
# Log startup failures to supervisord's stdout/err
|
||||
# Regular synapse logs will still go in the configured data directory
|
||||
stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
autorestart=unexpected
|
||||
exitcodes=0
|
||||
## [program:synapse_main]
|
||||
## command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
|
||||
## priority=10
|
||||
## # Log startup failures to supervisord's stdout/err
|
||||
## # Regular synapse logs will still go in the configured data directory
|
||||
## stdout_logfile=/dev/stdout
|
||||
## stdout_logfile_maxbytes=0
|
||||
## stderr_logfile=/dev/stderr
|
||||
## stderr_logfile_maxbytes=0
|
||||
## autorestart=unexpected
|
||||
## exitcodes=0
|
||||
|
||||
# Additional process blocks
|
||||
{{ worker_config }}
|
||||
{{ worker_config }}
|
||||
|
||||
@@ -401,6 +401,8 @@ def generate_worker_files(
|
||||
# which exists even if no workers do.
|
||||
healthcheck_urls = ["http://localhost:8080/health"]
|
||||
|
||||
worker_configs: List[Dict[str, Any]] = []
|
||||
|
||||
# For each worker type specified by the user, create config values
|
||||
for worker_type in worker_types:
|
||||
worker_type = worker_type.strip()
|
||||
@@ -438,6 +440,8 @@ def generate_worker_files(
|
||||
# Enable the worker in supervisord
|
||||
supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
|
||||
|
||||
worker_configs.append(worker_config)
|
||||
|
||||
# Add nginx location blocks for this worker's endpoints (if any are defined)
|
||||
for pattern in worker_config["endpoint_patterns"]:
|
||||
# Determine whether we need to load-balance this worker
|
||||
@@ -530,7 +534,15 @@ def generate_worker_files(
|
||||
"/conf/supervisord.conf.j2",
|
||||
"/etc/supervisor/supervisord.conf",
|
||||
main_config_path=config_path,
|
||||
worker_config=supervisord_config,
|
||||
# worker_config=supervisord_config,
|
||||
worker_config="",
|
||||
)
|
||||
|
||||
convert(
|
||||
"/conf/synapse_forking.supervisord.conf.j2",
|
||||
"/etc/supervisor/conf.d/synapse_forking.supervisor.conf",
|
||||
worker_configs=worker_configs,
|
||||
main_config_path=config_path,
|
||||
)
|
||||
|
||||
# healthcheck config
|
||||
@@ -562,7 +574,7 @@ def generate_worker_log_config(
|
||||
# Render and write the file
|
||||
log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
|
||||
convert(
|
||||
"/conf/log.config",
|
||||
"/conf/log_config.yaml.j2",
|
||||
log_config_filepath,
|
||||
worker_name=worker_name,
|
||||
**extra_log_template_args,
|
||||
|
||||
@@ -106,7 +106,7 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs)
|
||||
def start_worker_reactor(
|
||||
appname: str,
|
||||
config: HomeServerConfig,
|
||||
run_command: Callable[[], None] = reactor.run,
|
||||
run_command: Callable[[], None] = lambda: reactor.run(),
|
||||
) -> None:
|
||||
"""Run the reactor in the main process
|
||||
|
||||
@@ -141,7 +141,7 @@ def start_reactor(
|
||||
daemonize: bool,
|
||||
print_pidfile: bool,
|
||||
logger: logging.Logger,
|
||||
run_command: Callable[[], None] = reactor.run,
|
||||
run_command: Callable[[], None] = lambda: reactor.run(),
|
||||
) -> None:
|
||||
"""Run the reactor in the main process
|
||||
|
||||
|
||||
140
synapse/app/_complement_fork_starter.py
Normal file
140
synapse/app/_complement_fork_starter.py
Normal file
@@ -0,0 +1,140 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
#
|
||||
#
|
||||
# This script is intended for test purposes only (within Complement).
|
||||
# It spawns multiple workers, whilst only going through the code loading process
|
||||
# once.
|
||||
#
|
||||
# TODO more docs
|
||||
# Each worker is specified as an argument group (each argument group is
|
||||
# separated by '--').
|
||||
# ........
|
||||
#
|
||||
# Usage:
|
||||
# python -m synapse.app._complement_fork_starter \
|
||||
# synapse.app.homeserver [args..] -- \
|
||||
# synapse.app.generic_worker [args..] -- \
|
||||
# ...
|
||||
# synapse.app.generic_worker [args..]
|
||||
import importlib
|
||||
import itertools
|
||||
import multiprocessing
|
||||
import sys
|
||||
from typing import Any, Callable, List
|
||||
|
||||
from twisted.internet.main import installReactor
|
||||
|
||||
|
||||
class ProxiedReactor:
|
||||
"""
|
||||
Global state is horrible. Use this proxy reactor so we can 'reinstall'
|
||||
the reactor by changing the target of the proxy.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.___reactor_target: Any = None
|
||||
|
||||
def ___install(self, new_reactor: Any) -> None:
|
||||
self.___reactor_target = new_reactor
|
||||
|
||||
def __getattr__(self, attr_name: str) -> Any:
|
||||
if attr_name == "___install":
|
||||
return self.___install
|
||||
return getattr(self.___reactor_target, attr_name)
|
||||
|
||||
|
||||
def _worker_entrypoint(
|
||||
func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]
|
||||
) -> None:
|
||||
sys.argv = args
|
||||
|
||||
from twisted.internet.epollreactor import EPollReactor
|
||||
|
||||
proxy_reactor.___install(EPollReactor())
|
||||
func()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
# Split up the arguments into each workers' arguments
|
||||
# Strip out any newlines.
|
||||
# HACK
|
||||
db_config_path = sys.argv[1]
|
||||
args = [arg.replace("\n", "") for arg in sys.argv[2:]]
|
||||
args_by_worker: List[List[str]] = [
|
||||
list(args)
|
||||
for cond, args in itertools.groupby(args, lambda ele: ele != "--")
|
||||
if cond and args
|
||||
]
|
||||
|
||||
# Prevent Twisted from installing a shared reactor that all the workers will
|
||||
# pick up.
|
||||
proxy_reactor = ProxiedReactor()
|
||||
installReactor(proxy_reactor)
|
||||
|
||||
# Import the entrypoints for all the workers
|
||||
worker_functions = []
|
||||
for worker_args in args_by_worker:
|
||||
worker_module = importlib.import_module(worker_args[0])
|
||||
worker_functions.append(worker_module.main)
|
||||
|
||||
# At this point, we've imported all the main entrypoints for all the workers.
|
||||
# Now we basically just fork() out to create the workers we need.
|
||||
# Because we're using fork(), all the workers get a clone of this launcher's
|
||||
# memory space and don't need to repeat the work of loading the code!
|
||||
# Instead of using fork() directly, we use the multiprocessing library,
|
||||
# which *can* use fork() on Unix platforms.
|
||||
# Now we fork our process!
|
||||
|
||||
# TODO Can we do this better?
|
||||
# We need to prepare the database first as otherwise all the workers will
|
||||
# try to create a schema version table and some will crash out.
|
||||
# HACK
|
||||
from synapse._scripts import update_synapse_database
|
||||
|
||||
update_proc = multiprocessing.Process(
|
||||
target=_worker_entrypoint,
|
||||
args=(
|
||||
update_synapse_database.main,
|
||||
proxy_reactor,
|
||||
[
|
||||
"update_synapse_database",
|
||||
"--database-config",
|
||||
db_config_path,
|
||||
"--run-background-updates",
|
||||
],
|
||||
),
|
||||
)
|
||||
print("===== PREPARING DATABASE =====", file=sys.stderr)
|
||||
update_proc.start()
|
||||
print("JNG UPROC", file=sys.stderr)
|
||||
update_proc.join()
|
||||
print("===== PREPARED DATABASE =====", file=sys.stderr)
|
||||
|
||||
processes = []
|
||||
for (func, worker_args) in zip(worker_functions, args_by_worker):
|
||||
process = multiprocessing.Process(
|
||||
target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
|
||||
)
|
||||
process.start()
|
||||
processes.append(process)
|
||||
|
||||
# Be a good parent and wait for our children to die before exiting.
|
||||
for process in processes:
|
||||
process.join()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,6 +17,11 @@ import sys
|
||||
from synapse.app.generic_worker import start
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
def main() -> None:
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user