Compare commits
1 Commits
rei/cwas_e
...
clokep/sta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9622bda163 |
@@ -1 +0,0 @@
|
||||
Fix sending out of order `POSITION` over replication, causing additional database load.
|
||||
@@ -1 +0,0 @@
|
||||
More efficiently handle no-op `POSITION` over replication.
|
||||
@@ -1 +0,0 @@
|
||||
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up persisting large number of outliers.
|
||||
@@ -1 +0,0 @@
|
||||
Refactor the `configure_workers_and_start.py` script used internally by Complement.
|
||||
@@ -6,7 +6,7 @@ command=/usr/local/bin/python -m synapse.app.complement_fork_starter
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
{%- for worker in workers %}
|
||||
-- synapse.app.generic_worker
|
||||
-- {{ worker.app }}
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/conf/workers/{{ worker.name }}.yaml
|
||||
@@ -36,7 +36,7 @@ exitcodes=0
|
||||
|
||||
{% for worker in workers %}
|
||||
[program:synapse_{{ worker.name }}]
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.generic_worker
|
||||
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
|
||||
--config-path="{{ main_config_path }}"
|
||||
--config-path=/conf/workers/shared.yaml
|
||||
--config-path=/conf/workers/{{ worker.name }}.yaml
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
# Values will be change depending on whichever workers are selected when
|
||||
# running that image.
|
||||
|
||||
worker_app: "synapse.app.generic_worker"
|
||||
worker_app: "{{ app }}"
|
||||
worker_name: "{{ name }}"
|
||||
|
||||
worker_listeners:
|
||||
|
||||
@@ -47,21 +47,16 @@
|
||||
# in the project's README), this script may be run multiple times, and functionality should
|
||||
# continue to work if so.
|
||||
|
||||
import dataclasses
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass, field
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Mapping,
|
||||
@@ -83,32 +78,9 @@ MAIN_PROCESS_REPLICATION_PORT = 9093
|
||||
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
|
||||
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
|
||||
|
||||
# We place a file at this path to indicate that the script has already been
|
||||
# run and should not be run again.
|
||||
MARKER_FILE_PATH = "/conf/workers_have_been_configured"
|
||||
|
||||
|
||||
@dataclass
|
||||
class WorkerTemplate:
|
||||
"""
|
||||
A definition of individual settings for a specific worker type.
|
||||
A worker name can be fed into the template in order to generate a config.
|
||||
|
||||
These worker templates can be merged with `merge_worker_template_configs`
|
||||
in order for a single worker to be made from multiple templates.
|
||||
"""
|
||||
|
||||
listener_resources: Set[str] = field(default_factory=set)
|
||||
endpoint_patterns: Set[str] = field(default_factory=set)
|
||||
# (worker_name) -> {config}
|
||||
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
|
||||
worker_extra_conf: str = ""
|
||||
|
||||
stream_writers: Set[str] = field(default_factory=set)
|
||||
|
||||
# True if and only if multiple of this worker type are allowed.
|
||||
sharding_allowed: bool = True
|
||||
|
||||
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
|
||||
# during processing with the name of the worker.
|
||||
WORKER_PLACEHOLDER_NAME = "placeholder_name"
|
||||
|
||||
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
|
||||
# Watching /_matrix/client needs a "client" listener
|
||||
@@ -116,60 +88,75 @@ class WorkerTemplate:
|
||||
# Watching /_matrix/media and related needs a "media" listener
|
||||
# Stream Writers require "client" and "replication" listeners because they
|
||||
# have to attach by instance_map to the master process and have client endpoints.
|
||||
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
|
||||
"pusher": WorkerTemplate(
|
||||
shared_extra_conf=lambda worker_name: {
|
||||
"pusher_instances": [worker_name],
|
||||
}
|
||||
),
|
||||
"user_dir": WorkerTemplate(
|
||||
listener_resources={"client"},
|
||||
endpoint_patterns={
|
||||
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
"pusher": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": [],
|
||||
"endpoint_patterns": [],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"user_dir": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
|
||||
],
|
||||
"shared_extra_conf": {
|
||||
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
|
||||
},
|
||||
shared_extra_conf=lambda worker_name: {
|
||||
"update_user_directory_from_worker": worker_name
|
||||
},
|
||||
),
|
||||
"media_repository": WorkerTemplate(
|
||||
listener_resources={"media"},
|
||||
endpoint_patterns={
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"media_repository": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["media"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/media/",
|
||||
"^/_synapse/admin/v1/purge_media_cache$",
|
||||
"^/_synapse/admin/v1/room/.*/media.*$",
|
||||
"^/_synapse/admin/v1/user/.*/media.*$",
|
||||
"^/_synapse/admin/v1/media/.*$",
|
||||
"^/_synapse/admin/v1/quarantine_media/.*$",
|
||||
},
|
||||
],
|
||||
# The first configured media worker will run the media background jobs
|
||||
shared_extra_conf=lambda worker_name: {
|
||||
"shared_extra_conf": {
|
||||
"enable_media_repo": False,
|
||||
"media_instance_running_background_jobs": worker_name,
|
||||
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
|
||||
},
|
||||
worker_extra_conf="enable_media_repo: true",
|
||||
),
|
||||
"appservice": WorkerTemplate(
|
||||
shared_extra_conf=lambda worker_name: {
|
||||
"notify_appservices_from_worker": worker_name
|
||||
"worker_extra_conf": "enable_media_repo: true",
|
||||
},
|
||||
"appservice": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": [],
|
||||
"endpoint_patterns": [],
|
||||
"shared_extra_conf": {
|
||||
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
|
||||
},
|
||||
),
|
||||
"federation_sender": WorkerTemplate(
|
||||
shared_extra_conf=lambda worker_name: {
|
||||
"federation_sender_instances": [worker_name],
|
||||
}
|
||||
),
|
||||
"synchrotron": WorkerTemplate(
|
||||
listener_resources={"client"},
|
||||
endpoint_patterns={
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"federation_sender": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": [],
|
||||
"endpoint_patterns": [],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"synchrotron": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
|
||||
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
|
||||
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
|
||||
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
|
||||
},
|
||||
),
|
||||
"client_reader": WorkerTemplate(
|
||||
listener_resources={"client"},
|
||||
endpoint_patterns={
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"client_reader": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
|
||||
@@ -197,11 +184,14 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
|
||||
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
|
||||
"^/_matrix/client/(r0|v3|unstable)/notifications$",
|
||||
},
|
||||
),
|
||||
"federation_reader": WorkerTemplate(
|
||||
listener_resources={"federation"},
|
||||
endpoint_patterns={
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"federation_reader": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["federation"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/federation/(v1|v2)/event/",
|
||||
"^/_matrix/federation/(v1|v2)/state/",
|
||||
"^/_matrix/federation/(v1|v2)/state_ids/",
|
||||
@@ -221,73 +211,97 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
|
||||
"^/_matrix/federation/(v1|v2)/user/devices/",
|
||||
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
|
||||
"^/_matrix/key/v2/query",
|
||||
},
|
||||
),
|
||||
"federation_inbound": WorkerTemplate(
|
||||
listener_resources={"federation"},
|
||||
endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"},
|
||||
),
|
||||
"event_persister": WorkerTemplate(
|
||||
listener_resources={"replication"},
|
||||
stream_writers={"events"},
|
||||
),
|
||||
"background_worker": WorkerTemplate(
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"federation_inbound": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["federation"],
|
||||
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"event_persister": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["replication"],
|
||||
"endpoint_patterns": [],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"background_worker": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": [],
|
||||
"endpoint_patterns": [],
|
||||
# This worker cannot be sharded. Therefore, there should only ever be one
|
||||
# background worker. This is enforced for the safety of your database.
|
||||
shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
"event_creator": WorkerTemplate(
|
||||
listener_resources={"client"},
|
||||
endpoint_patterns={
|
||||
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"event_creator": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
|
||||
},
|
||||
),
|
||||
"frontend_proxy": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"},
|
||||
),
|
||||
"account_data": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"frontend_proxy": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"account_data": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
|
||||
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
|
||||
},
|
||||
stream_writers={"account_data"},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
"presence": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
|
||||
stream_writers={"presence"},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
"receipts": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"presence": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"receipts": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
|
||||
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
|
||||
},
|
||||
stream_writers={"receipts"},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
"to_device": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
|
||||
stream_writers={"to_device"},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
"typing": WorkerTemplate(
|
||||
listener_resources={"client", "replication"},
|
||||
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
|
||||
stream_writers={"typing"},
|
||||
sharding_allowed=False,
|
||||
),
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"to_device": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
"typing": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": ["client", "replication"],
|
||||
"endpoint_patterns": [
|
||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
|
||||
],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
},
|
||||
}
|
||||
|
||||
# Templates for sections that may be inserted multiple times in config files
|
||||
@@ -322,45 +336,6 @@ def flush_buffers() -> None:
|
||||
sys.stderr.flush()
|
||||
|
||||
|
||||
def merged(a: Any, b: Any) -> Any:
|
||||
"""
|
||||
Merges `a` and `b` together, returning the result.
|
||||
|
||||
The merge is performed with the following rules:
|
||||
|
||||
- dicts: values with the same key will be merged recursively
|
||||
- lists: `new` will be appended to `dest`
|
||||
- primitives: they will be checked for equality and inequality will result
|
||||
in a ValueError
|
||||
|
||||
|
||||
It is an error for `a` and `b` to be of different types.
|
||||
"""
|
||||
if isinstance(a, dict) and isinstance(b, dict):
|
||||
result = {}
|
||||
for key in set(a.keys()) | set(b.keys()):
|
||||
if key in a and key in b:
|
||||
result[key] = merged(a[key], b[key])
|
||||
elif key in a:
|
||||
result[key] = deepcopy(a[key])
|
||||
else:
|
||||
result[key] = deepcopy(b[key])
|
||||
|
||||
return result
|
||||
elif isinstance(a, list) and isinstance(b, list):
|
||||
return deepcopy(a) + deepcopy(b)
|
||||
elif type(a) != type(b):
|
||||
raise TypeError(f"Cannot merge {type(a).__name__} and {type(b).__name__}")
|
||||
elif a != b:
|
||||
raise ValueError(f"Cannot merge primitive values: {a!r} != {b!r}")
|
||||
|
||||
if type(a) not in {str, int, float, bool, None.__class__}:
|
||||
raise TypeError(
|
||||
f"Cannot use `merged` on type {a} as it may not be safe (must either be an immutable primitive or must have special copy/merge logic)"
|
||||
)
|
||||
return a
|
||||
|
||||
|
||||
def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||
"""Generate a file from a template
|
||||
|
||||
@@ -389,84 +364,138 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||
outfile.write(rendered)
|
||||
|
||||
|
||||
def add_worker_to_instance_map(
|
||||
def add_worker_roles_to_shared_config(
|
||||
shared_config: dict,
|
||||
worker_types_set: Set[str],
|
||||
worker_name: str,
|
||||
worker_port: int,
|
||||
) -> None:
|
||||
"""
|
||||
Update the shared config map to add the worker in the instance_map.
|
||||
"""Given a dictionary representing a config file shared across all workers,
|
||||
append appropriate worker information to it for the current worker_type instance.
|
||||
|
||||
Args:
|
||||
shared_config: The config dict that all worker instances share (after being
|
||||
converted to YAML)
|
||||
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
|
||||
This list can be a single worker type or multiple.
|
||||
worker_name: The name of the worker instance.
|
||||
worker_port: The HTTP replication port that the worker instance is listening on.
|
||||
"""
|
||||
# The instance_map config field marks the workers that write to various replication
|
||||
# streams
|
||||
instance_map = shared_config.setdefault("instance_map", {})
|
||||
|
||||
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
|
||||
instance_map[worker_name] = {
|
||||
"path": f"/run/worker.{worker_port}",
|
||||
}
|
||||
else:
|
||||
instance_map[worker_name] = {
|
||||
"host": "localhost",
|
||||
"port": worker_port,
|
||||
}
|
||||
# This is a list of the stream_writers that there can be only one of. Events can be
|
||||
# sharded, and therefore doesn't belong here.
|
||||
singular_stream_writers = [
|
||||
"account_data",
|
||||
"presence",
|
||||
"receipts",
|
||||
"to_device",
|
||||
"typing",
|
||||
]
|
||||
|
||||
# Worker-type specific sharding config. Now a single worker can fulfill multiple
|
||||
# roles, check each.
|
||||
if "pusher" in worker_types_set:
|
||||
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
||||
|
||||
if "federation_sender" in worker_types_set:
|
||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||
|
||||
if "event_persister" in worker_types_set:
|
||||
# Event persisters write to the events stream, so we need to update
|
||||
# the list of event stream writers
|
||||
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
|
||||
worker_name
|
||||
)
|
||||
|
||||
# Map of stream writer instance names to host/ports combos
|
||||
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
|
||||
instance_map[worker_name] = {
|
||||
"path": f"/run/worker.{worker_port}",
|
||||
}
|
||||
else:
|
||||
instance_map[worker_name] = {
|
||||
"host": "localhost",
|
||||
"port": worker_port,
|
||||
}
|
||||
# Update the list of stream writers. It's convenient that the name of the worker
|
||||
# type is the same as the stream to write. Iterate over the whole list in case there
|
||||
# is more than one.
|
||||
for worker in worker_types_set:
|
||||
if worker in singular_stream_writers:
|
||||
shared_config.setdefault("stream_writers", {}).setdefault(
|
||||
worker, []
|
||||
).append(worker_name)
|
||||
|
||||
# Map of stream writer instance names to host/ports combos
|
||||
# For now, all stream writers need http replication ports
|
||||
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
|
||||
instance_map[worker_name] = {
|
||||
"path": f"/run/worker.{worker_port}",
|
||||
}
|
||||
else:
|
||||
instance_map[worker_name] = {
|
||||
"host": "localhost",
|
||||
"port": worker_port,
|
||||
}
|
||||
|
||||
|
||||
def merge_worker_template_configs(
|
||||
left: WorkerTemplate,
|
||||
right: WorkerTemplate,
|
||||
) -> WorkerTemplate:
|
||||
"""Merges two templates together, returning a new template that includes
|
||||
the listeners, endpoint patterns and configuration from both.
|
||||
|
||||
Does not mutate the input templates.
|
||||
"""
|
||||
|
||||
return WorkerTemplate(
|
||||
# include listener resources from both
|
||||
listener_resources=left.listener_resources | right.listener_resources,
|
||||
# include endpoint patterns from both
|
||||
endpoint_patterns=left.endpoint_patterns | right.endpoint_patterns,
|
||||
# merge shared config dictionaries; the worker name will be replaced later
|
||||
shared_extra_conf=lambda worker_name: merged(
|
||||
left.shared_extra_conf(worker_name),
|
||||
right.shared_extra_conf(worker_name),
|
||||
),
|
||||
# There is only one worker type that has a 'worker_extra_conf' and it is
|
||||
# the media_repo. Since duplicate worker types on the same worker don't
|
||||
# work, this is fine.
|
||||
worker_extra_conf=(left.worker_extra_conf + right.worker_extra_conf),
|
||||
# (This is unused, but in principle sharding this hybrid worker type
|
||||
# would be allowed if both constituent types are shardable)
|
||||
sharding_allowed=left.sharding_allowed and right.sharding_allowed,
|
||||
# include stream writers from both
|
||||
stream_writers=left.stream_writers | right.stream_writers,
|
||||
)
|
||||
|
||||
|
||||
def instantiate_worker_template(
|
||||
template: WorkerTemplate, worker_name: str
|
||||
existing_dict: Optional[Dict[str, Any]],
|
||||
to_be_merged_dict: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""Given a worker template, instantiate it into a worker configuration
|
||||
(which is currently represented as a dictionary).
|
||||
"""When given an existing dict of worker template configuration consisting with both
|
||||
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
|
||||
return new dict.
|
||||
|
||||
Args:
|
||||
template: The WorkerTemplate to template
|
||||
worker_name: The name of the worker to use.
|
||||
Returns: worker configuration dictionary
|
||||
existing_dict: Either an existing worker template or a fresh blank one.
|
||||
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
|
||||
existing_dict.
|
||||
Returns: The newly merged together dict values.
|
||||
"""
|
||||
worker_config_dict = dataclasses.asdict(template)
|
||||
stream_writers_dict = {writer: worker_name for writer in template.stream_writers}
|
||||
worker_config_dict["shared_extra_conf"] = merged(
|
||||
template.shared_extra_conf(worker_name), stream_writers_dict
|
||||
)
|
||||
worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns)
|
||||
worker_config_dict["listener_resources"] = sorted(template.listener_resources)
|
||||
return worker_config_dict
|
||||
new_dict: Dict[str, Any] = {}
|
||||
if not existing_dict:
|
||||
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
|
||||
new_dict = to_be_merged_dict.copy()
|
||||
else:
|
||||
for i in to_be_merged_dict.keys():
|
||||
if (i == "endpoint_patterns") or (i == "listener_resources"):
|
||||
# merge the two lists, remove duplicates
|
||||
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
|
||||
elif i == "shared_extra_conf":
|
||||
# merge dictionary's, the worker name will be replaced later
|
||||
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
|
||||
elif i == "worker_extra_conf":
|
||||
# There is only one worker type that has a 'worker_extra_conf' and it is
|
||||
# the media_repo. Since duplicate worker types on the same worker don't
|
||||
# work, this is fine.
|
||||
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
|
||||
else:
|
||||
# Everything else should be identical, like "app", which only works
|
||||
# because all apps are now generic_workers.
|
||||
new_dict[i] = to_be_merged_dict[i]
|
||||
return new_dict
|
||||
|
||||
|
||||
def insert_worker_name_for_worker_config(
|
||||
existing_dict: Dict[str, Any], worker_name: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Insert a given worker name into the worker's configuration dict.
|
||||
|
||||
Args:
|
||||
existing_dict: The worker_config dict that is imported into shared_config.
|
||||
worker_name: The name of the worker to insert.
|
||||
Returns: Copy of the dict with newly inserted worker name
|
||||
"""
|
||||
dict_to_edit = existing_dict.copy()
|
||||
for k, v in dict_to_edit["shared_extra_conf"].items():
|
||||
# Only proceed if it's the placeholder name string
|
||||
if v == WORKER_PLACEHOLDER_NAME:
|
||||
dict_to_edit["shared_extra_conf"][k] = worker_name
|
||||
return dict_to_edit
|
||||
|
||||
|
||||
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
|
||||
@@ -511,6 +540,23 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
|
||||
return new_worker_types
|
||||
|
||||
|
||||
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
|
||||
"""Helper to check to make sure worker types that cannot have multiples do not.
|
||||
|
||||
Args:
|
||||
worker_type: The type of worker to check against.
|
||||
Returns: True if allowed, False if not
|
||||
"""
|
||||
return worker_type not in [
|
||||
"background_worker",
|
||||
"account_data",
|
||||
"presence",
|
||||
"receipts",
|
||||
"typing",
|
||||
"to_device",
|
||||
]
|
||||
|
||||
|
||||
def split_and_strip_string(
|
||||
given_string: str, split_char: str, max_split: SupportsIndex = -1
|
||||
) -> List[str]:
|
||||
@@ -636,7 +682,7 @@ def parse_worker_types(
|
||||
)
|
||||
|
||||
if worker_type in worker_type_shard_counter:
|
||||
if not WORKERS_CONFIG[worker_type].sharding_allowed:
|
||||
if not is_sharding_allowed_for_worker_type(worker_type):
|
||||
error(
|
||||
f"There can be only a single worker with {worker_type} "
|
||||
"type. Please recount and remove."
|
||||
@@ -765,35 +811,36 @@ def generate_worker_files(
|
||||
# Map locations to upstreams (corresponding to worker types) in Nginx
|
||||
# but only if we use the appropriate worker type
|
||||
for worker_type in all_worker_types_in_use:
|
||||
for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns):
|
||||
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
|
||||
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
|
||||
|
||||
# For each worker type specified by the user, create config values and write it's
|
||||
# yaml config file
|
||||
for worker_name, worker_types_set in requested_worker_types.items():
|
||||
# The collected and processed data will live here.
|
||||
worker_template: WorkerTemplate = WorkerTemplate()
|
||||
worker_config: Dict[str, Any] = {}
|
||||
|
||||
# Merge all worker config templates for this worker into a single config
|
||||
for worker_type in worker_types_set:
|
||||
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
|
||||
|
||||
# Merge worker type template configuration data. It's a combination of lists
|
||||
# and dicts, so use this helper.
|
||||
worker_template = merge_worker_template_configs(
|
||||
worker_template, WORKERS_CONFIG[worker_type]
|
||||
worker_config = merge_worker_template_configs(
|
||||
worker_config, copy_of_template_config
|
||||
)
|
||||
|
||||
# Replace placeholder names in the config template with the actual worker name.
|
||||
worker_config: Dict[str, Any] = instantiate_worker_template(
|
||||
worker_template, worker_name
|
||||
)
|
||||
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
|
||||
|
||||
worker_config.update(
|
||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||
)
|
||||
|
||||
# Update the shared config with any options needed to enable this worker.
|
||||
shared_config = merged(shared_config, worker_config["shared_extra_conf"])
|
||||
|
||||
# Update the shared config with any worker_type specific options. The first of a
|
||||
# given worker_type needs to stay assigned and not be replaced.
|
||||
worker_config["shared_extra_conf"].update(shared_config)
|
||||
shared_config = worker_config["shared_extra_conf"]
|
||||
if using_unix_sockets:
|
||||
healthcheck_urls.append(
|
||||
f"--unix-socket /run/worker.{worker_port} http://localhost/health"
|
||||
@@ -801,10 +848,10 @@ def generate_worker_files(
|
||||
else:
|
||||
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
|
||||
|
||||
# Add all workers to the `instance_map`
|
||||
# Technically only certain types of workers, such as stream writers, are needed
|
||||
# here but it is simpler just to be consistent.
|
||||
add_worker_to_instance_map(shared_config, worker_name, worker_port)
|
||||
# Update the shared config with sharding-related options if necessary
|
||||
add_worker_roles_to_shared_config(
|
||||
shared_config, worker_types_set, worker_name, worker_port
|
||||
)
|
||||
|
||||
# Enable the worker in supervisord
|
||||
worker_descriptors.append(worker_config)
|
||||
@@ -971,14 +1018,6 @@ def generate_worker_log_config(
|
||||
|
||||
|
||||
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--generate-only",
|
||||
action="store_true",
|
||||
help="Only generate configuration; don't run Synapse.",
|
||||
)
|
||||
opts = parser.parse_args(args)
|
||||
|
||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||
@@ -995,8 +1034,8 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
log("Base homeserver config exists—not regenerating")
|
||||
# This script may be run multiple times (mostly by Complement, see note at top of
|
||||
# file). Don't re-configure workers in this instance.
|
||||
|
||||
if not os.path.exists(MARKER_FILE_PATH):
|
||||
mark_filepath = "/conf/workers_have_been_configured"
|
||||
if not os.path.exists(mark_filepath):
|
||||
# Collect and validate worker_type requests
|
||||
# Read the desired worker configuration from the environment
|
||||
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
|
||||
@@ -1015,15 +1054,11 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
|
||||
|
||||
# Mark workers as being configured
|
||||
with open(MARKER_FILE_PATH, "w") as f:
|
||||
with open(mark_filepath, "w") as f:
|
||||
f.write("")
|
||||
else:
|
||||
log("Worker config exists—not regenerating")
|
||||
|
||||
if opts.generate_only:
|
||||
log("--generate-only: won't run Synapse")
|
||||
return
|
||||
|
||||
# Lifted right out of start.py
|
||||
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
|
||||
|
||||
@@ -1046,4 +1081,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1:], os.environ)
|
||||
main(sys.argv, os.environ)
|
||||
|
||||
@@ -88,7 +88,7 @@ from synapse.types import (
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
|
||||
from synapse.util.iterutils import batch_iter, partition
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import shortstr
|
||||
|
||||
@@ -1669,13 +1669,14 @@ class FederationEventHandler:
|
||||
|
||||
# XXX: it might be possible to kick this process off in parallel with fetching
|
||||
# the events.
|
||||
while event_map:
|
||||
# build a list of events whose auth events are not in the queue.
|
||||
roots = tuple(
|
||||
ev
|
||||
for ev in event_map.values()
|
||||
if not any(aid in event_map for aid in ev.auth_event_ids())
|
||||
)
|
||||
|
||||
# We need to persist an event's auth events before the event.
|
||||
auth_graph = {
|
||||
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
|
||||
for ev in event_map.values()
|
||||
}
|
||||
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
|
||||
if not roots:
|
||||
# if *none* of the remaining events are ready, that means
|
||||
# we have a loop. This either means a bug in our logic, or that
|
||||
@@ -1697,6 +1698,9 @@ class FederationEventHandler:
|
||||
|
||||
await self._auth_and_persist_outliers_inner(room_id, roots)
|
||||
|
||||
for ev in roots:
|
||||
del event_map[ev.event_id]
|
||||
|
||||
async def _auth_and_persist_outliers_inner(
|
||||
self, room_id: str, fetched_events: Collection[EventBase]
|
||||
) -> None:
|
||||
|
||||
@@ -257,11 +257,6 @@ class ReplicationCommandHandler:
|
||||
if hs.config.redis.redis_enabled:
|
||||
self._notifier.add_lock_released_callback(self.on_lock_released)
|
||||
|
||||
# Marks if we should send POSITION commands for all streams ASAP. This
|
||||
# is checked by the `ReplicationStreamer` which manages sending
|
||||
# RDATA/POSITION commands
|
||||
self._should_announce_positions = True
|
||||
|
||||
def subscribe_to_channel(self, channel_name: str) -> None:
|
||||
"""
|
||||
Indicates that we wish to subscribe to a Redis channel by name.
|
||||
@@ -402,23 +397,29 @@ class ReplicationCommandHandler:
|
||||
return self._streams_to_replicate
|
||||
|
||||
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
|
||||
self.send_positions_to_connection()
|
||||
self.send_positions_to_connection(conn)
|
||||
|
||||
def send_positions_to_connection(self) -> None:
|
||||
def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
|
||||
"""Send current position of all streams this process is source of to
|
||||
the connection.
|
||||
"""
|
||||
|
||||
self._should_announce_positions = True
|
||||
self._notifier.notify_replication()
|
||||
|
||||
def should_announce_positions(self) -> bool:
|
||||
"""Check if we should send POSITION commands for all streams ASAP."""
|
||||
return self._should_announce_positions
|
||||
|
||||
def will_announce_positions(self) -> None:
|
||||
"""Mark that we're about to send POSITIONs out for all streams."""
|
||||
self._should_announce_positions = False
|
||||
# We respond with current position of all streams this instance
|
||||
# replicates.
|
||||
for stream in self.get_streams_to_replicate():
|
||||
# Note that we use the current token as the prev token here (rather
|
||||
# than stream.last_token), as we can't be sure that there have been
|
||||
# no rows written between last token and the current token (since we
|
||||
# might be racing with the replication sending bg process).
|
||||
current_token = stream.current_token(self._instance_name)
|
||||
self.send_command(
|
||||
PositionCommand(
|
||||
stream.NAME,
|
||||
self._instance_name,
|
||||
current_token,
|
||||
current_token,
|
||||
)
|
||||
)
|
||||
|
||||
def on_USER_SYNC(
|
||||
self, conn: IReplicationConnection, cmd: UserSyncCommand
|
||||
@@ -587,21 +588,6 @@ class ReplicationCommandHandler:
|
||||
|
||||
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
|
||||
|
||||
# Check if we can early discard this position. We can only do so for
|
||||
# connected streams.
|
||||
stream = self._streams[cmd.stream_name]
|
||||
if stream.can_discard_position(
|
||||
cmd.instance_name, cmd.prev_token, cmd.new_token
|
||||
) and self.is_stream_connected(conn, cmd.stream_name):
|
||||
logger.debug(
|
||||
"Discarding redundant POSITION %s/%s %s %s",
|
||||
cmd.instance_name,
|
||||
cmd.stream_name,
|
||||
cmd.prev_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
return
|
||||
|
||||
self._add_command_to_stream_queue(conn, cmd)
|
||||
|
||||
async def _process_position(
|
||||
@@ -613,18 +599,6 @@ class ReplicationCommandHandler:
|
||||
"""
|
||||
stream = self._streams[stream_name]
|
||||
|
||||
if stream.can_discard_position(
|
||||
cmd.instance_name, cmd.prev_token, cmd.new_token
|
||||
) and self.is_stream_connected(conn, cmd.stream_name):
|
||||
logger.debug(
|
||||
"Discarding redundant POSITION %s/%s %s %s",
|
||||
cmd.instance_name,
|
||||
cmd.stream_name,
|
||||
cmd.prev_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
return
|
||||
|
||||
# We're about to go and catch up with the stream, so remove from set
|
||||
# of connected streams.
|
||||
for streams in self._streams_by_connection.values():
|
||||
@@ -652,9 +626,8 @@ class ReplicationCommandHandler:
|
||||
# for why this can happen.
|
||||
|
||||
logger.info(
|
||||
"Fetching replication rows for '%s' / %s between %i and %i",
|
||||
"Fetching replication rows for '%s' between %i and %i",
|
||||
stream_name,
|
||||
cmd.instance_name,
|
||||
current_token,
|
||||
cmd.new_token,
|
||||
)
|
||||
@@ -684,13 +657,6 @@ class ReplicationCommandHandler:
|
||||
|
||||
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
|
||||
|
||||
def is_stream_connected(
|
||||
self, conn: IReplicationConnection, stream_name: str
|
||||
) -> bool:
|
||||
"""Return if stream has been successfully connected and is ready to
|
||||
receive updates"""
|
||||
return stream_name in self._streams_by_connection.get(conn, ())
|
||||
|
||||
def on_REMOTE_SERVER_UP(
|
||||
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
|
||||
) -> None:
|
||||
|
||||
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
|
||||
# We send out our positions when there is a new connection in case the
|
||||
# other side missed updates. We do this for Redis connections as the
|
||||
# otherside won't know we've connected and so won't issue a REPLICATE.
|
||||
self.synapse_handler.send_positions_to_connection()
|
||||
self.synapse_handler.send_positions_to_connection(self)
|
||||
|
||||
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
|
||||
"""Received a message from redis."""
|
||||
|
||||
@@ -123,7 +123,7 @@ class ReplicationStreamer:
|
||||
|
||||
# We check up front to see if anything has actually changed, as we get
|
||||
# poked because of changes that happened on other instances.
|
||||
if not self.command_handler.should_announce_positions() and all(
|
||||
if all(
|
||||
stream.last_token == stream.current_token(self._instance_name)
|
||||
for stream in self.streams
|
||||
):
|
||||
@@ -158,21 +158,6 @@ class ReplicationStreamer:
|
||||
all_streams = list(all_streams)
|
||||
random.shuffle(all_streams)
|
||||
|
||||
if self.command_handler.should_announce_positions():
|
||||
# We need to send out POSITIONs for all streams, usually
|
||||
# because a worker has reconnected.
|
||||
self.command_handler.will_announce_positions()
|
||||
|
||||
for stream in all_streams:
|
||||
self.command_handler.send_command(
|
||||
PositionCommand(
|
||||
stream.NAME,
|
||||
self._instance_name,
|
||||
stream.last_token,
|
||||
stream.last_token,
|
||||
)
|
||||
)
|
||||
|
||||
for stream in all_streams:
|
||||
if stream.last_token == stream.current_token(
|
||||
self._instance_name
|
||||
|
||||
@@ -144,16 +144,6 @@ class Stream:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
"""Whether or not a position command for this stream can be discarded.
|
||||
|
||||
Useful for streams that can never go backwards and where we already know
|
||||
the stream ID for the instance has advanced.
|
||||
"""
|
||||
return False
|
||||
|
||||
def discard_updates_and_advance(self) -> None:
|
||||
"""Called when the stream should advance but the updates would be discarded,
|
||||
e.g. when there are no currently connected workers.
|
||||
@@ -231,14 +221,6 @@ class _StreamFromIdGen(Stream):
|
||||
def minimal_local_current_token(self) -> Token:
|
||||
return self._stream_id_gen.get_minimal_local_current_token()
|
||||
|
||||
def can_discard_position(
|
||||
self, instance_name: str, prev_token: int, new_token: int
|
||||
) -> bool:
|
||||
# These streams can't go backwards, so we know we can ignore any
|
||||
# positions where the tokens are from before the current token.
|
||||
|
||||
return new_token <= self.current_token(instance_name)
|
||||
|
||||
|
||||
def current_token_without_instance(
|
||||
current_token: Callable[[], int]
|
||||
|
||||
@@ -768,8 +768,9 @@ class BackgroundUpdater:
|
||||
|
||||
# override the global statement timeout to avoid accidentally squashing
|
||||
# a long-running index creation process
|
||||
timeout_sql = "SET SESSION statement_timeout = 0"
|
||||
c.execute(timeout_sql)
|
||||
self.db_pool.engine.attempt_to_set_statement_timeout(
|
||||
c, 0, for_transaction=True
|
||||
)
|
||||
|
||||
sql = (
|
||||
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
|
||||
@@ -791,12 +792,6 @@ class BackgroundUpdater:
|
||||
logger.debug("[SQL] %s", sql)
|
||||
c.execute(sql)
|
||||
finally:
|
||||
# mypy ignore - `statement_timeout` is defined on PostgresEngine
|
||||
# reset the global timeout to the default
|
||||
default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined]
|
||||
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
|
||||
conn.cursor().execute(undo_timeout_sql)
|
||||
|
||||
conn.engine.attempt_to_set_autocommit(conn.conn, False)
|
||||
|
||||
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
|
||||
|
||||
@@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# furthermore, we might already have the table from a previous (failed)
|
||||
# purge attempt, so let's drop the table first.
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# Disable statement timeouts for this transaction; purging rooms can
|
||||
# take a while!
|
||||
txn.execute("SET LOCAL statement_timeout = 0")
|
||||
# Disable statement timeouts for this transaction; purging rooms can
|
||||
# take a while!
|
||||
self.database_engine.attempt_to_set_statement_timeout(
|
||||
txn, 0, for_transaction=True
|
||||
)
|
||||
|
||||
txn.execute("DROP TABLE IF EXISTS events_to_purge")
|
||||
|
||||
|
||||
@@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor)
|
||||
|
||||
|
||||
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
|
||||
# The default statement timeout to use for transactions.
|
||||
statement_timeout: Optional[int] = None
|
||||
|
||||
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
|
||||
self.module = module
|
||||
|
||||
@@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def attempt_to_set_statement_timeout(
|
||||
self, cursor: CursorType, statement_timeout: int, for_transaction: bool
|
||||
) -> None:
|
||||
"""Attempt to set the cursor's statement timeout.
|
||||
|
||||
Note this has no effect on SQLite3.
|
||||
"""
|
||||
...
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def executescript(cursor: CursorType, script: str) -> None:
|
||||
|
||||
@@ -52,7 +52,7 @@ class PostgresEngine(
|
||||
# some degenerate query plan has been created and the client has probably
|
||||
# timed out/walked off anyway.
|
||||
# This is in milliseconds.
|
||||
self.statement_timeout: Optional[int] = database_config.get(
|
||||
self.statement_timeout = database_config.get(
|
||||
"statement_timeout", 60 * 60 * 1000
|
||||
)
|
||||
self._version: Optional[int] = None # unknown as yet
|
||||
@@ -169,7 +169,11 @@ class PostgresEngine(
|
||||
|
||||
# Abort really long-running statements and turn them into errors.
|
||||
if self.statement_timeout is not None:
|
||||
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
|
||||
self.attempt_to_set_statement_timeout(
|
||||
cast(psycopg2.extensions.cursor, cursor.txn),
|
||||
self.statement_timeout,
|
||||
for_transaction=False,
|
||||
)
|
||||
|
||||
cursor.close()
|
||||
db_conn.commit()
|
||||
@@ -233,6 +237,18 @@ class PostgresEngine(
|
||||
isolation_level = self.isolation_level_map[isolation_level]
|
||||
return conn.set_isolation_level(isolation_level)
|
||||
|
||||
def attempt_to_set_statement_timeout(
|
||||
self,
|
||||
cursor: psycopg2.extensions.cursor,
|
||||
statement_timeout: int,
|
||||
for_transaction: bool,
|
||||
) -> None:
|
||||
if for_transaction:
|
||||
sql = "SET LOCAL statement_timeout TO ?"
|
||||
else:
|
||||
sql = "SET statement_timeout TO ?"
|
||||
cursor.execute(sql, (statement_timeout,))
|
||||
|
||||
@staticmethod
|
||||
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
|
||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||
|
||||
@@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
|
||||
# All transactions are SERIALIZABLE by default in sqlite
|
||||
pass
|
||||
|
||||
def attempt_to_set_statement_timeout(
|
||||
self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool
|
||||
) -> None:
|
||||
# Not supported.
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
|
||||
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
|
||||
-- Any table that doesn't have a primary key should be annotated explicitly with
|
||||
-- a REPLICA IDENTITY so that logical replication can be used.
|
||||
-- If this is not done, then UPDATE and DELETE statements on those tables
|
||||
-- will fail if logical replication is in use.
|
||||
|
||||
|
||||
-- Re-use unique indices already defined on tables as a replica identity.
|
||||
ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key;
|
||||
ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key;
|
||||
ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness;
|
||||
ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key;
|
||||
ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key;
|
||||
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
|
||||
-- Any table that doesn't have a primary key should be annotated explicitly with
|
||||
-- a REPLICA IDENTITY so that logical replication can be used.
|
||||
-- If this is not done, then UPDATE and DELETE statements on those tables
|
||||
-- will fail if logical replication is in use.
|
||||
|
||||
|
||||
-- Where possible, re-use unique indices already defined on tables as a replica
|
||||
-- identity.
|
||||
ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness;
|
||||
ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key;
|
||||
ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key;
|
||||
ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key;
|
||||
ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key;
|
||||
ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness;
|
||||
ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness;
|
||||
ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness;
|
||||
ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness;
|
||||
ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key;
|
||||
ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx;
|
||||
ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key;
|
||||
ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key;
|
||||
ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key;
|
||||
ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key;
|
||||
ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key;
|
||||
ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key;
|
||||
ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id;
|
||||
ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id;
|
||||
ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx;
|
||||
ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key;
|
||||
ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key;
|
||||
ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key;
|
||||
ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key;
|
||||
ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key;
|
||||
ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key;
|
||||
ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness;
|
||||
ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key;
|
||||
ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key;
|
||||
ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key;
|
||||
ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key;
|
||||
ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness;
|
||||
ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness;
|
||||
ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness;
|
||||
ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key;
|
||||
ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key;
|
||||
ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key;
|
||||
ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness;
|
||||
ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key;
|
||||
ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key;
|
||||
ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key;
|
||||
ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key;
|
||||
ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key;
|
||||
ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address;
|
||||
ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key;
|
||||
ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key;
|
||||
|
||||
-- special cases: unique indices on nullable columns can't be used
|
||||
ALTER TABLE event_push_actions REPLICA IDENTITY FULL;
|
||||
ALTER TABLE local_media_repository REPLICA IDENTITY FULL;
|
||||
ALTER TABLE receipts_graph REPLICA IDENTITY FULL;
|
||||
ALTER TABLE receipts_linearized REPLICA IDENTITY FULL;
|
||||
ALTER TABLE received_transactions REPLICA IDENTITY FULL;
|
||||
ALTER TABLE remote_media_cache REPLICA IDENTITY FULL;
|
||||
ALTER TABLE server_signature_keys REPLICA IDENTITY FULL;
|
||||
ALTER TABLE users REPLICA IDENTITY FULL;
|
||||
@@ -135,54 +135,3 @@ def sorted_topologically(
|
||||
degree_map[edge] -= 1
|
||||
if degree_map[edge] == 0:
|
||||
heapq.heappush(zero_degree, edge)
|
||||
|
||||
|
||||
def sorted_topologically_batched(
|
||||
nodes: Iterable[T],
|
||||
graph: Mapping[T, Collection[T]],
|
||||
) -> Generator[Collection[T], None, None]:
|
||||
r"""Walk the graph topologically, returning batches of nodes where all nodes
|
||||
that references it have been previously returned.
|
||||
|
||||
For example, given the following graph:
|
||||
|
||||
A
|
||||
/ \
|
||||
B C
|
||||
\ /
|
||||
D
|
||||
|
||||
This function will return: `[[A], [B, C], [D]]`.
|
||||
|
||||
This function is useful for e.g. batch persisting events in an auth chain,
|
||||
where we can only persist an event if all its auth events have already been
|
||||
persisted.
|
||||
"""
|
||||
|
||||
degree_map = {node: 0 for node in nodes}
|
||||
reverse_graph: Dict[T, Set[T]] = {}
|
||||
|
||||
for node, edges in graph.items():
|
||||
if node not in degree_map:
|
||||
continue
|
||||
|
||||
for edge in set(edges):
|
||||
if edge in degree_map:
|
||||
degree_map[node] += 1
|
||||
|
||||
reverse_graph.setdefault(edge, set()).add(node)
|
||||
reverse_graph.setdefault(node, set())
|
||||
|
||||
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
|
||||
|
||||
while zero_degree:
|
||||
new_zero_degree = []
|
||||
for node in zero_degree:
|
||||
for edge in reverse_graph.get(node, []):
|
||||
if edge in degree_map:
|
||||
degree_map[edge] -= 1
|
||||
if degree_map[edge] == 0:
|
||||
new_zero_degree.append(edge)
|
||||
|
||||
yield zero_degree
|
||||
zero_degree = new_zero_degree
|
||||
|
||||
@@ -35,10 +35,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
typing = self.hs.get_typing_handler()
|
||||
assert isinstance(typing, TypingWriterHandler)
|
||||
|
||||
# Create a typing update before we reconnect so that there is a missing
|
||||
# update to fetch.
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
self.reconnect()
|
||||
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
@@ -95,10 +91,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
|
||||
typing = self.hs.get_typing_handler()
|
||||
assert isinstance(typing, TypingWriterHandler)
|
||||
|
||||
# Create a typing update before we reconnect so that there is a missing
|
||||
# update to fetch.
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
self.reconnect()
|
||||
|
||||
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|
||||
|
||||
@@ -313,10 +313,9 @@ class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase):
|
||||
AND table_schema not in ('pg_catalog', 'information_schema')
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM information_schema.table_constraints tc
|
||||
WHERE tc.constraint_type = 'PRIMARY KEY'
|
||||
AND tc.table_schema = tbl.table_schema
|
||||
AND tc.table_name = tbl.table_name
|
||||
FROM information_schema.key_column_usage kcu
|
||||
WHERE kcu.table_name = tbl.table_name
|
||||
AND kcu.table_schema = tbl.table_schema
|
||||
)
|
||||
)
|
||||
SELECT pg_class.oid::regclass FROM tables_no_pkey INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass
|
||||
|
||||
@@ -13,11 +13,7 @@
|
||||
# limitations under the License.
|
||||
from typing import Dict, Iterable, List, Sequence
|
||||
|
||||
from synapse.util.iterutils import (
|
||||
chunk_seq,
|
||||
sorted_topologically,
|
||||
sorted_topologically_batched,
|
||||
)
|
||||
from synapse.util.iterutils import chunk_seq, sorted_topologically
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
@@ -111,73 +107,3 @@ class SortTopologically(TestCase):
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
|
||||
|
||||
self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
|
||||
|
||||
|
||||
class SortTopologicallyBatched(TestCase):
|
||||
"Test cases for `sorted_topologically_batched`"
|
||||
|
||||
def test_empty(self) -> None:
|
||||
"Test that an empty graph works correctly"
|
||||
|
||||
graph: Dict[int, List[int]] = {}
|
||||
self.assertEqual(list(sorted_topologically_batched([], graph)), [])
|
||||
|
||||
def test_handle_empty_graph(self) -> None:
|
||||
"Test that a graph where a node doesn't have an entry is treated as empty"
|
||||
|
||||
graph: Dict[int, List[int]] = {}
|
||||
|
||||
# For disconnected nodes the output is simply sorted.
|
||||
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
|
||||
|
||||
def test_disconnected(self) -> None:
|
||||
"Test that a graph with no edges work"
|
||||
|
||||
graph: Dict[int, List[int]] = {1: [], 2: []}
|
||||
|
||||
# For disconnected nodes the output is simply sorted.
|
||||
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
|
||||
|
||||
def test_linear(self) -> None:
|
||||
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"
|
||||
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
def test_subset(self) -> None:
|
||||
"Test that only sorting a subset of the graph works"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
|
||||
|
||||
self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
|
||||
|
||||
def test_fork(self) -> None:
|
||||
"Test that a forked graph works"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
|
||||
|
||||
# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
|
||||
# always get the same one.
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
|
||||
)
|
||||
|
||||
def test_duplicates(self) -> None:
|
||||
"Test that a graph with duplicate edges work"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
def test_multiple_paths(self) -> None:
|
||||
"Test that a graph with multiple paths between two nodes work"
|
||||
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
|
||||
|
||||
self.assertEqual(
|
||||
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
|
||||
[[1], [2], [3], [4]],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user