1
0

Compare commits

...

34 Commits

Author SHA1 Message Date
Mathieu Velten 5d0352c207 Remove validate 2023-09-08 13:39:12 +02:00
Mathieu Velten 2da21f6204 Merge remote-tracking branch 'origin/develop' into mv/add-mxid-validation-log 2023-09-08 13:37:43 +02:00
Mathieu Velten 01c582ff36 Change to is_valid 2023-09-08 13:37:00 +02:00
Erik Johnston 1cd410a783 Recheck if remote device is cached before requesting it (#16252)
This fixes a bug where we could get stuck re-requesting the device over
replication again and again.
2023-09-07 12:45:43 +00:00
Mathieu Velten 42a392f4e2 Merge branch 'develop' into mv/add-mxid-validation-log 2023-09-07 12:37:16 +02:00
Erik Johnston 8940d1b28e Add /notifications endpoint to workers (#16265) 2023-09-07 09:26:07 +00:00
dependabot[bot] a83f75a37d Bump gitpython from 3.1.32 to 3.1.34 (#16267)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 15:19:56 -04:00
Marcel 13e9cad537 Send the opentracing span information to appservices (#16227) 2023-09-06 15:19:17 -04:00
Aurélien Grimpard fe69e7f617 Handle "registration_enabled" parameter for CAS (#16262)
Similar to OIDC, CAS providers can now disable registration such
that only existing users are able to login via SSO.
2023-09-06 14:32:24 -04:00
Patrick Cloke 32fb264120 Merge remote-tracking branch 'origin/release-v1.92' into develop 2023-09-06 13:08:22 -04:00
reivilibre 51303035f2 Apply missed suggestions from the review of #16090. (#16263)
* Suggestions from PR

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
2023-09-06 16:15:56 +01:00
reivilibre 35934b02a9 Add GCC and GNU Make to the Nix flake development environment so that ruff can be compiled. (#16090)
* Add gcc and GNU make to the Nix flake

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

* unset LD_LIBRARY_PATH

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
2023-09-06 14:35:02 +01:00
Andrew Morgan ffe4ea1302 Update rust in flake.nix: 1.70.0 -> 1.71.1 to address CVE-2023-38497 (#16260) 2023-09-06 14:34:01 +01:00
reivilibre e937e2111a Add the ability to use G (GiB) and T (TiB) suffixes in configuration options that refer to numbers of bytes. (#16219)
* Add more suffixes to `parse_size`

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
2023-09-06 14:01:10 +01:00
reivilibre 698f6fa250 Allow modules to delete rooms. (#15997)
* Allow user_id to be optional for room deletion

* Add module API method to delete a room

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

* Don't worry about the case block=True && requester_user_id is None

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
2023-09-06 11:50:07 +01:00
Mathieu Velten 4f1840a88a Delete device messages asynchronously and in staged batches (#16240) 2023-09-06 09:30:53 +02:00
Will Hunt 1e571cd664 Fix appservices being unable to handle to_device messages for multiple users (#16251) 2023-09-05 15:46:57 -04:00
Travis Ralston b1d71c687a Add MSC4040 matrix-fed service lookups (#16137) 2023-09-05 15:45:39 -04:00
Erik Johnston c9cec2daed Fix bug where we kept re-requesting a remote server's key repeatedly. (#16257)
* Correctly handle multiple rows per server/key

* Newsfile
2023-09-05 20:27:41 +01:00
David Robertson 02bc5906ec Merge tag 'v1.92.0rc1' into develop
- Add configuration setting for CAS protocol version. Contributed by Aurélien Grimpard. ([\#15816](https://github.com/matrix-org/synapse/issues/15816))
- Suppress notifications from message edits per [MSC3958](https://github.com/matrix-org/matrix-spec-proposals/pull/3958). ([\#16113](https://github.com/matrix-org/synapse/issues/16113))
- Return a `Retry-After` with `M_LIMIT_EXCEEDED` error responses. ([\#16136](https://github.com/matrix-org/synapse/issues/16136))
- Add `last_seen_ts` to the [admin users API](https://matrix-org.github.io/synapse/latest/admin_api/user_admin_api.html). ([\#16218](https://github.com/matrix-org/synapse/issues/16218))
- Improve resource usage when sending data to a large number of remote hosts that are marked as "down". ([\#16223](https://github.com/matrix-org/synapse/issues/16223))

- Fix IPv6-related bugs on SMTP settings, adding groundwork to fix similar issues. Contributed by @evilham and @telmich (ungleich.ch). ([\#16155](https://github.com/matrix-org/synapse/issues/16155))
- Fix a spec compliance issue where requests to the `/publicRooms` federation API would specify `include_all_networks` as a string. ([\#16185](https://github.com/matrix-org/synapse/issues/16185))
- Fix inaccurate error message while attempting to ban or unban a user with the same or higher PL by spliting the conditional statements. Contributed by @leviosacz. ([\#16205](https://github.com/matrix-org/synapse/issues/16205))
- Fix a rare bug that broke looping calls, which could lead to e.g. linearly increasing memory usage. Introduced in v1.90.0. ([\#16210](https://github.com/matrix-org/synapse/issues/16210))
- Fix a long-standing bug where uploading images would fail if we could not generate thumbnails for them. ([\#16211](https://github.com/matrix-org/synapse/issues/16211))
- Fix a long-standing bug where we did not correctly back off from servers that had "gone" if they returned 4xx series error codes. ([\#16221](https://github.com/matrix-org/synapse/issues/16221))

- Update links to the [matrix.org blog](https://matrix.org/blog/). ([\#16008](https://github.com/matrix-org/synapse/issues/16008))
- Document which [admin APIs](https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/index.html) are disabled when experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support is enabled. ([\#16168](https://github.com/matrix-org/synapse/issues/16168))
- Document [`exclude_rooms_from_sync`](https://matrix-org.github.io/synapse/v1.92/usage/configuration/config_documentation.html#exclude_rooms_from_sync) configuration option. ([\#16178](https://github.com/matrix-org/synapse/issues/16178))

- Prepare unit tests for Python 3.12. ([\#16099](https://github.com/matrix-org/synapse/issues/16099))
- Fix nightly CI jobs. ([\#16121](https://github.com/matrix-org/synapse/issues/16121), [\#16213](https://github.com/matrix-org/synapse/issues/16213))
- Describe which rate limiter was hit in logs. ([\#16135](https://github.com/matrix-org/synapse/issues/16135))
- Simplify presence code when using workers. ([\#16170](https://github.com/matrix-org/synapse/issues/16170))
- Track per-device information in the presence code. ([\#16171](https://github.com/matrix-org/synapse/issues/16171), [\#16172](https://github.com/matrix-org/synapse/issues/16172))
- Stop using the `event_txn_id` table. ([\#16175](https://github.com/matrix-org/synapse/issues/16175))
- Use `AsyncMock` instead of custom code. ([\#16179](https://github.com/matrix-org/synapse/issues/16179), [\#16180](https://github.com/matrix-org/synapse/issues/16180))
- Improve error reporting of invalid data passed to `/_matrix/key/v2/query`. ([\#16183](https://github.com/matrix-org/synapse/issues/16183))
- Task scheduler: add replication notify for new task to launch ASAP. ([\#16184](https://github.com/matrix-org/synapse/issues/16184))
- Improve type hints. ([\#16186](https://github.com/matrix-org/synapse/issues/16186), [\#16188](https://github.com/matrix-org/synapse/issues/16188), [\#16201](https://github.com/matrix-org/synapse/issues/16201))
- Bump black version to 23.7.0. ([\#16187](https://github.com/matrix-org/synapse/issues/16187))
- Log the details of background update failures. ([\#16212](https://github.com/matrix-org/synapse/issues/16212))
- Cache device resync requests over replication. ([\#16241](https://github.com/matrix-org/synapse/issues/16241))

* Bump anyhow from 1.0.72 to 1.0.75. ([\#16141](https://github.com/matrix-org/synapse/issues/16141))
* Bump furo from 2023.7.26 to 2023.8.19. ([\#16238](https://github.com/matrix-org/synapse/issues/16238))
* Bump phonenumbers from 8.13.18 to 8.13.19. ([\#16237](https://github.com/matrix-org/synapse/issues/16237))
* Bump psycopg2 from 2.9.6 to 2.9.7. ([\#16196](https://github.com/matrix-org/synapse/issues/16196))
* Bump regex from 1.9.3 to 1.9.4. ([\#16195](https://github.com/matrix-org/synapse/issues/16195))
* Bump ruff from 0.0.277 to 0.0.286. ([\#16198](https://github.com/matrix-org/synapse/issues/16198))
* Bump sentry-sdk from 1.29.2 to 1.30.0. ([\#16236](https://github.com/matrix-org/synapse/issues/16236))
* Bump serde from 1.0.184 to 1.0.188. ([\#16194](https://github.com/matrix-org/synapse/issues/16194))
* Bump serde_json from 1.0.104 to 1.0.105. ([\#16140](https://github.com/matrix-org/synapse/issues/16140))
* Bump types-psycopg2 from 2.9.21.10 to 2.9.21.11. ([\#16200](https://github.com/matrix-org/synapse/issues/16200))
* Bump types-pyyaml from 6.0.12.10 to 6.0.12.11. ([\#16199](https://github.com/matrix-org/synapse/issues/16199))
2023-09-05 16:56:43 +01:00
Patrick Cloke 8b5013dcbc Time out busy presence status & test multi-device busy (#16174)
Add a (long) timeout to when a "busy" device is considered not online.
This does *not* match MSC3026, but is a reasonable thing for an
implementation to do.

Expands tests for the (unstable) busy presence with multiple devices.
2023-09-05 10:39:38 -04:00
Patrick Cloke ea75346f6a Track presence state per-device and combine to a user state. (#16066)
Tracks presence on an individual per-device basis and combine
the per-device state into a per-user state. This should help in
situations where a user has multiple devices with conflicting status
(e.g. one is syncing with unavailable and one is syncing with online).

The tie-breaking is done by priority:

    BUSY > ONLINE > UNAVAILABLE > OFFLINE
2023-09-05 09:58:51 -04:00
dependabot[bot] 36ae8611fe Bump regex from 1.9.4 to 1.9.5 (#16233)
Bumps [regex](https://github.com/rust-lang/regex) from 1.9.4 to 1.9.5.
- [Release notes](https://github.com/rust-lang/regex/releases)
- [Changelog](https://github.com/rust-lang/regex/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/regex/compare/1.9.4...1.9.5)

---
updated-dependencies:
- dependency-name: regex
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:14:00 +00:00
Erik Johnston dfcfa9f0ed Bump minimum supported Rust version to 1.61.0 (#16248) 2023-09-05 13:12:50 +01:00
dependabot[bot] 757010905e Bump twisted from 22.10.0 to 23.8.0 (#16235)
* Bump twisted from 22.10.0 to 23.8.0

Bumps [twisted](https://github.com/twisted/twisted) from 22.10.0 to 23.8.0.
- [Release notes](https://github.com/twisted/twisted/releases)
- [Changelog](https://github.com/twisted/twisted/blob/trunk/NEWS.rst)
- [Commits](https://github.com/twisted/twisted/compare/twisted-22.10.0...twisted-23.8.0)

---
updated-dependencies:
- dependency-name: twisted
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* Fix types

* Fix lint

* Newsfile

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Erik Johnston <erik@matrix.org>
2023-09-05 11:14:14 +00:00
Mathieu Velten 830a29482a Merge branch 'develop' into mv/add-mxid-validation-log 2023-08-07 14:10:58 +02:00
Mathieu Velten 44f7df09bb Unused import 2023-08-04 15:35:15 +02:00
Mathieu Velten 25597e97f3 Change changelog 2023-08-04 15:32:24 +02:00
Mathieu Velten 7ad75e6d20 Add check to EDUs and move PDUs check in the event storage controller 2023-08-04 15:30:18 +02:00
Mathieu Velten 7c224e149b fix comments 2023-08-04 15:09:43 +02:00
Mathieu Velten 1fe7be0be1 Merge branch 'develop' into mv/add-mxid-validation-log 2023-08-04 12:46:08 +02:00
Mathieu Velten dc9957dbba Remove : from the historical localpart authorized chars 2023-08-04 12:21:27 +02:00
Mathieu Velten 09a7adf85d Add changelog 2023-08-03 17:20:50 +02:00
Mathieu Velten aa9e47e144 Add logging of invalid mxids when persisting events 2023-08-03 17:18:48 +02:00
73 changed files with 1853 additions and 274 deletions
+9 -9
View File
@@ -35,7 +35,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
with:
@@ -93,7 +93,7 @@ jobs:
uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- name: Setup Poetry
@@ -150,7 +150,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
with:
@@ -167,7 +167,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
with:
components: clippy
- uses: Swatinem/rust-cache@v2
@@ -268,7 +268,7 @@ jobs:
postgres:${{ matrix.job.postgres-version }}
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
@@ -308,7 +308,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
# There aren't wheels for some of the older deps, so we need to install
@@ -416,7 +416,7 @@ jobs:
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- name: Run SyTest
@@ -556,7 +556,7 @@ jobs:
path: synapse
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- uses: actions/setup-go@v4
@@ -584,7 +584,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.61.0
- uses: Swatinem/rust-cache@v2
- run: cargo test
Generated
+6 -6
View File
@@ -138,9 +138,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "memchr"
version = "2.5.0"
version = "2.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
[[package]]
name = "memoffset"
@@ -291,9 +291,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.9.4"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29"
checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
dependencies = [
"aho-corasick",
"memchr",
@@ -303,9 +303,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629"
checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
dependencies = [
"aho-corasick",
"memchr",
+1
View File
@@ -0,0 +1 @@
Allow modules to delete rooms.
+1
View File
@@ -0,0 +1 @@
Add logging of sender invalid mxids when persisting events and receiving EDUs.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
+1
View File
@@ -0,0 +1 @@
Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled.
+1
View File
@@ -0,0 +1 @@
Support resolving homeservers using `matrix-fed` DNS SRV records from [MSC4040](https://github.com/matrix-org/matrix-spec-proposals/pull/4040).
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.
+1
View File
@@ -0,0 +1 @@
Add the ability to use `G` (GiB) and `T` (TiB) suffixes in configuration options that refer to numbers of bytes.
+1
View File
@@ -0,0 +1 @@
Add span information to requests sent to appservices. Contributed by MTRNord.
+1
View File
@@ -0,0 +1 @@
Fix type checking when using the new version of Twisted.
+1
View File
@@ -0,0 +1 @@
Delete device messages asynchronously and in staged batches using the task scheduler.
+1
View File
@@ -0,0 +1 @@
Bump minimum supported Rust version to 1.61.0.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where appservices using MSC2409 to receive to_device messages, would only get messages for one user.
+1
View File
@@ -0,0 +1 @@
Fix bug when using workers where Synapse could end up re-requesting the same remote device repeatedly.
+1
View File
@@ -0,0 +1 @@
Fix long-standing bug where we kept re-requesting a remote server's key repeatedly, potentially causing delays in receiving events over federation.
+1
View File
@@ -0,0 +1 @@
Update rust to version 1.71.1 in the nix development environment.
+1
View File
@@ -0,0 +1 @@
Add the ability to enable/disable registrations when in the CAS flow. Contributed by Aurélien Grimpard.
+1
View File
@@ -0,0 +1 @@
Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled.
+1
View File
@@ -0,0 +1 @@
Allow `/notifications` endpoint to be routed to workers.
+1
View File
@@ -183,6 +183,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(r0|v3|unstable)/password_policy$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
+8
View File
@@ -88,6 +88,14 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.93.0
## Minimum supported Rust version
The minimum supported Rust version has been increased from v1.60.0 to v1.61.0.
Users building from source will need to ensure their `rustc` version is up to
date.
# Upgrading to v1.90.0
## App service query parameter authorization is now a configuration option
@@ -25,8 +25,10 @@ messages from the database after 5 minutes, rather than 5 months.
In addition, configuration options referring to size use the following suffixes:
* `M` = MiB, or 1,048,576 bytes
* `K` = KiB, or 1024 bytes
* `M` = MiB, or 1,048,576 bytes
* `G` = GiB, or 1,073,741,824 bytes
* `T` = TiB, or 1,099,511,627,776 bytes
For example, setting `max_avatar_size: 10M` means that Synapse will not accept files larger than 10,485,760 bytes
for a user avatar.
@@ -3428,6 +3430,12 @@ Has the following sub-options:
and the values must match the given value. Alternately if the given value
is `None` then any value is allowed (the attribute just must exist).
All of the listed attributes must match for the login to be permitted.
* `enable_registration`: set to 'false' to disable automatic registration of new
users. This allows the CAS SSO flow to be limited to sign in only, rather than
automatically registering users that have a valid SSO login but do not have
a pre-registered account. Defaults to true.
*Added in Synapse 1.93.0.*
Example configuration:
```yaml
@@ -3439,6 +3447,7 @@ cas_config:
required_attributes:
userGroup: "staff"
department: None
enable_registration: true
```
---
### `sso`
+1
View File
@@ -246,6 +246,7 @@ information.
^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
^/_matrix/client/(r0|v3|unstable)/capabilities$
^/_matrix/client/(r0|v3|unstable)/notifications$
# Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$
Generated
+3 -3
View File
@@ -258,11 +258,11 @@
"nixpkgs": "nixpkgs_3"
},
"locked": {
"lastModified": 1690510705,
"narHash": "sha256-6mjs3Gl9/xrseFh9iNcNq1u5yJ/MIoAmjoaG7SXZDIE=",
"lastModified": 1693966243,
"narHash": "sha256-a2CA1aMIPE67JWSVIGoGtD3EGlFdK9+OlJQs0FOWCKY=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "851ae4c128905a62834d53ce7704ebc1ba481bea",
"rev": "a8b4bb4cbb744baaabc3e69099f352f99164e2c1",
"type": "github"
},
"original": {
+19 -1
View File
@@ -82,7 +82,7 @@
#
# NOTE: We currently need to set the Rust version unnecessarily high
# in order to work around https://github.com/matrix-org/synapse/issues/15939
(rust-bin.stable."1.70.0".default.override {
(rust-bin.stable."1.71.1".default.override {
# Additionally install the "rust-src" extension to allow diving into the
# Rust source code in an IDE (rust-analyzer will also make use of it).
extensions = [ "rust-src" ];
@@ -90,6 +90,11 @@
# The rust-analyzer language server implementation.
rust-analyzer
# GCC includes a linker; needed for building `ruff`
gcc
# Needed for building `ruff`
gnumake
# Native dependencies for running Synapse.
icu
libffi
@@ -236,6 +241,19 @@
URI
YAMLLibYAML
]}";
# Clear the LD_LIBRARY_PATH environment variable on shell init.
#
# By default, devenv will set LD_LIBRARY_PATH to point to .devenv/profile/lib. This causes
# issues when we include `gcc` as a dependency to build C libraries, as the version of glibc
# that the development environment's cc compiler uses may differ from that of the system.
#
# When LD_LIBRARY_PATH is set, system tools will attempt to use the development environment's
# libraries. Which, when built against a different glibc version lead, to "version 'GLIBC_X.YY'
# not found" errors.
enterShell = ''
unset LD_LIBRARY_PATH
'';
}
];
};
Generated
+21 -22
View File
@@ -586,13 +586,13 @@ smmap = ">=3.0.1,<6"
[[package]]
name = "gitpython"
version = "3.1.32"
version = "3.1.34"
description = "GitPython is a Python library used to interact with Git repositories"
optional = false
python-versions = ">=3.7"
files = [
{file = "GitPython-3.1.32-py3-none-any.whl", hash = "sha256:e3d59b1c2c6ebb9dfa7a184daf3b6dd4914237e7488a1730a6d8f6f5d0b4187f"},
{file = "GitPython-3.1.32.tar.gz", hash = "sha256:8d9b8cb1e80b9735e8717c9362079d3ce4c6e5ddeebedd0361b228c3a67a62f6"},
{file = "GitPython-3.1.34-py3-none-any.whl", hash = "sha256:5d3802b98a3bae1c2b8ae0e1ff2e4aa16bcdf02c145da34d092324f599f01395"},
{file = "GitPython-3.1.34.tar.gz", hash = "sha256:85f7d365d1f6bf677ae51039c1ef67ca59091c7ebd5a3509aa399d4eda02d6dd"},
]
[package.dependencies]
@@ -2866,44 +2866,43 @@ urllib3 = ">=1.26.0"
[[package]]
name = "twisted"
version = "22.10.0"
version = "23.8.0"
description = "An asynchronous networking framework written in Python"
optional = false
python-versions = ">=3.7.1"
files = [
{file = "Twisted-22.10.0-py3-none-any.whl", hash = "sha256:86c55f712cc5ab6f6d64e02503352464f0400f66d4f079096d744080afcccbd0"},
{file = "Twisted-22.10.0.tar.gz", hash = "sha256:32acbd40a94f5f46e7b42c109bfae2b302250945561783a8b7a059048f2d4d31"},
{file = "twisted-23.8.0-py3-none-any.whl", hash = "sha256:b8bdba145de120ffb36c20e6e071cce984e89fba798611ed0704216fb7f884cd"},
{file = "twisted-23.8.0.tar.gz", hash = "sha256:3c73360add17336a622c0d811c2a2ce29866b6e59b1125fd6509b17252098a24"},
]
[package.dependencies]
attrs = ">=19.2.0"
Automat = ">=0.8.0"
attrs = ">=21.3.0"
automat = ">=0.8.0"
constantly = ">=15.1"
hyperlink = ">=17.1.1"
idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
incremental = ">=21.3.0"
incremental = ">=22.10.0"
pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
typing-extensions = ">=3.6.5"
"zope.interface" = ">=4.4.2"
typing-extensions = ">=3.10.0"
zope-interface = ">=5"
[package.extras]
all-non-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
conch-nacl = ["PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
all-non-platform = ["twisted[conch,contextvars,http2,serial,test,tls]", "twisted[conch,contextvars,http2,serial,test,tls]"]
conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"]
contextvars = ["contextvars (>=2.4,<3)"]
dev = ["coverage (>=6b1,<7)", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)"]
dev-release = ["pydoctor (>=22.9.0,<22.10.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)"]
gtk-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pygobject", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"]
dev-release = ["pydoctor (>=23.4.0,<23.5.0)", "pydoctor (>=23.4.0,<23.5.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "sphinx (>=5,<7)", "sphinx (>=5,<7)", "sphinx-rtd-theme (>=1.2,<2.0)", "sphinx-rtd-theme (>=1.2,<2.0)", "towncrier (>=22.12,<23.0)", "towncrier (>=22.12,<23.0)", "urllib3 (<2)", "urllib3 (<2)"]
gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"]
http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
macos-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
mypy = ["PyHamcrest (>=1.9.0)", "PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "coverage (>=6b1,<7)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "mypy (==0.930)", "mypy-zope (==0.3.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "service-identity (>=18.1.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)", "types-pyOpenSSL", "types-setuptools"]
osx-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"]
mypy = ["mypy (==0.981)", "mypy-extensions (==0.4.3)", "mypy-zope (==0.3.11)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"]
osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"]
serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
test = ["PyHamcrest (>=1.9.0)", "cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.0,<7.0)"]
test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"]
tls = ["idna (>=2.4)", "pyopenssl (>=21.0.0)", "service-identity (>=18.1.0)"]
windows-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
windows-platform = ["pywin32 (!=226)", "pywin32 (!=226)", "twisted[all-non-platform]", "twisted[all-non-platform]"]
[[package]]
name = "twisted-iocpsupport"
+7 -2
View File
@@ -7,7 +7,7 @@ name = "synapse"
version = "0.1.0"
edition = "2021"
rust-version = "1.60.0"
rust-version = "1.61.0"
[lib]
name = "synapse"
@@ -23,7 +23,12 @@ name = "synapse.synapse_rust"
anyhow = "1.0.63"
lazy_static = "1.4.0"
log = "0.4.17"
pyo3 = { version = "0.17.1", features = ["macros", "anyhow", "abi3", "abi3-py37"] }
pyo3 = { version = "0.17.1", features = [
"macros",
"anyhow",
"abi3",
"abi3-py37",
] }
pyo3-log = "0.8.1"
pythonize = "0.17.0"
regex = "1.6.0"
+12
View File
@@ -329,6 +329,17 @@ class MatrixConnectionAdapter(HTTPAdapter):
raise ValueError("Invalid host:port '%s'" % (server_name,))
return out[0], port, out[0]
# Look up SRV for Matrix 1.8 `matrix-fed` service first
try:
srv = srvlookup.lookup("matrix-fed", "tcp", server_name)[0]
print(
f"SRV lookup on _matrix-fed._tcp.{server_name} gave {srv}",
file=sys.stderr,
)
return srv.host, srv.port, server_name
except Exception:
pass
# Fall back to deprecated `matrix` service
try:
srv = srvlookup.lookup("matrix", "tcp", server_name)[0]
print(
@@ -337,6 +348,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
)
return srv.host, srv.port, server_name
except Exception:
# Fall even further back to just port 8448
return server_name, 8448, server_name
@staticmethod
+39 -4
View File
@@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState
from synapse.types import JsonDict
@attr.s(slots=True, auto_attribs=True)
class UserDevicePresenceState:
"""
Represents the current presence state of a user's device.
user_id: The user ID.
device_id: The user's device ID.
state: The presence state, see PresenceState.
last_active_ts: Time in msec that the device last interacted with server.
last_sync_ts: Time in msec that the device last *completed* a sync
(or event stream).
"""
user_id: str
device_id: Optional[str]
state: str
last_active_ts: int
last_sync_ts: int
@classmethod
def default(
cls, user_id: str, device_id: Optional[str]
) -> "UserDevicePresenceState":
"""Returns a default presence state."""
return cls(
user_id=user_id,
device_id=device_id,
state=PresenceState.OFFLINE,
last_active_ts=0,
last_sync_ts=0,
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPresenceState:
"""Represents the current presence state of the user.
user_id
last_active: Time in msec that the user last interacted with server.
last_federation_update: Time in msec since either a) we sent a presence
user_id: The user ID.
state: The presence state, see PresenceState.
last_active_ts: Time in msec that the user last interacted with server.
last_federation_update_ts: Time in msec since either a) we sent a presence
update to other servers or b) we received a presence update, depending
on if is a local user or not.
last_user_sync: Time in msec that the user last *completed* a sync
last_user_sync_ts: Time in msec that the user last *completed* a sync
(or event stream).
status_msg: User set status message.
currently_active: True if the user is currently syncing.
"""
user_id: str
+24 -8
View File
@@ -40,6 +40,7 @@ from synapse.appservice import (
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
from synapse.logging import opentracing
from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
@@ -125,6 +126,17 @@ class ApplicationServiceApi(SimpleHttpClient):
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
)
def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]:
"""This makes sure we have always the auth header and opentracing headers set."""
# This is also ensured before in the functions. However this is needed to please
# the typechecks.
assert service.hs_token is not None
headers = {b"Authorization": [b"Bearer " + service.hs_token.encode("ascii")]}
opentracing.inject_header_dict(headers, check_destination=False)
return headers
async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
if service.url is None:
return False
@@ -136,10 +148,11 @@ class ApplicationServiceApi(SimpleHttpClient):
args = None
if self.config.use_appservice_legacy_authorization:
args = {"access_token": service.hs_token}
response = await self.get_json(
f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}",
args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
if response is not None: # just an empty json object
return True
@@ -162,10 +175,11 @@ class ApplicationServiceApi(SimpleHttpClient):
args = None
if self.config.use_appservice_legacy_authorization:
args = {"access_token": service.hs_token}
response = await self.get_json(
f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}",
args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
if response is not None: # just an empty json object
return True
@@ -203,10 +217,11 @@ class ApplicationServiceApi(SimpleHttpClient):
**fields,
b"access_token": service.hs_token,
}
response = await self.get_json(
f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
args=args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
if not isinstance(response, list):
logger.warning(
@@ -243,10 +258,11 @@ class ApplicationServiceApi(SimpleHttpClient):
args = None
if self.config.use_appservice_legacy_authorization:
args = {"access_token": service.hs_token}
info = await self.get_json(
f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}",
args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
if not _is_valid_3pe_metadata(info):
@@ -283,7 +299,7 @@ class ApplicationServiceApi(SimpleHttpClient):
await self.post_json_get_json(
uri=f"{service.url}{APP_SERVICE_PREFIX}/ping",
post_json={"transaction_id": txn_id},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
async def push_bulk(
@@ -364,7 +380,7 @@ class ApplicationServiceApi(SimpleHttpClient):
f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}",
json_body=body,
args=args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -437,7 +453,7 @@ class ApplicationServiceApi(SimpleHttpClient):
response = await self.post_json_get_json(
uri,
body,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
except HttpResponseException as e:
# The appservice doesn't support this endpoint.
@@ -498,7 +514,7 @@ class ApplicationServiceApi(SimpleHttpClient):
response = await self.post_json_get_json(
uri,
query,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
headers=self._get_headers(service),
)
except HttpResponseException as e:
# The appservice doesn't support this endpoint.
+4 -3
View File
@@ -179,8 +179,9 @@ class Config:
If an integer is provided it is treated as bytes and is unchanged.
String byte sizes can have a suffix of 'K' or `M`, representing kibibytes and
mebibytes respectively. No suffix is understood as a plain byte count.
String byte sizes can have a suffix of 'K', `M`, `G` or `T`,
representing kibibytes, mebibytes, gibibytes and tebibytes respectively.
No suffix is understood as a plain byte count.
Raises:
TypeError, if given something other than an integer or a string
@@ -189,7 +190,7 @@ class Config:
if type(value) is int: # noqa: E721
return value
elif isinstance(value, str):
sizes = {"K": 1024, "M": 1024 * 1024}
sizes = {"K": 1024, "M": 1024 * 1024, "G": 1024**3, "T": 1024**4}
size = 1
suffix = value[-1]
if suffix in sizes:
+3
View File
@@ -57,6 +57,8 @@ class CasConfig(Config):
required_attributes
)
self.cas_enable_registration = cas_config.get("enable_registration", True)
self.idp_name = cas_config.get("idp_name", "CAS")
self.idp_icon = cas_config.get("idp_icon")
self.idp_brand = cas_config.get("idp_brand")
@@ -67,6 +69,7 @@ class CasConfig(Config):
self.cas_protocol_version = None
self.cas_displayname_attribute = None
self.cas_required_attributes = []
self.cas_enable_registration = False
# CAS uses a legacy required attributes mapping, not the one provided by
+2
View File
@@ -70,6 +70,7 @@ class CasHandler:
self._cas_protocol_version = hs.config.cas.cas_protocol_version
self._cas_displayname_attribute = hs.config.cas.cas_displayname_attribute
self._cas_required_attributes = hs.config.cas.cas_required_attributes
self._cas_enable_registration = hs.config.cas.cas_enable_registration
self._http_client = hs.get_proxied_http_client()
@@ -395,4 +396,5 @@ class CasHandler:
client_redirect_url,
cas_response_to_user_attributes,
grandfather_existing_users,
registration_enabled=self._cas_enable_registration,
)
+67 -6
View File
@@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.types import (
JsonDict,
JsonMapping,
ScheduledTask,
StrCollection,
StreamKeyType,
StreamToken,
TaskStatus,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -62,6 +65,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
@@ -78,6 +82,7 @@ class DeviceWorkerHandler:
self._appservice_handler = hs.get_application_service_handler()
self._state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
self._event_sources = hs.get_event_sources()
self.server_name = hs.hostname
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self._query_appservices_for_keys = (
@@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler):
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool
self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListUpdater(hs, self)
@@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler):
self._delete_stale_devices,
)
self._task_scheduler.register_action(
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)
def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
@@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler):
user_id: The user to delete devices from.
device_ids: The list of device IDs to delete
"""
to_device_stream_id = self._event_sources.get_current_token().to_device_key
try:
await self.store.delete_devices(user_id, device_ids)
@@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler):
f"org.matrix.msc3890.local_notification_settings.{device_id}",
)
# Delete device messages asynchronously and in batches using the task scheduler
await self._task_scheduler.schedule_task(
DELETE_DEVICE_MSGS_TASK_NAME,
resource_id=device_id,
params={
"user_id": user_id,
"device_id": device_id,
"up_to_stream_id": to_device_stream_id,
},
)
# Pushers are deleted after `delete_access_tokens_for_user` is called so that
# modules using `on_logged_out` hook can use them if needed.
await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
await self.notify_device_update(user_id, device_ids)
DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
async def _delete_device_messages(
self,
task: ScheduledTask,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
assert task.params is not None
user_id = task.params["user_id"]
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None
async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
"""Update the given device
@@ -982,7 +1030,7 @@ class DeviceListWorkerUpdater:
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
) -> Dict[str, Optional[JsonMapping]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
@@ -1011,6 +1059,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
self._notifier = hs.get_notifier()
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
self._resync_linearizer = Linearizer(name="remote_device_resync")
# user_id -> list of updates waiting to be handled.
self._pending_updates: Dict[
@@ -1057,6 +1106,10 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
)
prev_ids = [str(p) for p in prev_ids] # They may come as ints
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(user_id, allow_historical_mxids=True)
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
logger.warning(
@@ -1253,7 +1306,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
) -> Dict[str, Optional[JsonDict]]:
) -> Dict[str, Optional[JsonMapping]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
@@ -1273,9 +1326,11 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
failed = set()
# TODO(Perf): Actually batch these up
for user_id in user_ids:
user_result, user_failed = await self._user_device_resync_returning_failed(
user_id
)
async with self._resync_linearizer.queue(user_id):
(
user_result,
user_failed,
) = await self._user_device_resync_returning_failed(user_id)
result[user_id] = user_result
if user_failed:
failed.add(user_id)
@@ -1287,7 +1342,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
async def _user_device_resync_returning_failed(
self, user_id: str
) -> Tuple[Optional[JsonDict], bool]:
) -> Tuple[Optional[JsonMapping], bool]:
"""Fetches all devices for a user and updates the device cache with them.
Args:
@@ -1300,6 +1355,12 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
e.g. due to a connection problem.
- True iff the resync failed and the device list should be marked as stale.
"""
# Check that we haven't gone and fetched the devices since we last
# checked if we needed to resync these device lists.
if await self.store.get_users_whose_devices_are_cached([user_id]):
cached = await self.store.get_cached_devices_for_user(user_id)
return cached, False
logger.debug("Attempting to resync the device list for %s", user_id)
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
+4
View File
@@ -109,6 +109,10 @@ class DeviceMessageHandler:
origin,
sender_user_id,
)
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(sender_user_id, allow_historical_mxids=True)
message_type = content["type"]
message_id = content["message_id"]
for user_id, by_device in content["messages"].items():
+4
View File
@@ -1593,6 +1593,10 @@ class SigningKeyEduUpdater:
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
return
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(user_id, allow_historical_mxids=True)
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
+2 -6
View File
@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.api.constants import (
AccountDataTypes,
@@ -23,7 +23,6 @@ from synapse.api.constants import (
Membership,
)
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
@@ -35,7 +34,6 @@ from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
StateMap,
StreamKeyType,
StreamToken,
UserID,
@@ -199,9 +197,7 @@ class InitialSyncHandler:
deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events,
[event.event_id],
).addCallback(
lambda states: cast(StateMap[EventBase], states[event.event_id])
)
).addCallback(lambda states: states[event.event_id])
(messages, token), current_state = await make_deferred_yieldable(
gather_results(
+10 -2
View File
@@ -713,7 +713,7 @@ class PaginationHandler:
self,
delete_id: str,
room_id: str,
requester_user_id: str,
requester_user_id: Optional[str],
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
@@ -732,6 +732,10 @@ class PaginationHandler:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
If None, the action was not manually requested but instead
triggered automatically, e.g. through a Synapse module
or some other policy.
MUST NOT be None if block=True.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
@@ -818,7 +822,7 @@ class PaginationHandler:
def start_shutdown_and_purge_room(
self,
room_id: str,
requester_user_id: str,
requester_user_id: Optional[str],
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
@@ -833,6 +837,10 @@ class PaginationHandler:
requester_user_id:
User who requested the action and put the room on the
blocking list.
If None, the action was not manually requested but instead
triggered automatically, e.g. through a Synapse module
or some other policy.
MUST NOT be None if block=True.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
+249 -49
View File
@@ -13,13 +13,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module is responsible for keeping track of presence status of local
"""
This module is responsible for keeping track of presence status of local
and remote users.
The methods that define policy are:
- PresenceHandler._update_states
- PresenceHandler._handle_timeouts
- should_notify
# Tracking local presence
For local users, presence is tracked on a per-device basis. When a user has multiple
devices the user presence state is derived by coalescing the presence from each
device:
BUSY > ONLINE > UNAVAILABLE > OFFLINE
The time that each device was last active and last synced is tracked in order to
automatically downgrade a device's presence state:
A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
a period of time.
A device may go from any state -> OFFLINE, if it is not active and has not
synced for a period of time.
The timeouts are handled using a wheel timer, which has coarse buckets. Timings
do not need to be exact.
Generally a device's presence state is updated whenever a user syncs (via the
set_presence parameter), when the presence API is called, or if "pro-active"
events occur, including:
* Sending an event, receipt, read marker.
* Updating typing status.
The busy state has special status that it cannot is not downgraded by a call to
sync with a lower priority state *and* it takes a long period of time to transition
to offline.
# Persisting (and restoring) presence
For all users, presence is persisted on a per-user basis. Data is kept in-memory
and persisted periodically. When Synapse starts each worker loads the current
presence state and then tracks the presence stream to keep itself up-to-date.
When restoring presence for local users a pseudo-device is created to match the
user state; this device follows the normal timeout logic (see above) and will
automatically be replaced with any information from currently available devices.
"""
import abc
import contextlib
@@ -30,6 +73,7 @@ from contextlib import contextmanager
from types import TracebackType
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Collection,
@@ -49,7 +93,7 @@ from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
@@ -111,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000
# How long to wait until a new /events or /sync request before assuming
# the client has gone.
SYNC_ONLINE_TIMEOUT = 30 * 1000
# Busy status waits longer, but does eventually go offline.
BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000
# How long to wait before marking the user as idle. Compared against last active
IDLE_TIMER = 5 * 60 * 1000
@@ -137,6 +183,7 @@ class BasePresenceHandler(abc.ABC):
writer"""
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
@@ -162,6 +209,7 @@ class BasePresenceHandler(abc.ABC):
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
# The combined status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence}
@abc.abstractmethod
@@ -426,8 +474,6 @@ class _NullContextManager(ContextManager[None]):
class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self._presence_writer_instance = hs.config.worker.writers.presence[0]
# Route presence EDUs to the right worker
@@ -691,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
@@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
lambda: len(self.user_to_current_state),
)
# The per-device presence state, maps user to devices to per-device presence state.
self._user_to_device_to_current_state: Dict[
str, Dict[Optional[str], UserDevicePresenceState]
] = {}
now = self.clock.time_msec()
if self._presence_enabled:
for state in self.user_to_current_state.values():
# Create a psuedo-device to properly handle time outs. This will
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
pseudo_device_id = None
self._user_to_device_to_current_state[state.user_id] = {
pseudo_device_id: UserDevicePresenceState(
user_id=state.user_id,
device_id=pseudo_device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
}
self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
@@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on other processes.
#
# While any sync is ongoing on another process the user will never
# While any sync is ongoing on another process the user's device will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
@@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
timers_fired_counter.inc(len(states))
syncing_user_ids = {
user_id
for (user_id, _), count in self._user_device_to_num_current_syncs.items()
# Set of user ID & device IDs which are currently syncing.
syncing_user_devices = {
user_id_device_id
for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count
}
syncing_user_ids.update(
user_id
for user_id, _ in itertools.chain(
*self.external_process_to_current_syncs.values()
)
syncing_user_devices.update(
itertools.chain(*self.external_process_to_current_syncs.values())
)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=syncing_user_ids,
syncing_user_devices=syncing_user_devices,
user_to_devices=self._user_to_device_to_current_state,
now=now,
)
@@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
bump_active_time_counter.inc()
prev_state = await self.current_state_for_user(user_id)
now = self.clock.time_msec()
new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
if prev_state.state == PresenceState.UNAVAILABLE:
new_fields["state"] = PresenceState.ONLINE
# Update the device information & mark the device as online if it was
# unavailable.
devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState.default(user_id, device_id),
)
device_state.last_active_ts = now
if device_state.state == PresenceState.UNAVAILABLE:
device_state.state = PresenceState.ONLINE
# Update the user state, this will always update last_active_ts and
# might update the presence state.
prev_state = await self.current_state_for_user(user_id)
new_fields: Dict[str, Any] = {
"last_active_ts": now,
"state": _combine_device_states(devices.values()),
}
await self._update_states([prev_state.copy_and_replace(**new_fields)])
@@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
if is_syncing and (user_id, device_id) not in process_presence:
process_presence.add((user_id, device_id))
elif not is_syncing and (user_id, device_id) in process_presence:
devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)
device_state.last_sync_ts = sync_time_msec
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
@@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
prev_states = await self.current_state_for_users(
{user_id for user_id, device_id in process_presence}
)
time_now_ms = self.clock.time_msec()
# Mark each device as having a last sync time.
updated_users = set()
for user_id, device_id in process_presence:
device_state = self._user_to_device_to_current_state.setdefault(
user_id, {}
).setdefault(
device_id, UserDevicePresenceState.default(user_id, device_id)
)
device_state.last_sync_ts = time_now_ms
updated_users.add(user_id)
# Update each user (and insert into the appropriate timers to check if
# they've gone offline).
prev_states = await self.current_state_for_users(updated_users)
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY
# Update the device specific information.
devices = self._user_to_device_to_current_state.setdefault(user_id, {})
device_state = devices.setdefault(
device_id,
UserDevicePresenceState.default(user_id, device_id),
)
device_state.state = presence
device_state.last_active_ts = now
if is_sync:
device_state.last_sync_ts = now
# Based on the state of each user's device calculate the new presence state.
presence = _combine_device_states(devices.values())
new_fields = {"state": presence}
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str],
syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
@@ -1882,7 +1993,8 @@ def handle_timeouts(
Args:
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids: Set of user_ids with active syncs.
syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1891,9 +2003,16 @@ def handle_timeouts(
changes = {} # Actual changes we need to notify people about
for state in user_states:
is_mine = is_mine_fn(state.user_id)
user_id = state.user_id
is_mine = is_mine_fn(user_id)
new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
new_state = handle_timeout(
state,
is_mine,
syncing_user_devices,
user_to_devices.get(user_id, {}),
now,
)
if new_state:
changes[state.user_id] = new_state
@@ -1901,14 +2020,19 @@ def handle_timeouts(
def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
state: UserPresenceState,
is_mine: bool,
syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
user_devices: Dict[Optional[str], UserDevicePresenceState],
now: int,
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state
state: UserPresenceState to check.
is_mine: Whether the user is ours
syncing_user_ids: Set of user_ids with active syncs.
syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
user_devices: A map of device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1919,34 +2043,63 @@ def handle_timeout(
return None
changed = False
user_id = state.user_id
if is_mine:
if state.state == PresenceState.ONLINE:
if now - state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
changed = True
elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
# stopped updating.
# Check per-device whether the device should be considered idle or offline
# due to timeouts.
device_changed = False
offline_devices = []
for device_id, device_state in user_devices.items():
if device_state.state == PresenceState.ONLINE:
if now - device_state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
device_state.state = PresenceState.UNAVAILABLE
device_changed = True
# If there are have been no sync for a while (and none ongoing),
# set presence to offline.
if (state.user_id, device_id) not in syncing_device_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(
device_state.last_sync_ts, device_state.last_active_ts
)
# Implementations aren't meant to timeout a device with a busy
# state, but it needs to timeout *eventually* or else the user
# will be stuck in that state.
online_timeout = (
BUSY_ONLINE_TIMEOUT
if device_state.state == PresenceState.BUSY
else SYNC_ONLINE_TIMEOUT
)
if now - sync_or_active > online_timeout:
# Mark the device as going offline.
offline_devices.append(device_id)
device_changed = True
# Offline devices are not needed and do not add information.
for device_id in offline_devices:
user_devices.pop(device_id)
# If the presence state of the devices changed, then (maybe) update
# the user's overall presence state.
if device_changed:
new_presence = _combine_device_states(user_devices.values())
if new_presence != state.state:
state = state.copy_and_replace(state=new_presence)
changed = True
if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
# stopped updating.
changed = True
if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
# Need to send ping to other servers to ensure they don't
# timeout and set us to offline
changed = True
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
# If the user has done something recently but hasn't synced,
# don't set them as offline.
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(state=PresenceState.OFFLINE)
changed = True
else:
# We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
@@ -2021,6 +2174,13 @@ def handle_update(
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True
if new_state.state == PresenceState.BUSY:
wheel_timer.insert(
now=now,
obj=user_id,
then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT,
)
else:
wheel_timer.insert(
now=now,
@@ -2036,6 +2196,46 @@ def handle_update(
return new_state, persist_and_notify, federation_ping
PRESENCE_BY_PRIORITY = {
PresenceState.BUSY: 4,
PresenceState.ONLINE: 3,
PresenceState.UNAVAILABLE: 2,
PresenceState.OFFLINE: 1,
}
def _combine_device_states(
device_states: Iterable[UserDevicePresenceState],
) -> str:
"""
Find the device to use presence information from.
Orders devices by priority, then last_active_ts.
Args:
device_states: An iterable of device presence states
Return:
The combined presence state.
"""
# Based on (all) the user's devices calculate the new presence state.
presence = PresenceState.OFFLINE
last_active_ts = -1
# Find the device to use the presence state of based on the presence priority,
# but tie-break with how recently the device has been seen.
for device_state in device_states:
if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
PRESENCE_BY_PRIORITY[presence],
last_active_ts,
):
presence = device_state.state
last_active_ts = device_state.last_active_ts
return presence
async def get_interested_parties(
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
+4
View File
@@ -117,6 +117,10 @@ class ReceiptsHandler:
max_batch_id: Optional[int] = None
for receipt in receipts:
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(receipt.user_id, allow_historical_mxids=True)
res = await self.store.insert_receipt(
receipt.room_id,
receipt.receipt_type,
+9 -1
View File
@@ -1787,7 +1787,7 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
requester_user_id: str,
requester_user_id: Optional[str],
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
@@ -1811,6 +1811,10 @@ class RoomShutdownHandler:
requester_user_id:
User who requested the action and put the room on the
blocking list.
If None, the action was not manually requested but instead
triggered automatically, e.g. through a Synapse module
or some other policy.
MUST NOT be None if block=True.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
@@ -1863,6 +1867,10 @@ class RoomShutdownHandler:
# Action the block first (even if the room doesn't exist yet)
if block:
if requester_user_id is None:
raise ValueError(
"shutdown_room: block=True not allowed when requester_user_id is None."
)
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
+13 -3
View File
@@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
@@ -268,6 +269,7 @@ class SyncHandler:
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
self._task_scheduler = hs.get_task_scheduler()
self.should_calculate_push_rules = hs.config.push.enable_push
@@ -360,11 +362,19 @@ class SyncHandler:
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
deleted = await self.store.delete_messages_for_device(
sync_config.user.to_string(), sync_config.device_id, since_stream_id
# Delete device messages asynchronously and in batches using the task scheduler
await self._task_scheduler.schedule_task(
DELETE_DEVICE_MSGS_TASK_NAME,
resource_id=sync_config.device_id,
params={
"user_id": sync_config.user.to_string(),
"device_id": sync_config.device_id,
"up_to_stream_id": since_stream_id,
},
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
"Deletion of to-device messages up to %d scheduled",
since_stream_id,
)
if timeout == 0 or since_token is None or full_state:
+4
View File
@@ -370,6 +370,10 @@ class TypingWriterHandler(FollowerTypingHandler):
room_id = content["room_id"]
user_id = content["user_id"]
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(user_id, allow_historical_mxids=True)
# If we're not in the room just ditch the event entirely. This is
# probably an old server that has come back and thinks we're still in
# the room (or we've been rejoined to the room by a state reset).
@@ -399,15 +399,34 @@ class MatrixHostnameEndpoint:
if port or _is_ip_literal(host):
return [Server(host, port or 8448)]
# Check _matrix-fed._tcp SRV record.
logger.debug("Looking up SRV record for %s", host.decode(errors="replace"))
server_list = await self._srv_resolver.resolve_service(
b"_matrix-fed._tcp." + host
)
if server_list:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Got %s from SRV lookup for %s",
", ".join(map(str, server_list)),
host.decode(errors="replace"),
)
return server_list
# No _matrix-fed._tcp SRV record, fallback to legacy _matrix._tcp SRV record.
logger.debug(
"Looking up deprecated SRV record for %s", host.decode(errors="replace")
)
server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
if server_list:
logger.debug(
"Got %s from SRV lookup for %s",
", ".join(map(str, server_list)),
host.decode(errors="replace"),
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Got %s from deprecated SRV lookup for %s",
", ".join(map(str, server_list)),
host.decode(errors="replace"),
)
return server_list
# No SRV records, so we fallback to host and 8448
+2 -2
View File
@@ -728,7 +728,7 @@ async def _unwrap_awaitable(awaitable: Awaitable[R]) -> R:
@overload
def preserve_fn( # type: ignore[misc]
def preserve_fn(
f: Callable[P, Awaitable[R]],
) -> Callable[P, "defer.Deferred[R]"]:
# The `type: ignore[misc]` above suppresses
@@ -756,7 +756,7 @@ def preserve_fn(
@overload
def run_in_background( # type: ignore[misc]
def run_in_background(
f: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs
) -> "defer.Deferred[R]":
# The `type: ignore[misc]` above suppresses
+13
View File
@@ -1730,6 +1730,19 @@ class ModuleApi:
room_alias_str = room_alias.to_string() if room_alias else None
return room_id, room_alias_str
async def delete_room(self, room_id: str) -> None:
"""
Schedules the deletion of a room from Synapse's database.
If the room is already being deleted, this method does nothing.
This method does not wait for the room to be deleted.
Added in Synapse v1.89.0.
"""
# Future extensions to this method might want to e.g. allow use of `force_purge`.
# TODO In the future we should make sure this is persistent.
self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None)
async def set_displayname(
self,
user_id: UserID,
@@ -40,7 +40,7 @@ CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[
[str, StateMap[EventBase], str], Awaitable[bool]
]
ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable]
CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]]
CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[Optional[str], str], Awaitable[bool]]
CHECK_CAN_DEACTIVATE_USER_CALLBACK = Callable[[str, bool], Awaitable[bool]]
ON_PROFILE_UPDATE_CALLBACK = Callable[[str, ProfileInfo, bool, bool], Awaitable]
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK = Callable[[str, bool, bool], Awaitable]
@@ -429,12 +429,17 @@ class ThirdPartyEventRulesModuleApiCallbacks:
"Failed to run module API callback %s: %s", callback, e
)
async def check_can_shutdown_room(self, user_id: str, room_id: str) -> bool:
async def check_can_shutdown_room(
self, user_id: Optional[str], room_id: str
) -> bool:
"""Intercept requests to shutdown a room. If `False` is returned, the
room must not be shut down.
Args:
requester: The ID of the user requesting the shutdown.
user_id: The ID of the user requesting the shutdown.
If no user ID is supplied, then the room is being shut down through
some mechanism other than a user's request, e.g. through a module's
request.
room_id: The ID of the room.
"""
for callback in self._check_can_shutdown_room_callbacks:
+2 -2
View File
@@ -20,7 +20,7 @@ from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.logging.opentracing import active_span
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
from synapse.types import JsonDict, JsonMapping
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -82,7 +82,7 @@ class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, Dict[str, Optional[JsonDict]]]:
) -> Tuple[int, Dict[str, Optional[JsonMapping]]]:
user_ids: List[str] = content["user_ids"]
logger.info("Resync for %r", user_ids)
+1 -1
View File
@@ -123,7 +123,7 @@ class ClientRestResource(JsonResource):
if is_main_process:
report_event.register_servlets(hs, client_resource)
openid.register_servlets(hs, client_resource)
notifications.register_servlets(hs, client_resource)
notifications.register_servlets(hs, client_resource)
devices.register_servlets(hs, client_resource)
if is_main_process:
thirdparty.register_servlets(hs, client_resource)
+2
View File
@@ -36,6 +36,8 @@ logger = logging.getLogger(__name__)
class NotificationsServlet(RestServlet):
PATTERNS = client_patterns("/notifications$")
CATEGORY = "Client API requests"
def __init__(self, hs: "HomeServer"):
super().__init__()
self.store = hs.get_datastores().main
@@ -63,6 +63,7 @@ from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
StateMap,
UserID,
get_domain_from_id,
)
from synapse.types.state import StateFilter
@@ -397,6 +398,10 @@ class EventsPersistenceStorageController:
event_ids: List[str] = []
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
# The result of `is_valid` is not used yet because for now we only want to
# log invalid mxids in the wild.
UserID.is_valid(event.user_id, allow_historical_mxids=True)
partitioned.setdefault(event.room_id, []).append((event, ctx))
event_ids.append(event.event_id)
+21 -7
View File
@@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
table="devices",
column="user_id",
iterable=user_ids_to_query,
keyvalues={"user_id": user_id, "hidden": False},
keyvalues={"hidden": False},
retcols=("device_id",),
)
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
self, user_id: str, device_id: Optional[str], up_to_stream_id: int
self,
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
ROW_ID_NAME = self.database_engine.row_id_name
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
sql = (
"DELETE FROM device_inbox"
" WHERE user_id = ? AND device_id = ?"
" AND stream_id <= ?"
)
sql = f"""
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
SELECT {ROW_ID_NAME} FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
LIMIT {limit}
)
"""
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
# In this case we don't know if we hit the limit or the delete is complete
# so let's not update the cache.
if count == limit:
return count
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
+17 -17
View File
@@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
mapping of user_id -> device_id -> device_info.
"""
unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids}
user_map = await self.get_device_list_last_stream_id_for_remotes(
list(unique_user_ids)
)
# We go and check if any of the users need to have their device lists
# resynced. If they do then we remove them from the cached list.
users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
user_ids_in_cache = await self.get_users_whose_devices_are_cached(
unique_user_ids
)
user_ids_in_cache = {
user_id for user_id, stream_id in user_map.items() if stream_id
} - users_needing_resync
user_ids_not_in_cache = unique_user_ids - user_ids_in_cache
# First fetch all the users which all devices are to be returned.
@@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return user_ids_not_in_cache, results
async def get_users_whose_devices_are_cached(
self, user_ids: StrCollection
) -> Set[str]:
"""Checks which of the given users we have cached the devices for."""
user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids)
# We go and check if any of the users need to have their device lists
# resynced. If they do then we remove them from the cached list.
users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
user_ids
)
user_ids_in_cache = {
user_id for user_id, stream_id in user_map.items() if stream_id
} - users_needing_resync
return user_ids_in_cache
@cached(num_args=2, tree=True)
async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict:
content = await self.db_pool.simple_select_one_onecol(
@@ -1764,14 +1772,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
keyvalues={"user_id": user_id, "hidden": False},
)
self.db_pool.simple_delete_many_txn(
txn,
table="device_inbox",
column="device_id",
values=device_ids,
keyvalues={"user_id": user_id},
)
self.db_pool.simple_delete_many_txn(
txn,
table="device_auth_providers",
@@ -1740,42 +1740,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0)
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.db_pool.updates.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1",
)
# Add index to make deleting old push actions faster.
self.db_pool.updates.register_background_index_update(
"event_push_actions_stream_highlight_index",
index_name="event_push_actions_stream_highlight_index",
table="event_push_actions",
columns=["highlight", "stream_ordering"],
where_clause="highlight=0",
)
async def get_push_actions_for_user(
self,
user_id: str,
@@ -1834,6 +1798,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
]
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.db_pool.updates.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1",
)
# Add index to make deleting old push actions faster.
self.db_pool.updates.register_background_index_update(
"event_push_actions_stream_highlight_index",
index_name="event_push_actions_stream_highlight_index",
table="event_push_actions",
columns=["highlight", "stream_ordering"],
where_clause="highlight=0",
)
def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
for action in actions:
if not isinstance(action, dict):
+11 -6
View File
@@ -221,12 +221,17 @@ class KeyStore(CacheInvalidationWorkerStore):
"""Processes a batch of keys to fetch, and adds the result to `keys`."""
# batch_iter always returns tuples so it's safe to do len(batch)
sql = """
SELECT server_name, key_id, key_json, ts_valid_until_ms
FROM server_keys_json WHERE 1=0
""" + " OR (server_name=? AND key_id=?)" * len(
batch
)
where_clause = " OR (server_name=? AND key_id=?)" * len(batch)
# `server_keys_json` can have multiple entries per server (one per
# remote server we fetched from, if using perspectives). Order by
# `ts_added_ms` so the most recently fetched one always wins.
sql = f"""
SELECT server_name, key_id, key_json, ts_valid_until_ms
FROM server_keys_json WHERE 1=0
{where_clause}
ORDER BY ts_added_ms
"""
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
+1 -5
View File
@@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
if isinstance(self.database_engine, PostgresEngine):
ROW_ID_NAME = "ctid"
else:
ROW_ID_NAME = "rowid"
ROW_ID_NAME = self.database_engine.row_id_name
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
+6
View File
@@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""Gets a string giving the server version. For example: '3.22.0'"""
...
@property
@abc.abstractmethod
def row_id_name(self) -> str:
"""Gets the literal name representing a row id for this engine."""
...
@abc.abstractmethod
def in_transaction(self, conn: ConnectionType) -> bool:
"""Whether the connection is currently in a transaction."""
+4
View File
@@ -211,6 +211,10 @@ class PostgresEngine(
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
@property
def row_id_name(self) -> str:
return "ctid"
def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY
+4
View File
@@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
"""Gets a string giving the server version. For example: '3.22.0'."""
return "%i.%i.%i" % sqlite3.sqlite_version_info
@property
def row_id_name(self) -> str:
return "rowid"
def in_transaction(self, conn: sqlite3.Connection) -> bool:
return conn.in_transaction
@@ -14,7 +14,7 @@
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements
FIX_INDEXES = """
@@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
rowid = database_engine.row_id_name
# remove duplicates from group_users & group_invites tables
cur.execute(
+47 -6
View File
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import re
import string
from enum import Enum
@@ -64,6 +65,9 @@ if TYPE_CHECKING:
from synapse.storage.databases.main import DataStore, PurgeEventsStore
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
logger = logging.getLogger(__name__)
# Define a state map type from type/state_key to T (usually an event ID or
# event)
T = TypeVar("T")
@@ -306,7 +310,7 @@ class DomainSpecificString(metaclass=abc.ABCMeta):
return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain)
@classmethod
def is_valid(cls: Type[DS], s: str) -> bool:
def is_valid(cls: Type[DS], s: str, **kwargs: Any) -> bool:
"""Parses the input string and attempts to ensure it is valid."""
# TODO: this does not reject an empty localpart or an overly-long string.
# See https://spec.matrix.org/v1.2/appendices/#identifier-grammar
@@ -329,6 +333,35 @@ class UserID(DomainSpecificString):
SIGIL = "@"
@classmethod
def is_valid(cls: Type[DS], s: str, **kwargs: Any) -> bool:
""""""
"""Parses the user id str and attempts to ensure it is valid per the spec.
Args:
allow_historical_mxids: True to allow historical mxids, which can
include all printable ASCII chars minus `:`
Returns:
False if the user ID is invalid per the spec
"""
allow_historical_mxids = kwargs.get("allow_historical_mxids", False)
is_valid = DomainSpecificString.is_valid(s)
if len(s.encode("utf-8")) > 255:
logger.warn(
f"User ID {s} has more than 255 bytes and is invalid per the spec"
)
is_valid = False
obj = UserID.from_string(s)
if contains_invalid_mxid_characters(obj.localpart, allow_historical_mxids):
logger.warn(
f"localpart of User ID {s} contains invalid characters per the spec"
)
is_valid = False
return is_valid
@attr.s(slots=True, frozen=True, repr=False)
class RoomAlias(DomainSpecificString):
@@ -355,22 +388,30 @@ MXID_LOCALPART_ALLOWED_CHARACTERS = set(
"_-./=+" + string.ascii_lowercase + string.digits
)
MXID_HISTORICAL_LOCALPART_ALLOWED_CHARACTERS = set(string.printable.replace(":", ""))
# Guest user IDs are purely numeric.
GUEST_USER_ID_PATTERN = re.compile(r"^\d+$")
def contains_invalid_mxid_characters(localpart: str) -> bool:
def contains_invalid_mxid_characters(
localpart: str, allow_historical_mxids: Optional[bool] = False
) -> bool:
"""Check for characters not allowed in an mxid or groupid localpart
Args:
localpart: the localpart to be checked
use_extended_character_set: True to use the extended allowed characters
from MSC4009.
allow_historical_mxids: True to allow historical mxids, which can
include all printable ASCII chars minus `:`
Returns:
True if there are any naughty characters
"""
return any(c not in MXID_LOCALPART_ALLOWED_CHARACTERS for c in localpart)
if allow_historical_mxids:
allowed_characters = MXID_HISTORICAL_LOCALPART_ALLOWED_CHARACTERS
else:
allowed_characters = MXID_LOCALPART_ALLOWED_CHARACTERS
return any(c not in allowed_characters for c in localpart)
UPPER_CASE_PATTERN = re.compile(b"[A-Z_]")
+1 -1
View File
@@ -136,7 +136,7 @@ class GAIResolver:
# The types on IHostnameResolver is incorrect in Twisted, see
# https://twistedmatrix.com/trac/ticket/10276
def resolveHostName( # type: ignore[override]
def resolveHostName(
self,
resolutionReceiver: IResolutionReceiver,
hostName: str,
+7 -10
View File
@@ -77,6 +77,7 @@ class TaskScheduler:
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
def __init__(self, hs: "HomeServer"):
self._hs = hs
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
self._running_tasks: Set[str] = set()
@@ -97,8 +98,6 @@ class TaskScheduler:
"handle_scheduled_tasks",
self._handle_scheduled_tasks,
)
else:
self.replication_client = hs.get_replication_command_handler()
def register_action(
self,
@@ -133,7 +132,7 @@ class TaskScheduler:
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
`action` should have been previously registered with `register_action`.
`action` should have be registered with `register_action` before the task is run.
Args:
action: the name of a previously registered action
@@ -149,11 +148,6 @@ class TaskScheduler:
Returns:
The id of the scheduled task
"""
if action not in self._actions:
raise Exception(
f"No function associated with action {action} of the scheduled task"
)
status = TaskStatus.SCHEDULED
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
@@ -175,7 +169,7 @@ class TaskScheduler:
if self._run_background_tasks:
await self._launch_task(task)
else:
self.replication_client.send_new_active_task(task.id)
self._hs.get_replication_command_handler().send_new_active_task(task.id)
return task.id
@@ -315,7 +309,10 @@ class TaskScheduler:
"""
assert self._run_background_tasks
assert task.action in self._actions
if task.action not in self._actions:
raise Exception(
f"No function associated with action {task.action} of the scheduled task {task.id}"
)
function = self._actions[task.action]
async def wrapper() -> None:
+12 -6
View File
@@ -76,7 +76,7 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]],
) -> List[JsonDict]:
# Ensure the access token is passed as a header.
if not headers or not headers.get("Authorization"):
if not headers or not headers.get(b"Authorization"):
raise RuntimeError("Access token not provided")
# ... and not as a query param
if b"access_token" in args:
@@ -84,7 +84,9 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
"Access token should not be passed as a query param."
)
self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"])
self.assertEqual(
headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()]
)
self.request_url = url
if url == URL_USER:
return SUCCESS_RESULT_USER
@@ -152,11 +154,13 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
# Ensure the access token is passed as a both a query param and in the headers.
if not args.get(b"access_token"):
raise RuntimeError("Access token should be provided in query params.")
if not headers or not headers.get("Authorization"):
if not headers or not headers.get(b"Authorization"):
raise RuntimeError("Access token should be provided in auth headers.")
self.assertEqual(args.get(b"access_token"), TOKEN)
self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"])
self.assertEqual(
headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()]
)
self.request_url = url
if url == URL_USER:
return SUCCESS_RESULT_USER
@@ -208,10 +212,12 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]],
) -> JsonDict:
# Ensure the access token is passed as both a header and query arg.
if not headers.get("Authorization"):
if not headers.get(b"Authorization"):
raise RuntimeError("Access token not provided")
self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"])
self.assertEqual(
headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()]
)
return RESPONSE
# We assign to a method, which mypy doesn't like.
+125
View File
@@ -422,6 +422,18 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
"exclusive_as_user", "password", self.exclusive_as_user_device_id
)
self.exclusive_as_user_2_device_id = "exclusive_as_device_2"
self.exclusive_as_user_2 = self.register_user("exclusive_as_user_2", "password")
self.exclusive_as_user_2_token = self.login(
"exclusive_as_user_2", "password", self.exclusive_as_user_2_device_id
)
self.exclusive_as_user_3_device_id = "exclusive_as_device_3"
self.exclusive_as_user_3 = self.register_user("exclusive_as_user_3", "password")
self.exclusive_as_user_3_token = self.login(
"exclusive_as_user_3", "password", self.exclusive_as_user_3_device_id
)
def _notify_interested_services(self) -> None:
# This is normally set in `notify_interested_services` but we need to call the
# internal async version so the reactor gets pushed to completion.
@@ -849,6 +861,119 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
for count in service_id_to_message_count.values():
self.assertEqual(count, number_of_messages)
@unittest.override_config(
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
)
def test_application_services_receive_local_to_device_for_many_users(self) -> None:
"""
Test that when a user sends a to-device message to many users
in an application service's user namespace, the
application service will receive all of them.
"""
interested_appservice = self._register_application_service(
namespaces={
ApplicationService.NS_USERS: [
{
"regex": "@exclusive_as_user:.+",
"exclusive": True,
},
{
"regex": "@exclusive_as_user_2:.+",
"exclusive": True,
},
{
"regex": "@exclusive_as_user_3:.+",
"exclusive": True,
},
],
},
)
# Have local_user send a to-device message to exclusive_as_users
message_content = {"some_key": "some really interesting value"}
chan = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.room_key_request/3",
content={
"messages": {
self.exclusive_as_user: {
self.exclusive_as_user_device_id: message_content
},
self.exclusive_as_user_2: {
self.exclusive_as_user_2_device_id: message_content
},
self.exclusive_as_user_3: {
self.exclusive_as_user_3_device_id: message_content
},
}
},
access_token=self.local_user_token,
)
self.assertEqual(chan.code, 200, chan.result)
# Have exclusive_as_user send a to-device message to local_user
for user_token in [
self.exclusive_as_user_token,
self.exclusive_as_user_2_token,
self.exclusive_as_user_3_token,
]:
chan = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.room_key_request/4",
content={
"messages": {
self.local_user: {self.local_user_device_id: message_content}
}
},
access_token=user_token,
)
self.assertEqual(chan.code, 200, chan.result)
# Check if our application service - that is interested in exclusive_as_user - received
# the to-device message as part of an AS transaction.
# Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS.
#
# The uninterested application service should not have been notified at all.
self.send_mock.assert_called_once()
(
service,
_events,
_ephemeral,
to_device_messages,
_otks,
_fbks,
_device_list_summary,
) = self.send_mock.call_args[0]
# Assert that this was the same to-device message that local_user sent
self.assertEqual(service, interested_appservice)
# Assert expected number of messages
self.assertEqual(len(to_device_messages), 3)
for device_msg in to_device_messages:
self.assertEqual(device_msg["type"], "m.room_key_request")
self.assertEqual(device_msg["sender"], self.local_user)
self.assertEqual(device_msg["content"], message_content)
self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user)
self.assertEqual(
to_device_messages[0]["to_device_id"],
self.exclusive_as_user_device_id,
)
self.assertEqual(to_device_messages[1]["to_user_id"], self.exclusive_as_user_2)
self.assertEqual(
to_device_messages[1]["to_device_id"],
self.exclusive_as_user_2_device_id,
)
self.assertEqual(to_device_messages[2]["to_user_id"], self.exclusive_as_user_3)
self.assertEqual(
to_device_messages[2]["to_device_id"],
self.exclusive_as_user_3_device_id,
)
def _register_application_service(
self,
namespaces: Optional[Dict[str, Iterable[Dict]]] = None,
+17
View File
@@ -197,6 +197,23 @@ class CasHandlerTestCase(HomeserverTestCase):
auth_provider_session_id=None,
)
@override_config({"cas_config": {"enable_registration": False}})
def test_map_cas_user_does_not_register_new_user(self) -> None:
"""Ensures new users are not registered if the enabled registration flag is disabled."""
# stub out the auth handler
auth_handler = self.hs.get_auth_handler()
auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
cas_response = CasResponse("test_user", {})
request = _mock_request()
self.get_success(
self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
)
# check that the auth handler was not called as expected
auth_handler.complete_sso_login.assert_not_called()
def _mock_request() -> Mock:
"""Returns a mock which will stand in as a SynapseRequest"""
+47
View File
@@ -30,6 +30,7 @@ from synapse.server import HomeServer
from synapse.storage.databases.main.appservice import _make_exclusive_regex
from synapse.types import JsonDict, create_requester
from synapse.util import Clock
from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
from tests.unittest import override_config
@@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
assert isinstance(handler, DeviceHandler)
self.handler = handler
self.store = hs.get_datastores().main
self.device_message_handler = hs.get_device_message_handler()
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase):
)
self.assertIsNone(res)
def test_delete_device_and_big_device_inbox(self) -> None:
"""Check that deleting a big device inbox is staged and batched asynchronously."""
DEVICE_ID = "abc"
sender = "@sender:" + self.hs.hostname
receiver = "@receiver:" + self.hs.hostname
self._record_user(sender, DEVICE_ID, DEVICE_ID)
self._record_user(receiver, DEVICE_ID, DEVICE_ID)
# queue a bunch of messages in the inbox
requester = create_requester(sender, device_id=DEVICE_ID)
for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10):
self.get_success(
self.device_message_handler.send_device_message(
requester, "message_type", {receiver: {"*": {"val": i}}}
)
)
# delete the device
self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID]))
# messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away
res = self.get_success(
self.store.db_pool.simple_select_list(
table="device_inbox",
keyvalues={"user_id": receiver},
retcols=("user_id", "device_id", "stream_id"),
desc="get_device_id_from_device_inbox",
)
)
self.assertEqual(10, len(res))
# wait for the task scheduler to do a second delete pass
self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)
# remaining messages should now be deleted
res = self.get_success(
self.store.db_pool.simple_select_list(
table="device_inbox",
keyvalues={"user_id": receiver},
retcols=("user_id", "device_id", "stream_id"),
desc="get_device_id_from_device_inbox",
)
)
self.assertEqual(0, len(res))
def test_update_device(self) -> None:
self._record_users()
+588 -10
View File
@@ -21,11 +21,12 @@ from signedjson.key import generate_signing_key
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.presence import UserPresenceState
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events.builder import EventBuilder
from synapse.federation.sender import FederationSender
from synapse.handlers.presence import (
BUSY_ONLINE_TIMEOUT,
EXTERNAL_PROCESS_EXPIRY,
FEDERATION_PING_INTERVAL,
FEDERATION_TIMEOUT,
@@ -352,6 +353,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_idle_timer(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -362,8 +364,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
assert new_state is not None
@@ -376,6 +391,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
presence state into unavailable.
"""
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -386,8 +402,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
assert new_state is not None
@@ -396,6 +425,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_sync_timeout(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -406,8 +436,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
assert new_state is not None
@@ -416,6 +459,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_sync_online(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -426,9 +470,20 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(
state, is_mine=True, syncing_user_ids={user_id}, now=now
state,
is_mine=True,
syncing_device_ids={(user_id, device_id)},
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
@@ -438,6 +493,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_federation_ping(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -449,14 +505,28 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
self.assertEqual(state, new_state)
def test_no_timeout(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
now = 5000000
state = UserPresenceState.default(user_id)
@@ -466,8 +536,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_user_sync_ts=now,
last_federation_update_ts=now,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNone(new_state)
@@ -485,8 +568,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
status_msg=status_msg,
)
# Note that this is a remote user so we do not have their device information.
new_state = handle_timeout(
state, is_mine=False, syncing_user_ids=set(), now=now
state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now
)
self.assertIsNotNone(new_state)
@@ -496,6 +580,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
def test_last_active(self) -> None:
user_id = "@foo:bar"
device_id = "dev-1"
status_msg = "I'm here!"
now = 5000000
@@ -507,8 +592,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
last_federation_update_ts=now,
status_msg=status_msg,
)
device_state = UserDevicePresenceState(
user_id=user_id,
device_id=device_id,
state=state.state,
last_active_ts=state.last_active_ts,
last_sync_ts=state.last_user_sync_ts,
)
new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
new_state = handle_timeout(
state,
is_mine=True,
syncing_device_ids=set(),
user_devices={device_id: device_state},
now=now,
)
self.assertIsNotNone(new_state)
self.assertEqual(state, new_state)
@@ -579,7 +677,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
[
(PresenceState.BUSY, PresenceState.BUSY),
(PresenceState.ONLINE, PresenceState.ONLINE),
(PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
(PresenceState.UNAVAILABLE, PresenceState.ONLINE),
# Offline syncs don't update the state.
(PresenceState.OFFLINE, PresenceState.ONLINE),
]
@@ -800,6 +898,486 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
# we should now be online
self.assertEqual(state.state, PresenceState.ONLINE)
@parameterized.expand(
# A list of tuples of 4 strings:
#
# * The presence state of device 1.
# * The presence state of device 2.
# * The expected user presence state after both devices have synced.
# * The expected user presence state after device 1 has idled.
# * The expected user presence state after device 2 has idled.
# * True to use workers, False a monolith.
[
(*cases, workers)
for workers in (False, True)
for cases in [
# If both devices have the same state, online should eventually idle.
# Otherwise, the state doesn't change.
(
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
),
# If the second device has a "lower" state it should fallback to it,
# except for "busy" which overrides.
(
PresenceState.BUSY,
PresenceState.ONLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.BUSY,
PresenceState.UNAVAILABLE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.BUSY,
PresenceState.OFFLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.ONLINE,
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
# If the second device has a "higher" state it should override.
(
PresenceState.ONLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.UNAVAILABLE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.OFFLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
]
],
name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}",
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_syncing_multi_device(
self,
dev_1_state: str,
dev_2_state: str,
expected_state_1: str,
expected_state_2: str,
expected_state_3: str,
test_with_workers: bool,
) -> None:
"""
Test the behaviour of multiple devices syncing at the same time.
Roughly the user's presence state should be set to the "highest" priority
of all the devices. When a device then goes offline its state should be
discarded and the next highest should win.
Note that these tests use the idle timer (and don't close the syncs), it
is unlikely that a *single* sync would last this long, but is close enough
to continually syncing with that current state.
"""
user_id = f"@test:{self.hs.config.server.server_name}"
# By default, we call /sync against the main process.
worker_presence_handler = self.presence_handler
if test_with_workers:
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()
# 1. Sync with the first device.
self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-1",
affect_presence=dev_1_state != PresenceState.OFFLINE,
presence_state=dev_1_state,
),
by=0.01,
)
# 2. Wait half the idle timer.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.1])
# 3. Sync with the second device.
self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-2",
affect_presence=dev_2_state != PresenceState.OFFLINE,
presence_state=dev_2_state,
),
by=0.01,
)
# 4. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
# When testing with workers, make another random sync (with any *different*
# user) to keep the process information from expiring.
#
# This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER.
if test_with_workers:
with self.get_success(
worker_presence_handler.user_syncing(
f"@other-user:{self.hs.config.server.server_name}",
"dev-3",
affect_presence=True,
presence_state=PresenceState.ONLINE,
),
by=0.01,
):
pass
# 5. Advance such that the first device should be discarded (the idle timer),
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.01])
# 6. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
# 7. Advance such that the second device should be discarded (half the idle timer),
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000 / 2)
self.reactor.pump([0.1])
# 8. The devices are still "syncing" (the sync context managers were never
# closed), so might idle.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_3)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_3)
@parameterized.expand(
# A list of tuples of 4 strings:
#
# * The presence state of device 1.
# * The presence state of device 2.
# * The expected user presence state after both devices have synced.
# * The expected user presence state after device 1 has stopped syncing.
# * True to use workers, False a monolith.
[
(*cases, workers)
for workers in (False, True)
for cases in [
# If both devices have the same state, nothing exciting should happen.
(
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
PresenceState.OFFLINE,
),
# If the second device has a "lower" state it should fallback to it,
# except for "busy" which overrides.
(
PresenceState.BUSY,
PresenceState.ONLINE,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.BUSY,
PresenceState.UNAVAILABLE,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.BUSY,
PresenceState.OFFLINE,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
),
(
PresenceState.ONLINE,
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.OFFLINE,
),
(
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
),
# If the second device has a "higher" state it should override.
(
PresenceState.ONLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.UNAVAILABLE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.OFFLINE,
PresenceState.BUSY,
PresenceState.BUSY,
PresenceState.BUSY,
),
(
PresenceState.UNAVAILABLE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.OFFLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
PresenceState.ONLINE,
),
(
PresenceState.OFFLINE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
PresenceState.UNAVAILABLE,
),
]
],
name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}",
)
@unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_non_syncing_multi_device(
self,
dev_1_state: str,
dev_2_state: str,
expected_state_1: str,
expected_state_2: str,
test_with_workers: bool,
) -> None:
"""
Test the behaviour of multiple devices syncing at the same time.
Roughly the user's presence state should be set to the "highest" priority
of all the devices. When a device then goes offline its state should be
discarded and the next highest should win.
Note that these tests use the idle timer (and don't close the syncs), it
is unlikely that a *single* sync would last this long, but is close enough
to continually syncing with that current state.
"""
user_id = f"@test:{self.hs.config.server.server_name}"
# By default, we call /sync against the main process.
worker_presence_handler = self.presence_handler
if test_with_workers:
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()
# 1. Sync with the first device.
sync_1 = self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-1",
affect_presence=dev_1_state != PresenceState.OFFLINE,
presence_state=dev_1_state,
),
by=0.1,
)
# 2. Sync with the second device.
sync_2 = self.get_success(
worker_presence_handler.user_syncing(
user_id,
"dev-2",
affect_presence=dev_2_state != PresenceState.OFFLINE,
presence_state=dev_2_state,
),
by=0.1,
)
# 3. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_1)
# 4. Disconnect the first device.
with sync_1:
pass
# 5. Advance such that the first device should be discarded (the sync timeout),
# then pump so _handle_timeouts function to called.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
self.reactor.pump([5])
# 6. Assert the expected presence state.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, expected_state_2)
# 7. Disconnect the second device.
with sync_2:
pass
# 8. Advance such that the second device should be discarded (the sync timeout),
# then pump so _handle_timeouts function to called.
if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY:
timeout = BUSY_ONLINE_TIMEOUT
else:
timeout = SYNC_ONLINE_TIMEOUT
self.reactor.advance(timeout / 1000)
self.reactor.pump([5])
# 9. There are no more devices, should be offline.
state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)
if test_with_workers:
state = self.get_success(
worker_presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)
def test_set_presence_from_syncing_keeps_status(self) -> None:
"""Test that presence set by syncing retains status message"""
status_msg = "I'm here!"
@@ -15,7 +15,7 @@ import base64
import logging
import os
from typing import Generator, List, Optional, cast
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, call, patch
import treq
from netaddr import IPSet
@@ -651,9 +651,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
# .well-known request fails.
self.reactor.pump((0.4,))
# now there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv1"
# now there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv1"), call(b"_matrix._tcp.testserv1")]
)
# we should fall back to a direct connection
@@ -737,9 +737,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
# .well-known request fails.
self.reactor.pump((0.4,))
# now there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv"
# now there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
)
# we should fall back to a direct connection
@@ -788,9 +788,12 @@ class MatrixFederationAgentTests(unittest.TestCase):
content=b'{ "m.server": "target-server" }',
)
# there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.target-server"
# there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[
call(b"_matrix-fed._tcp.target-server"),
call(b"_matrix._tcp.target-server"),
]
)
# now we should get a connection to the target server
@@ -878,9 +881,12 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.pump((0.1,))
# there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.target-server"
# there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[
call(b"_matrix-fed._tcp.target-server"),
call(b"_matrix._tcp.target-server"),
]
)
# now we should get a connection to the target server
@@ -942,9 +948,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
client_factory, expected_sni=b"testserv", content=b"NOT JSON"
)
# now there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv"
# now there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
)
# we should fall back to a direct connection
@@ -1016,14 +1022,14 @@ class MatrixFederationAgentTests(unittest.TestCase):
# there should be no requests
self.assertEqual(len(http_proto.requests), 0)
# and there should be a SRV lookup instead
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv"
# and there should be two SRV lookups instead
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
)
def test_get_hostname_srv(self) -> None:
"""
Test the behaviour when there is a single SRV record
Test the behaviour when there is a single SRV record for _matrix-fed.
"""
self.agent = self._make_agent()
@@ -1039,7 +1045,51 @@ class MatrixFederationAgentTests(unittest.TestCase):
# the request for a .well-known will have failed with a DNS lookup error.
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv"
b"_matrix-fed._tcp.testserv"
)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8443)
# make a test server, and wire up the client
http_server = self._make_connection(client_factory, expected_sni=b"testserv")
self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/foo/bar")
self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
# finish the request
request.finish()
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_get_hostname_srv_legacy(self) -> None:
"""
Test the behaviour when there is a single SRV record for _matrix.
"""
self.agent = self._make_agent()
# Return no entries for the _matrix-fed lookup, and a response for _matrix.
self.mock_resolver.resolve_service.side_effect = [
[],
[Server(host=b"srvtarget", port=8443)],
]
self.reactor.lookups["srvtarget"] = "1.2.3.4"
test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
# Nothing happened yet
self.assertNoResult(test_d)
# the request for a .well-known will have failed with a DNS lookup error.
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
)
# Make sure treq is trying to connect
@@ -1065,7 +1115,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
def test_get_well_known_srv(self) -> None:
"""Test the behaviour when the .well-known redirects to a place where there
is a SRV.
is a _matrix-fed SRV record.
"""
self.agent = self._make_agent()
@@ -1096,7 +1146,72 @@ class MatrixFederationAgentTests(unittest.TestCase):
# there should be a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.target-server"
b"_matrix-fed._tcp.target-server"
)
# now we should get a connection to the target of the SRV record
self.assertEqual(len(clients), 2)
(host, port, client_factory, _timeout, _bindAddress) = clients[1]
self.assertEqual(host, "5.6.7.8")
self.assertEqual(port, 8443)
# make a test server, and wire up the client
http_server = self._make_connection(
client_factory, expected_sni=b"target-server"
)
self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/foo/bar")
self.assertEqual(
request.requestHeaders.getRawHeaders(b"host"), [b"target-server"]
)
# finish the request
request.finish()
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_get_well_known_srv_legacy(self) -> None:
"""Test the behaviour when the .well-known redirects to a place where there
is a _matrix SRV record.
"""
self.agent = self._make_agent()
self.reactor.lookups["testserv"] = "1.2.3.4"
self.reactor.lookups["srvtarget"] = "5.6.7.8"
test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
# Nothing happened yet
self.assertNoResult(test_d)
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 443)
# Return no entries for the _matrix-fed lookup, and a response for _matrix.
self.mock_resolver.resolve_service.side_effect = [
[],
[Server(host=b"srvtarget", port=8443)],
]
self._handle_well_known_connection(
client_factory,
expected_sni=b"testserv",
content=b'{ "m.server": "target-server" }',
)
# there should be two SRV lookups
self.mock_resolver.resolve_service.assert_has_calls(
[
call(b"_matrix-fed._tcp.target-server"),
call(b"_matrix._tcp.target-server"),
]
)
# now we should get a connection to the target of the SRV record
@@ -1158,8 +1273,11 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.pump((0.4,))
# now there should have been a SRV lookup
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.xn--bcher-kva.com"
self.mock_resolver.resolve_service.assert_has_calls(
[
call(b"_matrix-fed._tcp.xn--bcher-kva.com"),
call(b"_matrix._tcp.xn--bcher-kva.com"),
]
)
# We should fall back to port 8448
@@ -1188,7 +1306,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.successResultOf(test_d)
def test_idna_srv_target(self) -> None:
"""test the behaviour when the target of a SRV record has idna chars"""
"""test the behaviour when the target of a _matrix-fed SRV record has idna chars"""
self.agent = self._make_agent()
self.mock_resolver.resolve_service.return_value = [
@@ -1204,7 +1322,57 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.assertNoResult(test_d)
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.xn--bcher-kva.com"
b"_matrix-fed._tcp.xn--bcher-kva.com"
)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8443)
# make a test server, and wire up the client
http_server = self._make_connection(
client_factory, expected_sni=b"xn--bcher-kva.com"
)
self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/foo/bar")
self.assertEqual(
request.requestHeaders.getRawHeaders(b"host"), [b"xn--bcher-kva.com"]
)
# finish the request
request.finish()
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_idna_srv_target_legacy(self) -> None:
"""test the behaviour when the target of a _matrix SRV record has idna chars"""
self.agent = self._make_agent()
# Return no entries for the _matrix-fed lookup, and a response for _matrix.
self.mock_resolver.resolve_service.side_effect = [
[],
[Server(host=b"xn--trget-3qa.com", port=8443)],
] # târget.com
self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4"
test_d = self._make_get_request(
b"matrix-federation://xn--bcher-kva.com/foo/bar"
)
# Nothing happened yet
self.assertNoResult(test_d)
self.mock_resolver.resolve_service.assert_has_calls(
[
call(b"_matrix-fed._tcp.xn--bcher-kva.com"),
call(b"_matrix._tcp.xn--bcher-kva.com"),
]
)
# Make sure treq is trying to connect
@@ -1394,7 +1562,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.assertIsNone(r.delegated_server)
def test_srv_fallbacks(self) -> None:
"""Test that other SRV results are tried if the first one fails."""
"""Test that other SRV results are tried if the first one fails for _matrix-fed SRV."""
self.agent = self._make_agent()
self.mock_resolver.resolve_service.return_value = [
@@ -1409,7 +1577,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.assertNoResult(test_d)
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix._tcp.testserv"
b"_matrix-fed._tcp.testserv"
)
# We should see an attempt to connect to the first server
@@ -1449,6 +1617,103 @@ class MatrixFederationAgentTests(unittest.TestCase):
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_srv_fallbacks_legacy(self) -> None:
"""Test that other SRV results are tried if the first one fails for _matrix SRV."""
self.agent = self._make_agent()
# Return no entries for the _matrix-fed lookup, and a response for _matrix.
self.mock_resolver.resolve_service.side_effect = [
[],
[
Server(host=b"target.com", port=8443),
Server(host=b"target.com", port=8444),
],
]
self.reactor.lookups["target.com"] = "1.2.3.4"
test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
# Nothing happened yet
self.assertNoResult(test_d)
self.mock_resolver.resolve_service.assert_has_calls(
[call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
)
# We should see an attempt to connect to the first server
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8443)
# Fonx the connection
client_factory.clientConnectionFailed(None, Exception("nope"))
# There's a 300ms delay in HostnameEndpoint
self.reactor.pump((0.4,))
# Hasn't failed yet
self.assertNoResult(test_d)
# We shouldnow see an attempt to connect to the second server
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8444)
# make a test server, and wire up the client
http_server = self._make_connection(client_factory, expected_sni=b"testserv")
self.assertEqual(len(http_server.requests), 1)
request = http_server.requests[0]
self.assertEqual(request.method, b"GET")
self.assertEqual(request.path, b"/foo/bar")
self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
# finish the request
request.finish()
self.reactor.pump((0.1,))
self.successResultOf(test_d)
def test_srv_no_fallback_to_legacy(self) -> None:
"""Test that _matrix SRV results are not tried if the _matrix-fed one fails."""
self.agent = self._make_agent()
# Return a failing entry for _matrix-fed.
self.mock_resolver.resolve_service.side_effect = [
[Server(host=b"target.com", port=8443)],
[],
]
self.reactor.lookups["target.com"] = "1.2.3.4"
test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
# Nothing happened yet
self.assertNoResult(test_d)
# Only the _matrix-fed is checked, _matrix is ignored.
self.mock_resolver.resolve_service.assert_called_once_with(
b"_matrix-fed._tcp.testserv"
)
# We should see an attempt to connect to the first server
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 8443)
# Fonx the connection
client_factory.clientConnectionFailed(None, Exception("nope"))
# There's a 300ms delay in HostnameEndpoint
self.reactor.pump((0.4,))
# Failed to resolve a server.
self.assertFailure(test_d, Exception)
class TestCachePeriodFromHeaders(unittest.TestCase):
def test_cache_control(self) -> None: