Compare commits

...

5 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
3933bb17de Fix running background tasks on BG worker 2022-03-18 13:53:55 +00:00
Olivier Wilkinson (reivilibre)
70ee28f386 WORKING workers setup template 2022-03-17 10:42:28 +00:00
Olivier Wilkinson (reivilibre)
f5bb3887c8 STASH 2022-03-16 14:02:58 +00:00
Olivier Wilkinson (reivilibre)
85774e17be STASH 2022-03-11 11:36:47 +00:00
Olivier Wilkinson (reivilibre)
a0e699b301 STASH 2022-02-24 17:10:23 +00:00
8 changed files with 559 additions and 1 deletions

View File

@@ -0,0 +1,61 @@
# workers_setup
This gives you a **development-grade** installation of workerised Synapse.
DO NOT USE ME IN PRODUCTION.
## Known defects
* Non-generic workers aren't set up properly with their worker type.
* I haven't checked the routes that well; they are probably wrong.
## Requirements from you:
* Redis on default port (unauthenticated)
```
# You need Redis. On Ubuntu, this gets you what you need running on the right port:
apt install redis-server redis-tools
```
* Postgres on default port, using UNIX sockets for authentication.
This means you want your normal user account to have a corresponding Postgres account,
and let Postgres authenticate you automatically.
On Ubuntu, this just means you need to `createuser <your Linux account name>`.
You need a database with the same name as your server_name (I used `syn7`).
It should be owned by your user; see `createdb` to do that properly (and don't
forget to follow the Synapse instructions to use a C locale!)
Typing `psql syn7` should just work once your database is ready.
(If your UNIX socket is not numbered 5432, you might have to add `port: 5433`
to the config. Somehow I messed up my Postgres installation ages ago that it
chose port 5433 rather than the default 5432...)
* Virtualenv with Synapse (don't forget: `[postgres,redis]`)
* You'll need a bog standard Caddy binary (as the reverse proxy / router).
The website offers pre-built static binaries.
* (Optional): If you want to federate, you can set up TLS yourself afterwards.
I haven't bothered so far.
## Run the script
```
# python scripts-dev/workers_setup.py (path to server dir) (server name)
python scripts-dev/workers_setup.py ../servers/syn7_auto syn7
```
## Launching the homeserver
```
cd syn7_auto
/path/to/synapse/.venv/bin/synctl start homeserver.yaml -a workers
/path/to/caddy run
```
## Stopping the homeserver
```
# ^C to stop Caddy
/path/to/synapse/.venv/bin/synctl stop homeserver.yaml -a workers
```

305
scripts-dev/workers_setup.py Executable file
View File

@@ -0,0 +1,305 @@
#!/usr/bin/env python
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import sys
from os.path import dirname
from pathlib import Path
from typing import Collection, Dict, Iterable, List, Sequence, Tuple
from jinja2 import Environment, FileSystemLoader
from signedjson.key import generate_signing_key, write_signing_keys
from synapse.util.stringutils import random_string
DESIRED_WORKERS = (
("main", 1),
("synchrotron", 2),
("federation_inbound", 2),
("federation_reader", 2),
("federation_sender", 2),
("typing", 1),
("appservice", 1),
("client_reader", 2),
("event_creator", 2),
("event_persister", 2),
("media_repository", 1),
("pusher", 2),
("user_dir", 1),
("background_worker", 1),
# TODO ("encryption", 1), # ??
("receipts_account_data", 1)
# TODO frontend_proxy?
)
# TODO These are probably all wrong
# ^/_matrix/client/(api/v1|r0|v3|unstable)/sendToDevice/ ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/.*/tags
# ^/_matrix/client/(api/v1|r0|v3|unstable)/.*/account_data ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/receipt
# ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/read_markers ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ ?
WORKER_ROUTES: Dict[str, Tuple[str, ...]] = {
"main": (),
"synchrotron": (
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
),
"federation_inbound": ("^/_matrix/federation/v1/send/",),
"federation_reader": (
"^/_matrix/federation/v1/event/",
"^/_matrix/federation/v1/state/",
"^/_matrix/federation/v1/state_ids/",
"^/_matrix/federation/v1/backfill/",
"^/_matrix/federation/v1/get_missing_events/",
"^/_matrix/federation/v1/publicRooms",
"^/_matrix/federation/v1/query/",
"^/_matrix/federation/v1/make_join/",
"^/_matrix/federation/v1/make_leave/",
"^/_matrix/federation/v1/send_join/",
"^/_matrix/federation/v2/send_join/",
"^/_matrix/federation/v1/send_leave/",
"^/_matrix/federation/v2/send_leave/",
"^/_matrix/federation/v1/invite/",
"^/_matrix/federation/v2/invite/",
"^/_matrix/federation/v1/query_auth/",
"^/_matrix/federation/v1/event_auth/",
"^/_matrix/federation/v1/exchange_third_party_invite/",
"^/_matrix/federation/v1/user/devices/",
"^/_matrix/federation/v1/get_groups_publicised$",
"^/_matrix/key/v2/query",
"^/_matrix/federation/(v1|unstable/org.matrix.msc2946)/hierarchy/",
),
"federation_sender": (),
"typing": ("^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing",),
"appservice": (),
"client_reader": (
"^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
"^/_matrix/client/(v1|unstable/org.matrix.msc2946)/rooms/.*/hierarchy$",
"^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$",
"^/_matrix/client/(r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(r0|v3|unstable)/devices$",
"^/_matrix/client/versions$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
"^/_matrix/client/(r0|v3|unstable)/joined_groups$",
"^/_matrix/client/(r0|v3|unstable)/publicised_groups$",
"^/_matrix/client/(r0|v3|unstable)/publicised_groups/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search$",
"^/_matrix/client/(r0|v3|unstable)/keys/query$",
"^/_matrix/client/(r0|v3|unstable)/keys/changes$",
"^/_matrix/client/(r0|v3|unstable)/keys/claim$",
"^/_matrix/client/(r0|v3|unstable)/room_keys/",
#
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(r0|v3|unstable)/register$",
"^/_matrix/client/v1/register/m.login.registration_token/validity$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(r0|v3|unstable)/sendToDevice/",
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/login/sso/redirect",
"^/_synapse/client/pick_idp$",
"^/_synapse/client/pick_username",
"^/_synapse/client/new_user_consent$",
"^/_synapse/client/sso_register$",
"^/_synapse/client/oidc/callback$",
"^/_synapse/client/saml2/authn_response$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/login/cas/ticket$",
),
"event_creator": (),
"event_persister": (),
"media_repository": (
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
"^/_synapse/admin/v1/users/.*/media$",
),
"pusher": (),
"user_dir": ("^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$",),
"background_worker": (),
"receipts_account_data": (),
}
@dataclasses.dataclass
class Worker:
name: str
kind: str
index: int
ip: str
def worker_num_to_ip(num: int) -> str:
return f"127.0.57.{num}"
def make_workers(workers: Iterable[Tuple[str, int]]) -> List[Worker]:
result = []
worker_overall_num = 0
for worker_type, worker_type_count in workers:
for worker_idx in range(1, worker_type_count + 1):
worker_overall_num += 1
if worker_type == "main":
worker_name = "main"
else:
worker_name = f"{worker_type}{worker_idx}"
result.append(
Worker(
worker_name,
worker_type,
worker_idx,
worker_num_to_ip(worker_overall_num),
)
)
return result
def generate(
worker_counts: Tuple[Tuple[str, int], ...], target_path: Path, server_name: str
) -> None:
if target_path.exists():
print("Target path already exists. Won't overwrite.")
return
target_path.mkdir()
# Generate a signing key
key_id = "a_" + random_string(4)
key = (generate_signing_key(key_id),)
with open(target_path.joinpath("signing.key"), "w") as fout:
write_signing_keys(fout, key)
macaroon_secret_key = random_string(32)
env = Environment(loader=FileSystemLoader(dirname(__file__) + "/workers_setup"))
hs_template = env.get_template("homeserver.yaml.j2")
worker_template = env.get_template("worker.yaml.j2")
logging_template = env.get_template("logging.yaml.j2")
rp_template = env.get_template("Caddyfile.j2")
worker_dir = target_path.joinpath("workers")
worker_dir.mkdir()
worker_logging_dir = target_path.joinpath("workers.logging")
worker_logging_dir.mkdir()
worker_dir = worker_dir.resolve()
logs_dir = target_path.joinpath("logs")
logs_dir.mkdir()
logs_dir = logs_dir.resolve()
all_workers = make_workers(worker_counts)
workers_by_name = {worker.name: worker for worker in all_workers}
for worker in all_workers:
log_config_path = worker_logging_dir.joinpath(f"{worker.name}.logging.yaml")
log_config = logging_template.render(
worker=worker,
worker_dir=worker_dir,
logs_dir=logs_dir,
all_workers=all_workers,
workers_by_name=workers_by_name,
)
with open(log_config_path, "w") as fout:
fout.write(log_config)
# if worker.name == "main":
# Main can't use a worker file.
# continue
worker_config_path = worker_dir.joinpath(f"{worker.name}.yaml")
worker_config = worker_template.render(
worker=worker,
worker_dir=worker_dir,
logs_dir=logs_dir,
all_workers=all_workers,
workers_by_name=workers_by_name,
)
with open(worker_config_path, "w") as fout:
fout.write(worker_config)
hs_config_path = target_path.joinpath("homeserver.yaml")
hs_config = hs_template.render(
all_workers=all_workers,
workers_by_name=workers_by_name,
worker_dir=worker_dir,
logs_dir=logs_dir,
server_name=server_name,
macaroon_secret_key=macaroon_secret_key,
)
with open(hs_config_path, "w") as fout:
fout.write(hs_config)
caddy_config_path = target_path.joinpath("Caddyfile")
caddy_config = rp_template.render(
server_name=server_name,
port=8447,
http_ip=worker_num_to_ip(1),
routing=build_routes_template_var(all_workers),
main_server=f"{worker_num_to_ip(1)}:8080",
)
with open(caddy_config_path, "w") as fout:
fout.write(caddy_config)
def build_routes_template_var(
all_workers: List[Worker],
) -> Sequence[Tuple[str, Collection[str], List[str]]]:
route_groups = {}
for worker in all_workers:
if worker.kind not in route_groups:
if WORKER_ROUTES[worker.kind]:
route_groups[worker.kind] = worker.kind, WORKER_ROUTES[worker.kind], []
else:
continue
_, _routes, server_endpoints = route_groups[worker.kind]
server_endpoints.append(f"{worker.ip}:8080")
return tuple(route_groups.values())
def main(target_path: Path, server_name: str) -> None:
generate(DESIRED_WORKERS, target_path, server_name)
if __name__ == "__main__":
target_path = Path(sys.argv[1])
server_name = sys.argv[2]
main(target_path, server_name)

View File

@@ -0,0 +1,24 @@
{
# Prevents Caddy from asking for sudo password to install a root cert that
# we don't even want to use here.
skip_install_trust
}
# If you want TLS, you can add https:// schemes and configure the TLS cert... somehow.
http://{{ server_name }}:{{ port }}, http://{{ http_ip }}:{{ port }} {
{%- for route_group_name, routes, route_servers in routing %}
@{{ route_group_name }} {
{%- for route in routes %}
path_regexp {{ route }}
{%- endfor %}
}
route @{{ route_group_name }} {
reverse_proxy {% for server in route_servers %} {{ server }} {% endfor %}
}
{%- endfor %}
# fallback to main
route {
reverse_proxy {{ main_server }}
}
}

View File

@@ -0,0 +1,96 @@
server_name: {{ server_name }}
report_stats: false
signing_key_path: "{{ worker_dir }}/../signing.key"
macaroon_secret_key: "{{ macaroon_secret_key }}"
enable_registration: true
redis:
enabled: true
#host: localhost
#port: 6379
trusted_key_servers: []
listeners:
- port: 8080
bind_address: {{ workers_by_name.main.ip }}
type: http
resources:
- names: [client, federation]
# The HTTP replication port
- port: 9090
bind_address: {{ workers_by_name.main.ip }}
type: http
resources:
- names: [replication]
database:
name: psycopg2
args:
# Comment out user, password and host to use UNIX socket auth.
# For testing, create a database owned by your Postgres user that is logged
# in with your UNIX user
#user: "synapse"
#password:
database: "{{ server_name }}"
#host: "localhost"
cp_min: 5
cp_max: 10
instance_map:
{%- for worker in all_workers %}
{{ worker.name }}:
host: {{ worker.ip }}
port: 9090
{%- endfor %}
stream_writers:
events:
{%- for worker in all_workers %}
{%- if worker.kind == "event_persister" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
typing:
{%- for worker in all_workers %}
{%- if worker.kind == "typing" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
start_pushers: false
pusher_instances:
{% for worker in all_workers -%}
{%- if worker.kind == "pusher" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
notify_appservices: False
federation_sender_instances:
{% for worker in all_workers -%}
{%- if worker.kind == "federation_sender" %}
- {{ worker.name }}
{% endif -%}
{% endfor %}
enable_media_repo: False
media_instance_running_background_jobs: "media1"
update_user_directory: False
pid_file: "{{ logs_dir }}/main.pid"
log_config: '{{ worker_dir }}.logging/main.logging.yaml'
run_background_tasks_on: background_worker1

View File

@@ -0,0 +1,32 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.util.logcontext.LoggingContextFilter
request: ""
handlers:
console:
class: logging.FileHandler
formatter: precise
filters: [context]
encoding: 'UTF-8'
filename: '{{ logs_dir }}/{{ worker.name }}.log'
loggers:
synapse:
level: DEBUG
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
root:
level: WARNING
handlers: [console]

View File

@@ -0,0 +1,36 @@
{% set main_worker = workers_by_name.main %}
{% if worker.kind == "main" %}
worker_app: synapse.app.homeserver
{% else %}
worker_app: synapse.app.generic_worker
worker_name: {{ worker.name }}
# The replication listener on the main synapse process.
worker_replication_host: {{ main_worker.ip }}
worker_replication_http_port: 9090
worker_listeners:
- type: http
bind_address: {{ worker.ip }}
port: 8080
resources:
- names:
- client
- federation
{%- if worker.kind == "media" %}
- media
{%- endif %}
- type: http
bind_address: {{ worker.ip }}
port: 9090
resources:
- names: [replication]
worker_log_config: '{{ worker_dir }}.logging/{{ worker.name }}.logging.yaml'
worker_pid_file: '{{ logs_dir }}/{{ worker.name }}.pid'
worker_main_http_uri: http://{{ main_worker.ip }}:8080
{% endif %}

View File

@@ -227,6 +227,8 @@ class ReplicationCommandHandler:
self._is_master = hs.config.worker.worker_app is None
self._is_user_ip_handler = self._is_master # TODO
self._federation_sender = None
if self._is_master and not hs.config.worker.send_federation:
self._federation_sender = hs.get_federation_sender()
@@ -403,7 +405,7 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
if self._is_master:
if self._is_user_ip_handler:
return self._handle_user_ip(cmd)
else:
return None

View File

@@ -54,6 +54,8 @@ class ServerNoticesSender(WorkerServerNoticesSender):
Args:
user_id: mxid
"""
# TODO this will need to run on the user IP handler, at least in a
# stubbed capacity!
# The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing