1
0

Compare commits

..

26 Commits

Author SHA1 Message Date
Erik Johnston
ec885ffd33 1.117.0 2024-10-15 10:46:33 +01:00
Erik Johnston
6a0c21fabd Fixup changlog 2024-10-08 15:04:20 +01:00
Erik Johnston
b3e2d10f39 1.117.0rc1 2024-10-08 14:37:36 +01:00
Shay
a5986ac229 Improvements to admin redact api (#17792)
- better validation on user input
- fix an early task completion
- when checking membership in rooms, check for rooms user has been
banned from as well
2024-10-08 14:23:21 +01:00
Andrew Ferrazzutti
006251a5d0 Add missing license header (#17799)
Co-authored-by: Erik Johnston <erik@matrix.org>
2024-10-08 12:01:44 +01:00
Erik Johnston
422f3ecec1 Sliding sync: omit bump stamp when it is unchanged (#17788)
This saves some DB lookups in rooms
2024-10-08 11:17:23 +01:00
Erik Johnston
4e90221d87 Sliding sync minor performance speed up using new table (#17787)
Use the new tables to work out which rooms have changed.

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
2024-10-08 11:06:31 +01:00
Erik Johnston
e2610de208 Speed up sliding sync when there are many active subscriptions (#17789)
Two changes: a) use a batch lookup function instead of a loop, b) check
existing data to see if we already have what we need and only fetch what
we don't.
2024-10-08 10:35:15 +01:00
Andrew Morgan
e8c8924b81 Clarify test_forget_when_not_left docstring (#17628) 2024-10-07 16:34:32 +01:00
V02460
e8e0f0fad7 Add config option redis.password_path (#17717)
Adds the option to load the Redis password from a file, instead of
giving it in the config directly. The code is similar to how it’s done
for `registration_shared_secret_path`. I changed the example in the
documentation to represent the best practice regarding the handling of
secrets.

Reading secrets from files has the security advantage of separating the
secrets from the config. It also simplifies secrets management in
Kubernetes.
2024-10-07 09:46:51 +01:00
Henrique
beb7a951f4 docs: add note about PYTHONMALLOC for accurate jemalloc memory tracking (#17709)
Added a note in the documentation suggesting that users may set
`PYTHONMALLOC=malloc` when using `jemalloc`. This allows jemalloc to
track memory usage more accurately by bypassing Python's internal
small-object allocator (`pymalloc`), helping to ensure that
`cache_autotuning` functions as expected.

This doc change aims to provide more clarity for users configuring
jemalloc with Synapse.


Based on:
4ac783549c/synapse/metrics/jemalloc.py (L198-L201)
2024-10-07 08:37:39 +00:00
dependabot[bot]
d34f827ed8 Bump python-multipart from 0.0.10 to 0.0.12 (#17772) 2024-10-07 09:14:30 +01:00
Andrew Ferrazzutti
9920417723 Don't say MSC4140 is supported when it's disabled (#17780) 2024-10-04 13:42:34 +01:00
Andrew Morgan
316d635906 Fix NAME attribute of ReplicationRemovePusherRestServlet (#17779) 2024-10-04 09:53:35 +01:00
Dirk Klimpel
8bbe66a9b9 explain load balancing for federation_sender_instances (#17776)
Adding information on how the load is distributed for
`federation_sender_instances`.

Thx to @devonh for the information.

causal source:
c2e5e9e67c/synapse/config/_base.py (L946-L989)

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Devon Hudson <devon.dmytro@gmail.com>
2024-10-03 22:01:33 +00:00
Andrew Morgan
d4e3ad04cd Merge branch 'master' into develop 2024-10-01 12:18:22 +01:00
Andrew Morgan
55c0391cc8 1.116.0 2024-10-01 11:14:13 +01:00
Erik Johnston
81e0f57800 Fix perf when streams don't change often (#17767)
There is a bug with the `StreamChangeCache` where it would incorrectly
return that all entities had changed if asked for entities changed
*since* the earliest stream position.

Note that for streams we use the inequalities: `$min_stream_id <
stream_id <= $max_stream_id`, i.e. when we ask the stream change cache
for all things that have changed since `$stream_id` we don't care for
events that happened *at* `$stream_id`.

Specifically: `_earliest_known_stream_pos` is the position at which we
know that we'll have entries for all changes since that point, we can
use the cache for any stream IDs that equal
`_earliest_known_stream_pos`.

`_earliest_known_stream_pos` is set in three places:
- On startup we set it either to:
  - the current maximum stream ID, with not prefilled values; or
  - the minimum of the latest N values we pulled from the DB
- When we evict items from the bottom, we set it to the stream ID of the
evicted items.

This was changed in https://github.com/matrix-org/synapse/pull/14435,
but I think we were overly conservative there.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
2024-09-30 13:52:33 +01:00
Erik Johnston
ae4862c38f Optimise notifier mk2 (#17766)
Based on #17765.

Basically the idea is to reduce the overhead of calling
`ObservableDeferred` in a loop. The two gains are: a) just using a list
of deferreds rather than the machinery of `ObservableDeferred`, and b)
only calling `PreseverLoggingContext` once.

`PreseverLoggingContext` in particular is expensive to call a lot as
each time it needs to call `get_thread_resource_usage` twice, so that it
an update the CPU metrics of the log context.
2024-09-30 13:32:31 +01:00
dependabot[bot]
602956ef64 Bump ruff from 0.6.7 to 0.6.8 (#17774) 2024-09-30 13:08:56 +01:00
dependabot[bot]
444b565c76 Bump phonenumbers from 8.13.45 to 8.13.46 (#17773) 2024-09-30 13:07:57 +01:00
dependabot[bot]
8068f31146 Bump regex from 1.10.6 to 1.11.0 (#17770) 2024-09-30 13:06:43 +01:00
Erik Johnston
5210565c12 Reduce overhead of sliding sync E2EE loops (#17771)
Mainly toning down logging and only calling
`get_membership_from_event_ids` if something has changed.
2024-09-30 13:00:14 +01:00
Erik Johnston
de955293cf Add fast path for sliding sync streams that only ask for extensions (#17768)
Principally useful for EX e2ee sliding sync connections.
2024-09-30 12:59:50 +01:00
Erik Johnston
93889eb2e7 Optimise notifier (#17765)
The notifier is quite inefficient when it has to wake up many user
streams all at once

From a silly benchmark this takes the time to notify 1M user streams
from ~30s to ~5s
2024-09-30 12:58:13 +01:00
Erik Johnston
ece66ba61c Minor perf speed up for large accounts on SSS (#17751)
This works as instead of passing *all* rooms to `record_sent_rooms` we
only need to pass rooms that were previously not in the LIVE state.

This came from a py-spy where we were spending ~10% CPU calling these
functions. Note that `record_sent_rooms` is a no-op for rooms that are
already in the `LIVE` state, so we only need to call them for
`PREVIOUSLY` or `INITIAL` rooms.
2024-09-30 12:58:02 +01:00
42 changed files with 848 additions and 290 deletions

View File

@@ -1,3 +1,57 @@
# Synapse 1.117.0 (2024-10-15)
No significant changes since 1.117.0rc1.
# Synapse 1.117.0rc1 (2024-10-08)
### Features
- Add config option `redis.password_path`. ([\#17717](https://github.com/element-hq/synapse/issues/17717))
### Bugfixes
- Fix a rare bug introduced in v1.29.0 where invalidating a user's access token from a worker could raise an error. ([\#17779](https://github.com/element-hq/synapse/issues/17779))
- In the response to `GET /_matrix/client/versions`, set the `unstable_features` flag for [MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140) to `false` when server configuration disables support for delayed events. ([\#17780](https://github.com/element-hq/synapse/issues/17780))
- Improve input validation and room membership checks in admin redaction API. ([\#17792](https://github.com/element-hq/synapse/issues/17792))
### Improved Documentation
- Clarify the docstring of `test_forget_when_not_left`. ([\#17628](https://github.com/element-hq/synapse/issues/17628))
- Add documentation note about PYTHONMALLOC for accurate jemalloc memory tracking. Contributed by @hensg. ([\#17709](https://github.com/element-hq/synapse/issues/17709))
- Remove spurious "TODO UPDATE ALL THIS" note in the Debian installation docs. ([\#17749](https://github.com/element-hq/synapse/issues/17749))
- Explain how load balancing works for `federation_sender_instances`. ([\#17776](https://github.com/element-hq/synapse/issues/17776))
### Internal Changes
- Minor performance increase for large accounts using sliding sync. ([\#17751](https://github.com/element-hq/synapse/issues/17751))
- Increase performance of the notifier when there are many syncing users. ([\#17765](https://github.com/element-hq/synapse/issues/17765), [\#17766](https://github.com/element-hq/synapse/issues/17766))
- Fix performance of streams that don't change often. ([\#17767](https://github.com/element-hq/synapse/issues/17767))
- Improve performance of sliding sync connections that do not ask for any rooms. ([\#17768](https://github.com/element-hq/synapse/issues/17768))
- Reduce overhead of sliding sync E2EE loops. ([\#17771](https://github.com/element-hq/synapse/issues/17771))
- Sliding sync minor performance speed up using new table. ([\#17787](https://github.com/element-hq/synapse/issues/17787))
- Sliding sync minor performance improvement by omitting unchanged data from incremental responses. ([\#17788](https://github.com/element-hq/synapse/issues/17788))
- Speed up sliding sync when there are many active subscriptions. ([\#17789](https://github.com/element-hq/synapse/issues/17789))
- Add missing license headers on new source files. ([\#17799](https://github.com/element-hq/synapse/issues/17799))
### Updates to locked dependencies
* Bump phonenumbers from 8.13.45 to 8.13.46. ([\#17773](https://github.com/element-hq/synapse/issues/17773))
* Bump python-multipart from 0.0.10 to 0.0.12. ([\#17772](https://github.com/element-hq/synapse/issues/17772))
* Bump regex from 1.10.6 to 1.11.0. ([\#17770](https://github.com/element-hq/synapse/issues/17770))
* Bump ruff from 0.6.7 to 0.6.8. ([\#17774](https://github.com/element-hq/synapse/issues/17774))
# Synapse 1.116.0 (2024-10-01)
No significant changes since 1.116.0rc2.
# Synapse 1.116.0rc2 (2024-09-26)
### Features

12
Cargo.lock generated
View File

@@ -444,9 +444,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.10.6"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8"
dependencies = [
"aho-corasick",
"memchr",
@@ -456,9 +456,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.6"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3"
dependencies = [
"aho-corasick",
"memchr",
@@ -467,9 +467,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.3"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "ryu"

View File

@@ -1 +0,0 @@
Remove spurious "TODO UPDATE ALL THIS" note in the Debian installation docs.

18
debian/changelog vendored
View File

@@ -1,3 +1,21 @@
matrix-synapse-py3 (1.117.0) stable; urgency=medium
* New Synapse release 1.117.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Oct 2024 10:46:30 +0100
matrix-synapse-py3 (1.117.0~rc1) stable; urgency=medium
* New Synapse release 1.117.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Oct 2024 14:37:11 +0100
matrix-synapse-py3 (1.116.0) stable; urgency=medium
* New Synapse release 1.116.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 01 Oct 2024 11:14:07 +0100
matrix-synapse-py3 (1.116.0~rc2) stable; urgency=medium
* New synapse release 1.116.0rc2.

View File

@@ -24,15 +24,6 @@
# nginx and supervisord configs depending on the workers requested.
#
# The environment variables it reads are:
# * SYNAPSE_CONFIG_PATH: The path where the generated `homeserver.yaml` will
# be stored.
# * SYNAPSE_CONFIG_DIR: The directory where generated config will be stored.
# If `SYNAPSE_CONFIG_PATH` is not set, it will default to
# SYNAPSE_CONFIG_DIR/homeserver.yaml.
# * SYNAPSE_DATA_DIR: Where the generated config will put persistent data
# such as the database and media store.
# * SYNAPSE_CONFIG_TEMPLATE_DIR: The directory containing jinja2 templates for
# configuration that this script will generate config from. Defaults to '/conf'.
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
@@ -44,8 +35,6 @@
# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK: Whether worker logs should be written to disk,
# in addition to stdout.
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -59,9 +48,7 @@
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
# regardless of the SYNAPSE_LOG_LEVEL setting.
# * SYNAPSE_LOG_TESTING: if set, Synapse will log additional information useful
# for testing.
# * SYNAPSE_USE_UNIX_SOCKET: if set, workers will communicate via unix socket
# rather than TCP.
# for testing.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
@@ -617,9 +604,7 @@ def generate_base_homeserver_config() -> None:
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
# This script makes use of the `SYNAPSE_CONFIG_DIR` environment variable to
# determine where to place the generated homeserver config.
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
@@ -748,10 +733,8 @@ def parse_worker_types(
def generate_worker_files(
environ: Mapping[str, str],
config_dir: str,
config_path: str,
data_dir: str,
template_dir: str,
requested_worker_types: Dict[str, Set[str]],
) -> None:
"""Read the desired workers(if any) that is passed in and generate shared
@@ -759,13 +742,9 @@ def generate_worker_files(
Args:
environ: os.environ instance.
config_dir: The location of the configuration directory, where generated
worker config files are written to.
config_path: The location of the base Synapse homeserver config file.
data_dir: The location of the synapse data directory. Where logs will be
stored (if `SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
template_dir: The location of the template directory. Where jinja2
templates for config files live.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
requested_worker_types: A Dict containing requested workers in the format of
{'worker_name1': {'worker_type', ...}}
"""
@@ -828,8 +807,7 @@ def generate_worker_files(
nginx_locations: Dict[str, str] = {}
# Create the worker configuration directory if it doesn't already exist
workers_config_dir = os.path.join(config_dir, "workers")
os.makedirs(workers_config_dir, exist_ok=True)
os.makedirs("/conf/workers", exist_ok=True)
# Start worker ports from this arbitrary port
worker_port = 18009
@@ -876,7 +854,7 @@ def generate_worker_files(
worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
worker_config.update(
{"name": worker_name, "port": str(worker_port)}
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker_type specific options. The first of a
@@ -899,14 +877,12 @@ def generate_worker_files(
worker_descriptors.append(worker_config)
# Write out the worker's logging config file
log_config_filepath = generate_worker_log_config(
environ, worker_name, template_dir, workers_config_dir, data_dir
)
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
# Then a worker config file
convert(
os.path.join(template_dir, "worker.yaml.j2"),
os.path.join(workers_config_dir, f"{worker_name}.yaml"),
"/conf/worker.yaml.j2",
f"/conf/workers/{worker_name}.yaml",
**worker_config,
worker_log_config_filepath=log_config_filepath,
using_unix_sockets=using_unix_sockets,
@@ -947,9 +923,7 @@ def generate_worker_files(
# Finally, we'll write out the config files.
# log config for the master process
master_log_config = generate_worker_log_config(
environ, "master", template_dir, workers_config_dir, data_dir
)
master_log_config = generate_worker_log_config(environ, "master", data_dir)
shared_config["log_config"] = master_log_config
# Find application service registrations
@@ -980,8 +954,8 @@ def generate_worker_files(
# Shared homeserver config
convert(
os.path.join(template_dir, "shared.yaml.j2"),
os.path.join(workers_config_dir, "shared.yaml"),
"/conf/shared.yaml.j2",
"/conf/workers/shared.yaml",
shared_worker_config=yaml.dump(shared_config),
appservice_registrations=appservice_registrations,
enable_redis=workers_in_use,
@@ -991,7 +965,7 @@ def generate_worker_files(
# Nginx config
convert(
os.path.join(template_dir, "nginx.conf.j2"),
"/conf/nginx.conf.j2",
"/etc/nginx/conf.d/matrix-synapse.conf",
worker_locations=nginx_location_config,
upstream_directives=nginx_upstream_config,
@@ -1003,7 +977,7 @@ def generate_worker_files(
# Supervisord config
os.makedirs("/etc/supervisor", exist_ok=True)
convert(
os.path.join(template_dir, "supervisord.conf.j2"),
"/conf/supervisord.conf.j2",
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
enable_redis=workers_in_use,
@@ -1011,7 +985,7 @@ def generate_worker_files(
)
convert(
os.path.join(template_dir, "synapse.supervisord.conf.j2"),
"/conf/synapse.supervisord.conf.j2",
"/etc/supervisor/conf.d/synapse.conf",
workers=worker_descriptors,
main_config_path=config_path,
@@ -1020,7 +994,7 @@ def generate_worker_files(
# healthcheck config
convert(
os.path.join(template_dir, "healthcheck.sh.j2"),
"/conf/healthcheck.sh.j2",
"/healthcheck.sh",
healthcheck_urls=healthcheck_urls,
)
@@ -1032,24 +1006,10 @@ def generate_worker_files(
def generate_worker_log_config(
environ: Mapping[str, str],
worker_name: str,
workers_config_dir: str,
template_dir: str,
data_dir: str,
environ: Mapping[str, str], worker_name: str, data_dir: str
) -> str:
"""Generate a log.config file for the given worker.
Args:
environ: A mapping representing the environment variables that this script
is running with.
worker_name: The name of the worker. Used in generated file paths.
workers_config_dir: The location of the worker configuration directory,
where the generated worker log config will be saved.
template_dir: The directory containing jinja2 template files.
data_dir: The directory where log files will be written (if
`SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK` is set).
Returns: the path to the generated file
"""
# Check whether we should write worker logs to disk, in addition to the console
@@ -1064,9 +1024,9 @@ def generate_worker_log_config(
extra_log_template_args["SYNAPSE_LOG_TESTING"] = environ.get("SYNAPSE_LOG_TESTING")
# Render and write the file
log_config_filepath = os.path.join(workers_config_dir, f"{worker_name}.log.config")
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
convert(
os.path.join(template_dir, "log.config"),
"/conf/log.config",
log_config_filepath,
worker_name=worker_name,
**extra_log_template_args,
@@ -1089,7 +1049,6 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
# this needs to be handled by a frontend proxy
@@ -1101,10 +1060,9 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_base_homeserver_config()
else:
log("Base homeserver config exists—not regenerating")
# This script may be run multiple times (mostly by Complement, see note at top of
# file). Don't re-configure workers in this instance.
mark_filepath = os.path.join(config_dir, "workers_have_been_configured")
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
# Collect and validate worker_type requests
# Read the desired worker configuration from the environment
@@ -1121,9 +1079,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
# Always regenerate all other config files
log("Generating worker config files")
generate_worker_files(
environ, config_dir, config_path, data_dir, template_dir, requested_worker_types
)
generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(mark_filepath, "w") as f:

View File

@@ -42,8 +42,6 @@ def convert(src: str, dst: str, environ: Mapping[str, object]) -> None:
def generate_config_from_template(
data_dir: str,
template_dir: str,
config_dir: str,
config_path: str,
os_environ: Mapping[str, str],
@@ -52,9 +50,6 @@ def generate_config_from_template(
"""Generate a homeserver.yaml from environment variables
Args:
data_dir: where persistent data is stored
template_dir: The location of the template directory. Where jinja2
templates for config files live.
config_dir: where to put generated config files
config_path: where to put the main config file
os_environ: environment mapping
@@ -75,10 +70,9 @@ def generate_config_from_template(
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
}
synapse_server_name = environ["SYNAPSE_SERVER_NAME"]
for name, secret in secrets.items():
if secret not in environ:
filename = os.path.join(data_dir, f"{synapse_server_name}.{name}.key")
filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
# if the file already exists, load in the existing value; otherwise,
# generate a new secret and write it to a file
@@ -94,7 +88,7 @@ def generate_config_from_template(
handle.write(value)
environ[secret] = value
environ["SYNAPSE_APPSERVICES"] = glob.glob(os.path.join(data_dir, "appservices", "*.yaml"))
environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
if not os.path.exists(config_dir):
os.mkdir(config_dir)
@@ -117,12 +111,12 @@ def generate_config_from_template(
environ["SYNAPSE_LOG_CONFIG"] = config_dir + "/log.config"
log("Generating synapse config file " + config_path)
convert(os.path.join(template_dir, "homeserver.yaml"), config_path, environ)
convert("/conf/homeserver.yaml", config_path, environ)
log_config_file = environ["SYNAPSE_LOG_CONFIG"]
log("Generating log config file " + log_config_file)
convert(
os.path.join(template_dir, "log.config"),
"/conf/log.config",
log_config_file,
{**environ, "include_worker_name_in_log_line": False},
)
@@ -134,15 +128,15 @@ def generate_config_from_template(
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in the data directory rather than /compiled
# tell synapse to put generated keys in /data rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
if ownership is not None:
log(f"Setting ownership on the data dir to {ownership}")
subprocess.run(["chown", "-R", ownership, data_dir], check=True)
log(f"Setting ownership on /data to {ownership}")
subprocess.run(["chown", "-R", ownership, "/data"], check=True)
args = ["gosu", ownership] + args
subprocess.run(args, check=True)
@@ -165,13 +159,12 @@ def run_generate_config(environ: Mapping[str, str], ownership: Optional[str]) ->
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
# create a suitable log config from our template
log_config_file = "%s/%s.log.config" % (config_dir, server_name)
if not os.path.exists(log_config_file):
log("Creating log config %s" % (log_config_file,))
convert(os.path.join(template_dir, "log.config"), log_config_file, environ)
convert("/conf/log.config", log_config_file, environ)
# generate the main config file, and a signing key.
args = [
@@ -223,14 +216,12 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if mode == "migrate_config":
# generate a config based on environment vars.
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get(
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
)
template_dir = environ.get("SYNAPSE_CONFIG_TEMPLATE_DIR", "/conf")
return generate_config_from_template(
data_dir, template_dir, config_dir, config_path, environ, ownership
config_dir, config_path, environ, ownership
)
if mode != "run":

View File

@@ -255,6 +255,8 @@ line to `/etc/default/matrix-synapse`:
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
*Note*: You may need to set `PYTHONMALLOC=malloc` to ensure that `jemalloc` can accurately calculate memory usage. By default, Python uses its internal small-object allocator, which may interfere with jemalloc's ability to track memory consumption correctly. This could prevent the [cache_autotuning](../configuration/config_documentation.md#caches-and-associated-values) feature from functioning as expected, as the Python allocator may not reach the memory threshold set by `max_cache_memory_usage`, thus not triggering the cache eviction process.
This made a significant difference on Python 2.7 - it's unclear how
much of an improvement it provides on Python 3.x.

View File

@@ -4368,7 +4368,13 @@ It is possible to scale the processes that handle sending outbound federation re
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
a `federation_sender_instances` map. Doing so will remove handling of this function from
the main process. Multiple workers can be added to this map, in which case the work is
balanced across them.
balanced across them.
The way that the load balancing works is any outbound federation request will be assigned
to a federation sender worker based on the hash of the destination server name. This
means that all requests being sent to the same destination will be processed by the same
worker instance. Multiple `federation_sender_instances` are useful if there is a federation
with multiple servers.
This configuration setting must be shared between all workers handling federation
sending, and if changed all federation sender workers must be stopped at the same time
@@ -4518,6 +4524,9 @@ This setting has the following sub-options:
* `path`: The full path to a local Unix socket file. **If this is used, `host` and
`port` are ignored.** Defaults to `/tmp/redis.sock'
* `password`: Optional password if configured on the Redis instance.
* `password_path`: Alternative to `password`, reading the password from an
external file. The file should be a plain text file, containing only the
password. Synapse reads the password from the given file once at startup.
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
* `use_tls`: Whether to use tls connection. Defaults to false.
* `certificate_file`: Optional path to the certificate file
@@ -4531,13 +4540,16 @@ This setting has the following sub-options:
_Changed in Synapse 1.85.0: Added path option to use a local Unix socket_
_Changed in Synapse 1.116.0: Added password\_path_
Example configuration:
```yaml
redis:
enabled: true
host: localhost
port: 6379
password: <secret_password>
password_path: <path_to_the_password_file>
# OR password: <secret_password>
dbid: <dbid>
#use_tls: True
#certificate_file: <path_to_the_certificate_file>

52
poetry.lock generated
View File

@@ -1447,13 +1447,13 @@ dev = ["jinja2"]
[[package]]
name = "phonenumbers"
version = "8.13.45"
version = "8.13.46"
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
optional = false
python-versions = "*"
files = [
{file = "phonenumbers-8.13.45-py2.py3-none-any.whl", hash = "sha256:bf05ec20fcd13f0d53e43a34ed7bd1c8be26a72b88fce4b8c64fca5b4641987a"},
{file = "phonenumbers-8.13.45.tar.gz", hash = "sha256:53679a95b6060fd5e15467759252c87933d8566d6a5be00995a579eb0e02435b"},
{file = "phonenumbers-8.13.46-py2.py3-none-any.whl", hash = "sha256:519422d407af066fdbf98e179ea2e214487060f26526d67871f817eefbbb2134"},
{file = "phonenumbers-8.13.46.tar.gz", hash = "sha256:94bf18ba9725bb6868d29473b13f78ef01e2585c5cb561ec0200be7676e77452"},
]
[[package]]
@@ -1974,13 +1974,13 @@ six = ">=1.5"
[[package]]
name = "python-multipart"
version = "0.0.10"
version = "0.0.12"
description = "A streaming multipart parser for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "python_multipart-0.0.10-py3-none-any.whl", hash = "sha256:2b06ad9e8d50c7a8db80e3b56dab590137b323410605af2be20d62a5f1ba1dc8"},
{file = "python_multipart-0.0.10.tar.gz", hash = "sha256:46eb3c6ce6fdda5fb1a03c7e11d490e407c6930a2703fe7aef4da71c374688fa"},
{file = "python_multipart-0.0.12-py3-none-any.whl", hash = "sha256:43dcf96cf65888a9cd3423544dd0d75ac10f7aa0c3c28a175bbcd00c9ce1aebf"},
{file = "python_multipart-0.0.12.tar.gz", hash = "sha256:045e1f98d719c1ce085ed7f7e1ef9d8ccc8c02ba02b5566d5f7521410ced58cb"},
]
[[package]]
@@ -2277,29 +2277,29 @@ files = [
[[package]]
name = "ruff"
version = "0.6.7"
version = "0.6.8"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.6.7-py3-none-linux_armv6l.whl", hash = "sha256:08277b217534bfdcc2e1377f7f933e1c7957453e8a79764d004e44c40db923f2"},
{file = "ruff-0.6.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:c6707a32e03b791f4448dc0dce24b636cbcdee4dd5607adc24e5ee73fd86c00a"},
{file = "ruff-0.6.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:533d66b7774ef224e7cf91506a7dafcc9e8ec7c059263ec46629e54e7b1f90ab"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:17a86aac6f915932d259f7bec79173e356165518859f94649d8c50b81ff087e9"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:b3f8822defd260ae2460ea3832b24d37d203c3577f48b055590a426a722d50ef"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9ba4efe5c6dbbb58be58dd83feedb83b5e95c00091bf09987b4baf510fee5c99"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:525201b77f94d2b54868f0cbe5edc018e64c22563da6c5c2e5c107a4e85c1c0d"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8854450839f339e1049fdbe15d875384242b8e85d5c6947bb2faad33c651020b"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2f0b62056246234d59cbf2ea66e84812dc9ec4540518e37553513392c171cb18"},
{file = "ruff-0.6.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6b1462fa56c832dc0cea5b4041cfc9c97813505d11cce74ebc6d1aae068de36b"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:02b083770e4cdb1495ed313f5694c62808e71764ec6ee5db84eedd82fd32d8f5"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:0c05fd37013de36dfa883a3854fae57b3113aaa8abf5dea79202675991d48624"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f49c9caa28d9bbfac4a637ae10327b3db00f47d038f3fbb2195c4d682e925b14"},
{file = "ruff-0.6.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a0e1655868164e114ba43a908fd2d64a271a23660195017c17691fb6355d59bb"},
{file = "ruff-0.6.7-py3-none-win32.whl", hash = "sha256:a939ca435b49f6966a7dd64b765c9df16f1faed0ca3b6f16acdf7731969deb35"},
{file = "ruff-0.6.7-py3-none-win_amd64.whl", hash = "sha256:590445eec5653f36248584579c06252ad2e110a5d1f32db5420de35fb0e1c977"},
{file = "ruff-0.6.7-py3-none-win_arm64.whl", hash = "sha256:b28f0d5e2f771c1fe3c7a45d3f53916fc74a480698c4b5731f0bea61e52137c8"},
{file = "ruff-0.6.7.tar.gz", hash = "sha256:44e52129d82266fa59b587e2cd74def5637b730a69c4542525dfdecfaae38bd5"},
{file = "ruff-0.6.8-py3-none-linux_armv6l.whl", hash = "sha256:77944bca110ff0a43b768f05a529fecd0706aac7bcce36d7f1eeb4cbfca5f0f2"},
{file = "ruff-0.6.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:27b87e1801e786cd6ede4ada3faa5e254ce774de835e6723fd94551464c56b8c"},
{file = "ruff-0.6.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:cd48f945da2a6334f1793d7f701725a76ba93bf3d73c36f6b21fb04d5338dcf5"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:677e03c00f37c66cea033274295a983c7c546edea5043d0c798833adf4cf4c6f"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9f1476236b3eacfacfc0f66aa9e6cd39f2a624cb73ea99189556015f27c0bdeb"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f5a2f17c7d32991169195d52a04c95b256378bbf0de8cb98478351eb70d526f"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5fd0d4b7b1457c49e435ee1e437900ced9b35cb8dc5178921dfb7d98d65a08d0"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f8034b19b993e9601f2ddf2c517451e17a6ab5cdb1c13fdff50c1442a7171d87"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6cfb227b932ba8ef6e56c9f875d987973cd5e35bc5d05f5abf045af78ad8e098"},
{file = "ruff-0.6.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef0411eccfc3909269fed47c61ffebdcb84a04504bafa6b6df9b85c27e813b0"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:007dee844738c3d2e6c24ab5bc7d43c99ba3e1943bd2d95d598582e9c1b27750"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ce60058d3cdd8490e5e5471ef086b3f1e90ab872b548814e35930e21d848c9ce"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:1085c455d1b3fdb8021ad534379c60353b81ba079712bce7a900e834859182fa"},
{file = "ruff-0.6.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:70edf6a93b19481affd287d696d9e311388d808671bc209fb8907b46a8c3af44"},
{file = "ruff-0.6.8-py3-none-win32.whl", hash = "sha256:792213f7be25316f9b46b854df80a77e0da87ec66691e8f012f887b4a671ab5a"},
{file = "ruff-0.6.8-py3-none-win_amd64.whl", hash = "sha256:ec0517dc0f37cad14a5319ba7bba6e7e339d03fbf967a6d69b0907d61be7a263"},
{file = "ruff-0.6.8-py3-none-win_arm64.whl", hash = "sha256:8d3bb2e3fbb9875172119021a13eed38849e762499e3cfde9588e4b4d70968dc"},
{file = "ruff-0.6.8.tar.gz", hash = "sha256:a5bf44b1aa0adaf6d9d20f86162b34f7c593bfedabc51239953e446aefc8ce18"},
]
[[package]]
@@ -3114,4 +3114,4 @@ user-search = ["pyicu"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8.0"
content-hash = "93c267fac3428b764f954e6faa17937b9c97b1ed2bdafc41dd8f6cb5d2ce085b"
content-hash = "304d03b74d2886def69ae44ce5afaed21318db9f09aae91281e0f182e1660ffd"

View File

@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.116.0rc2"
version = "1.117.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
@@ -320,7 +320,7 @@ all = [
# failing on new releases. Keeping lower bounds loose here means that dependabot
# can bump versions without having to update the content-hash in the lockfile.
# This helps prevents merge conflicts when running a batch of dependabot updates.
ruff = "0.6.7"
ruff = "0.6.8"
# Type checking only works with the pydantic.v1 compat module from pydantic v2
pydantic = "^2"

View File

@@ -338,7 +338,7 @@ class MSC3861DelegatedAuth(BaseAuth):
logger.exception("Failed to introspect token")
raise SynapseError(503, "Unable to introspect the access token")
logger.info(f"Introspection result: {introspection_result!r}")
logger.debug("Introspection result: %r", introspection_result)
# TODO: introspection verification should be more extensive, especially:
# - verify the audience

View File

@@ -3,7 +3,7 @@
#
# Copyright 2020 The Matrix.org Foundation C.I.C.
# Copyright 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
# Copyright (C) 2023-2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as

View File

@@ -21,10 +21,15 @@
from typing import Any
from synapse.config._base import Config
from synapse.config._base import Config, ConfigError, read_file
from synapse.types import JsonDict
from synapse.util.check_dependencies import check_requirements
CONFLICTING_PASSWORD_OPTS_ERROR = """\
You have configured both `redis.password` and `redis.password_path`.
These are mutually incompatible.
"""
class RedisConfig(Config):
section = "redis"
@@ -43,6 +48,17 @@ class RedisConfig(Config):
self.redis_path = redis_config.get("path", None)
self.redis_dbid = redis_config.get("dbid", None)
self.redis_password = redis_config.get("password")
redis_password_path = redis_config.get("password_path")
if redis_password_path:
if self.redis_password:
raise ConfigError(CONFLICTING_PASSWORD_OPTS_ERROR)
self.redis_password = read_file(
redis_password_path,
(
"redis",
"password_path",
),
).strip()
self.redis_use_tls = redis_config.get("use_tls", False)
self.redis_certificate = redis_config.get("certificate_file", None)

View File

@@ -443,8 +443,8 @@ class AdminHandler:
["m.room.member", "m.room.message"],
)
if not event_ids:
# there's nothing to redact
return TaskStatus.COMPLETE, result, None
# nothing to redact in this room
continue
events = await self._store.get_events_as_list(event_ids)
for event in events:

View File

@@ -1,3 +1,17 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
from typing import TYPE_CHECKING, List, Optional, Set, Tuple

View File

@@ -48,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
DeviceListUpdates,
@@ -222,7 +223,6 @@ class DeviceWorkerHandler:
return changed
@trace
@measure_func("device.get_user_ids_changed")
@cancellable
async def get_user_ids_changed(
self, user_id: str, from_token: StreamToken
@@ -290,9 +290,11 @@ class DeviceWorkerHandler:
memberships_to_fetch.add(delta.prev_event_id)
# Fetch all the memberships for the membership events
event_id_to_memberships = await self.store.get_membership_from_event_ids(
memberships_to_fetch
)
event_id_to_memberships: Mapping[str, Optional[EventIdMembership]] = {}
if memberships_to_fetch:
event_id_to_memberships = await self.store.get_membership_from_event_ids(
memberships_to_fetch
)
joined_invited_knocked = (
Membership.JOIN,
@@ -349,7 +351,6 @@ class DeviceWorkerHandler:
return device_list_updates
@measure_func("_generate_sync_entry_for_device_list")
async def generate_sync_entry_for_device_list(
self,
user_id: str,

View File

@@ -49,6 +49,7 @@ from synapse.types import (
Requester,
SlidingSyncStreamToken,
StateMap,
StrCollection,
StreamKeyType,
StreamToken,
)
@@ -293,7 +294,6 @@ class SlidingSyncHandler:
# to record rooms as having updates even if there might not actually
# be anything new for the user (e.g. due to event filters, events
# having happened after the user left, etc).
unsent_room_ids = []
if from_token:
# The set of rooms that the client (may) care about, but aren't
# in any list range (or subscribed to).
@@ -305,15 +305,24 @@ class SlidingSyncHandler:
# TODO: Replace this with something faster. When we land the
# sliding sync tables that record the most recent event
# positions we can use that.
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
unsent_room_ids: StrCollection
if await self.store.have_finished_sliding_sync_background_jobs():
unsent_room_ids = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
room_ids=missing_rooms,
from_key=from_token.stream_token.room_key,
)
)
)
unsent_room_ids = list(missing_event_map_by_room)
else:
missing_event_map_by_room = (
await self.store.get_room_events_stream_for_rooms(
room_ids=missing_rooms,
from_key=to_token.room_key,
to_key=from_token.stream_token.room_key,
limit=1,
)
)
unsent_room_ids = list(missing_event_map_by_room)
new_connection_state.rooms.record_unsent_rooms(
unsent_room_ids, from_token.stream_token.room_key
@@ -1048,22 +1057,42 @@ class SlidingSyncHandler:
)
)
# Figure out the last bump event in the room
#
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# Figure out the last bump event in the room. If the bump stamp hasn't
# changed we omit it from the response.
bump_stamp = None
always_return_bump_stamp = (
# We use the membership event position for any non-join
room_membership_for_user_at_to_token.membership != Membership.JOIN
# We didn't fetch any timeline events but we should still check for
# a bump_stamp that might be somewhere
or limited is None
# There might be a bump event somewhere before the timeline events
# that we fetched, that we didn't previously send down
or limited is True
# Always give the client some frame of reference if this is the
# first time they are seeing the room down the connection
or initial
)
# If we're joined to the room, we need to find the last bump event before the
# `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
# Try and get a bump stamp, if not we just fall back to the
# membership token.
# Try and get a bump stamp
new_bump_stamp = await self._get_bump_stamp(
room_id, to_token, timeline_events
room_id,
to_token,
timeline_events,
check_outside_timeline=always_return_bump_stamp,
)
if new_bump_stamp is not None:
bump_stamp = new_bump_stamp
if bump_stamp < 0:
if bump_stamp is None and always_return_bump_stamp:
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
if bump_stamp is not None and bump_stamp < 0:
# We never want to send down negative stream orderings, as you can't
# sensibly compare positive and negative stream orderings (they have
# different meanings).
@@ -1156,14 +1185,23 @@ class SlidingSyncHandler:
@trace
async def _get_bump_stamp(
self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
self,
room_id: str,
to_token: StreamToken,
timeline: List[EventBase],
check_outside_timeline: bool,
) -> Optional[int]:
"""Get a bump stamp for the room, if we have a bump event
"""Get a bump stamp for the room, if we have a bump event and it has
changed.
Args:
room_id
to_token: The upper bound of token to return
timeline: The list of events we have fetched.
limited: If the timeline was limited.
check_outside_timeline: Whether we need to check for bump stamp for
events before the timeline if we didn't find a bump stamp in
the timeline events.
"""
# First check the timeline events we're returning to see if one of
@@ -1183,6 +1221,11 @@ class SlidingSyncHandler:
if new_bump_stamp > 0:
return new_bump_stamp
if not check_outside_timeline:
# If we are not a limited sync, then we know the bump stamp can't
# have changed.
return None
# We can quickly query for the latest bump event in the room using the
# sliding sync tables.
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(

View File

@@ -572,7 +572,8 @@ class SlidingSyncExtensionHandler:
# Now record which rooms are now up to data, and which rooms have
# pending updates to send.
new_connection_state.account_data.record_sent_rooms(relevant_room_ids)
new_connection_state.account_data.record_sent_rooms(previously_rooms.keys())
new_connection_state.account_data.record_sent_rooms(initial_rooms)
missing_updates = (
all_updates_since_the_from_token.keys() - relevant_room_ids
)
@@ -763,9 +764,10 @@ class SlidingSyncExtensionHandler:
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
# Now we update the per-connection state to track which receipts we have
# and haven't sent down.
new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
# Update the per-connection state to track which rooms we have sent
# all the receipts for.
new_connection_state.receipts.record_sent_rooms(previously_rooms.keys())
new_connection_state.receipts.record_sent_rooms(initial_rooms)
if from_token:
# Now find the set of rooms that may have receipts that we're not sending

View File

@@ -123,6 +123,19 @@ class SlidingSyncInterestedRooms:
newly_left_rooms: AbstractSet[str]
dm_room_ids: AbstractSet[str]
@staticmethod
def empty() -> "SlidingSyncInterestedRooms":
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
newly_left_rooms=set(),
dm_room_ids=set(),
)
def filter_membership_for_sync(
*,
@@ -181,6 +194,14 @@ class SlidingSyncRoomLists:
from_token: Optional[StreamToken],
) -> SlidingSyncInterestedRooms:
"""Fetch the set of rooms that match the request"""
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
has_room_subscriptions = (
sync_config.room_subscriptions is not None
and len(sync_config.room_subscriptions) > 0
)
if not has_lists and not has_room_subscriptions:
return SlidingSyncInterestedRooms.empty()
if await self.store.have_finished_sliding_sync_background_jobs():
return await self._compute_interested_rooms_new_tables(
@@ -479,6 +500,16 @@ class SlidingSyncRoomLists:
# depending on the `required_state` requested (see below).
partial_state_rooms = await self.store.get_partial_rooms()
# Fetch any rooms that we have not already fetched from the database.
subscription_sliding_sync_rooms = (
await self.store.get_sliding_sync_room_for_user_batch(
user_id,
sync_config.room_subscriptions.keys()
- room_membership_for_user_map.keys(),
)
)
room_membership_for_user_map.update(subscription_sliding_sync_rooms)
for (
room_id,
room_subscription,
@@ -486,17 +517,11 @@ class SlidingSyncRoomLists:
# Check if we have a membership for the room, but didn't pull it out
# above. This could be e.g. a leave that we don't pull out by
# default.
current_room_entry = (
await self.store.get_sliding_sync_room_for_user(
user_id, room_id
)
)
current_room_entry = room_membership_for_user_map.get(room_id)
if not current_room_entry:
# TODO: Handle rooms the user isn't in.
continue
room_membership_for_user_map[room_id] = current_room_entry
all_rooms.add(room_id)
# Take the superset of the `RoomSyncConfig` for each room.

View File

@@ -1039,7 +1039,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred = deferred
self.boundary = boundary
self.max_length = max_length
self.parser = None
self.parser: Optional[multipart.MultipartParser] = None
self.multipart_response = MultipartResponse()
self.has_redirect = False
self.in_json = False
@@ -1097,7 +1097,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.deferred.errback()
self.file_length += end - start
callbacks = {
callbacks: "multipart.multipart.MultipartCallbacks" = {
"on_header_field": on_header_field,
"on_header_value": on_header_value,
"on_part_data": on_part_data,
@@ -1113,7 +1113,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.transport.abortConnection()
try:
self.parser.write(incoming_data) # type: ignore[attr-defined]
self.parser.write(incoming_data)
except Exception as e:
logger.warning(f"Exception writing to multipart parser: {e}")
self.deferred.errback()

View File

@@ -41,6 +41,7 @@ import attr
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.api.constants import EduTypes, EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
@@ -52,6 +53,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
ISynapseReactor,
JsonDict,
MultiWriterStreamToken,
PersistedEventPosition,
@@ -61,8 +63,11 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -89,18 +94,6 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
return n
class _NotificationListener:
"""This represents a single client connection to the events stream.
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred: "defer.Deferred"):
self.deferred = deferred
class _NotifierUserStream:
"""This represents a user connected to the event stream.
It tracks the most recent stream token for that user.
@@ -113,59 +106,49 @@ class _NotifierUserStream:
def __init__(
self,
reactor: ISynapseReactor,
user_id: str,
rooms: StrCollection,
current_token: StreamToken,
time_now_ms: int,
):
self.reactor = reactor
self.user_id = user_id
self.rooms = set(rooms)
self.current_token = current_token
# The last token for which we should wake up any streams that have a
# token that comes before it. This gets updated every time we get poked.
# We start it at the current token since if we get any streams
# that have a token from before we have no idea whether they should be
# woken up or not, so lets just wake them up.
self.last_notified_token = current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)
# Set of listeners that we need to wake up when there has been a change.
self.listeners: Set[Deferred[StreamToken]] = set()
def notify(
def update_and_fetch_deferreds(
self,
stream_key: StreamKeyType,
stream_id: Union[int, RoomStreamToken, MultiWriterStreamToken],
current_token: StreamToken,
time_now_ms: int,
) -> None:
"""Notify any listeners for this user of a new event from an
event source.
) -> Collection["Deferred[StreamToken]"]:
"""Update the stream for this user because of a new event from an
event source, and return the set of deferreds to wake up.
Args:
stream_key: The stream the event came from.
stream_id: The new id for the stream the event came from.
current_token: The new current token.
time_now_ms: The current time in milliseconds.
Returns:
The set of deferreds that need to be called.
"""
self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.current_token = current_token
self.last_notified_ms = time_now_ms
notify_deferred = self.notify_deferred
log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)
listeners = self.listeners
self.listeners = set()
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
notify_deferred.callback(self.current_token)
return listeners
def remove(self, notifier: "Notifier") -> None:
"""Remove this listener from all the indexes in the Notifier
@@ -179,9 +162,9 @@ class _NotifierUserStream:
notifier.user_to_user_stream.pop(self.user_id)
def count_listeners(self) -> int:
return len(self.notify_deferred.observers())
return len(self.listeners)
def new_listener(self, token: StreamToken) -> _NotificationListener:
def new_listener(self, token: StreamToken) -> "Deferred[StreamToken]":
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
@@ -191,10 +174,17 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
if token != self.current_token:
return defer.succeed(self.current_token)
# Create a new deferred and add it to the set of listeners. We add a
# cancel handler to remove it from the set again, to handle timeouts.
deferred: "Deferred[StreamToken]" = Deferred(
canceller=lambda d: self.listeners.discard(d)
)
self.listeners.add(deferred)
return deferred
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -247,6 +237,7 @@ class Notifier:
# List of callbacks to be notified when a lock is released
self._lock_released_callback: List[Callable[[str, str, str], None]] = []
self.reactor = hs.get_reactor()
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -342,14 +333,25 @@ class Notifier:
# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
listeners.extend(
user_stream.update_and_fetch_deferreds(current_token, time_now_ms)
)
except Exception:
logger.exception("Failed to notify listener")
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(StreamKeyType.UN_PARTIAL_STATED_ROOMS).inc(
len(user_streams)
)
# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()
@@ -519,12 +521,16 @@ class Notifier:
rooms = rooms or []
with Measure(self.clock, "on_new_event"):
user_streams = set()
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
@@ -544,12 +550,27 @@ class Notifier:
)
time_now_ms = self.clock.time_msec()
current_token = self.event_sources.get_current_token()
listeners: List["Deferred[StreamToken]"] = []
for user_stream in user_streams:
try:
user_stream.notify(stream_key, new_token, time_now_ms)
listeners.extend(
user_stream.update_and_fetch_deferreds(
current_token, time_now_ms
)
)
except Exception:
logger.exception("Failed to notify listener")
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
# Notify appservices.
@@ -586,6 +607,7 @@ class Notifier:
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
reactor=self.reactor,
user_id=user_id,
rooms=room_ids,
current_token=current_token,
@@ -608,8 +630,8 @@ class Notifier:
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
listener = timeout_deferred(
listener,
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
@@ -622,7 +644,7 @@ class Notifier:
)
with PreserveLoggingContext():
await listener.deferred
await listener
log_kv(
{

View File

@@ -1,7 +1,7 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
# Copyright (C) 2023-2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as

View File

@@ -1,3 +1,17 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
from typing import TYPE_CHECKING, Dict, Optional, Tuple

View File

@@ -48,7 +48,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
"""
NAME = "add_user_account_data"
NAME = "remove_pusher"
PATH_ARGS = ("user_id",)
CACHE = False

View File

@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
import attr
from synapse._pydantic_compat import StrictBool
from synapse._pydantic_compat import StrictBool, StrictInt, StrictStr
from synapse.api.constants import Direction, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
@@ -1421,40 +1421,39 @@ class RedactUser(RestServlet):
self._store = hs.get_datastores().main
self.admin_handler = hs.get_admin_handler()
class PostBody(RequestBodyModel):
rooms: List[StrictStr]
reason: Optional[StrictStr]
limit: Optional[StrictInt]
async def on_POST(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)
body = parse_json_object_from_request(request, allow_empty_body=True)
rooms = body.get("rooms")
if rooms is None:
# parse provided user id to check that it is valid
UserID.from_string(user_id)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
limit = body.limit
if limit and limit <= 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Must provide a value for rooms."
HTTPStatus.BAD_REQUEST,
"If limit is provided it must be a non-negative integer greater than 0.",
)
reason = body.get("reason")
if reason:
if not isinstance(reason, str):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"If a reason is provided it must be a string.",
)
limit = body.get("limit")
if limit:
if not isinstance(limit, int) or limit <= 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"If limit is provided it must be a non-negative integer greater than 0.",
)
rooms = body.rooms
if not rooms:
rooms = await self._store.get_rooms_for_user(user_id)
current_rooms = list(await self._store.get_rooms_for_user(user_id))
banned_rooms = list(
await self._store.get_rooms_user_currently_banned_from(user_id)
)
rooms = current_rooms + banned_rooms
redact_id = await self.admin_handler.start_redact_events(
user_id, list(rooms), requester.serialize(), reason, limit
user_id, rooms, requester.serialize(), body.reason, limit
)
return HTTPStatus.OK, {"redact_id": redact_id}

View File

@@ -1,3 +1,17 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# This module contains REST servlets to do with delayed events: /delayed_events/<paths>
import logging

View File

@@ -1010,11 +1010,13 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.bump_stamp is not None:
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp
if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count

View File

@@ -172,7 +172,7 @@ class VersionsRestServlet(RestServlet):
)
),
# MSC4140: Delayed events
"org.matrix.msc4140": True,
"org.matrix.msc4140": bool(self.config.server.max_event_delay_ms),
# MSC4151: Report room API (Client-Server API)
"org.matrix.msc4151": self.config.experimental.msc4151_enabled,
# Simplified sliding sync

View File

@@ -1,3 +1,17 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
from typing import List, NewType, Optional, Tuple

View File

@@ -711,6 +711,27 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return {row[0] for row in txn}
async def get_rooms_user_currently_banned_from(
self, user_id: str
) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently banned from.
If a remote user only returns rooms this server is currently
participating in.
"""
room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
keyvalues={
"type": EventTypes.Member,
"membership": Membership.BAN,
"state_key": user_id,
},
retcol="room_id",
desc="get_rooms_user_currently_banned_from",
)
return frozenset(room_ids)
@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.
@@ -1499,6 +1520,57 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
)
async def get_sliding_sync_room_for_user_batch(
self, user_id: str, room_ids: StrCollection
) -> Dict[str, RoomsForUserSlidingSync]:
"""Get the sliding sync room entry for the given user and rooms."""
if not room_ids:
return {}
def get_sliding_sync_room_for_user_batch_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
clause, args = make_in_list_sql_clause(
self.database_engine, "m.room_id", room_ids
)
sql = f"""
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE m.forgotten = 0
AND {clause}
AND user_id = ?
"""
args.append(user_id)
txn.execute(sql, args)
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=row[9],
)
for row in txn
}
return await self.db_pool.runInteraction(
"get_sliding_sync_room_for_user_batch",
get_sliding_sync_room_for_user_batch_txn,
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(

View File

@@ -751,6 +751,48 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if self._events_stream_cache.has_entity_changed(room_id, from_id)
}
async def get_rooms_that_have_updates_since_sliding_sync_table(
self,
room_ids: StrCollection,
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token (changes that are > `from_key`)."""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
sql = """
SELECT room_id
FROM sliding_sync_joined_rooms
WHERE {clause}
AND event_stream_ordering > ?
"""
results: Set[str] = set()
for batch in batch_iter(room_ids, 1000):
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch
)
args.append(from_key.stream)
txn.execute(sql.format(clause=clause), args)
results.update(row[0] for row in txn)
return results
return await self.db_pool.runInteraction(
"get_rooms_that_have_updates_since_sliding_sync_table",
get_rooms_that_have_updates_since_sliding_sync_table_txn,
)
async def paginate_room_events_by_stream_ordering(
self,
*,

View File

@@ -1,3 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE delayed_events (
delay_id TEXT NOT NULL,
user_localpart TEXT NOT NULL,

View File

@@ -158,6 +158,7 @@ class SlidingSyncResult:
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
This may not be included if there hasn't been a change.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
@@ -193,7 +194,7 @@ class SlidingSyncResult:
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
bump_stamp: Optional[int]
joined_count: Optional[int]
invited_count: Optional[int]
notification_count: int

View File

@@ -142,9 +142,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
if not self._cache or stream_pos <= self._earliest_known_stream_pos:
if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
# _cache is not valid at or before the earliest known stream position, so
# _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
if stream_pos <= self._earliest_known_stream_pos:
if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []

View File

@@ -19,13 +19,23 @@
# [This file includes modifications made by New Vector Limited]
#
#
import tempfile
from typing import Callable
import yaml
from parameterized import parameterized
from synapse.config import ConfigError
from synapse.config._base import RootConfig
from synapse.config.homeserver import HomeServerConfig
from tests.config.utils import ConfigFileTestCase
try:
import hiredis
except ImportError:
hiredis = None # type: ignore
class ConfigLoadingFileTestCase(ConfigFileTestCase):
def test_load_fails_if_server_name_missing(self) -> None:
@@ -116,3 +126,49 @@ class ConfigLoadingFileTestCase(ConfigFileTestCase):
self.add_lines_to_config(["trust_identity_server_for_password_resets: true"])
with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
"turn_shared_secret_path: /does/not/exist",
"registration_shared_secret_path: /does/not/exist",
*["redis:\n enabled: true\n password_path: /does/not/exist"]
* (hiredis is not None),
]
)
def test_secret_files_missing(self, config_str: str) -> None:
self.generate_config()
self.add_lines_to_config(["", config_str])
with self.assertRaises(ConfigError):
HomeServerConfig.load_config("", ["-c", self.config_file])
@parameterized.expand(
[
(
"turn_shared_secret_path: {}",
lambda c: c.voip.turn_shared_secret,
),
(
"registration_shared_secret_path: {}",
lambda c: c.registration.registration_shared_secret,
),
*[
(
"redis:\n enabled: true\n password_path: {}",
lambda c: c.redis.redis_password,
)
]
* (hiredis is not None),
]
)
def test_secret_files_existing(
self, config_line: str, get_secret: Callable[[RootConfig], str]
) -> None:
self.generate_config_and_remove_lines_containing("registration_shared_secret")
with tempfile.NamedTemporaryFile(buffering=0) as secret_file:
secret_file.write(b"53C237")
self.add_lines_to_config(["", config_line.format(secret_file.name)])
config = HomeServerConfig.load_config("", ["-c", self.config_file])
self.assertEqual(get_secret(config), "53C237")

View File

@@ -380,7 +380,7 @@ class RoomMemberMasterHandlerTestCase(HomeserverTestCase):
)
def test_forget_when_not_left(self) -> None:
"""Tests that a user cannot not forgets a room that has not left."""
"""Tests that a user cannot forget a room that they are still in."""
self.get_failure(self.handler.forget(self.alice_ID, self.room_id), SynapseError)
def test_nonlocal_room_user_action(self) -> None:

View File

@@ -5288,19 +5288,26 @@ class UserRedactionTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(matched), len(rm2_originals))
def test_admin_redact_works_if_user_kicked_or_banned(self) -> None:
originals = []
originals1 = []
originals2 = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
if rm in [self.rm1, self.rm3]:
originals1.append(join["event_id"])
else:
originals2.append(join["event_id"])
for i in range(5):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok
)
originals.append(res["event_id"])
if rm in [self.rm1, self.rm3]:
originals1.append(res["event_id"])
else:
originals2.append(res["event_id"])
# kick user from rooms 1 and 3
for r in [self.rm1, self.rm2]:
for r in [self.rm1, self.rm3]:
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{r}/kick",
@@ -5330,32 +5337,70 @@ class UserRedactionTestCase(unittest.HomeserverTestCase):
failed_redactions = channel2.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {})
# ban user
channel3 = self.make_request(
# double check
for rm in [self.rm1, self.rm3]:
filter = json.dumps({"types": [EventTypes.Redaction]})
channel3 = self.make_request(
"GET",
f"rooms/{rm}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel3.code, 200)
matches = []
for event in channel3.json_body["chunk"]:
for event_id in originals1:
if (
event["type"] == "m.room.redaction"
and event["redacts"] == event_id
):
matches.append((event_id, event))
# we redacted 6 messages
self.assertEqual(len(matches), 6)
# ban user from room 2
channel4 = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{self.rm2}/ban",
content={"reason": "being a bummer", "user_id": self.bad_user},
access_token=self.admin_tok,
)
self.assertEqual(channel3.code, HTTPStatus.OK, channel3.result)
self.assertEqual(channel4.code, HTTPStatus.OK, channel4.result)
# redact messages in room 2
channel4 = self.make_request(
# make a request to ban all user's messages
channel5 = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm2]},
content={"rooms": []},
access_token=self.admin_tok,
)
self.assertEqual(channel4.code, 200)
id2 = channel1.json_body.get("redact_id")
self.assertEqual(channel5.code, 200)
id2 = channel5.json_body.get("redact_id")
# check that there were no failed redactions in room 2
channel5 = self.make_request(
channel6 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id2}",
access_token=self.admin_tok,
)
self.assertEqual(channel5.code, 200)
self.assertEqual(channel5.json_body.get("status"), "complete")
failed_redactions = channel5.json_body.get("failed_redactions")
self.assertEqual(channel6.code, 200)
self.assertEqual(channel6.json_body.get("status"), "complete")
failed_redactions = channel6.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {})
# double check messages in room 2 were redacted
filter = json.dumps({"types": [EventTypes.Redaction]})
channel7 = self.make_request(
"GET",
f"rooms/{self.rm2}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel7.code, 200)
matches = []
for event in channel7.json_body["chunk"]:
for event_id in originals2:
if event["type"] == "m.room.redaction" and event["redacts"] == event_id:
matches.append((event_id, event))
# we redacted 6 messages
self.assertEqual(len(matches), 6)

View File

@@ -1096,6 +1096,92 @@ class SlidingSyncRoomsMetaTestCase(SlidingSyncBase):
self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0)
def test_rooms_bump_stamp_no_change_incremental(self) -> None:
"""Test that the bump stamp is omitted if there has been no change"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 100,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Initial sync so we expect to see a bump stamp
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
# Send an event that is not in the bump events list
self.helper.send_event(
room_id1, type="org.matrix.test", content={}, tok=user1_tok
)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# There hasn't been a change to the bump stamps, so we ignore it
self.assertNotIn("bump_stamp", response_body["rooms"][room_id1])
def test_rooms_bump_stamp_change_incremental(self) -> None:
"""Test that the bump stamp is included if there has been a change, even
if its not in the timeline"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [],
"timeline_limit": 2,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# Initial sync so we expect to see a bump stamp
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
first_bump_stamp = response_body["rooms"][room_id1]["bump_stamp"]
# Send a bump event at the start.
self.helper.send(room_id1, "test", tok=user1_tok)
# Send events that are not in the bump events list to fill the timeline
for _ in range(5):
self.helper.send_event(
room_id1, type="org.matrix.test", content={}, tok=user1_tok
)
response_body, from_token = self.do_sync(
sync_body, since=from_token, tok=user1_tok
)
# There was a bump event in the timeline gap, so we should see the bump
# stamp be updated.
self.assertIn("bump_stamp", response_body["rooms"][room_id1])
second_bump_stamp = response_body["rooms"][room_id1]["bump_stamp"]
self.assertGreater(second_bump_stamp, first_bump_stamp)
def test_rooms_bump_stamp_invites(self) -> None:
"""
Test that `bump_stamp` is present and points to the membership event,

View File

@@ -1,3 +1,17 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
"""Tests REST events for /delayed_events paths."""
from http import HTTPStatus
@@ -8,11 +22,12 @@ from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.errors import Codes
from synapse.rest.client import delayed_events, room
from synapse.rest.client import delayed_events, room, versions
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
from tests.unittest import HomeserverTestCase
PATH_PREFIX = "/_matrix/client/unstable/org.matrix.msc4140/delayed_events"
@@ -21,6 +36,21 @@ _HS_NAME = "red"
_EVENT_TYPE = "com.example.test"
class DelayedEventsUnstableSupportTestCase(HomeserverTestCase):
servlets = [versions.register_servlets]
def test_false_by_default(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertFalse(channel.json_body["unstable_features"]["org.matrix.msc4140"])
@unittest.override_config({"max_event_delay_duration": "24h"})
def test_true_if_enabled(self) -> None:
channel = self.make_request("GET", "/_matrix/client/versions")
self.assertEqual(channel.code, 200, channel.result)
self.assertTrue(channel.json_body["unstable_features"]["org.matrix.msc4140"])
class DelayedEventsTestCase(HomeserverTestCase):
"""Tests getting and managing delayed events."""

View File

@@ -4,7 +4,7 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2017 Vector Creations Ltd
# Copyright 2014-2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
# Copyright (C) 2023-2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as

View File

@@ -282,22 +282,33 @@ class SyncTypingTests(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
channel = self.make_request("GET", sync_url % (access_token, next_batch))
self.assertEqual(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is
# in the future.
# in the future. This happens automatically when the typing stream
# resets.
typing._reset()
# Now it SHOULD fail as it never completes!
# Nothing new, so we time out.
with self.assertRaises(TimedOutException):
self.make_request("GET", sync_url % (access_token, next_batch))
# Sync and start typing again.
sync_channel = self.make_request(
"GET", sync_url % (access_token, next_batch), await_result=False
)
self.assertFalse(sync_channel.is_finished())
channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.assertEqual(200, channel.code)
# Sync should now return.
sync_channel.await_result()
self.assertEqual(200, sync_channel.code)
next_batch = sync_channel.json_body["next_batch"]
class SyncKnockTestCase(KnockingStrippedStateEventHelperMixin):
servlets = [

View File

@@ -53,8 +53,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# return True, whether it's a known entity or not.
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
self.assertTrue(cache.has_entity_changed("not@here.website", 3))
self.assertTrue(cache.has_entity_changed("user@foo.com", 2))
self.assertTrue(cache.has_entity_changed("not@here.website", 2))
def test_entity_has_changed_pops_off_start(self) -> None:
"""
@@ -76,9 +76,11 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertTrue("user@foo.com" not in cache._entity_to_key)
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
cache.get_all_entities_changed(2).entities,
["bar@baz.net", "user@elsewhere.org"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
# If we update an existing entity, it keeps the two existing entities
cache.entity_has_changed("bar@baz.net", 5)
@@ -89,7 +91,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
cache.get_all_entities_changed(3).entities,
["user@elsewhere.org", "bar@baz.net"],
)
self.assertFalse(cache.get_all_entities_changed(2).hit)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertTrue(cache.get_all_entities_changed(2).hit)
def test_get_all_entities_changed(self) -> None:
"""
@@ -114,7 +117,8 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
self.assertEqual(
cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
)
self.assertFalse(cache.get_all_entities_changed(1).hit)
self.assertFalse(cache.get_all_entities_changed(0).hit)
self.assertTrue(cache.get_all_entities_changed(1).hit)
# ... later, things gest more updates
cache.entity_has_changed("user@foo.com", 5)
@@ -149,7 +153,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
# With no entities, it returns True for the past, present, and False for
# the future.
self.assertTrue(cache.has_any_entity_changed(0))
self.assertTrue(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(1))
self.assertFalse(cache.has_any_entity_changed(2))
# We add an entity