1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Patrick Cloke
9622bda163 Abstract logic for setting the statement timeout. 2023-11-15 15:11:51 -05:00
24 changed files with 376 additions and 622 deletions

View File

@@ -1 +0,0 @@
Fix sending out of order `POSITION` over replication, causing additional database load.

View File

@@ -1 +0,0 @@
More efficiently handle no-op `POSITION` over replication.

View File

@@ -1 +0,0 @@
Add a Postgres `REPLICA IDENTITY` to tables that do not have an implicit one. This should allow use of Postgres logical replication.

View File

@@ -1 +0,0 @@
Speed up persisting large number of outliers.

View File

@@ -1 +0,0 @@
Refactor the `configure_workers_and_start.py` script used internally by Complement.

View File

@@ -6,7 +6,7 @@ command=/usr/local/bin/python -m synapse.app.complement_fork_starter
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
{%- for worker in workers %}
-- synapse.app.generic_worker
-- {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
@@ -36,7 +36,7 @@ exitcodes=0
{% for worker in workers %}
[program:synapse_{{ worker.name }}]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.generic_worker
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml

View File

@@ -3,7 +3,7 @@
# Values will be change depending on whichever workers are selected when
# running that image.
worker_app: "synapse.app.generic_worker"
worker_app: "{{ app }}"
worker_name: "{{ name }}"
worker_listeners:

View File

@@ -47,21 +47,16 @@
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.
import dataclasses
import os
import platform
import re
import subprocess
import sys
from argparse import ArgumentParser
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass, field
from itertools import chain
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
@@ -83,32 +78,9 @@ MAIN_PROCESS_REPLICATION_PORT = 9093
MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# We place a file at this path to indicate that the script has already been
# run and should not be run again.
MARKER_FILE_PATH = "/conf/workers_have_been_configured"
@dataclass
class WorkerTemplate:
"""
A definition of individual settings for a specific worker type.
A worker name can be fed into the template in order to generate a config.
These worker templates can be merged with `merge_worker_template_configs`
in order for a single worker to be made from multiple templates.
"""
listener_resources: Set[str] = field(default_factory=set)
endpoint_patterns: Set[str] = field(default_factory=set)
# (worker_name) -> {config}
shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
worker_extra_conf: str = ""
stream_writers: Set[str] = field(default_factory=set)
# True if and only if multiple of this worker type are allowed.
sharding_allowed: bool = True
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
WORKER_PLACEHOLDER_NAME = "placeholder_name"
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
@@ -116,60 +88,75 @@ class WorkerTemplate:
# Watching /_matrix/media and related needs a "media" listener
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"pusher_instances": [worker_name],
}
),
"user_dir": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"user_dir": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {
"update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
},
shared_extra_conf=lambda worker_name: {
"update_user_directory_from_worker": worker_name
},
),
"media_repository": WorkerTemplate(
listener_resources={"media"},
endpoint_patterns={
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.generic_worker",
"listener_resources": ["media"],
"endpoint_patterns": [
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
},
],
# The first configured media worker will run the media background jobs
shared_extra_conf=lambda worker_name: {
"shared_extra_conf": {
"enable_media_repo": False,
"media_instance_running_background_jobs": worker_name,
"media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
},
worker_extra_conf="enable_media_repo: true",
),
"appservice": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"notify_appservices_from_worker": worker_name
"worker_extra_conf": "enable_media_repo: true",
},
"appservice": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {
"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
},
),
"federation_sender": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"federation_sender_instances": [worker_name],
}
),
"synchrotron": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"worker_extra_conf": "",
},
"federation_sender": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"synchrotron": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
},
),
"client_reader": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"client_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
@@ -197,11 +184,14 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
},
),
"federation_reader": WorkerTemplate(
listener_resources={"federation"},
endpoint_patterns={
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": [
"^/_matrix/federation/(v1|v2)/event/",
"^/_matrix/federation/(v1|v2)/state/",
"^/_matrix/federation/(v1|v2)/state_ids/",
@@ -221,73 +211,97 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"^/_matrix/federation/(v1|v2)/user/devices/",
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query",
},
),
"federation_inbound": WorkerTemplate(
listener_resources={"federation"},
endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"},
),
"event_persister": WorkerTemplate(
listener_resources={"replication"},
stream_writers={"events"},
),
"background_worker": WorkerTemplate(
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_inbound": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"event_persister": {
"app": "synapse.app.generic_worker",
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"background_worker": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name},
sharding_allowed=False,
),
"event_creator": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
"shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
},
),
"frontend_proxy": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"},
),
"account_data": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"frontend_proxy": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"account_data": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
},
stream_writers={"account_data"},
sharding_allowed=False,
),
"presence": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
stream_writers={"presence"},
sharding_allowed=False,
),
"receipts": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"presence": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"receipts": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
},
stream_writers={"receipts"},
sharding_allowed=False,
),
"to_device": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
stream_writers={"to_device"},
sharding_allowed=False,
),
"typing": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
stream_writers={"typing"},
sharding_allowed=False,
),
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"to_device": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
}
# Templates for sections that may be inserted multiple times in config files
@@ -322,45 +336,6 @@ def flush_buffers() -> None:
sys.stderr.flush()
def merged(a: Any, b: Any) -> Any:
"""
Merges `a` and `b` together, returning the result.
The merge is performed with the following rules:
- dicts: values with the same key will be merged recursively
- lists: `new` will be appended to `dest`
- primitives: they will be checked for equality and inequality will result
in a ValueError
It is an error for `a` and `b` to be of different types.
"""
if isinstance(a, dict) and isinstance(b, dict):
result = {}
for key in set(a.keys()) | set(b.keys()):
if key in a and key in b:
result[key] = merged(a[key], b[key])
elif key in a:
result[key] = deepcopy(a[key])
else:
result[key] = deepcopy(b[key])
return result
elif isinstance(a, list) and isinstance(b, list):
return deepcopy(a) + deepcopy(b)
elif type(a) != type(b):
raise TypeError(f"Cannot merge {type(a).__name__} and {type(b).__name__}")
elif a != b:
raise ValueError(f"Cannot merge primitive values: {a!r} != {b!r}")
if type(a) not in {str, int, float, bool, None.__class__}:
raise TypeError(
f"Cannot use `merged` on type {a} as it may not be safe (must either be an immutable primitive or must have special copy/merge logic)"
)
return a
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
@@ -389,84 +364,138 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
outfile.write(rendered)
def add_worker_to_instance_map(
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
"""
Update the shared config map to add the worker in the instance_map.
"""Given a dictionary representing a config file shared across all workers,
append appropriate worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication
# streams
instance_map = shared_config.setdefault("instance_map", {})
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)
# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
left: WorkerTemplate,
right: WorkerTemplate,
) -> WorkerTemplate:
"""Merges two templates together, returning a new template that includes
the listeners, endpoint patterns and configuration from both.
Does not mutate the input templates.
"""
return WorkerTemplate(
# include listener resources from both
listener_resources=left.listener_resources | right.listener_resources,
# include endpoint patterns from both
endpoint_patterns=left.endpoint_patterns | right.endpoint_patterns,
# merge shared config dictionaries; the worker name will be replaced later
shared_extra_conf=lambda worker_name: merged(
left.shared_extra_conf(worker_name),
right.shared_extra_conf(worker_name),
),
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
worker_extra_conf=(left.worker_extra_conf + right.worker_extra_conf),
# (This is unused, but in principle sharding this hybrid worker type
# would be allowed if both constituent types are shardable)
sharding_allowed=left.sharding_allowed and right.sharding_allowed,
# include stream writers from both
stream_writers=left.stream_writers | right.stream_writers,
)
def instantiate_worker_template(
template: WorkerTemplate, worker_name: str
existing_dict: Optional[Dict[str, Any]],
to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""Given a worker template, instantiate it into a worker configuration
(which is currently represented as a dictionary).
"""When given an existing dict of worker template configuration consisting with both
dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
return new dict.
Args:
template: The WorkerTemplate to template
worker_name: The name of the worker to use.
Returns: worker configuration dictionary
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
"""
worker_config_dict = dataclasses.asdict(template)
stream_writers_dict = {writer: worker_name for writer in template.stream_writers}
worker_config_dict["shared_extra_conf"] = merged(
template.shared_extra_conf(worker_name), stream_writers_dict
)
worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns)
worker_config_dict["listener_resources"] = sorted(template.listener_resources)
return worker_config_dict
new_dict: Dict[str, Any] = {}
if not existing_dict:
# It doesn't exist yet, just use the new dict(but take a copy not a reference)
new_dict = to_be_merged_dict.copy()
else:
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced later
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works
# because all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's the placeholder name string
if v == WORKER_PLACEHOLDER_NAME:
dict_to_edit["shared_extra_conf"][k] = worker_name
return dict_to_edit
def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
@@ -511,6 +540,23 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
return new_worker_types
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
return worker_type not in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]
def split_and_strip_string(
given_string: str, split_char: str, max_split: SupportsIndex = -1
) -> List[str]:
@@ -636,7 +682,7 @@ def parse_worker_types(
)
if worker_type in worker_type_shard_counter:
if not WORKERS_CONFIG[worker_type].sharding_allowed:
if not is_sharding_allowed_for_worker_type(worker_type):
error(
f"There can be only a single worker with {worker_type} "
"type. Please recount and remove."
@@ -765,35 +811,36 @@ def generate_worker_files(
# Map locations to upstreams (corresponding to worker types) in Nginx
# but only if we use the appropriate worker type
for worker_type in all_worker_types_in_use:
for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns):
for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
nginx_locations[endpoint_pattern] = f"http://{worker_type}"
# For each worker type specified by the user, create config values and write it's
# yaml config file
for worker_name, worker_types_set in requested_worker_types.items():
# The collected and processed data will live here.
worker_template: WorkerTemplate = WorkerTemplate()
worker_config: Dict[str, Any] = {}
# Merge all worker config templates for this worker into a single config
for worker_type in worker_types_set:
copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
# Merge worker type template configuration data. It's a combination of lists
# and dicts, so use this helper.
worker_template = merge_worker_template_configs(
worker_template, WORKERS_CONFIG[worker_type]
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
)
# Replace placeholder names in the config template with the actual worker name.
worker_config: Dict[str, Any] = instantiate_worker_template(
worker_template, worker_name
)
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any options needed to enable this worker.
shared_config = merged(shared_config, worker_config["shared_extra_conf"])
# Update the shared config with any worker_type specific options. The first of a
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"]
if using_unix_sockets:
healthcheck_urls.append(
f"--unix-socket /run/worker.{worker_port} http://localhost/health"
@@ -801,10 +848,10 @@ def generate_worker_files(
else:
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Add all workers to the `instance_map`
# Technically only certain types of workers, such as stream writers, are needed
# here but it is simpler just to be consistent.
add_worker_to_instance_map(shared_config, worker_name, worker_port)
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_types_set, worker_name, worker_port
)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
@@ -971,14 +1018,6 @@ def generate_worker_log_config(
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
parser = ArgumentParser()
parser.add_argument(
"--generate-only",
action="store_true",
help="Only generate configuration; don't run Synapse.",
)
opts = parser.parse_args(args)
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
@@ -995,8 +1034,8 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
if not os.path.exists(MARKER_FILE_PATH):
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
@@ -1015,15 +1054,11 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(MARKER_FILE_PATH, "w") as f:
with open(mark_filepath, "w") as f:
f.write("")
else:
log("Worker config exists—not regenerating")
if opts.generate_only:
log("--generate-only: won't run Synapse")
return
# Lifted right out of start.py
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
@@ -1046,4 +1081,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if __name__ == "__main__":
main(sys.argv[1:], os.environ)
main(sys.argv, os.environ)

View File

@@ -88,7 +88,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -1669,13 +1669,14 @@ class FederationEventHandler:
# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
# build a list of events whose auth events are not in the queue.
roots = tuple(
ev
for ev in event_map.values()
if not any(aid in event_map for aid in ev.auth_event_ids())
)
# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
@@ -1697,6 +1698,9 @@ class FederationEventHandler:
await self._auth_and_persist_outliers_inner(room_id, roots)
for ev in roots:
del event_map[ev.event_id]
async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:

View File

@@ -257,11 +257,6 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
# Marks if we should send POSITION commands for all streams ASAP. This
# is checked by the `ReplicationStreamer` which manages sending
# RDATA/POSITION commands
self._should_announce_positions = True
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -402,23 +397,29 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
self.send_positions_to_connection()
self.send_positions_to_connection(conn)
def send_positions_to_connection(self) -> None:
def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
self._should_announce_positions = True
self._notifier.notify_replication()
def should_announce_positions(self) -> bool:
"""Check if we should send POSITION commands for all streams ASAP."""
return self._should_announce_positions
def will_announce_positions(self) -> None:
"""Mark that we're about to send POSITIONs out for all streams."""
self._should_announce_positions = False
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
# Note that we use the current token as the prev token here (rather
# than stream.last_token), as we can't be sure that there have been
# no rows written between last token and the current token (since we
# might be racing with the replication sending bg process).
current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
current_token,
current_token,
)
)
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -587,21 +588,6 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
# Check if we can early discard this position. We can only do so for
# connected streams.
stream = self._streams[cmd.stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -613,18 +599,6 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
if stream.can_discard_position(
cmd.instance_name, cmd.prev_token, cmd.new_token
) and self.is_stream_connected(conn, cmd.stream_name):
logger.debug(
"Discarding redundant POSITION %s/%s %s %s",
cmd.instance_name,
cmd.stream_name,
cmd.prev_token,
cmd.new_token,
)
return
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -652,9 +626,8 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
"Fetching replication rows for '%s' / %s between %i and %i",
"Fetching replication rows for '%s' between %i and %i",
stream_name,
cmd.instance_name,
current_token,
cmd.new_token,
)
@@ -684,13 +657,6 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
def is_stream_connected(
self, conn: IReplicationConnection, stream_name: str
) -> bool:
"""Return if stream has been successfully connected and is ready to
receive updates"""
return stream_name in self._streams_by_connection.get(conn, ())
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:

View File

@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
self.synapse_handler.send_positions_to_connection()
self.synapse_handler.send_positions_to_connection(self)
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""

View File

@@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
if not self.command_handler.should_announce_positions() and all(
if all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
@@ -158,21 +158,6 @@ class ReplicationStreamer:
all_streams = list(all_streams)
random.shuffle(all_streams)
if self.command_handler.should_announce_positions():
# We need to send out POSITIONs for all streams, usually
# because a worker has reconnected.
self.command_handler.will_announce_positions()
for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
)
for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name

View File

@@ -144,16 +144,6 @@ class Stream:
"""
raise NotImplementedError()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
"""Whether or not a position command for this stream can be discarded.
Useful for streams that can never go backwards and where we already know
the stream ID for the instance has advanced.
"""
return False
def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
@@ -231,14 +221,6 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token()
def can_discard_position(
self, instance_name: str, prev_token: int, new_token: int
) -> bool:
# These streams can't go backwards, so we know we can ignore any
# positions where the tokens are from before the current token.
return new_token <= self.current_token(instance_name)
def current_token_without_instance(
current_token: Callable[[], int]

View File

@@ -768,8 +768,9 @@ class BackgroundUpdater:
# override the global statement timeout to avoid accidentally squashing
# a long-running index creation process
timeout_sql = "SET SESSION statement_timeout = 0"
c.execute(timeout_sql)
self.db_pool.engine.attempt_to_set_statement_timeout(
c, 0, for_transaction=True
)
sql = (
"CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
@@ -791,12 +792,6 @@ class BackgroundUpdater:
logger.debug("[SQL] %s", sql)
c.execute(sql)
finally:
# mypy ignore - `statement_timeout` is defined on PostgresEngine
# reset the global timeout to the default
default_timeout = self.db_pool.engine.statement_timeout # type: ignore[attr-defined]
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:

View File

@@ -89,10 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# furthermore, we might already have the table from a previous (failed)
# purge attempt, so let's drop the table first.
if isinstance(self.database_engine, PostgresEngine):
# Disable statement timeouts for this transaction; purging rooms can
# take a while!
txn.execute("SET LOCAL statement_timeout = 0")
# Disable statement timeouts for this transaction; purging rooms can
# take a while!
self.database_engine.attempt_to_set_statement_timeout(
txn, 0, for_transaction=True
)
txn.execute("DROP TABLE IF EXISTS events_to_purge")

View File

@@ -36,6 +36,9 @@ CursorType = TypeVar("CursorType", bound=Cursor)
class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
# The default statement timeout to use for transactions.
statement_timeout: Optional[int] = None
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
self.module = module
@@ -132,6 +135,16 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""
...
@abc.abstractmethod
def attempt_to_set_statement_timeout(
self, cursor: CursorType, statement_timeout: int, for_transaction: bool
) -> None:
"""Attempt to set the cursor's statement timeout.
Note this has no effect on SQLite3.
"""
...
@staticmethod
@abc.abstractmethod
def executescript(cursor: CursorType, script: str) -> None:

View File

@@ -52,7 +52,7 @@ class PostgresEngine(
# some degenerate query plan has been created and the client has probably
# timed out/walked off anyway.
# This is in milliseconds.
self.statement_timeout: Optional[int] = database_config.get(
self.statement_timeout = database_config.get(
"statement_timeout", 60 * 60 * 1000
)
self._version: Optional[int] = None # unknown as yet
@@ -169,7 +169,11 @@ class PostgresEngine(
# Abort really long-running statements and turn them into errors.
if self.statement_timeout is not None:
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
self.attempt_to_set_statement_timeout(
cast(psycopg2.extensions.cursor, cursor.txn),
self.statement_timeout,
for_transaction=False,
)
cursor.close()
db_conn.commit()
@@ -233,6 +237,18 @@ class PostgresEngine(
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)
def attempt_to_set_statement_timeout(
self,
cursor: psycopg2.extensions.cursor,
statement_timeout: int,
for_transaction: bool,
) -> None:
if for_transaction:
sql = "SET LOCAL statement_timeout TO ?"
else:
sql = "SET statement_timeout TO ?"
cursor.execute(sql, (statement_timeout,))
@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.

View File

@@ -143,6 +143,12 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
# All transactions are SERIALIZABLE by default in sqlite
pass
def attempt_to_set_statement_timeout(
self, cursor: sqlite3.Cursor, statement_timeout: int, for_transaction: bool
) -> None:
# Not supported.
pass
@staticmethod
def executescript(cursor: sqlite3.Cursor, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.

View File

@@ -1,30 +0,0 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Re-use unique indices already defined on tables as a replica identity.
ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key;
ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key;
ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness;
ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key;
ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key;

View File

@@ -1,80 +0,0 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Annotate some tables in Postgres with a REPLICA IDENTITY.
-- Any table that doesn't have a primary key should be annotated explicitly with
-- a REPLICA IDENTITY so that logical replication can be used.
-- If this is not done, then UPDATE and DELETE statements on those tables
-- will fail if logical replication is in use.
-- Where possible, re-use unique indices already defined on tables as a replica
-- identity.
ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness;
ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key;
ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key;
ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key;
ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key;
ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness;
ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness;
ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness;
ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness;
ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key;
ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx;
ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key;
ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key;
ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key;
ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key;
ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key;
ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key;
ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id;
ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id;
ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx;
ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key;
ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key;
ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key;
ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key;
ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key;
ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key;
ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness;
ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key;
ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key;
ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key;
ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key;
ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness;
ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness;
ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness;
ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key;
ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key;
ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key;
ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness;
ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key;
ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key;
ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key;
ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key;
ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key;
ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address;
ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key;
ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key;
-- special cases: unique indices on nullable columns can't be used
ALTER TABLE event_push_actions REPLICA IDENTITY FULL;
ALTER TABLE local_media_repository REPLICA IDENTITY FULL;
ALTER TABLE receipts_graph REPLICA IDENTITY FULL;
ALTER TABLE receipts_linearized REPLICA IDENTITY FULL;
ALTER TABLE received_transactions REPLICA IDENTITY FULL;
ALTER TABLE remote_media_cache REPLICA IDENTITY FULL;
ALTER TABLE server_signature_keys REPLICA IDENTITY FULL;
ALTER TABLE users REPLICA IDENTITY FULL;

View File

@@ -135,54 +135,3 @@ def sorted_topologically(
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)
def sorted_topologically_batched(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Generator[Collection[T], None, None]:
r"""Walk the graph topologically, returning batches of nodes where all nodes
that references it have been previously returned.
For example, given the following graph:
A
/ \
B C
\ /
D
This function will return: `[[A], [B, C], [D]]`.
This function is useful for e.g. batch persisting events in an auth chain,
where we can only persist an event if all its auth events have already been
persisted.
"""
degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}
for node, edges in graph.items():
if node not in degree_map:
continue
for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1
reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
while zero_degree:
new_zero_degree = []
for node in zero_degree:
for edge in reverse_graph.get(node, []):
if edge in degree_map:
degree_map[edge] -= 1
if degree_map[edge] == 0:
new_zero_degree.append(edge)
yield zero_degree
zero_degree = new_zero_degree

View File

@@ -35,10 +35,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
@@ -95,10 +91,6 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
# Create a typing update before we reconnect so that there is a missing
# update to fetch.
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)

View File

@@ -313,10 +313,9 @@ class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase):
AND table_schema not in ('pg_catalog', 'information_schema')
AND NOT EXISTS (
SELECT 1
FROM information_schema.table_constraints tc
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = tbl.table_schema
AND tc.table_name = tbl.table_name
FROM information_schema.key_column_usage kcu
WHERE kcu.table_name = tbl.table_name
AND kcu.table_schema = tbl.table_schema
)
)
SELECT pg_class.oid::regclass FROM tables_no_pkey INNER JOIN pg_class ON pg_class.oid::regclass = table_name::regclass

View File

@@ -13,11 +13,7 @@
# limitations under the License.
from typing import Dict, Iterable, List, Sequence
from synapse.util.iterutils import (
chunk_seq,
sorted_topologically,
sorted_topologically_batched,
)
from synapse.util.iterutils import chunk_seq, sorted_topologically
from tests.unittest import TestCase
@@ -111,73 +107,3 @@ class SortTopologically(TestCase):
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
class SortTopologicallyBatched(TestCase):
"Test cases for `sorted_topologically_batched`"
def test_empty(self) -> None:
"Test that an empty graph works correctly"
graph: Dict[int, List[int]] = {}
self.assertEqual(list(sorted_topologically_batched([], graph)), [])
def test_handle_empty_graph(self) -> None:
"Test that a graph where a node doesn't have an entry is treated as empty"
graph: Dict[int, List[int]] = {}
# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
def test_disconnected(self) -> None:
"Test that a graph with no edges work"
graph: Dict[int, List[int]] = {1: [], 2: []}
# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])
def test_linear(self) -> None:
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)
def test_subset(self) -> None:
"Test that only sorting a subset of the graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])
def test_fork(self) -> None:
"Test that a forked graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
# always get the same one.
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
)
def test_duplicates(self) -> None:
"Test that a graph with duplicate edges work"
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)
def test_multiple_paths(self) -> None:
"Test that a graph with multiple paths between two nodes work"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)