diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 7d9d7ba78f..3dad01630a 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -19,10 +19,15 @@ # The environment variables it reads are: # * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver. # * SYNAPSE_REPORT_STATS: Whether to report stats. -# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG -# below. Leave empty for no workers. Append a ':' and a number to multiply that -# worker. E.g. 'event_persister:2' would be identical to 'event_persister, -# event_persister'. +# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG +# below. Leave empty for no workers. Add a ':' and a number at the end to +# multiply that worker. Append multiple worker types with '+' to merge the +# worker types into a single worker. Add a name and a '=' to the front of a +# worker type to give this instance a name in logs and nginx. +# Examples: +# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader' +# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader' +# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing' # * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files # will be treated as Application Service registration files. # * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format. @@ -72,11 +77,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - "shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"}, + "shared_extra_conf": {"update_user_directory_from_worker": "placeholder_name"}, "worker_extra_conf": "", }, "media_repository": { - "app": "synapse.app.media_repository", + "app": "synapse.app.generic_worker", "listener_resources": ["media"], "endpoint_patterns": [ "^/_matrix/media/", @@ -89,7 +94,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { # The first configured media worker will run the media background jobs "shared_extra_conf": { "enable_media_repo": False, - "media_instance_running_background_jobs": "media_repository1", + "media_instance_running_background_jobs": "placeholder_name", }, "worker_extra_conf": "enable_media_repo: true", }, @@ -97,7 +102,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - "shared_extra_conf": {"notify_appservices_from_worker": "appservice1"}, + "shared_extra_conf": {"notify_appservices_from_worker": "placeholder_name"}, "worker_extra_conf": "", }, "federation_sender": { @@ -193,9 +198,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - # This worker cannot be sharded. Therefore there should only ever be one background - # worker, and it should be named background_worker1 - "shared_extra_conf": {"run_background_tasks_on": "background_worker1"}, + # This worker cannot be sharded. Therefore, there should only ever be one + # background worker. This is enforced for the safety of your database. + "shared_extra_conf": {"run_background_tasks_on": "placeholder_name"}, "worker_extra_conf": "", }, "event_creator": { @@ -326,7 +331,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None: def add_worker_roles_to_shared_config( shared_config: dict, - worker_type: str, + worker_type_list: list, worker_name: str, worker_port: int, ) -> None: @@ -334,22 +339,35 @@ def add_worker_roles_to_shared_config( append appropriate worker information to it for the current worker_type instance. Args: - shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_type: The type of worker (one of those defined in WORKERS_CONFIG). + shared_config: The config dict that all worker instances share (after being + converted to YAML) + worker_type_list: The type of worker (one of those defined in WORKERS_CONFIG). + This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ # The instance_map config field marks the workers that write to various replication streams instance_map = shared_config.setdefault("instance_map", {}) - # Worker-type specific sharding config - if worker_type == "pusher": + # This is a list of the stream_writers that there can be only one of. Events can be + # sharded, and therefore doesn't belong here. + singular_stream_writers = [ + "account_data", + "presence", + "receipts", + "to_device", + "typing", + ] + + # Worker-type specific sharding config. Now a single worker can fulfill multiple + # roles, check each. + if "pusher" in worker_type_list: shared_config.setdefault("pusher_instances", []).append(worker_name) - elif worker_type == "federation_sender": + if "federation_sender" in worker_type_list: shared_config.setdefault("federation_sender_instances", []).append(worker_name) - elif worker_type == "event_persister": + if "event_persister" in worker_type_list: # Event persisters write to the events stream, so we need to update # the list of event stream writers shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( @@ -362,19 +380,137 @@ def add_worker_roles_to_shared_config( "port": worker_port, } - elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: - # Update the list of stream writers - # It's convenient that the name of the worker type is the same as the stream to write - shared_config.setdefault("stream_writers", {}).setdefault( - worker_type, [] - ).append(worker_name) + # Update the list of stream writers. It's convenient that the name of the worker + # type is the same as the stream to write. Iterate over the whole list in case there + # is more than one. + for worker in worker_type_list: + if worker in singular_stream_writers: + shared_config.setdefault("stream_writers", {}).setdefault( + worker, [] + ).append(worker_name) - # Map of stream writer instance names to host/ports combos - # For now, all stream writers need http replication ports - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + # Map of stream writer instance names to host/ports combos + # For now, all stream writers need http replication ports + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } + + +def merge_worker_template_configs( + existing_dict: Dict[str, Any], + to_be_merged_dict: Dict[str, Any], +) -> Dict[str, Any]: + """When given an existing dict of worker template configuration, merge new template + data from WORKERS_CONFIG and return new dict. + + Args: + existing_dict: Either an existing worker template or a fresh blank one. + to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into + existing_dict. + Returns: The newly merged together dict values. + """ + new_dict: Dict[str, Any] = {} + for i in to_be_merged_dict.keys(): + if (i == "endpoint_patterns") or (i == "listener_resources"): + # merge the two lists, remove duplicates + new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) + elif i == "shared_extra_conf": + # merge dictionary's, the worker name will be replaced below after counting + new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} + elif i == "worker_extra_conf": + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_dict[i] = existing_dict[i] + to_be_merged_dict[i] + else: + # Everything else should be identical, like "app", which only works because + # all apps are now generic_workers. + new_dict[i] = to_be_merged_dict[i] + return new_dict + + +def insert_worker_name_for_worker_config( + existing_dict: Dict[str, Any], worker_name: str +) -> Dict[str, Any]: + """Insert a given worker name into the worker's configuration dict. + + Args: + existing_dict: The worker_config dict that is imported into shared_config. + worker_name: The name of the worker to insert. + Returns: Copy of the dict with newly inserted worker name + """ + dict_to_edit = existing_dict.copy() + for k, v in dict_to_edit["shared_extra_conf"].items(): + # Only proceed if it's a string as some values are boolean + if isinstance(dict_to_edit["shared_extra_conf"][k], str): + # This will be ignored if the text isn't the placeholder. + dict_to_edit["shared_extra_conf"][k] = v.replace( + "placeholder_name", worker_name + ) + return dict_to_edit + + +def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: + """Helper to check to make sure worker types that cannot have multiples do not. + + Args: + worker_type: The type of worker to check against. + Returns: True if allowed, False if not + """ + if worker_type in [ + "background_worker", + "account_data", + "presence", + "receipts", + "typing", + "to_device", + ]: + return False + else: + return True + + +def increment_counter(worker_type_counter: Dict[str, int], worker_type: str) -> int: + """Given a dict of worker_type:int, increment int + + Args: + worker_type_counter: dict to increment + worker_type: str of worker_type + Returns: int of new value of counter + """ + new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 + worker_type_counter[worker_type] = new_worker_count + return new_worker_count + + +def is_name_allowed_for_worker( + worker_type_counter: Dict[str, str], worker_base_name: str, worker_type: str +) -> bool: + """Given a dict of worker_base_name:worker_type, check if this worker_base_name has + been seen before. + + Args: + worker_type_counter: dict of worker_base_name:worker_type. + worker_base_name: str of the base worker name, no appended number. + worker_type: str of the worker_type, including combo workers. + Returns: True if allowed, False if not + """ + counter_to_check = worker_type_counter.get(worker_base_name) + if counter_to_check is not None: + if counter_to_check == worker_type: + # Key exists, and they match so it should be ok. + return True + else: + return False + else: + # Key doesn't exist, it's ok to use. + return True + + +def split_and_strip_string(given_string: str, split_char: str) -> List: + # Removes whitespace from ends of result strings before adding to list. + return [x.strip() for x in given_string.split(split_char)] def generate_base_homeserver_config() -> None: @@ -471,9 +607,18 @@ def generate_worker_files( # A counter of worker_type -> int. Used for determining the name for a given # worker type when generating its config file, as each worker's name is just - # worker_type + instance # + # worker_type(s) + instance # worker_type_counter: Dict[str, int] = {} + # Similar to above, but more finely grained. This is used to determine we don't have + # more than a single worker for cases where multiples would be bad(e.g. presence). + worker_type_shard_counter: Dict[str, int] = {} + + # Similar to above, but for worker name's. This is used to check that worker names + # for different worker types or combinations of types is not used, as it will error + # with 'Address in use'(e.g. "to_device, to_device=typing" would error. + worker_name_type_counter: Dict[str, str] = {} + # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. healthcheck_urls = ["http://localhost:8080/health"] @@ -490,46 +635,171 @@ def generate_worker_files( new_worker_types = [] for worker_type in worker_types: if ":" in worker_type: - x = worker_type.split(":") - y = 0 - # should only be 2 components, a type of worker and an integer as a + worker_type_components = split_and_strip_string(worker_type, ":") + count = 0 + # Should only be 2 components, a type of worker(s) and an integer as a # string. Cast the number as an int then it can be used as a counter. - if len(x) == 2 and x[-1].isdigit(): - y = int(x[1]) + if ( + len(worker_type_components) == 2 + and worker_type_components[-1].isdigit() + ): + count = int(worker_type_components[1]) else: error( "Multiplier signal(:) for worker found, but incorrect components: " + worker_type + + ". Please fix." ) # As long as there are more than 0, we add one to the list to make below. - while y > 0: - new_worker_types.append(x[0]) - y -= 1 + while count > 0: + new_worker_types.append(worker_type_components[0]) + count -= 1 else: # If it's not a real worker, it will error out below new_worker_types.append(worker_type) # worker_types is now an expanded list of worker types. worker_types = new_worker_types + # For each worker type specified by the user, create config values for worker_type in worker_types: - worker_config = WORKERS_CONFIG.get(worker_type) - if worker_config: - worker_config = worker_config.copy() + # pre-template the worker_config so when updating we don't get a KeyError + worker_config: Dict[str, Any] = { + "app": "", + "listener_resources": [], + "endpoint_patterns": [], + "shared_extra_conf": {}, + "worker_extra_conf": "", + } + + # Peel off any name designated before a '=' to use later. + requested_worker_name = "" + if "=" in worker_type: + # Split on "=", remove extra whitespace from ends then make list + worker_type_split = split_and_strip_string(worker_type, "=") + if len(worker_type_split) > 2: + error( + "To many worker names requested for a single worker, or to many " + "'='. Please fix: " + worker_type + ) + # if there was no name given, this will still be an empty string + requested_worker_name = worker_type_split[0] + # Uncommon mistake that will cause problems. Name string containing spaces. + if len(requested_worker_name.split(" ")) > 1: + error( + "Requesting a worker name containing a space is not allowed, " + "as it would raise a FileNotFoundError. Please use an " + "underscore instead." + ) + # Reassign the worker_type string with no name on it. + worker_type = worker_type_split[1] + + # Split on "+", remove whitespace from ends then make a list. + workers_to_combo_list = split_and_strip_string(worker_type, "+") + + # Check for duplicates in the split worker type list. No advantage in having + # duplicated worker types on the same worker. Two would consolidate into one. + # (e.g. "pusher + pusher" would resolve to a single "pusher" which may not be + # what was intended.) + if len(workers_to_combo_list) != len(set(workers_to_combo_list)): + error("Duplicate worker type found in " + worker_type + "! Please fix.") + + # Merge all worker config templates for this worker into a single config + for worker in workers_to_combo_list: + # Verify this is a real defined worker type. If it's not, stop everything so + # it can be fixed. + copy_of_template_config = WORKERS_CONFIG.get(worker) + if copy_of_template_config: + # So it's not a reference pointer + copy_of_template_config = copy_of_template_config.copy() + else: + error( + worker + + " is an unknown worker type! Was found in " + + worker_type + + ". Please fix!" + ) + + # Merge worker type template configuration data. + worker_config = merge_worker_template_configs( + worker_config, copy_of_template_config + ) + + # Check if there is to many of a worker there can only be one of. + # Will error and stop if it is a problem, e.g. 'background_worker'. + if worker in worker_type_shard_counter: + if not is_sharding_allowed_for_worker_type(worker): + error( + "There can be only a single worker with " + + worker + + " type. Please recount and remove." + ) + # Not in shard counter, add it. Don't need return value + increment_counter(worker_type_shard_counter, worker) + + # Name workers by their type or requested name concatenated with an + # incrementing number. e.g. federation_reader1 or event_creator+event_persister1 + if requested_worker_name: + worker_base_name = requested_worker_name + # It'll be useful to have this in the log in case it's a complex of many + # workers merged together. Note for Complement: it would only be seen in the + # logs for blueprint construction(which are not collected). + log( + "Worker name request found: " + + requested_worker_name + + ", for: " + + str(workers_to_combo_list) + ) + else: - error(worker_type + " is an unknown worker type! Please fix!") + # The worker name will be the worker_type, however if spaces exist + # between concatenated worker_types and the "+" because of readability, + # it will error on startup. Recombine worker_types without spaces and log. + if len(workers_to_combo_list) > 1: + worker_base_name = "+".join(workers_to_combo_list) + if worker_base_name != worker_type: + log( + "Default worker name would have contained spaces, which is not " + "allowed(" + worker_type + "). Reformed name to not contain " + "spaces: " + worker_base_name + ) + else: + worker_base_name = worker_type + # This counter is used for names with broader worker type definitions. + new_worker_count = increment_counter(worker_type_counter, worker_type) + worker_name = worker_base_name + str(new_worker_count) - new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 - worker_type_counter[worker_type] = new_worker_count + # Now that the worker name is settled, check this name isn't used for a + # different worker_type. If it's not allowed, will error and stop. If no issues, + # it will be added to the counter. + if is_name_allowed_for_worker( + worker_name_type_counter, + worker_base_name, + worker_type, + ): + worker_name_type_counter.setdefault(worker_base_name, worker_type) + + else: + error( + "Can not use " + + worker_name + + " for " + + worker_type + + ". It is already in use by " + # This is cast as a str because mypy thinks it could be None + + str(worker_name_type_counter.get(worker_base_name)) + ) + + # Replace placeholder names in the config template with the actual worker name. + worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) - # Name workers by their type concatenated with an incrementing number - # e.g. federation_reader1 - worker_name = worker_type + str(new_worker_count) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} ) - # Update the shared config with any worker-type specific options + # Update the shared config with any worker-type specific options. Do a dance so + # the first of a given worker type gets to stay assigned. + worker_config["shared_extra_conf"].update(shared_config) shared_config.update(worker_config["shared_extra_conf"]) healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) @@ -539,7 +809,7 @@ def generate_worker_files( # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( - shared_config, worker_type, worker_name, worker_port + shared_config, workers_to_combo_list, worker_name, worker_port ) # Enable the worker in supervisord