diff --git a/Cargo.lock b/Cargo.lock index 46c930ebd7..b7084165ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,12 +35,6 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" -[[package]] -name = "bitflags" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" - [[package]] name = "blake2" version = "0.10.6" @@ -162,9 +156,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hex" @@ -222,16 +216,6 @@ version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" -[[package]] -name = "lock_api" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.22" @@ -265,29 +249,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "parking_lot" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets", -] - [[package]] name = "portable-atomic" version = "1.6.0" @@ -311,16 +272,16 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.21.2" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e00b96a521718e08e03b1a622f01c8a8deb50719335de3f60b3b3950f069d8" +checksum = "f54b3d09cbdd1f8c20650b28e7b09e338881482f4aa908a5f61a00c98fba2690" dependencies = [ "anyhow", "cfg-if", "indoc", "libc", "memoffset", - "parking_lot", + "once_cell", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -330,9 +291,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.21.2" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7883df5835fafdad87c0d888b266c8ec0f4c9ca48a5bed6bbb592e8dedee1b50" +checksum = "3015cf985888fe66cfb63ce0e321c603706cd541b7aec7ddd35c281390af45d8" dependencies = [ "once_cell", "target-lexicon", @@ -340,9 +301,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.21.2" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01be5843dc60b916ab4dad1dca6d20b9b4e6ddc8e15f50c47fe6d85f1fb97403" +checksum = "6fca7cd8fd809b5ac4eefb89c1f98f7a7651d3739dfb341ca6980090f554c270" dependencies = [ "libc", "pyo3-build-config", @@ -350,9 +311,9 @@ dependencies = [ [[package]] name = "pyo3-log" -version = "0.10.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af49834b8d2ecd555177e63b273b708dea75150abc6f5341d0a6e1a9623976c" +checksum = "3eb421dc86d38d08e04b927b02424db480be71b777fa3a56f32e2f2a3a1a3b08" dependencies = [ "arc-swap", "log", @@ -361,9 +322,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.21.2" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77b34069fc0682e11b31dbd10321cbf94808394c56fd996796ce45217dfac53c" +checksum = "34e657fa5379a79151b6ff5328d9216a84f55dc93b17b08e7c3609a969b73aa0" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -373,9 +334,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.21.2" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08260721f32db5e1a5beae69a55553f56b99bd0e1c3e6e0a5e8851a9d0f5a85c" +checksum = "295548d5ffd95fd1981d2d3cf4458831b21d60af046b729b6fd143b0ba7aee2f" dependencies = [ "heck", "proc-macro2", @@ -386,9 +347,9 @@ dependencies = [ [[package]] name = "pythonize" -version = "0.21.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0664248812c38cc55a4ed07f88e4df516ce82604b93b1ffdc041aa77a6cb3c" +checksum = "91a6ee7a084f913f98d70cdc3ebec07e852b735ae3059a1500db2661265da9ff" dependencies = [ "pyo3", "serde", @@ -433,15 +394,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "redox_syscall" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" -dependencies = [ - "bitflags", -] - [[package]] name = "regex" version = "1.11.1" @@ -477,12 +429,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "serde" version = "1.0.215" @@ -505,9 +451,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -537,12 +483,6 @@ dependencies = [ "digest", ] -[[package]] -name = "smallvec" -version = "1.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" - [[package]] name = "subtle" version = "2.5.0" @@ -694,67 +634,3 @@ dependencies = [ "js-sys", "wasm-bindgen", ] - -[[package]] -name = "windows-targets" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" - -[[package]] -name = "windows_i686_gnu" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" - -[[package]] -name = "windows_i686_gnullvm" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" - -[[package]] -name = "windows_i686_msvc" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" diff --git a/build_rust.py b/build_rust.py index 662474dcb4..d2726cee26 100644 --- a/build_rust.py +++ b/build_rust.py @@ -1,8 +1,10 @@ # A build script for poetry that adds the rust extension. +import itertools import os from typing import Any, Dict +from packaging.specifiers import SpecifierSet from setuptools_rust import Binding, RustExtension @@ -14,6 +16,8 @@ def build(setup_kwargs: Dict[str, Any]) -> None: target="synapse.synapse_rust", path=cargo_toml_path, binding=Binding.PyO3, + # This flag is a no-op in the latest versions. Instead, we need to + # specify this in the `bdist_wheel` config below. py_limited_api=True, # We force always building in release mode, as we can't tell the # difference between using `poetry` in development vs production. @@ -21,3 +25,18 @@ def build(setup_kwargs: Dict[str, Any]) -> None: ) setup_kwargs.setdefault("rust_extensions", []).append(extension) setup_kwargs["zip_safe"] = False + + # We lookup the minimum supported python version by looking at + # `python_requires` (e.g. ">=3.9.0,<4.0.0") and finding the first python + # version that matches. We then convert that into the `py_limited_api` form, + # e.g. cp39 for python 3.9. + py_limited_api: str + python_bounds = SpecifierSet(setup_kwargs["python_requires"]) + for minor_version in itertools.count(start=8): + if f"3.{minor_version}.0" in python_bounds: + py_limited_api = f"cp3{minor_version}" + break + + setup_kwargs.setdefault("options", {}).setdefault("bdist_wheel", {})[ + "py_limited_api" + ] = py_limited_api diff --git a/changelog.d/17253.misc b/changelog.d/17253.misc new file mode 100644 index 0000000000..868691624d --- /dev/null +++ b/changelog.d/17253.misc @@ -0,0 +1 @@ +[MSC4108](https://github.com/matrix-org/matrix-spec-proposals/pull/4108): Add a `Content-Type` header on the `PUT` response to work around a faulty behavior in some caching reverse proxies. diff --git a/changelog.d/17872.doc b/changelog.d/17872.doc new file mode 100644 index 0000000000..7f8b2d3495 --- /dev/null +++ b/changelog.d/17872.doc @@ -0,0 +1 @@ +Add OIDC example configuration for Forgejo (fork of Gitea). diff --git a/changelog.d/17933.bugfix b/changelog.d/17933.bugfix new file mode 100644 index 0000000000..8d30ac587e --- /dev/null +++ b/changelog.d/17933.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where read receipts could get overly delayed being sent over federation. diff --git a/changelog.d/17936.misc b/changelog.d/17936.misc new file mode 100644 index 0000000000..91d976fbd9 --- /dev/null +++ b/changelog.d/17936.misc @@ -0,0 +1 @@ +Fix incorrect comment in new schema delta. diff --git a/changelog.d/17944.misc b/changelog.d/17944.misc new file mode 100644 index 0000000000..a8a645103f --- /dev/null +++ b/changelog.d/17944.misc @@ -0,0 +1 @@ +Raise setuptools_rust version cap to 1.10.2. \ No newline at end of file diff --git a/changelog.d/17945.misc b/changelog.d/17945.misc new file mode 100644 index 0000000000..eeebb92169 --- /dev/null +++ b/changelog.d/17945.misc @@ -0,0 +1 @@ +Enable encrypted appservice related experimental features in the complement docker image. diff --git a/changelog.d/17952.misc b/changelog.d/17952.misc new file mode 100644 index 0000000000..84fc8bfc29 --- /dev/null +++ b/changelog.d/17952.misc @@ -0,0 +1 @@ +Return whether the user is suspended when querying the user account in the Admin API. \ No newline at end of file diff --git a/changelog.d/17953.doc b/changelog.d/17953.doc new file mode 100644 index 0000000000..10f5a27ba9 --- /dev/null +++ b/changelog.d/17953.doc @@ -0,0 +1 @@ +Link to element-docker-demo from contrib/docker*. diff --git a/changelog.d/17966.misc b/changelog.d/17966.misc new file mode 100644 index 0000000000..c6d6e55fbf --- /dev/null +++ b/changelog.d/17966.misc @@ -0,0 +1 @@ +Bump pyo3 and dependencies to v0.23.2. \ No newline at end of file diff --git a/changelog.d/17969.misc b/changelog.d/17969.misc new file mode 100644 index 0000000000..05506daaa0 --- /dev/null +++ b/changelog.d/17969.misc @@ -0,0 +1 @@ +Update setuptools-rust and fix building abi3 wheels in latest version. diff --git a/contrib/docker/README.md b/contrib/docker/README.md index 89c1518bd0..fdfa96795a 100644 --- a/contrib/docker/README.md +++ b/contrib/docker/README.md @@ -30,3 +30,6 @@ docker-compose up -d ### More information For more information on required environment variables and mounts, see the main docker documentation at [/docker/README.md](../../docker/README.md) + +**For a more comprehensive Docker Compose example showcasing a full Matrix 2.0 stack, please see +https://github.com/element-hq/element-docker-demo** \ No newline at end of file diff --git a/contrib/docker_compose_workers/README.md b/contrib/docker_compose_workers/README.md index 81518f6ba1..16c8c26795 100644 --- a/contrib/docker_compose_workers/README.md +++ b/contrib/docker_compose_workers/README.md @@ -8,6 +8,9 @@ All examples and snippets assume that your Synapse service is called `synapse` i An example Docker Compose file can be found [here](docker-compose.yaml). +**For a more comprehensive Docker Compose example, showcasing a full Matrix 2.0 stack (originally based on this +docker-compose.yaml), please see https://github.com/element-hq/element-docker-demo** + ## Worker Service Examples in Docker Compose In order to start the Synapse container as a worker, you must specify an `entrypoint` that loads both the `homeserver.yaml` and the configuration for the worker (`synapse-generic-worker-1.yaml` in the example below). You must also include the worker type in the environment variable `SYNAPSE_WORKER` or alternatively pass `-m synapse.app.generic_worker` as part of the `entrypoint` after `"/start.py", "run"`). diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index b9334cc53b..9a74c617bc 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -104,6 +104,16 @@ experimental_features: msc3967_enabled: true # Expose a room summary for public rooms msc3266_enabled: true + # Send to-device messages to application services + msc2409_to_device_messages_enabled: true + # Allow application services to masquerade devices + msc3202_device_masquerading: true + # Sending device list changes, one-time key counts and fallback key usage to application services + msc3202_transaction_extensions: true + # Proxy OTK claim requests to exclusive ASes + msc3983_appservice_otk_claims: true + # Proxy key queries to exclusive ASes + msc3984_appservice_key_query: true server_notices: system_mxid_localpart: _server diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index 96a2994b7b..a6e2e0a153 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -55,7 +55,8 @@ It returns a JSON body like the following: } ], "user_type": null, - "locked": false + "locked": false, + "suspended": false } ``` diff --git a/docs/openid.md b/docs/openid.md index 7a10b1615b..5a3d7e9fba 100644 --- a/docs/openid.md +++ b/docs/openid.md @@ -336,6 +336,36 @@ but it has a `response_types_supported` which excludes "code" (which we rely on, is even mentioned in their [documentation](https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow#login)), so we have to disable discovery and configure the URIs manually. +### Forgejo + +Forgejo is a fork of Gitea that can act as an OAuth2 provider. + +The implementation of OAuth2 is improved compared to Gitea, as it provides a correctly defined `subject_claim` and `scopes`. + +Synapse config: + +```yaml +oidc_providers: + - idp_id: forgejo + idp_name: Forgejo + discover: false + issuer: "https://your-forgejo.com/" + client_id: "your-client-id" # TO BE FILLED + client_secret: "your-client-secret" # TO BE FILLED + client_auth_method: client_secret_post + scopes: ["openid", "profile", "email", "groups"] + authorization_endpoint: "https://your-forgejo.com/login/oauth/authorize" + token_endpoint: "https://your-forgejo.com/login/oauth/access_token" + userinfo_endpoint: "https://your-forgejo.com/api/v1/user" + user_mapping_provider: + config: + subject_claim: "sub" + picture_claim: "picture" + localpart_template: "{{ user.preferred_username }}" + display_name_template: "{{ user.name }}" + email_template: "{{ user.email }}" +``` + ### GitHub [GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but diff --git a/poetry.lock b/poetry.lock index eece221095..f43fe2489a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "annotated-types" @@ -2405,19 +2405,18 @@ test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata [[package]] name = "setuptools-rust" -version = "1.8.1" +version = "1.10.2" description = "Setuptools Rust extension plugin" optional = false python-versions = ">=3.8" files = [ - {file = "setuptools-rust-1.8.1.tar.gz", hash = "sha256:94b1dd5d5308b3138d5b933c3a2b55e6d6927d1a22632e509fcea9ddd0f7e486"}, - {file = "setuptools_rust-1.8.1-py3-none-any.whl", hash = "sha256:b5324493949ccd6aa0c03890c5f6b5f02de4512e3ac1697d02e9a6c02b18aa8e"}, + {file = "setuptools_rust-1.10.2-py3-none-any.whl", hash = "sha256:4b39c435ae9670315d522ed08fa0e8cb29f2a6048033966b6be2571a90ce4f1c"}, + {file = "setuptools_rust-1.10.2.tar.gz", hash = "sha256:5d73e7eee5f87a6417285b617c97088a7c20d1a70fcea60e3bdc94ff567c29dc"}, ] [package.dependencies] semantic-version = ">=2.8.2,<3" setuptools = ">=62.4" -tomli = {version = ">=1.2.1", markers = "python_version < \"3.11\""} [[package]] name = "signedjson" @@ -2515,33 +2514,33 @@ twisted = ["twisted"] [[package]] name = "tomli" -version = "2.0.2" +version = "2.1.0" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" files = [ - {file = "tomli-2.0.2-py3-none-any.whl", hash = "sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38"}, - {file = "tomli-2.0.2.tar.gz", hash = "sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed"}, + {file = "tomli-2.1.0-py3-none-any.whl", hash = "sha256:a5c57c3d1c56f5ccdf89f6523458f60ef716e210fc47c4cfb188c5ba473e0391"}, + {file = "tomli-2.1.0.tar.gz", hash = "sha256:3f646cae2aec94e17d04973e4249548320197cfabdf130015d023de4b74d8ab8"}, ] [[package]] name = "tornado" -version = "6.4.1" +version = "6.4.2" description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed." optional = true python-versions = ">=3.8" files = [ - {file = "tornado-6.4.1-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:163b0aafc8e23d8cdc3c9dfb24c5368af84a81e3364745ccb4427669bf84aec8"}, - {file = "tornado-6.4.1-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6d5ce3437e18a2b66fbadb183c1d3364fb03f2be71299e7d10dbeeb69f4b2a14"}, - {file = "tornado-6.4.1-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e2e20b9113cd7293f164dc46fffb13535266e713cdb87bd2d15ddb336e96cfc4"}, - {file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ae50a504a740365267b2a8d1a90c9fbc86b780a39170feca9bcc1787ff80842"}, - {file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:613bf4ddf5c7a95509218b149b555621497a6cc0d46ac341b30bd9ec19eac7f3"}, - {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:25486eb223babe3eed4b8aecbac33b37e3dd6d776bc730ca14e1bf93888b979f"}, - {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:454db8a7ecfcf2ff6042dde58404164d969b6f5d58b926da15e6b23817950fc4"}, - {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a02a08cc7a9314b006f653ce40483b9b3c12cda222d6a46d4ac63bb6c9057698"}, - {file = "tornado-6.4.1-cp38-abi3-win32.whl", hash = "sha256:d9a566c40b89757c9aa8e6f032bcdb8ca8795d7c1a9762910c722b1635c9de4d"}, - {file = "tornado-6.4.1-cp38-abi3-win_amd64.whl", hash = "sha256:b24b8982ed444378d7f21d563f4180a2de31ced9d8d84443907a0a64da2072e7"}, - {file = "tornado-6.4.1.tar.gz", hash = "sha256:92d3ab53183d8c50f8204a51e6f91d18a15d5ef261e84d452800d4ff6fc504e9"}, + {file = "tornado-6.4.2-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e828cce1123e9e44ae2a50a9de3055497ab1d0aeb440c5ac23064d9e44880da1"}, + {file = "tornado-6.4.2-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:072ce12ada169c5b00b7d92a99ba089447ccc993ea2143c9ede887e0937aa803"}, + {file = "tornado-6.4.2-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1a017d239bd1bb0919f72af256a970624241f070496635784d9bf0db640d3fec"}, + {file = "tornado-6.4.2-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c36e62ce8f63409301537222faffcef7dfc5284f27eec227389f2ad11b09d946"}, + {file = "tornado-6.4.2-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca9eb02196e789c9cb5c3c7c0f04fb447dc2adffd95265b2c7223a8a615ccbf"}, + {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:304463bd0772442ff4d0f5149c6f1c2135a1fae045adf070821c6cdc76980634"}, + {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:c82c46813ba483a385ab2a99caeaedf92585a1f90defb5693351fa7e4ea0bf73"}, + {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:932d195ca9015956fa502c6b56af9eb06106140d844a335590c1ec7f5277d10c"}, + {file = "tornado-6.4.2-cp38-abi3-win32.whl", hash = "sha256:2876cef82e6c5978fde1e0d5b1f919d756968d5b4282418f3146b79b58556482"}, + {file = "tornado-6.4.2-cp38-abi3-win_amd64.whl", hash = "sha256:908b71bf3ff37d81073356a5fadcc660eb10c1476ee6e2725588626ce7e5ca38"}, + {file = "tornado-6.4.2.tar.gz", hash = "sha256:92bad5b4746e9879fd7bf1eb21dce4e3fc5128d71601f80005afa39237ad620b"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 5fd1d7c198..eccce10455 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -370,7 +370,7 @@ tomli = ">=1.2.3" # runtime errors caused by build system changes. # We are happy to raise these upper bounds upon request, # provided we check that it's safe to do so (i.e. that CI passes). -requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.8.1"] +requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.10.2"] build-backend = "poetry.core.masonry.api" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 026487275c..7eebeb3b55 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,14 +30,14 @@ http = "1.1.0" lazy_static = "1.4.0" log = "0.4.17" mime = "0.3.17" -pyo3 = { version = "0.21.0", features = [ +pyo3 = { version = "0.23.2", features = [ "macros", "anyhow", "abi3", "abi3-py38", ] } -pyo3-log = "0.10.0" -pythonize = "0.21.0" +pyo3-log = "0.12.0" +pythonize = "0.23.0" regex = "1.6.0" sha2 = "0.10.8" serde = { version = "1.0.144", features = ["derive"] } diff --git a/rust/src/acl/mod.rs b/rust/src/acl/mod.rs index 982720ba90..57b45475fd 100644 --- a/rust/src/acl/mod.rs +++ b/rust/src/acl/mod.rs @@ -32,14 +32,14 @@ use crate::push::utils::{glob_to_regex, GlobMatchType}; /// Called when registering modules with python. pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { - let child_module = PyModule::new_bound(py, "acl")?; + let child_module = PyModule::new(py, "acl")?; child_module.add_class::()?; m.add_submodule(&child_module)?; // We need to manually add the module to sys.modules to make `from // synapse.synapse_rust import acl` work. - py.import_bound("sys")? + py.import("sys")? .getattr("modules")? .set_item("synapse.synapse_rust.acl", child_module)?; diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs index ad87825f16..eeb6074c10 100644 --- a/rust/src/events/internal_metadata.rs +++ b/rust/src/events/internal_metadata.rs @@ -41,9 +41,11 @@ use pyo3::{ pybacked::PyBackedStr, pyclass, pymethods, types::{PyAnyMethods, PyDict, PyDictMethods, PyString}, - Bound, IntoPy, PyAny, PyObject, PyResult, Python, + Bound, IntoPyObject, PyAny, PyObject, PyResult, Python, }; +use crate::UnwrapInfallible; + /// Definitions of the various fields of the internal metadata. #[derive(Clone)] enum EventInternalMetadataData { @@ -60,31 +62,59 @@ enum EventInternalMetadataData { impl EventInternalMetadataData { /// Convert the field to its name and python object. - fn to_python_pair<'a>(&self, py: Python<'a>) -> (&'a Bound<'a, PyString>, PyObject) { + fn to_python_pair<'a>(&self, py: Python<'a>) -> (&'a Bound<'a, PyString>, Bound<'a, PyAny>) { match self { - EventInternalMetadataData::OutOfBandMembership(o) => { - (pyo3::intern!(py, "out_of_band_membership"), o.into_py(py)) - } - EventInternalMetadataData::SendOnBehalfOf(o) => { - (pyo3::intern!(py, "send_on_behalf_of"), o.into_py(py)) - } - EventInternalMetadataData::RecheckRedaction(o) => { - (pyo3::intern!(py, "recheck_redaction"), o.into_py(py)) - } - EventInternalMetadataData::SoftFailed(o) => { - (pyo3::intern!(py, "soft_failed"), o.into_py(py)) - } - EventInternalMetadataData::ProactivelySend(o) => { - (pyo3::intern!(py, "proactively_send"), o.into_py(py)) - } - EventInternalMetadataData::Redacted(o) => { - (pyo3::intern!(py, "redacted"), o.into_py(py)) - } - EventInternalMetadataData::TxnId(o) => (pyo3::intern!(py, "txn_id"), o.into_py(py)), - EventInternalMetadataData::TokenId(o) => (pyo3::intern!(py, "token_id"), o.into_py(py)), - EventInternalMetadataData::DeviceId(o) => { - (pyo3::intern!(py, "device_id"), o.into_py(py)) - } + EventInternalMetadataData::OutOfBandMembership(o) => ( + pyo3::intern!(py, "out_of_band_membership"), + o.into_pyobject(py) + .unwrap_infallible() + .to_owned() + .into_any(), + ), + EventInternalMetadataData::SendOnBehalfOf(o) => ( + pyo3::intern!(py, "send_on_behalf_of"), + o.into_pyobject(py).unwrap_infallible().into_any(), + ), + EventInternalMetadataData::RecheckRedaction(o) => ( + pyo3::intern!(py, "recheck_redaction"), + o.into_pyobject(py) + .unwrap_infallible() + .to_owned() + .into_any(), + ), + EventInternalMetadataData::SoftFailed(o) => ( + pyo3::intern!(py, "soft_failed"), + o.into_pyobject(py) + .unwrap_infallible() + .to_owned() + .into_any(), + ), + EventInternalMetadataData::ProactivelySend(o) => ( + pyo3::intern!(py, "proactively_send"), + o.into_pyobject(py) + .unwrap_infallible() + .to_owned() + .into_any(), + ), + EventInternalMetadataData::Redacted(o) => ( + pyo3::intern!(py, "redacted"), + o.into_pyobject(py) + .unwrap_infallible() + .to_owned() + .into_any(), + ), + EventInternalMetadataData::TxnId(o) => ( + pyo3::intern!(py, "txn_id"), + o.into_pyobject(py).unwrap_infallible().into_any(), + ), + EventInternalMetadataData::TokenId(o) => ( + pyo3::intern!(py, "token_id"), + o.into_pyobject(py).unwrap_infallible().into_any(), + ), + EventInternalMetadataData::DeviceId(o) => ( + pyo3::intern!(py, "device_id"), + o.into_pyobject(py).unwrap_infallible().into_any(), + ), } } @@ -247,7 +277,7 @@ impl EventInternalMetadata { /// /// Note that `outlier` and `stream_ordering` are stored in separate columns so are not returned here. fn get_dict(&self, py: Python<'_>) -> PyResult { - let dict = PyDict::new_bound(py); + let dict = PyDict::new(py); for entry in &self.data { let (key, value) = entry.to_python_pair(py); diff --git a/rust/src/events/mod.rs b/rust/src/events/mod.rs index 0bb6cdb181..209efb917b 100644 --- a/rust/src/events/mod.rs +++ b/rust/src/events/mod.rs @@ -30,7 +30,7 @@ mod internal_metadata; /// Called when registering modules with python. pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { - let child_module = PyModule::new_bound(py, "events")?; + let child_module = PyModule::new(py, "events")?; child_module.add_class::()?; child_module.add_function(wrap_pyfunction!(filter::event_visible_to_server_py, m)?)?; @@ -38,7 +38,7 @@ pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> // We need to manually add the module to sys.modules to make `from // synapse.synapse_rust import events` work. - py.import_bound("sys")? + py.import("sys")? .getattr("modules")? .set_item("synapse.synapse_rust.events", child_module)?; diff --git a/rust/src/http.rs b/rust/src/http.rs index af052ab721..63ed05be54 100644 --- a/rust/src/http.rs +++ b/rust/src/http.rs @@ -70,7 +70,7 @@ pub fn http_request_from_twisted(request: &Bound<'_, PyAny>) -> PyResult, m: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } + +pub trait UnwrapInfallible { + fn unwrap_infallible(self) -> T; +} + +impl UnwrapInfallible for Result { + fn unwrap_infallible(self) -> T { + match self { + Ok(val) => val, + Err(never) => match never {}, + } + } +} diff --git a/rust/src/push/evaluator.rs b/rust/src/push/evaluator.rs index 0d436a1d7b..db406acb88 100644 --- a/rust/src/push/evaluator.rs +++ b/rust/src/push/evaluator.rs @@ -167,6 +167,7 @@ impl PushRuleEvaluator { /// /// Returns the set of actions, if any, that match (filtering out any /// `dont_notify` and `coalesce` actions). + #[pyo3(signature = (push_rules, user_id=None, display_name=None))] pub fn run( &self, push_rules: &FilteredPushRules, @@ -236,6 +237,7 @@ impl PushRuleEvaluator { } /// Check if the given condition matches. + #[pyo3(signature = (condition, user_id=None, display_name=None))] fn matches( &self, condition: Condition, diff --git a/rust/src/push/mod.rs b/rust/src/push/mod.rs index ef8ed150d4..bd0e853ac3 100644 --- a/rust/src/push/mod.rs +++ b/rust/src/push/mod.rs @@ -65,8 +65,8 @@ use anyhow::{Context, Error}; use log::warn; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; -use pyo3::types::{PyBool, PyList, PyLong, PyString}; -use pythonize::{depythonize_bound, pythonize}; +use pyo3::types::{PyBool, PyInt, PyList, PyString}; +use pythonize::{depythonize, pythonize, PythonizeError}; use serde::de::Error as _; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -79,7 +79,7 @@ pub mod utils; /// Called when registering modules with python. pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { - let child_module = PyModule::new_bound(py, "push")?; + let child_module = PyModule::new(py, "push")?; child_module.add_class::()?; child_module.add_class::()?; child_module.add_class::()?; @@ -90,7 +90,7 @@ pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> // We need to manually add the module to sys.modules to make `from // synapse.synapse_rust import push` work. - py.import_bound("sys")? + py.import("sys")? .getattr("modules")? .set_item("synapse.synapse_rust.push", child_module)?; @@ -182,12 +182,16 @@ pub enum Action { Unknown(Value), } -impl IntoPy for Action { - fn into_py(self, py: Python<'_>) -> PyObject { +impl<'py> IntoPyObject<'py> for Action { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PythonizeError; + + fn into_pyobject(self, py: Python<'py>) -> Result { // When we pass the `Action` struct to Python we want it to be converted // to a dict. We use `pythonize`, which converts the struct using the // `serde` serialization. - pythonize(py, &self).expect("valid action") + pythonize(py, &self) } } @@ -270,13 +274,13 @@ pub enum SimpleJsonValue { } impl<'source> FromPyObject<'source> for SimpleJsonValue { - fn extract(ob: &'source PyAny) -> PyResult { + fn extract_bound(ob: &Bound<'source, PyAny>) -> PyResult { if let Ok(s) = ob.downcast::() { Ok(SimpleJsonValue::Str(Cow::Owned(s.to_string()))) // A bool *is* an int, ensure we try bool first. } else if let Ok(b) = ob.downcast::() { Ok(SimpleJsonValue::Bool(b.extract()?)) - } else if let Ok(i) = ob.downcast::() { + } else if let Ok(i) = ob.downcast::() { Ok(SimpleJsonValue::Int(i.extract()?)) } else if ob.is_none() { Ok(SimpleJsonValue::Null) @@ -298,15 +302,19 @@ pub enum JsonValue { } impl<'source> FromPyObject<'source> for JsonValue { - fn extract(ob: &'source PyAny) -> PyResult { + fn extract_bound(ob: &Bound<'source, PyAny>) -> PyResult { if let Ok(l) = ob.downcast::() { - match l.iter().map(SimpleJsonValue::extract).collect() { + match l + .iter() + .map(|it| SimpleJsonValue::extract_bound(&it)) + .collect() + { Ok(a) => Ok(JsonValue::Array(a)), Err(e) => Err(PyTypeError::new_err(format!( "Can't convert to JsonValue::Array: {e}" ))), } - } else if let Ok(v) = SimpleJsonValue::extract(ob) { + } else if let Ok(v) = SimpleJsonValue::extract_bound(ob) { Ok(JsonValue::Value(v)) } else { Err(PyTypeError::new_err(format!( @@ -363,15 +371,19 @@ pub enum KnownCondition { }, } -impl IntoPy for Condition { - fn into_py(self, py: Python<'_>) -> PyObject { - pythonize(py, &self).expect("valid condition") +impl<'source> IntoPyObject<'source> for Condition { + type Target = PyAny; + type Output = Bound<'source, Self::Target>; + type Error = PythonizeError; + + fn into_pyobject(self, py: Python<'source>) -> Result { + pythonize(py, &self) } } impl<'source> FromPyObject<'source> for Condition { fn extract_bound(ob: &Bound<'source, PyAny>) -> PyResult { - Ok(depythonize_bound(ob.clone())?) + Ok(depythonize(ob)?) } } diff --git a/rust/src/rendezvous/mod.rs b/rust/src/rendezvous/mod.rs index f69f45490f..23de668102 100644 --- a/rust/src/rendezvous/mod.rs +++ b/rust/src/rendezvous/mod.rs @@ -29,7 +29,7 @@ use pyo3::{ exceptions::PyValueError, pyclass, pymethods, types::{PyAnyMethods, PyModule, PyModuleMethods}, - Bound, Py, PyAny, PyObject, PyResult, Python, ToPyObject, + Bound, IntoPyObject, Py, PyAny, PyObject, PyResult, Python, }; use ulid::Ulid; @@ -37,6 +37,7 @@ use self::session::Session; use crate::{ errors::{NotFoundError, SynapseError}, http::{http_request_from_twisted, http_response_to_twisted, HeaderMapPyExt}, + UnwrapInfallible, }; mod session; @@ -125,7 +126,11 @@ impl RendezvousHandler { let base = Uri::try_from(format!("{base}_synapse/client/rendezvous")) .map_err(|_| PyValueError::new_err("Invalid base URI"))?; - let clock = homeserver.call_method0("get_clock")?.to_object(py); + let clock = homeserver + .call_method0("get_clock")? + .into_pyobject(py) + .unwrap_infallible() + .unbind(); // Construct a Python object so that we can get a reference to the // evict method and schedule it to run. @@ -288,6 +293,13 @@ impl RendezvousHandler { let mut response = Response::new(Bytes::new()); *response.status_mut() = StatusCode::ACCEPTED; prepare_headers(response.headers_mut(), session); + + // Even though this isn't mandated by the MSC, we set a Content-Type on the response. It + // doesn't do any harm as the body is empty, but this helps escape a bug in some reverse + // proxy/cache setup which strips the ETag header if there is no Content-Type set. + // Specifically, we noticed this behaviour when placing Synapse behind Cloudflare. + response.headers_mut().typed_insert(ContentType::text()); + http_response_to_twisted(twisted_request, response)?; Ok(()) @@ -311,7 +323,7 @@ impl RendezvousHandler { } pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { - let child_module = PyModule::new_bound(py, "rendezvous")?; + let child_module = PyModule::new(py, "rendezvous")?; child_module.add_class::()?; @@ -319,7 +331,7 @@ pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> // We need to manually add the module to sys.modules to make `from // synapse.synapse_rust import rendezvous` work. - py.import_bound("sys")? + py.import("sys")? .getattr("modules")? .set_item("synapse.synapse_rust.rendezvous", child_module)?; diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 1888480881..17cddf18a3 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -140,7 +140,6 @@ from typing import ( Iterable, List, Optional, - Set, Tuple, ) @@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection +from synapse.types import ( + JsonDict, + ReadReceipt, + RoomStreamToken, + StrCollection, + get_domain_from_id, +) from synapse.util import Clock from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -297,12 +302,10 @@ class _DestinationWakeupQueue: # being woken up. _MAX_TIME_IN_QUEUE = 30.0 - # The maximum duration in seconds between waking up consecutive destination - # queues. - _MAX_DELAY = 0.1 - sender: "FederationSender" = attr.ib() clock: Clock = attr.ib() + max_delay_s: int = attr.ib() + queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) processing: bool = attr.ib(default=False) @@ -332,7 +335,7 @@ class _DestinationWakeupQueue: # We also add an upper bound to the delay, to gracefully handle the # case where the queue only has a few entries in it. current_sleep_seconds = min( - self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) + self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue) ) while self.queue: @@ -416,19 +419,14 @@ class FederationSender(AbstractFederationSender): self._is_processing = False self._last_poked_id = -1 - # map from room_id to a set of PerDestinationQueues which we believe are - # awaiting a call to flush_read_receipts_for_room. The presence of an entry - # here for a given room means that we are rate-limiting RR flushes to that room, - # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} - - self._rr_txn_interval_per_room_ms = ( - 1000.0 - / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second - ) - self._external_cache = hs.get_external_cache() - self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) + + rr_txn_interval_per_room_s = ( + 1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + ) + self._destination_wakeup_queue = _DestinationWakeupQueue( + self, self.clock, max_delay_s=rr_txn_interval_per_room_s + ) # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call_now( @@ -745,37 +743,48 @@ class FederationSender(AbstractFederationSender): # Some background on the rate-limiting going on here. # - # It turns out that if we attempt to send out RRs as soon as we get them from - # a client, then we end up trying to do several hundred Hz of federation - # transactions. (The number of transactions scales as O(N^2) on the size of a - # room, since in a large room we have both more RRs coming in, and more servers - # to send them to.) + # It turns out that if we attempt to send out RRs as soon as we get them + # from a client, then we end up trying to do several hundred Hz of + # federation transactions. (The number of transactions scales as O(N^2) + # on the size of a room, since in a large room we have both more RRs + # coming in, and more servers to send them to.) # - # This leads to a lot of CPU load, and we end up getting behind. The solution - # currently adopted is as follows: + # This leads to a lot of CPU load, and we end up getting behind. The + # solution currently adopted is to differentiate between receipts and + # destinations we should immediately send to, and those we can trickle + # the receipts to. # - # The first receipt in a given room is sent out immediately, at time T0. Any - # further receipts are, in theory, batched up for N seconds, where N is calculated - # based on the number of servers in the room to achieve a transaction frequency - # of around 50Hz. So, for example, if there were 100 servers in the room, then - # N would be 100 / 50Hz = 2 seconds. + # The current logic is to send receipts out immediately if: + # - the room is "small", i.e. there's only N servers to send receipts + # to, and so sending out the receipts immediately doesn't cause too + # much load; or + # - the receipt is for an event that happened recently, as users + # notice if receipts are delayed when they know other users are + # currently reading the room; or + # - the receipt is being sent to the server that sent the event, so + # that users see receipts for their own receipts quickly. # - # Then, after T+N, we flush out any receipts that have accumulated, and restart - # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate, - # we stop the cycle and go back to the start. + # For destinations that we should delay sending the receipt to, we queue + # the receipts up to be sent in the next transaction, but don't trigger + # a new transaction to be sent. We then add the destination to the + # `DestinationWakeupQueue`, which will slowly iterate over each + # destination and trigger a new transaction to be sent. # - # However, in practice, it is often possible to flush out receipts earlier: in - # particular, if we are sending a transaction to a given server anyway (for - # example, because we have a PDU or a RR in another room to send), then we may - # as well send out all of the pending RRs for that server. So it may be that - # by the time we get to T+N, we don't actually have any RRs left to send out. - # Nevertheless we continue to buffer up RRs for the room in question until we - # reach the point that no RRs arrive between timer ticks. + # However, in practice, it is often possible to send out delayed + # receipts earlier: in particular, if we are sending a transaction to a + # given server anyway (for example, because we have a PDU or a RR in + # another room to send), then we may as well send out all of the pending + # RRs for that server. So it may be that by the time we get to waking up + # the destination, we don't actually have any RRs left to send out. # - # For even more background, see https://github.com/matrix-org/synapse/issues/4730. + # For even more background, see + # https://github.com/matrix-org/synapse/issues/4730. room_id = receipt.room_id + # Local read receipts always have 1 event ID. + event_id = receipt.event_ids[0] + # Work out which remote servers should be poked and poke them. domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id @@ -797,49 +806,51 @@ class FederationSender(AbstractFederationSender): if not domains: return - queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) + # We now split which domains we want to wake up immediately vs which we + # want to delay waking up. + immediate_domains: StrCollection + delay_domains: StrCollection - # if there is no flush yet scheduled, we will send out these receipts with - # immediate flushes, and schedule the next flush for this room. - if queues_pending_flush is not None: - logger.debug("Queuing receipt for: %r", domains) + if len(domains) < 10: + # For "small" rooms send to all domains immediately + immediate_domains = domains + delay_domains = () else: - logger.debug("Sending receipt to: %r", domains) - self._schedule_rr_flush_for_room(room_id, len(domains)) + metadata = await self.store.get_metadata_for_event( + receipt.room_id, event_id + ) + assert metadata is not None - for domain in domains: + sender_domain = get_domain_from_id(metadata.sender) + + if self.clock.time_msec() - metadata.received_ts < 60_000: + # We always send receipts for recent messages immediately + immediate_domains = domains + delay_domains = () + else: + # Otherwise, we delay waking up all destinations except for the + # sender's domain. + immediate_domains = [] + delay_domains = [] + for domain in domains: + if domain == sender_domain: + immediate_domains.append(domain) + else: + delay_domains.append(domain) + + for domain in immediate_domains: + # Add to destination queue and wake the destination up + queue = self._get_per_destination_queue(domain) + queue.queue_read_receipt(receipt) + queue.attempt_new_transaction() + + for domain in delay_domains: + # Add to destination queue... queue = self._get_per_destination_queue(domain) queue.queue_read_receipt(receipt) - # if there is already a RR flush pending for this room, then make sure this - # destination is registered for the flush - if queues_pending_flush is not None: - queues_pending_flush.add(queue) - else: - queue.flush_read_receipts_for_room(room_id) - - def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: - # that is going to cause approximately len(domains) transactions, so now back - # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM - backoff_ms = self._rr_txn_interval_per_room_ms * n_domains - - logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms) - self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) - self._queues_awaiting_rr_flush_by_room[room_id] = set() - - def _flush_rrs_for_room(self, room_id: str) -> None: - queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) - logger.debug("Flushing RRs in %s to %s", room_id, queues) - - if not queues: - # no more RRs arrived for this room; we are done. - return - - # schedule the next flush - self._schedule_rr_flush_for_room(room_id, len(queues)) - - for queue in queues: - queue.flush_read_receipts_for_room(room_id) + # ... and schedule the destination to be woken up. + self._destination_wakeup_queue.add_to_queue(domain) async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index d097e65ea7..b3f65e8237 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -156,7 +156,6 @@ class PerDestinationQueue: # Each receipt can only have a single receipt per # (room ID, receipt type, user ID, thread ID) tuple. self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] - self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. # NB: may be a long or an int. @@ -258,15 +257,7 @@ class PerDestinationQueue: } ) - def flush_read_receipts_for_room(self, room_id: str) -> None: - # If there are any pending receipts for this room then force-flush them - # in a new transaction. - for edu in self._pending_receipt_edus: - if room_id in edu: - self._rrs_pending_flush = True - self.attempt_new_transaction() - # No use in checking remaining EDUs if the room was found. - break + self.mark_new_data() def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -603,12 +594,9 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + def _get_receipt_edus(self, limit: int) -> Iterable[Edu]: if not self._pending_receipt_edus: return - if not force_flush and not self._rrs_pending_flush: - # not yet time for this lot - return # Send at most limit EDUs for receipts. for content in self._pending_receipt_edus[:limit]: @@ -747,7 +735,7 @@ class _TransactionQueueManager: ) # Add read receipt EDUs. - pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + pending_edus.extend(self.queue._get_receipt_edus(limit=5)) edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) # Next, prioritize to-device messages so that existing encryption channels @@ -795,13 +783,6 @@ class _TransactionQueueManager: if not self._pdus and not pending_edus: return [], [] - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if edu_limit: - pending_edus.extend( - self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) - ) - if self._pdus: self._last_stream_ordering = self._pdus[ -1 diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index d1989e9d2c..d1194545ae 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -124,6 +124,7 @@ class AdminHandler: "consent_ts": user_info.consent_ts, "user_type": user_info.user_type, "is_guest": user_info.is_guest, + "suspended": user_info.suspended, } if self._msc3866_enabled: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 32c3472e58..707d18de78 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -322,6 +322,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_unread_event_push_actions_by_room_for_user", (room_id,) ) + self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id)) self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,)) @@ -446,6 +447,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache("_get_state_group_for_event", None) self._attempt_to_invalidate_cache("get_event_ordering", None) + self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,)) self._attempt_to_invalidate_cache("is_partial_state_event", None) self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 17ae02d0a2..3dfc7210d6 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -193,6 +193,14 @@ class _EventRow: outlier: bool +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EventMetadata: + """Event metadata returned by `get_metadata_for_event(..)`""" + + sender: str + received_ts: int + + class EventRedactBehaviour(Enum): """ What to do when retrieving a redacted event from the database. @@ -2583,3 +2591,22 @@ class EventsWorkerStore(SQLBaseStore): _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, ) ) + + @cached(tree=True) + async def get_metadata_for_event( + self, room_id: str, event_id: str + ) -> Optional[EventMetadata]: + row = await self.db_pool.simple_select_one( + table="events", + keyvalues={"room_id": room_id, "event_id": event_id}, + retcols=("sender", "received_ts"), + allow_none=True, + desc="get_metadata_for_event", + ) + if row is None: + return None + + return EventMetadata( + sender=row[0], + received_ts=row[1], + ) diff --git a/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql index ad54302a8f..0ee78df1a0 100644 --- a/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql +++ b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql @@ -12,7 +12,7 @@ -- . --- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can --- efficiently be issued in the same order they were uploaded. +-- Add an index on `current_state_delta_stream(room_id, stream_id)` to allow +-- efficient per-room lookups. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (8804, 'current_state_delta_stream_room_index', '{}'); diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 6a8887fe74..cd906bbbc7 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -34,6 +34,7 @@ from synapse.handlers.device import DeviceHandler from synapse.rest import admin from synapse.rest.client import login from synapse.server import HomeServer +from synapse.storage.databases.main.events_worker import EventMetadata from synapse.types import JsonDict, ReadReceipt from synapse.util import Clock @@ -55,12 +56,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): federation_transport_client=self.federation_transport_client, ) - hs.get_storage_controllers().state.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign] + self.main_store = hs.get_datastores().main + self.state_controller = hs.get_storage_controllers().state + + self.state_controller.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign] return_value={"test", "host2"} ) - hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign] - hs.get_storage_controllers().state.get_current_hosts_in_room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign] + self.state_controller.get_current_hosts_in_room ) return hs @@ -185,12 +189,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): ], ) - def test_send_receipts_with_backoff(self) -> None: - """Send two receipts in quick succession; the second should be flushed, but - only after 20ms""" + def test_send_receipts_with_backoff_small_room(self) -> None: + """Read receipt in small rooms should not be delayed""" mock_send_transaction = self.federation_transport_client.send_transaction mock_send_transaction.return_value = {} + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test", "host2"} + ) + sender = self.hs.get_federation_sender() receipt = ReadReceipt( "room_id", @@ -206,7 +213,104 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): # expect a call to send_transaction mock_send_transaction.assert_called_once() - json_cb = mock_send_transaction.call_args[0][1] + self._assert_edu_in_call(mock_send_transaction.call_args[0][1]) + + def test_send_receipts_with_backoff_recent_event(self) -> None: + """Read receipt for a recent message should not be delayed""" + mock_send_transaction = self.federation_transport_client.send_transaction + mock_send_transaction.return_value = {} + + # Pretend this is a big room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test"} | {f"host{i}" for i in range(20)} + ) + + self.main_store.get_metadata_for_event = AsyncMock( + return_value=EventMetadata( + received_ts=self.clock.time_msec(), + sender="@test:test", + ) + ) + + sender = self.hs.get_federation_sender() + receipt = ReadReceipt( + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, + ) + self.get_success(sender.send_read_receipt(receipt)) + + self.pump() + + # expect a call to send_transaction for each host + self.assertEqual(mock_send_transaction.call_count, 20) + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + + mock_send_transaction.reset_mock() + + def test_send_receipts_with_backoff_sender(self) -> None: + """Read receipt for a message should not be delayed to the sender, but + is delayed to everyone else""" + mock_send_transaction = self.federation_transport_client.send_transaction + mock_send_transaction.return_value = {} + + # Pretend this is a big room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test"} | {f"host{i}" for i in range(20)} + ) + + self.main_store.get_metadata_for_event = AsyncMock( + return_value=EventMetadata( + received_ts=self.clock.time_msec() - 5 * 60_000, + sender="@test:host1", + ) + ) + + sender = self.hs.get_federation_sender() + receipt = ReadReceipt( + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, + ) + self.get_success(sender.send_read_receipt(receipt)) + + self.pump() + + # First, expect a call to send_transaction for the sending host + mock_send_transaction.assert_called() + + transaction = mock_send_transaction.call_args_list[0].args[0] + self.assertEqual(transaction.destination, "host1") + self._assert_edu_in_call(mock_send_transaction.call_args_list[0].args[1]) + + # We also expect a call to one of the other hosts, as the first + # destination to wake up. + self.assertEqual(mock_send_transaction.call_count, 2) + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + + mock_send_transaction.reset_mock() + + # We now expect to see 18 more transactions to the remaining hosts + # periodically. + for _ in range(18): + self.reactor.advance( + 1.0 + / self.hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + ) + + mock_send_transaction.assert_called_once() + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + mock_send_transaction.reset_mock() + + def _assert_edu_in_call(self, json_cb: Callable[[], JsonDict]) -> None: + """Assert that the given `json_cb` from a `send_transaction` has a + receipt in it.""" data = json_cb() self.assertEqual( data["edus"], @@ -226,46 +330,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): } ], ) - mock_send_transaction.reset_mock() - - # send the second RR - receipt = ReadReceipt( - "room_id", - "m.read", - "user_id", - ["other_id"], - thread_id=None, - data={"ts": 1234}, - ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) - self.pump() - mock_send_transaction.assert_not_called() - - self.reactor.advance(19) - mock_send_transaction.assert_not_called() - - self.reactor.advance(10) - mock_send_transaction.assert_called_once() - json_cb = mock_send_transaction.call_args[0][1] - data = json_cb() - self.assertEqual( - data["edus"], - [ - { - "edu_type": EduTypes.RECEIPT, - "content": { - "room_id": { - "m.read": { - "user_id": { - "event_ids": ["other_id"], - "data": {"ts": 1234}, - } - } - } - }, - } - ], - ) class FederationSenderPresenceTestCases(HomeserverTestCase): diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 6d050e7784..fdb8fafa0e 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -3222,6 +3222,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertIn("consent_ts", content) self.assertIn("external_ids", content) self.assertIn("last_seen_ts", content) + self.assertIn("suspended", content) # This key was removed intentionally. Ensure it is not accidentally re-included. self.assertNotIn("password_hash", content)