1
0

Allow for worker types to be merged and given a name.

In the Synapse-worker docker image, multiple worker types are defined but only allowed to be that one single definition.
This makes it so that any two(or more) worker types can be merged into a single worker. For example:
1. If you wish to have one worker with both client_reader and federation_sender functions enabled, you can set
SYNAPSE_WORKER_TYPES='client_reader+federation_sender'
2. If you wish to have all stream_writers(excepting event_persister) you can set
SYNAPSE_WORKER_TYPES='account_data+presence+receipts+to_device+typing'

Multiple types can be combined, but some error checking to dis-allow multiples of worker types that shouldn't be
enabled more than once has been added. For example:
SYNAPSE_WORKER_TYPES='background_worker+event_persister, background_worker+event_persister' will not work as
background_worker is only allowed to have a single worker for the entire deployment.

Giving a worker or a combination of workers a custom name is as simple as adding the name then an equal sign in front
of the given worker type(s). For example:
SYNAPSE_WORKER_TYPES='alice=federation_reader'
or
SYNAPSE_WORKER_TYPES='bob=federation_inbound+federation_sender'
or
SYNAPSE_WORKER_TYPES='charlie=event_persister:2, derek=media_repository + pusher + user_dir + appservice + event_creator'
This commit is contained in:
Jason Little
2023-01-25 04:56:23 -06:00
parent 4647d592cd
commit e29ef6f215
+320 -50
View File
@@ -19,10 +19,15 @@
# The environment variables it reads are:
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
# below. Leave empty for no workers. Append a ':' and a number to multiply that
# worker. E.g. 'event_persister:2' would be identical to 'event_persister,
# event_persister'.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
# below. Leave empty for no workers. Add a ':' and a number at the end to
# multiply that worker. Append multiple worker types with '+' to merge the
# worker types into a single worker. Add a name and a '=' to the front of a
# worker type to give this instance a name in logs and nginx.
# Examples:
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -72,11 +77,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
"shared_extra_conf": {"update_user_directory_from_worker": "placeholder_name"},
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.media_repository",
"app": "synapse.app.generic_worker",
"listener_resources": ["media"],
"endpoint_patterns": [
"^/_matrix/media/",
@@ -89,7 +94,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
"enable_media_repo": False,
"media_instance_running_background_jobs": "media_repository1",
"media_instance_running_background_jobs": "placeholder_name",
},
"worker_extra_conf": "enable_media_repo: true",
},
@@ -97,7 +102,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
"shared_extra_conf": {"notify_appservices_from_worker": "placeholder_name"},
"worker_extra_conf": "",
},
"federation_sender": {
@@ -193,9 +198,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
# This worker cannot be sharded. Therefore there should only ever be one background
# worker, and it should be named background_worker1
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
# This worker cannot be sharded. Therefore, there should only ever be one
# background worker. This is enforced for the safety of your database.
"shared_extra_conf": {"run_background_tasks_on": "placeholder_name"},
"worker_extra_conf": "",
},
"event_creator": {
@@ -326,7 +331,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_type_list: list,
worker_name: str,
worker_port: int,
) -> None:
@@ -334,22 +339,35 @@ def add_worker_roles_to_shared_config(
append appropriate worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being converted to YAML)
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_type_list: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication streams
instance_map = shared_config.setdefault("instance_map", {})
# Worker-type specific sharding config
if worker_type == "pusher":
# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]
# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_type_list:
shared_config.setdefault("pusher_instances", []).append(worker_name)
elif worker_type == "federation_sender":
if "federation_sender" in worker_type_list:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
elif worker_type == "event_persister":
if "event_persister" in worker_type_list:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
@@ -362,19 +380,137 @@ def add_worker_roles_to_shared_config(
"port": worker_port,
}
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
# Update the list of stream writers
# It's convenient that the name of the worker type is the same as the stream to write
shared_config.setdefault("stream_writers", {}).setdefault(
worker_type, []
).append(worker_name)
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_type_list:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
def merge_worker_template_configs(
existing_dict: Dict[str, Any],
to_be_merged_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""When given an existing dict of worker template configuration, merge new template
data from WORKERS_CONFIG and return new dict.
Args:
existing_dict: Either an existing worker template or a fresh blank one.
to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
existing_dict.
Returns: The newly merged together dict values.
"""
new_dict: Dict[str, Any] = {}
for i in to_be_merged_dict.keys():
if (i == "endpoint_patterns") or (i == "listener_resources"):
# merge the two lists, remove duplicates
new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
elif i == "shared_extra_conf":
# merge dictionary's, the worker name will be replaced below after counting
new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
elif i == "worker_extra_conf":
# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
# work, this is fine.
new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
else:
# Everything else should be identical, like "app", which only works because
# all apps are now generic_workers.
new_dict[i] = to_be_merged_dict[i]
return new_dict
def insert_worker_name_for_worker_config(
existing_dict: Dict[str, Any], worker_name: str
) -> Dict[str, Any]:
"""Insert a given worker name into the worker's configuration dict.
Args:
existing_dict: The worker_config dict that is imported into shared_config.
worker_name: The name of the worker to insert.
Returns: Copy of the dict with newly inserted worker name
"""
dict_to_edit = existing_dict.copy()
for k, v in dict_to_edit["shared_extra_conf"].items():
# Only proceed if it's a string as some values are boolean
if isinstance(dict_to_edit["shared_extra_conf"][k], str):
# This will be ignored if the text isn't the placeholder.
dict_to_edit["shared_extra_conf"][k] = v.replace(
"placeholder_name", worker_name
)
return dict_to_edit
def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"""Helper to check to make sure worker types that cannot have multiples do not.
Args:
worker_type: The type of worker to check against.
Returns: True if allowed, False if not
"""
if worker_type in [
"background_worker",
"account_data",
"presence",
"receipts",
"typing",
"to_device",
]:
return False
else:
return True
def increment_counter(worker_type_counter: Dict[str, int], worker_type: str) -> int:
"""Given a dict of worker_type:int, increment int
Args:
worker_type_counter: dict to increment
worker_type: str of worker_type
Returns: int of new value of counter
"""
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
return new_worker_count
def is_name_allowed_for_worker(
worker_type_counter: Dict[str, str], worker_base_name: str, worker_type: str
) -> bool:
"""Given a dict of worker_base_name:worker_type, check if this worker_base_name has
been seen before.
Args:
worker_type_counter: dict of worker_base_name:worker_type.
worker_base_name: str of the base worker name, no appended number.
worker_type: str of the worker_type, including combo workers.
Returns: True if allowed, False if not
"""
counter_to_check = worker_type_counter.get(worker_base_name)
if counter_to_check is not None:
if counter_to_check == worker_type:
# Key exists, and they match so it should be ok.
return True
else:
return False
else:
# Key doesn't exist, it's ok to use.
return True
def split_and_strip_string(given_string: str, split_char: str) -> List:
# Removes whitespace from ends of result strings before adding to list.
return [x.strip() for x in given_string.split(split_char)]
def generate_base_homeserver_config() -> None:
@@ -471,9 +607,18 @@ def generate_worker_files(
# A counter of worker_type -> int. Used for determining the name for a given
# worker type when generating its config file, as each worker's name is just
# worker_type + instance #
# worker_type(s) + instance #
worker_type_counter: Dict[str, int] = {}
# Similar to above, but more finely grained. This is used to determine we don't have
# more than a single worker for cases where multiples would be bad(e.g. presence).
worker_type_shard_counter: Dict[str, int] = {}
# Similar to above, but for worker name's. This is used to check that worker names
# for different worker types or combinations of types is not used, as it will error
# with 'Address in use'(e.g. "to_device, to_device=typing" would error.
worker_name_type_counter: Dict[str, str] = {}
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
@@ -490,46 +635,171 @@ def generate_worker_files(
new_worker_types = []
for worker_type in worker_types:
if ":" in worker_type:
x = worker_type.split(":")
y = 0
# should only be 2 components, a type of worker and an integer as a
worker_type_components = split_and_strip_string(worker_type, ":")
count = 0
# Should only be 2 components, a type of worker(s) and an integer as a
# string. Cast the number as an int then it can be used as a counter.
if len(x) == 2 and x[-1].isdigit():
y = int(x[1])
if (
len(worker_type_components) == 2
and worker_type_components[-1].isdigit()
):
count = int(worker_type_components[1])
else:
error(
"Multiplier signal(:) for worker found, but incorrect components: "
+ worker_type
+ ". Please fix."
)
# As long as there are more than 0, we add one to the list to make below.
while y > 0:
new_worker_types.append(x[0])
y -= 1
while count > 0:
new_worker_types.append(worker_type_components[0])
count -= 1
else:
# If it's not a real worker, it will error out below
new_worker_types.append(worker_type)
# worker_types is now an expanded list of worker types.
worker_types = new_worker_types
# For each worker type specified by the user, create config values
for worker_type in worker_types:
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
# pre-template the worker_config so when updating we don't get a KeyError
worker_config: Dict[str, Any] = {
"app": "",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
}
# Peel off any name designated before a '=' to use later.
requested_worker_name = ""
if "=" in worker_type:
# Split on "=", remove extra whitespace from ends then make list
worker_type_split = split_and_strip_string(worker_type, "=")
if len(worker_type_split) > 2:
error(
"To many worker names requested for a single worker, or to many "
"'='. Please fix: " + worker_type
)
# if there was no name given, this will still be an empty string
requested_worker_name = worker_type_split[0]
# Uncommon mistake that will cause problems. Name string containing spaces.
if len(requested_worker_name.split(" ")) > 1:
error(
"Requesting a worker name containing a space is not allowed, "
"as it would raise a FileNotFoundError. Please use an "
"underscore instead."
)
# Reassign the worker_type string with no name on it.
worker_type = worker_type_split[1]
# Split on "+", remove whitespace from ends then make a list.
workers_to_combo_list = split_and_strip_string(worker_type, "+")
# Check for duplicates in the split worker type list. No advantage in having
# duplicated worker types on the same worker. Two would consolidate into one.
# (e.g. "pusher + pusher" would resolve to a single "pusher" which may not be
# what was intended.)
if len(workers_to_combo_list) != len(set(workers_to_combo_list)):
error("Duplicate worker type found in " + worker_type + "! Please fix.")
# Merge all worker config templates for this worker into a single config
for worker in workers_to_combo_list:
# Verify this is a real defined worker type. If it's not, stop everything so
# it can be fixed.
copy_of_template_config = WORKERS_CONFIG.get(worker)
if copy_of_template_config:
# So it's not a reference pointer
copy_of_template_config = copy_of_template_config.copy()
else:
error(
worker
+ " is an unknown worker type! Was found in "
+ worker_type
+ ". Please fix!"
)
# Merge worker type template configuration data.
worker_config = merge_worker_template_configs(
worker_config, copy_of_template_config
)
# Check if there is to many of a worker there can only be one of.
# Will error and stop if it is a problem, e.g. 'background_worker'.
if worker in worker_type_shard_counter:
if not is_sharding_allowed_for_worker_type(worker):
error(
"There can be only a single worker with "
+ worker
+ " type. Please recount and remove."
)
# Not in shard counter, add it. Don't need return value
increment_counter(worker_type_shard_counter, worker)
# Name workers by their type or requested name concatenated with an
# incrementing number. e.g. federation_reader1 or event_creator+event_persister1
if requested_worker_name:
worker_base_name = requested_worker_name
# It'll be useful to have this in the log in case it's a complex of many
# workers merged together. Note for Complement: it would only be seen in the
# logs for blueprint construction(which are not collected).
log(
"Worker name request found: "
+ requested_worker_name
+ ", for: "
+ str(workers_to_combo_list)
)
else:
error(worker_type + " is an unknown worker type! Please fix!")
# The worker name will be the worker_type, however if spaces exist
# between concatenated worker_types and the "+" because of readability,
# it will error on startup. Recombine worker_types without spaces and log.
if len(workers_to_combo_list) > 1:
worker_base_name = "+".join(workers_to_combo_list)
if worker_base_name != worker_type:
log(
"Default worker name would have contained spaces, which is not "
"allowed(" + worker_type + "). Reformed name to not contain "
"spaces: " + worker_base_name
)
else:
worker_base_name = worker_type
# This counter is used for names with broader worker type definitions.
new_worker_count = increment_counter(worker_type_counter, worker_type)
worker_name = worker_base_name + str(new_worker_count)
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
# Now that the worker name is settled, check this name isn't used for a
# different worker_type. If it's not allowed, will error and stop. If no issues,
# it will be added to the counter.
if is_name_allowed_for_worker(
worker_name_type_counter,
worker_base_name,
worker_type,
):
worker_name_type_counter.setdefault(worker_base_name, worker_type)
else:
error(
"Can not use "
+ worker_name
+ " for "
+ worker_type
+ ". It is already in use by "
# This is cast as a str because mypy thinks it could be None
+ str(worker_name_type_counter.get(worker_base_name))
)
# Replace placeholder names in the config template with the actual worker name.
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
# Name workers by their type concatenated with an incrementing number
# e.g. federation_reader1
worker_name = worker_type + str(new_worker_count)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker-type specific options
# Update the shared config with any worker-type specific options. Do a dance so
# the first of a given worker type gets to stay assigned.
worker_config["shared_extra_conf"].update(shared_config)
shared_config.update(worker_config["shared_extra_conf"])
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
@@ -539,7 +809,7 @@ def generate_worker_files(
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_type, worker_name, worker_port
shared_config, workers_to_combo_list, worker_name, worker_port
)
# Enable the worker in supervisord