1
0

Merge remote-tracking branch 'origin/release-v1.83' into matrix-org-hotfixes

This commit is contained in:
Sean Quah
2023-05-02 16:33:00 +01:00
107 changed files with 2198 additions and 503 deletions
+5 -1
View File
@@ -15,9 +15,10 @@ _trial_temp*/
.DS_Store
__pycache__/
# We do want the poetry and cargo lockfile.
# We do want poetry, cargo and flake lockfiles.
!poetry.lock
!Cargo.lock
!flake.lock
# stuff that is likely to exist when you run a server locally
/*.db
@@ -38,6 +39,9 @@ __pycache__/
/.envrc
.direnv/
# For nix/devenv users
.devenv/
# IDEs
/.idea/
/.ropeproject/
+54
View File
@@ -1,3 +1,57 @@
Synapse 1.83.0rc1 (2023-05-02)
==============================
Features
--------
- Experimental support to recursively provide relations per [MSC3981](https://github.com/matrix-org/matrix-spec-proposals/pull/3981). ([\#15315](https://github.com/matrix-org/synapse/issues/15315))
- Experimental support for [MSC3970](https://github.com/matrix-org/matrix-spec-proposals/pull/3970): Scope transaction IDs to devices. ([\#15318](https://github.com/matrix-org/synapse/issues/15318))
- Add an [admin API endpoint](https://matrix-org.github.io/synapse/v1.83/admin_api/experimental_features.html) to support per-user feature flags. ([\#15344](https://github.com/matrix-org/synapse/issues/15344))
- Add a module API to send an HTTP push notification. ([\#15387](https://github.com/matrix-org/synapse/issues/15387))
- Add an [admin API endpoint](https://matrix-org.github.io/synapse/v1.83/admin_api/statistics.html#get-largest-rooms-by-size-in-database) to query the largest rooms by disk space used in the database. ([\#15482](https://github.com/matrix-org/synapse/issues/15482))
Bugfixes
--------
- Disable push rule evaluation for rooms excluded from sync. ([\#15361](https://github.com/matrix-org/synapse/issues/15361))
- Fix a long-standing bug where cached server key results which were directly fetched would not be properly re-used. ([\#15417](https://github.com/matrix-org/synapse/issues/15417))
- Fix a bug introduced in Synapse 1.73.0 where some experimental push rules were returned by default. ([\#15494](https://github.com/matrix-org/synapse/issues/15494))
Improved Documentation
----------------------
- Add Nginx loadbalancing example with sticky mxid for workers. ([\#15411](https://github.com/matrix-org/synapse/issues/15411))
- Update outdated development docs that mention restrictions in versions of SQLite that we no longer support. ([\#15498](https://github.com/matrix-org/synapse/issues/15498))
Internal Changes
----------------
- Speedup tests by caching HomeServerConfig instances. ([\#15284](https://github.com/matrix-org/synapse/issues/15284))
- Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar). ([\#15356](https://github.com/matrix-org/synapse/issues/15356))
- Always use multi-user device resync replication endpoints. ([\#15418](https://github.com/matrix-org/synapse/issues/15418))
- Add column `full_user_id` to tables `profiles` and `user_filters`. ([\#15458](https://github.com/matrix-org/synapse/issues/15458))
- Update support for [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983) to allow always returning fallback-keys in a `/keys/claim` request. ([\#15462](https://github.com/matrix-org/synapse/issues/15462))
- Improve type hints. ([\#15465](https://github.com/matrix-org/synapse/issues/15465), [\#15496](https://github.com/matrix-org/synapse/issues/15496), [\#15497](https://github.com/matrix-org/synapse/issues/15497))
- Support claiming more than one OTK at a time. ([\#15468](https://github.com/matrix-org/synapse/issues/15468))
- Bump types-pyyaml from 6.0.12.8 to 6.0.12.9. ([\#15471](https://github.com/matrix-org/synapse/issues/15471))
- Bump pyasn1-modules from 0.2.8 to 0.3.0. ([\#15473](https://github.com/matrix-org/synapse/issues/15473))
- Bump cryptography from 40.0.1 to 40.0.2. ([\#15474](https://github.com/matrix-org/synapse/issues/15474))
- Bump types-netaddr from 0.8.0.7 to 0.8.0.8. ([\#15475](https://github.com/matrix-org/synapse/issues/15475))
- Bump types-jsonschema from 4.17.0.6 to 4.17.0.7. ([\#15476](https://github.com/matrix-org/synapse/issues/15476))
- Ask bug reporters to provide logs as text. ([\#15479](https://github.com/matrix-org/synapse/issues/15479))
- Add a Nix flake for use as a development environment. ([\#15495](https://github.com/matrix-org/synapse/issues/15495))
- Bump anyhow from 1.0.70 to 1.0.71. ([\#15507](https://github.com/matrix-org/synapse/issues/15507))
- Bump types-pillow from 9.4.0.19 to 9.5.0.2. ([\#15508](https://github.com/matrix-org/synapse/issues/15508))
- Bump packaging from 23.0 to 23.1. ([\#15510](https://github.com/matrix-org/synapse/issues/15510))
- Bump types-requests from 2.28.11.16 to 2.29.0.0. ([\#15511](https://github.com/matrix-org/synapse/issues/15511))
- Bump setuptools-rust from 1.5.2 to 1.6.0. ([\#15512](https://github.com/matrix-org/synapse/issues/15512))
- Reduce the size of the HTTP connection pool for non-pushers. ([\#15514](https://github.com/matrix-org/synapse/issues/15514))
- Update the check_schema_delta script to account for when the schema version has been bumped locally. ([\#15466](https://github.com/matrix-org/synapse/issues/15466))
Synapse 1.82.0 (2023-04-25)
===========================
Generated
+2 -2
View File
@@ -13,9 +13,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.70"
version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
[[package]]
name = "arc-swap"
-1
View File
@@ -1 +0,0 @@
Speedup tests by caching HomeServerConfig instances.
-1
View File
@@ -1 +0,0 @@
Experimental support for MSC3970: Scope transaction IDs to devices.
-1
View File
@@ -1 +0,0 @@
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
-1
View File
@@ -1 +0,0 @@
Fix a long-standing bug where cached key results which were directly fetched would not be properly re-used.
-1
View File
@@ -1 +0,0 @@
Always use multi-user device resync replication endpoints.
-1
View File
@@ -1 +0,0 @@
Update support for [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983) to allow always returning fallback-keys in a `/keys/claim` request.
-1
View File
@@ -1 +0,0 @@
Improve type hints.
-1
View File
@@ -1 +0,0 @@
Update the check_schema_delta script to account for when the schema version has been bumped locally.
-1
View File
@@ -1 +0,0 @@
Bump types-pyyaml from 6.0.12.8 to 6.0.12.9.
-1
View File
@@ -1 +0,0 @@
Bump pyasn1-modules from 0.2.8 to 0.3.0.
-1
View File
@@ -1 +0,0 @@
Bump cryptography from 40.0.1 to 40.0.2.
-1
View File
@@ -1 +0,0 @@
Bump types-netaddr from 0.8.0.7 to 0.8.0.8.
-1
View File
@@ -1 +0,0 @@
Bump types-jsonschema from 4.17.0.6 to 4.17.0.7.
-1
View File
@@ -1 +0,0 @@
Ask bug reporters to provide logs as text.
+6
View File
@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.83.0~rc1) stable; urgency=medium
* New Synapse release 1.83.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 02 May 2023 15:56:38 +0100
matrix-synapse-py3 (1.82.0) stable; urgency=medium
* New Synapse release 1.82.0.
+1
View File
@@ -57,6 +57,7 @@
- [Account Validity](admin_api/account_validity.md)
- [Background Updates](usage/administration/admin_api/background_updates.md)
- [Event Reports](admin_api/event_reports.md)
- [Experimental Features](admin_api/experimental_features.md)
- [Media](admin_api/media_admin_api.md)
- [Purge History](admin_api/purge_history_api.md)
- [Register Users](admin_api/register_api.md)
+54
View File
@@ -0,0 +1,54 @@
# Experimental Features API
This API allows a server administrator to enable or disable some experimental features on a per-user
basis. Currently supported features are [msc3026](https://github.com/matrix-org/matrix-spec-proposals/pull/3026): busy
presence state enabled, [msc2654](https://github.com/matrix-org/matrix-spec-proposals/pull/2654): enable unread counts,
[msc3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881): enable remotely toggling push notifications
for another client, and [msc3967](https://github.com/matrix-org/matrix-spec-proposals/pull/3967): do not require
UIA when first uploading cross-signing keys.
To use it, you will need to authenticate by providing an `access_token`
for a server admin: see [Admin API](../usage/administration/admin_api/).
## Enabling/Disabling Features
This API allows a server administrator to enable experimental features for a given user. The request must
provide a body containing the user id and listing the features to enable/disable in the following format:
```json
{
"features": {
"msc3026":true,
"msc2654":true
}
}
```
where true is used to enable the feature, and false is used to disable the feature.
The API is:
```
PUT /_synapse/admin/v1/experimental_features/<user_id>
```
## Listing Enabled Features
To list which features are enabled/disabled for a given user send a request to the following API:
```
GET /_synapse/admin/v1/experimental_features/<user_id>
```
It will return a list of possible features and indicate whether they are enabled or disabled for the
user like so:
```json
{
"features": {
"msc3026": true,
"msc2654": true,
"msc3881": false,
"msc3967": false
}
}
```
+49
View File
@@ -81,3 +81,52 @@ The following fields are returned in the JSON response body:
- `user_id` - string - Fully-qualified user ID (ex. `@user:server.com`).
* `next_token` - integer - Opaque value used for pagination. See above.
* `total` - integer - Total number of users after filtering.
# Get largest rooms by size in database
Returns the 10 largest rooms and an estimate of how much space in the database
they are taking.
This does not include the size of any associated media associated with the room.
Returns an error on SQLite.
*Note:* This uses the planner statistics from PostgreSQL to do the estimates,
which means that the returned information can vary widely from reality. However,
it should be enough to get a rough idea of where database disk space is going.
The API is:
```
GET /_synapse/admin/v1/statistics/statistics/database/rooms
```
A response body like the following is returned:
```json
{
"rooms": [
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"estimated_size": 47325417353
}
],
}
```
**Response**
The following fields are returned in the JSON response body:
* `rooms` - An array of objects, sorted by largest room first. Objects contain
the following fields:
- `room_id` - string - The room ID.
- `estimated_size` - integer - Estimated disk space used in bytes by the room
in the database.
*Added in Synapse 1.83.0*
+1 -33
View File
@@ -155,43 +155,11 @@ def run_upgrade(
Boolean columns require special treatment, since SQLite treats booleans the
same as integers.
There are three separate aspects to this:
* Any new boolean column must be added to the `BOOLEAN_COLUMNS` list in
Any new boolean column must be added to the `BOOLEAN_COLUMNS` list in
`synapse/_scripts/synapse_port_db.py`. This tells the port script to cast
the integer value from SQLite to a boolean before writing the value to the
postgres database.
* Before SQLite 3.23, `TRUE` and `FALSE` were not recognised as constants by
SQLite, and the `IS [NOT] TRUE`/`IS [NOT] FALSE` operators were not
supported. This makes it necessary to avoid using `TRUE` and `FALSE`
constants in SQL commands.
For example, to insert a `TRUE` value into the database, write:
```python
txn.execute("INSERT INTO tbl(col) VALUES (?)", (True, ))
```
* Default values for new boolean columns present a particular
difficulty. Generally it is best to create separate schema files for
Postgres and SQLite. For example:
```sql
# in 00delta.sql.postgres:
ALTER TABLE tbl ADD COLUMN col BOOLEAN DEFAULT FALSE;
```
```sql
# in 00delta.sql.sqlite:
ALTER TABLE tbl ADD COLUMN col BOOLEAN DEFAULT 0;
```
Note that there is a particularly insidious failure mode here: the Postgres
flavour will be accepted by SQLite 3.22, but will give a column whose
default value is the **string** `"FALSE"` - which, when cast back to a boolean
in Python, evaluates to `True`.
## `event_id` global uniqueness
+64 -2
View File
@@ -325,8 +325,7 @@ load balancing can be done in different ways.
For `/sync` and `/initialSync` requests it will be more efficient if all
requests from a particular user are routed to a single instance. This can
be done e.g. in nginx via IP `hash $http_x_forwarded_for;` or via
`hash $http_authorization consistent;` which contains the users access token.
be done in reverse proxy by extracting username part from the users access token.
Admins may additionally wish to separate out `/sync`
requests that have a `since` query parameter from those that don't (and
@@ -335,6 +334,69 @@ when a user logs in on a new device and can be *very* resource intensive, so
isolating these requests will stop them from interfering with other users ongoing
syncs.
Example `nginx` configuration snippet that handles the cases above. This is just an
example and probably requires some changes according to your particular setup:
```nginx
# Choose sync worker based on the existence of "since" query parameter
map $arg_since $sync {
default synapse_sync;
'' synapse_initial_sync;
}
# Extract username from access token passed as URL parameter
map $arg_access_token $accesstoken_from_urlparam {
# Defaults to just passing back the whole accesstoken
default $arg_access_token;
# Try to extract username part from accesstoken URL parameter
"~syt_(?<username>.*?)_.*" $username;
}
# Extract username from access token passed as authorization header
map $http_authorization $mxid_localpart {
# Defaults to just passing back the whole accesstoken
default $http_authorization;
# Try to extract username part from accesstoken header
"~Bearer syt_(?<username>.*?)_.*" $username;
# if no authorization-header exist, try mapper for URL parameter "access_token"
"" $accesstoken_from_urlparam;
}
upstream synapse_initial_sync {
# Use the username mapper result for hash key
hash $mxid_localpart consistent;
server 127.0.0.1:8016;
server 127.0.0.1:8036;
}
upstream synapse_sync {
# Use the username mapper result for hash key
hash $mxid_localpart consistent;
server 127.0.0.1:8013;
server 127.0.0.1:8037;
server 127.0.0.1:8038;
server 127.0.0.1:8039;
}
# Sync initial/normal
location ~ ^/_matrix/client/(r0|v3)/sync$ {
proxy_pass http://$sync;
}
# Normal sync
location ~ ^/_matrix/client/(api/v1|r0|v3)/events$ {
proxy_pass http://synapse_sync;
}
# Initial_sync
location ~ ^/_matrix/client/(api/v1|r0|v3)/initialSync$ {
proxy_pass http://synapse_initial_sync;
}
location ~ ^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$ {
proxy_pass http://synapse_initial_sync;
}
```
Federation and client requests can be balanced via simple round robin.
The inbound federation transaction request `^/_matrix/federation/v1/send/`
Generated
+274
View File
@@ -0,0 +1,274 @@
{
"nodes": {
"devenv": {
"inputs": {
"flake-compat": "flake-compat",
"nix": "nix",
"nixpkgs": "nixpkgs",
"pre-commit-hooks": "pre-commit-hooks"
},
"locked": {
"lastModified": 1682534083,
"narHash": "sha256-lBgFaLNHRQtD3InZbBXzIS8HgZUgcPJ6jiqGa4FJPrk=",
"owner": "anoadragon453",
"repo": "devenv",
"rev": "9694bd0a845dd184d4468cc3d3461089aace787a",
"type": "github"
},
"original": {
"owner": "anoadragon453",
"ref": "anoa/fix_languages_python",
"repo": "devenv",
"type": "github"
}
},
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1682490133,
"narHash": "sha256-tR2Qx0uuk97WySpSSk4rGS/oH7xb5LykbjATcw1vw1I=",
"owner": "nix-community",
"repo": "fenix",
"rev": "4e9412753ab75ef0e038a5fe54a062fb44c27c6a",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1673956053,
"narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-utils": {
"locked": {
"lastModified": 1667395993,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"gitignore": {
"inputs": {
"nixpkgs": [
"devenv",
"pre-commit-hooks",
"nixpkgs"
]
},
"locked": {
"lastModified": 1660459072,
"narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=",
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "a20de23b925fd8264fd7fad6454652e142fd7f73",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "gitignore.nix",
"type": "github"
}
},
"lowdown-src": {
"flake": false,
"locked": {
"lastModified": 1633514407,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
"owner": "kristapsdz",
"repo": "lowdown",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
"type": "github"
},
"original": {
"owner": "kristapsdz",
"repo": "lowdown",
"type": "github"
}
},
"nix": {
"inputs": {
"lowdown-src": "lowdown-src",
"nixpkgs": [
"devenv",
"nixpkgs"
],
"nixpkgs-regression": "nixpkgs-regression"
},
"locked": {
"lastModified": 1676545802,
"narHash": "sha256-EK4rZ+Hd5hsvXnzSzk2ikhStJnD63odF7SzsQ8CuSPU=",
"owner": "domenkozar",
"repo": "nix",
"rev": "7c91803598ffbcfe4a55c44ac6d49b2cf07a527f",
"type": "github"
},
"original": {
"owner": "domenkozar",
"ref": "relaxed-flakes",
"repo": "nix",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1678875422,
"narHash": "sha256-T3o6NcQPwXjxJMn2shz86Chch4ljXgZn746c2caGxd8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "126f49a01de5b7e35a43fd43f891ecf6d3a51459",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-regression": {
"locked": {
"lastModified": 1643052045,
"narHash": "sha256-uGJ0VXIhWKGXxkeNnq4TvV3CIOkUJ3PAoLZ3HMzNVMw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1673800717,
"narHash": "sha256-SFHraUqLSu5cC6IxTprex/nTsI81ZQAtDvlBvGDWfnA=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "2f9fd351ec37f5d479556cd48be4ca340da59b8f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-22.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1682519441,
"narHash": "sha256-Vsq/8NOtvW1AoC6shCBxRxZyMQ+LhvPuJT6ltbzuv+Y=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7a32a141db568abde9bc389845949dc2a454dfd3",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "master",
"repo": "nixpkgs",
"type": "github"
}
},
"pre-commit-hooks": {
"inputs": {
"flake-compat": [
"devenv",
"flake-compat"
],
"flake-utils": "flake-utils",
"gitignore": "gitignore",
"nixpkgs": [
"devenv",
"nixpkgs"
],
"nixpkgs-stable": "nixpkgs-stable"
},
"locked": {
"lastModified": 1678376203,
"narHash": "sha256-3tyYGyC8h7fBwncLZy5nCUjTJPrHbmNwp47LlNLOHSM=",
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"rev": "1a20b9708962096ec2481eeb2ddca29ed747770a",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"type": "github"
}
},
"root": {
"inputs": {
"devenv": "devenv",
"fenix": "fenix",
"nixpkgs": "nixpkgs_2",
"systems": "systems"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1682426789,
"narHash": "sha256-UqnLmJESRZE0tTEaGbRAw05Hm19TWIPA+R3meqi5I4w=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "943d2a8a1ca15e8b28a1f51f5a5c135e3728da04",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}
+204
View File
@@ -0,0 +1,204 @@
# A nix flake that sets up a complete Synapse development environment. Dependencies
# for the SyTest (https://github.com/matrix-org/sytest) and Complement
# (https://github.com/matrix-org/complement) Matrix homeserver test suites are also
# installed automatically.
#
# You must have already installed nix (https://nixos.org) on your system to use this.
# nix can be installed on Linux or MacOS; NixOS is not required. Windows is not
# directly supported, but nix can be installed inside of WSL2 or even Docker
# containers. Please refer to https://nixos.org/download for details.
#
# You must also enable support for flakes in Nix. See the following for how to
# do so permanently: https://nixos.wiki/wiki/Flakes#Enable_flakes
#
# Usage:
#
# With nix installed, navigate to the directory containing this flake and run
# `nix develop --impure`. The `--impure` is necessary in order to store state
# locally from "services", such as PostgreSQL and Redis.
#
# You should now be dropped into a new shell with all programs and dependencies
# availabile to you!
#
# You can start up pre-configured, local PostgreSQL and Redis instances by
# running: `devenv up`. To stop them, use Ctrl-C.
#
# A PostgreSQL database called 'synapse' will be set up for you, along with
# a PostgreSQL user named 'synapse_user'.
# The 'host' can be found by running `echo $PGHOST` with the development
# shell activated. Use these values to configure your Synapse to connect
# to the local PostgreSQL database. You do not need to specify a password.
# https://matrix-org.github.io/synapse/latest/postgres
#
# All state (the venv, postgres and redis data and config) are stored in
# .devenv/state. Deleting a file from here and then re-entering the shell
# will recreate these files from scratch.
#
# You can exit the development shell by typing `exit`, or using Ctrl-D.
#
# If you would like this development environment to activate automatically
# upon entering this directory in your terminal, first install `direnv`
# (https://direnv.net/). Then run `echo 'use flake . --impure' >> .envrc` at
# the root of the Synapse repo. Finally, run `direnv allow .` to allow the
# contents of '.envrc' to run every time you enter this directory. Voilà!
{
inputs = {
# Use the master/unstable branch of nixpkgs. The latest stable, 22.11,
# does not contain 'perl536Packages.NetAsyncHTTP', needed by Sytest.
nixpkgs.url = "github:NixOS/nixpkgs/master";
# Output a development shell for x86_64/aarch64 Linux/Darwin (MacOS).
systems.url = "github:nix-systems/default";
# A development environment manager built on Nix. See https://devenv.sh.
# This is temporarily overridden to a fork that fixes a quirk between
# devenv's service and python language features. This can be removed
# when https://github.com/cachix/devenv/pull/559 is merged upstream.
devenv.url = "github:anoadragon453/devenv/anoa/fix_languages_python";
#devenv.url = "github:cachix/devenv/main";
# Rust toolchains and rust-analyzer nightly.
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, devenv, systems, ... } @ inputs:
let
forEachSystem = nixpkgs.lib.genAttrs (import systems);
in {
devShells = forEachSystem (system:
let
pkgs = nixpkgs.legacyPackages.${system};
in {
# Everything is configured via devenv - a nix module for creating declarative
# developer environments. See https://devenv.sh/reference/options/ for a list
# of all possible options.
default = devenv.lib.mkShell {
inherit inputs pkgs;
modules = [
{
# Make use of the Starship command prompt when this development environment
# is manually activated (via `nix develop --impure`).
# See https://starship.rs/ for details on the prompt itself.
starship.enable = true;
# Configure packages to install.
# Search for package names at https://search.nixos.org/packages?channel=unstable
packages = with pkgs; [
# Native dependencies for running Synapse.
icu
libffi
libjpeg
libpqxx
libwebp
libxml2
libxslt
sqlite
# Native dependencies for unit tests (SyTest also requires OpenSSL).
openssl
# Native dependencies for running Complement.
olm
];
# Install Python and manage a virtualenv with Poetry.
languages.python.enable = true;
languages.python.poetry.enable = true;
# Automatically activate the poetry virtualenv upon entering the shell.
languages.python.poetry.activate.enable = true;
# Install all extra Python dependencies; this is needed to run the unit
# tests and utilitise all Synapse features.
languages.python.poetry.install.arguments = ["--extras all"];
# Install the 'matrix-synapse' package from the local checkout.
languages.python.poetry.install.installRootPackage = true;
# This is a work-around for NixOS systems. NixOS is special in
# that you can have multiple versions of packages installed at
# once, including your libc linker!
#
# Some binaries built for Linux expect those to be in a certain
# filepath, but that is not the case on NixOS. In that case, we
# force compiling those binaries locally instead.
env.POETRY_INSTALLER_NO_BINARY = "ruff";
# Install dependencies for the additional programming languages
# involved with Synapse development.
#
# * Rust is used for developing and running Synapse.
# * Golang is needed to run the Complement test suite.
# * Perl is needed to run the SyTest test suite.
languages.go.enable = true;
languages.rust.enable = true;
languages.rust.version = "stable";
languages.perl.enable = true;
# Postgres is needed to run Synapse with postgres support and
# to run certain unit tests that require postgres.
services.postgres.enable = true;
# On the first invocation of `devenv up`, create a database for
# Synapse to store data in.
services.postgres.initdbArgs = ["--locale=C" "--encoding=UTF8"];
services.postgres.initialDatabases = [
{ name = "synapse"; }
];
# Create a postgres user called 'synapse_user' which has ownership
# over the 'synapse' database.
services.postgres.initialScript = ''
CREATE USER synapse_user;
ALTER DATABASE synapse OWNER TO synapse_user;
'';
# Redis is needed in order to run Synapse in worker mode.
services.redis.enable = true;
# Define the perl modules we require to run SyTest.
#
# This list was compiled by cross-referencing https://metacpan.org/
# with the modules defined in './cpanfile' and then finding the
# corresponding nix packages on https://search.nixos.org/packages.
#
# This was done until `./install-deps.pl --dryrun` produced no output.
env.PERL5LIB = "${with pkgs.perl536Packages; makePerlPath [
DBI
ClassMethodModifiers
CryptEd25519
DataDump
DBDPg
DigestHMAC
DigestSHA1
EmailAddressXS
EmailMIME
EmailSimple # required by Email::Mime
EmailMessageID # required by Email::Mime
EmailMIMEContentType # required by Email::Mime
TextUnidecode # required by Email::Mime
ModuleRuntime # required by Email::Mime
EmailMIMEEncodings # required by Email::Mime
FilePath
FileSlurper
Future
GetoptLong
HTTPMessage
IOAsync
IOAsyncSSL
IOSocketSSL
NetSSLeay
JSON
ListUtilsBy
ScalarListUtils
ModulePluggable
NetAsyncHTTP
MetricsAny # required by Net::Async::HTTP
NetAsyncHTTPServer
StructDumb
URI
YAMLLibYAML
]}";
}
];
};
});
};
}
-13
View File
@@ -21,20 +21,7 @@ files =
tests/,
build_rust.py
# Note: Better exclusion syntax coming in mypy > 0.910
# https://github.com/python/mypy/pull/11329
#
# For now, set the (?x) flag enable "verbose" regexes
# https://docs.python.org/3/library/re.html#re.X
exclude = (?x)
^(
|synapse/storage/databases/__init__.py
|synapse/storage/databases/main/cache.py
|synapse/storage/schema/
)$
[mypy-synapse.metrics._reactor_metrics]
disallow_untyped_defs = False
# This module imports select.epoll. That exists on Linux, but doesn't on macOS.
# See https://github.com/matrix-org/synapse/pull/11771.
warn_unused_ignores = False
Generated
+12 -12
View File
@@ -1593,14 +1593,14 @@ tests = ["Sphinx", "doubles", "flake8", "flake8-quotes", "gevent", "mock", "pyte
[[package]]
name = "packaging"
version = "23.0"
version = "23.1"
description = "Core utilities for Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "packaging-23.0-py3-none-any.whl", hash = "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2"},
{file = "packaging-23.0.tar.gz", hash = "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"},
{file = "packaging-23.1-py3-none-any.whl", hash = "sha256:994793af429502c4ea2ebf6bf664629d07c1a9fe974af92966e4b8d2df7edc61"},
{file = "packaging-23.1.tar.gz", hash = "sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f"},
]
[[package]]
@@ -2466,14 +2466,14 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (
[[package]]
name = "setuptools-rust"
version = "1.5.2"
version = "1.6.0"
description = "Setuptools Rust extension plugin"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "setuptools-rust-1.5.2.tar.gz", hash = "sha256:d8daccb14dc0eae1b6b6eb3ecef79675bd37b4065369f79c35393dd5c55652c7"},
{file = "setuptools_rust-1.5.2-py3-none-any.whl", hash = "sha256:8eb45851e34288f2296cd5ab9e924535ac1757318b730a13fe6836867843f206"},
{file = "setuptools-rust-1.6.0.tar.gz", hash = "sha256:c86e734deac330597998bfbc08da45187e6b27837e23bd91eadb320732392262"},
{file = "setuptools_rust-1.6.0-py3-none-any.whl", hash = "sha256:e28ae09fb7167c44ab34434eb49279307d611547cb56cb9789955cdb54a1aed9"},
]
[package.dependencies]
@@ -3058,14 +3058,14 @@ files = [
[[package]]
name = "types-pillow"
version = "9.4.0.19"
version = "9.5.0.2"
description = "Typing stubs for Pillow"
category = "dev"
optional = false
python-versions = "*"
files = [
{file = "types-Pillow-9.4.0.19.tar.gz", hash = "sha256:a04401181979049977e318dae4523ab5ae8246314fc68fcf50b043ac885a5468"},
{file = "types_Pillow-9.4.0.19-py3-none-any.whl", hash = "sha256:b55f2508be21e68a39f0a41830f1f1725aba0888e727e2eccd253c78cd5357a5"},
{file = "types-Pillow-9.5.0.2.tar.gz", hash = "sha256:b3f9f621f259566c19c1deca21901017c8b1e3e200ed2e49e0a2d83c0a5175db"},
{file = "types_Pillow-9.5.0.2-py3-none-any.whl", hash = "sha256:58fdebd0ffa2353ecccdd622adde23bce89da5c0c8b96c34f2d1eca7b7e42d0e"},
]
[[package]]
@@ -3109,14 +3109,14 @@ files = [
[[package]]
name = "types-requests"
version = "2.28.11.16"
version = "2.29.0.0"
description = "Typing stubs for requests"
category = "dev"
optional = false
python-versions = "*"
files = [
{file = "types-requests-2.28.11.16.tar.gz", hash = "sha256:9d4002056df7ebc4ec1f28fd701fba82c5c22549c4477116cb2656aa30ace6db"},
{file = "types_requests-2.28.11.16-py3-none-any.whl", hash = "sha256:a86921028335fdcc3aaf676c9d3463f867db6af2303fc65aa309b13ae1e6dd53"},
{file = "types-requests-2.29.0.0.tar.gz", hash = "sha256:c86f4a955d943d2457120dbe719df24ef0924e11177164d10a0373cf311d7b4d"},
{file = "types_requests-2.29.0.0-py3-none-any.whl", hash = "sha256:4cf6e323e856c779fbe8815bb977a5bf5d6c5034713e4c17ff2a9a20610f5b27"},
]
[package.dependencies]
+1 -1
View File
@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
version = "1.82.0"
version = "1.83.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
+4 -1
View File
@@ -568,7 +568,10 @@ impl FilteredPushRules {
.filter(|rule| {
// Ignore disabled experimental push rules
if !self.msc1767_enabled && rule.rule_id.contains("org.matrix.msc1767") {
if !self.msc1767_enabled
&& (rule.rule_id.contains("org.matrix.msc1767")
|| rule.rule_id.contains("org.matrix.msc3933"))
{
return false;
}
+5 -1
View File
@@ -54,7 +54,7 @@ from synapse.logging.context import (
)
from synapse.notifier import ReplicationNotifier
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import PushRuleStore
from synapse.storage.databases.main import FilteringWorkerStore, PushRuleStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
@@ -69,6 +69,7 @@ from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.profile import ProfileWorkerStore
from synapse.storage.databases.main.pusher import (
PusherBackgroundUpdatesStore,
PusherWorkerStore,
@@ -124,6 +125,7 @@ BOOLEAN_COLUMNS = {
"users": ["shadow_banned", "approved"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],
"per_user_experimental_features": ["enabled"],
}
@@ -229,6 +231,8 @@ class Store(
EndToEndRoomKeyBackgroundStore,
StatsStore,
AccountDataWorkerStore,
FilteringWorkerStore,
ProfileWorkerStore,
PushRuleStore,
PusherWorkerStore,
PusherBackgroundUpdatesStore,
+2 -4
View File
@@ -170,11 +170,9 @@ class Filtering:
result = await self.store.get_user_filter(user_localpart, filter_id)
return FilterCollection(self._hs, result)
def add_user_filter(
self, user_localpart: str, user_filter: JsonDict
) -> Awaitable[int]:
def add_user_filter(self, user_id: UserID, user_filter: JsonDict) -> Awaitable[int]:
self.check_valid_filter(user_filter)
return self.store.add_user_filter(user_localpart, user_filter)
return self.store.add_user_filter(user_id, user_filter)
# TODO(paul): surely we should probably add a delete_user_filter or
# replace_user_filter at some point? There's no REST API specified for
+22 -9
View File
@@ -442,8 +442,10 @@ class ApplicationServiceApi(SimpleHttpClient):
return False
async def claim_client_keys(
self, service: "ApplicationService", query: List[Tuple[str, str, str]]
) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
self, service: "ApplicationService", query: List[Tuple[str, str, str, int]]
) -> Tuple[
Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str, int]]
]:
"""Claim one time keys from an application service.
Note that any error (including a timeout) is treated as the application
@@ -469,8 +471,10 @@ class ApplicationServiceApi(SimpleHttpClient):
# Create the expected payload shape.
body: Dict[str, Dict[str, List[str]]] = {}
for user_id, device, algorithm in query:
body.setdefault(user_id, {}).setdefault(device, []).append(algorithm)
for user_id, device, algorithm, count in query:
body.setdefault(user_id, {}).setdefault(device, []).extend(
[algorithm] * count
)
uri = f"{service.url}/_matrix/app/unstable/org.matrix.msc3983/keys/claim"
try:
@@ -493,11 +497,20 @@ class ApplicationServiceApi(SimpleHttpClient):
# or if some are still missing.
#
# TODO This places a lot of faith in the response shape being correct.
missing = [
(user_id, device, algorithm)
for user_id, device, algorithm in query
if algorithm not in response.get(user_id, {}).get(device, [])
]
missing = []
for user_id, device, algorithm, count in query:
# Count the number of keys in the response for this algorithm by
# checking which key IDs start with the algorithm. This uses that
# True == 1 in Python to generate a count.
response_count = sum(
key_id.startswith(f"{algorithm}:")
for key_id in response.get(user_id, {}).get(device, {})
)
count -= response_count
# If the appservice responds with fewer keys than requested, then
# consider the request unfulfilled.
if count > 0:
missing.append((user_id, device, algorithm, count))
return response, missing
+5
View File
@@ -192,5 +192,10 @@ class ExperimentalConfig(Config):
# MSC2659: Application service ping endpoint
self.msc2659_enabled = experimental.get("msc2659_enabled", False)
# MSC3981: Recurse relations
self.msc3981_recurse_relations = experimental.get(
"msc3981_recurse_relations", False
)
# MSC3970: Scope transaction IDs to devices
self.msc3970_enabled = experimental.get("msc3970_enabled", False)
+48 -1
View File
@@ -235,7 +235,10 @@ class FederationClient(FederationBase):
)
async def claim_client_keys(
self, destination: str, content: JsonDict, timeout: Optional[int]
self,
destination: str,
query: Dict[str, Dict[str, Dict[str, int]]],
timeout: Optional[int],
) -> JsonDict:
"""Claims one-time keys for a device hosted on a remote server.
@@ -247,6 +250,50 @@ class FederationClient(FederationBase):
The JSON object from the response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
# Convert the query with counts into a stable and unstable query and check
# if attempting to claim more than 1 OTK.
content: Dict[str, Dict[str, str]] = {}
unstable_content: Dict[str, Dict[str, List[str]]] = {}
use_unstable = False
for user_id, one_time_keys in query.items():
for device_id, algorithms in one_time_keys.items():
if any(count > 1 for count in algorithms.values()):
use_unstable = True
if algorithms:
# For the stable query, choose only the first algorithm.
content.setdefault(user_id, {})[device_id] = next(iter(algorithms))
# For the unstable query, repeat each algorithm by count, then
# splat those into chain to get a flattened list of all algorithms.
#
# Converts from {"algo1": 2, "algo2": 2} to ["algo1", "algo1", "algo2"].
unstable_content.setdefault(user_id, {})[device_id] = list(
itertools.chain(
*(
itertools.repeat(algorithm, count)
for algorithm, count in algorithms.items()
)
)
)
if use_unstable:
try:
return await self.transport_layer.claim_client_keys_unstable(
destination, unstable_content, timeout
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not is_unknown_endpoint(e):
raise
logger.debug(
"Couldn't claim client keys with the unstable API, falling back to the v1 API"
)
else:
logger.debug("Skipping unstable claim client keys API")
return await self.transport_layer.claim_client_keys(
destination, content, timeout
)
+1 -6
View File
@@ -1005,13 +1005,8 @@ class FederationServer(FederationBase):
@trace
async def on_claim_client_keys(
self, origin: str, content: JsonDict, always_include_fallback_keys: bool
self, query: List[Tuple[str, str, str, int]], always_include_fallback_keys: bool
) -> Dict[str, Any]:
query = []
for user_id, device_keys in content.get("one_time_keys", {}).items():
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self._e2e_keys_handler.claim_local_one_time_keys(
query, always_include_fallback_keys=always_include_fallback_keys
+46 -3
View File
@@ -650,10 +650,10 @@ class TransportLayerClient:
Response:
{
"device_keys": {
"one_time_keys": {
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": "<key_base64>"
"<algorithm>:<key_id>": <OTK JSON>
}
}
}
@@ -669,7 +669,50 @@ class TransportLayerClient:
path = _create_v1_path("/user/keys/claim")
return await self.client.post_json(
destination=destination, path=path, data=query_content, timeout=timeout
destination=destination,
path=path,
data={"one_time_keys": query_content},
timeout=timeout,
)
async def claim_client_keys_unstable(
self, destination: str, query_content: JsonDict, timeout: Optional[int]
) -> JsonDict:
"""Claim one-time keys for a list of devices hosted on a remote server.
Request:
{
"one_time_keys": {
"<user_id>": {
"<device_id>": {"<algorithm>": <count>}
}
}
}
Response:
{
"one_time_keys": {
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": <OTK JSON>
}
}
}
}
Args:
destination: The server to query.
query_content: The user ids to query.
Returns:
A dict containing the one-time keys.
"""
path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/user/keys/claim")
return await self.client.post_json(
destination=destination,
path=path,
data={"one_time_keys": query_content},
timeout=timeout,
)
async def get_missing_events(
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import Counter
from typing import (
TYPE_CHECKING,
Dict,
@@ -577,16 +578,23 @@ class FederationClientKeysClaimServlet(BaseFederationServerServlet):
async def on_POST(
self, origin: str, content: JsonDict, query: Dict[bytes, List[bytes]]
) -> Tuple[int, JsonDict]:
# Generate a count for each algorithm, which is hard-coded to 1.
key_query: List[Tuple[str, str, str, int]] = []
for user_id, device_keys in content.get("one_time_keys", {}).items():
for device_id, algorithm in device_keys.items():
key_query.append((user_id, device_id, algorithm, 1))
response = await self.handler.on_claim_client_keys(
origin, content, always_include_fallback_keys=False
key_query, always_include_fallback_keys=False
)
return 200, response
class FederationUnstableClientKeysClaimServlet(BaseFederationServerServlet):
"""
Identical to the stable endpoint (FederationClientKeysClaimServlet) except it
always includes fallback keys in the response.
Identical to the stable endpoint (FederationClientKeysClaimServlet) except
it allows for querying for multiple OTKs at once and always includes fallback
keys in the response.
"""
PREFIX = FEDERATION_UNSTABLE_PREFIX
@@ -596,8 +604,16 @@ class FederationUnstableClientKeysClaimServlet(BaseFederationServerServlet):
async def on_POST(
self, origin: str, content: JsonDict, query: Dict[bytes, List[bytes]]
) -> Tuple[int, JsonDict]:
# Generate a count for each algorithm.
key_query: List[Tuple[str, str, str, int]] = []
for user_id, device_keys in content.get("one_time_keys", {}).items():
for device_id, algorithms in device_keys.items():
counts = Counter(algorithms)
for algorithm, count in counts.items():
key_query.append((user_id, device_id, algorithm, count))
response = await self.handler.on_claim_client_keys(
origin, content, always_include_fallback_keys=True
key_query, always_include_fallback_keys=True
)
return 200, response
@@ -805,6 +821,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationClientKeysQueryServlet,
FederationUserDevicesQueryServlet,
FederationClientKeysClaimServlet,
FederationUnstableClientKeysClaimServlet,
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
FederationVersionServlet,
+8 -6
View File
@@ -841,8 +841,10 @@ class ApplicationServicesHandler:
return True
async def claim_e2e_one_time_keys(
self, query: Iterable[Tuple[str, str, str]]
) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
self, query: Iterable[Tuple[str, str, str, int]]
) -> Tuple[
Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str, int]]
]:
"""Claim one time keys from application services.
Users which are exclusively owned by an application service are sent a
@@ -863,18 +865,18 @@ class ApplicationServicesHandler:
services = self.store.get_app_services()
# Partition the users by appservice.
query_by_appservice: Dict[str, List[Tuple[str, str, str]]] = {}
query_by_appservice: Dict[str, List[Tuple[str, str, str, int]]] = {}
missing = []
for user_id, device, algorithm in query:
for user_id, device, algorithm, count in query:
if not self.store.get_if_app_services_interested_in_user(user_id):
missing.append((user_id, device, algorithm))
missing.append((user_id, device, algorithm, count))
continue
# Find the associated appservice.
for service in services:
if service.is_exclusive_user(user_id):
query_by_appservice.setdefault(service.id, []).append(
(user_id, device, algorithm)
(user_id, device, algorithm, count)
)
continue
+21 -10
View File
@@ -564,7 +564,7 @@ class E2eKeysHandler:
async def claim_local_one_time_keys(
self,
local_query: List[Tuple[str, str, str]],
local_query: List[Tuple[str, str, str, int]],
always_include_fallback_keys: bool,
) -> Iterable[Dict[str, Dict[str, Dict[str, JsonDict]]]]:
"""Claim one time keys for local users.
@@ -581,6 +581,12 @@ class E2eKeysHandler:
An iterable of maps of user ID -> a map device ID -> a map of key ID -> JSON bytes.
"""
# Cap the number of OTKs that can be claimed at once to avoid abuse.
local_query = [
(user_id, device_id, algorithm, min(count, 5))
for user_id, device_id, algorithm, count in local_query
]
otk_results, not_found = await self.store.claim_e2e_one_time_keys(local_query)
# If the application services have not provided any keys via the C-S
@@ -607,7 +613,7 @@ class E2eKeysHandler:
# from the appservice for that user ID / device ID. If it is found,
# check if any of the keys match the requested algorithm & are a
# fallback key.
for user_id, device_id, algorithm in local_query:
for user_id, device_id, algorithm, _count in local_query:
# Check if the appservice responded for this query.
as_result = appservice_results.get(user_id, {}).get(device_id, {})
found_otk = False
@@ -630,13 +636,17 @@ class E2eKeysHandler:
.get(device_id, {})
.keys()
)
# Note that it doesn't make sense to request more than 1 fallback key
# per (user_id, device_id, algorithm).
fallback_query.append((user_id, device_id, algorithm, mark_as_used))
else:
# All fallback keys get marked as used.
fallback_query = [
# Note that it doesn't make sense to request more than 1 fallback key
# per (user_id, device_id, algorithm).
(user_id, device_id, algorithm, True)
for user_id, device_id, algorithm in not_found
for user_id, device_id, algorithm, count in not_found
]
# For each user that does not have a one-time keys available, see if
@@ -650,18 +660,19 @@ class E2eKeysHandler:
@trace
async def claim_one_time_keys(
self,
query: Dict[str, Dict[str, Dict[str, str]]],
query: Dict[str, Dict[str, Dict[str, int]]],
timeout: Optional[int],
always_include_fallback_keys: bool,
) -> JsonDict:
local_query: List[Tuple[str, str, str]] = []
remote_queries: Dict[str, Dict[str, Dict[str, str]]] = {}
local_query: List[Tuple[str, str, str, int]] = []
remote_queries: Dict[str, Dict[str, Dict[str, Dict[str, int]]]] = {}
for user_id, one_time_keys in query.get("one_time_keys", {}).items():
for user_id, one_time_keys in query.items():
# we use UserID.from_string to catch invalid user ids
if self.is_mine(UserID.from_string(user_id)):
for device_id, algorithm in one_time_keys.items():
local_query.append((user_id, device_id, algorithm))
for device_id, algorithms in one_time_keys.items():
for algorithm, count in algorithms.items():
local_query.append((user_id, device_id, algorithm, count))
else:
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
@@ -692,7 +703,7 @@ class E2eKeysHandler:
device_keys = remote_queries[destination]
try:
remote_result = await self.federation.claim_client_keys(
destination, {"one_time_keys": device_keys}, timeout=timeout
destination, device_keys, timeout=timeout
)
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
+2 -6
View File
@@ -178,9 +178,7 @@ class ProfileHandler:
authenticated_entity=requester.authenticated_entity,
)
await self.store.set_profile_displayname(
target_user.localpart, displayname_to_set
)
await self.store.set_profile_displayname(target_user, displayname_to_set)
profile = await self.store.get_profileinfo(target_user.localpart)
await self.user_directory_handler.handle_local_profile_change(
@@ -272,9 +270,7 @@ class ProfileHandler:
target_user, authenticated_entity=requester.authenticated_entity
)
await self.store.set_profile_avatar_url(
target_user.localpart, avatar_url_to_set
)
await self.store.set_profile_avatar_url(target_user, avatar_url_to_set)
profile = await self.store.get_profileinfo(target_user.localpart)
await self.user_directory_handler.handle_local_profile_change(
+3
View File
@@ -85,6 +85,7 @@ class RelationsHandler:
event_id: str,
room_id: str,
pagin_config: PaginationConfig,
recurse: bool,
include_original_event: bool,
relation_type: Optional[str] = None,
event_type: Optional[str] = None,
@@ -98,6 +99,7 @@ class RelationsHandler:
event_id: Fetch events that relate to this event ID.
room_id: The room the event belongs to.
pagin_config: The pagination config rules to apply, if any.
recurse: Whether to recursively find relations.
include_original_event: Whether to include the parent event.
relation_type: Only fetch events with this relation type, if given.
event_type: Only fetch events with this event type, if given.
@@ -132,6 +134,7 @@ class RelationsHandler:
direction=pagin_config.direction,
from_token=pagin_config.from_token,
to_token=pagin_config.to_token,
recurse=recurse,
)
events = await self._main_store.get_events_as_list(
+3 -11
View File
@@ -768,6 +768,7 @@ class SimpleHttpClient(BaseHttpClient):
request if it were otherwise caught in a blacklist.
use_proxy: Whether proxy settings should be discovered and used
from conventional environment variables.
connection_pool: The connection pool to use for this client's agent.
"""
def __init__(
@@ -777,6 +778,7 @@ class SimpleHttpClient(BaseHttpClient):
ip_whitelist: Optional[IPSet] = None,
ip_blacklist: Optional[IPSet] = None,
use_proxy: bool = False,
connection_pool: Optional[HTTPConnectionPool] = None,
):
super().__init__(hs, treq_args=treq_args)
self._ip_whitelist = ip_whitelist
@@ -789,22 +791,12 @@ class SimpleHttpClient(BaseHttpClient):
self.reactor, self._ip_whitelist, self._ip_blacklist
)
# the pusher makes lots of concurrent SSL connections to Sygnal, and tends to
# do so in batches, so we need to allow the pool to keep lots of idle
# connections around.
pool = HTTPConnectionPool(self.reactor)
# XXX: The justification for using the cache factor here is that larger
# instances will need both more cache and more connections.
# Still, this should probably be a separate dial
pool.maxPersistentPerHost = max(int(100 * hs.config.caches.global_factor), 5)
pool.cachedConnectionTimeout = 2 * 60
self.agent: IAgent = ProxyAgent(
self.reactor,
hs.get_reactor(),
connectTimeout=15,
contextFactory=self.hs.get_http_client_context_factory(),
pool=pool,
pool=connection_pool,
use_proxy=use_proxy,
)
+46
View File
@@ -105,6 +105,7 @@ from synapse.module_api.callbacks.spamchecker_callbacks import (
USER_MAY_SEND_3PID_INVITE_CALLBACK,
SpamCheckerModuleApiCallbacks,
)
from synapse.push.httppusher import HttpPusher
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import (
@@ -248,6 +249,7 @@ class ModuleApi:
self._registration_handler = hs.get_registration_handler()
self._send_email_handler = hs.get_send_email_handler()
self._push_rules_handler = hs.get_push_rules_handler()
self._pusherpool = hs.get_pusherpool()
self._device_handler = hs.get_device_handler()
self.custom_template_dir = hs.config.server.custom_template_directory
self._callbacks = hs.get_module_api_callbacks()
@@ -1225,6 +1227,50 @@ class ModuleApi:
await self._clock.sleep(seconds)
async def send_http_push_notification(
self,
user_id: str,
device_id: Optional[str],
content: JsonDict,
tweaks: Optional[JsonMapping] = None,
default_payload: Optional[JsonMapping] = None,
) -> Dict[str, bool]:
"""Send an HTTP push notification that is forwarded to the registered push gateway
for the specified user/device.
Added in Synapse v1.82.0.
Args:
user_id: The user ID to send the push notification to.
device_id: The device ID of the device where to send the push notification. If `None`,
the notification will be sent to all registered HTTP pushers of the user.
content: A dict of values that will be put in the `notification` field of the push
(cf Push Gateway spec). `devices` field will be overrided if included.
tweaks: A dict of `tweaks` that will be inserted in the `devices` section, cf spec.
default_payload: default payload to add in `devices[0].data.default_payload`.
This will be merged (and override if some matching values already exist there)
with existing `default_payload`.
Returns:
a dict reprensenting the status of the push per device ID
"""
status = {}
if user_id in self._pusherpool.pushers:
for p in self._pusherpool.pushers[user_id].values():
if isinstance(p, HttpPusher) and (
not device_id or p.device_id == device_id
):
res = await p.dispatch_push(content, tweaks, default_payload)
# Check if the push was successful and no pushers were rejected.
sent = res is not False and not res
# This is mainly to accomodate mypy
# device_id should never be empty after the `set_device_id_for_pushers`
# background job has been properly run.
if p.device_id:
status[p.device_id] = sent
return status
async def send_mail(
self,
recipient: str,
+1
View File
@@ -326,6 +326,7 @@ class BulkPushRuleEvaluator:
if (
not event.internal_metadata.is_notifiable()
or event.internal_metadata.is_historical()
or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync
):
# Push rules for events that aren't notifiable can't be processed by this and
# we want to skip push notification actions for historical messages
+112 -75
View File
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from prometheus_client import Counter
@@ -27,6 +27,7 @@ from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
from synapse.types import JsonDict, JsonMapping
from . import push_tools
@@ -56,7 +57,7 @@ http_badges_failed_counter = Counter(
)
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> JsonMapping:
"""
Converts a list of actions into a `tweaks` dict (which can then be passed to
the push gateway).
@@ -101,6 +102,7 @@ class HttpPusher(Pusher):
self._storage_controllers = self.hs.get_storage_controllers()
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
self.device_id = pusher_config.device_id
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
@@ -143,7 +145,8 @@ class HttpPusher(Pusher):
)
self.url = url
self.http_client = hs.get_proxied_blacklisted_http_client()
self.http_client = hs.get_pusher_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url["url"]
@@ -329,7 +332,7 @@ class HttpPusher(Pusher):
event = await self.store.get_event(push_action.event_id, allow_none=True)
if event is None:
return True # It's been redacted
rejected = await self.dispatch_push(event, tweaks, badge)
rejected = await self.dispatch_push_event(event, tweaks, badge)
if rejected is False:
return False
@@ -347,9 +350,83 @@ class HttpPusher(Pusher):
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True
async def _build_notification_dict(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
) -> Dict[str, Any]:
async def dispatch_push(
self,
content: JsonDict,
tweaks: Optional[JsonMapping] = None,
default_payload: Optional[JsonMapping] = None,
) -> Union[bool, List[str]]:
"""Send a notification to the registered push gateway, with `content` being
the content of the `notification` top property specified in the spec.
Note that the `devices` property will be added with device-specific
information for this pusher.
Args:
content: the content
tweaks: tweaks to add into the `devices` section
default_payload: default payload to add in `devices[0].data.default_payload`.
This will be merged (and override if some matching values already exist there)
with existing `default_payload`.
Returns:
False if an error occured when calling the push gateway, or an array of
rejected push keys otherwise. If this array is empty, the push fully
succeeded.
"""
content = content.copy()
data = self.data_minus_url.copy()
if default_payload:
data.setdefault("default_payload", {}).update(default_payload)
device = {
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": data,
}
if tweaks:
device["tweaks"] = tweaks
content["devices"] = [device]
try:
resp = await self.http_client.post_json_get_json(
self.url, {"notification": content}
)
except Exception as e:
logger.warning(
"Failed to push data to %s: %s %s",
self.name,
type(e),
e,
)
return False
rejected = []
if "rejected" in resp:
rejected = resp["rejected"]
return rejected
async def dispatch_push_event(
self,
event: EventBase,
tweaks: JsonMapping,
badge: int,
) -> Union[bool, List[str]]:
"""Send a notification to the registered push gateway by building it
from an event.
Args:
event: the event
tweaks: tweaks to add into the `devices` section, used to decide the
push priority
badge: unread count to send with the push notification
Returns:
False if an error occured when calling the push gateway, or an array of
rejected push keys otherwise. If this array is empty, the push fully
succeeded.
"""
priority = "low"
if (
event.type == EventTypes.Encrypted
@@ -363,30 +440,20 @@ class HttpPusher(Pusher):
# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
d: Dict[str, Any] = {
"notification": {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
}
],
}
content: JsonDict = {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
}
return d
# event_id_only doesn't include the tweaks, so override them.
tweaks = {}
else:
ctx = await push_tools.get_context_for_event(
self._storage_controllers, event, self.user_id
)
ctx = await push_tools.get_context_for_event(
self._storage_controllers, event, self.user_id
)
d = {
"notification": {
content = {
"id": event.event_id, # deprecated: remove soon
"event_id": event.event_id,
"room_id": event.room_id,
@@ -397,57 +464,27 @@ class HttpPusher(Pusher):
"unread": badge,
# 'missed_calls': 2
},
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
"tweaks": tweaks,
}
],
}
}
if event.type == "m.room.member" and event.is_state():
d["notification"]["membership"] = event.content["membership"]
d["notification"]["user_is_target"] = event.state_key == self.user_id
if self.hs.config.push.push_include_content and event.content:
d["notification"]["content"] = event.content
if event.type == "m.room.member" and event.is_state():
content["membership"] = event.content["membership"]
content["user_is_target"] = event.state_key == self.user_id
if self.hs.config.push.push_include_content and event.content:
content["content"] = event.content
# We no longer send aliases separately, instead, we send the human
# readable name of the room, which may be an alias.
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
d["notification"]["sender_display_name"] = ctx["sender_display_name"]
if "name" in ctx and len(ctx["name"]) > 0:
d["notification"]["room_name"] = ctx["name"]
# We no longer send aliases separately, instead, we send the human
# readable name of the room, which may be an alias.
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
content["sender_display_name"] = ctx["sender_display_name"]
if "name" in ctx and len(ctx["name"]) > 0:
content["room_name"] = ctx["name"]
return d
res = await self.dispatch_push(content, tweaks)
async def dispatch_push(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
) -> Union[bool, Iterable[str]]:
notification_dict = await self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
return []
try:
resp = await self.http_client.post_json_get_json(
self.url, notification_dict
)
except Exception as e:
logger.warning(
"Failed to push event %s to %s: %s %s",
event.event_id,
self.name,
type(e),
e,
)
return False
rejected = []
if "rejected" in resp:
rejected = resp["rejected"]
if not rejected:
# If the push is successful and none are rejected, update the badge count.
if res is not False and not res:
self.badge_count_last_call = badge
return rejected
return res
async def _send_badge(self, badge: int) -> None:
"""
+7 -1
View File
@@ -39,6 +39,7 @@ from synapse.rest.admin.event_reports import (
EventReportDetailRestServlet,
EventReportsRestServlet,
)
from synapse.rest.admin.experimental_features import ExperimentalFeaturesRestServlet
from synapse.rest.admin.federation import (
DestinationMembershipRestServlet,
DestinationResetConnectionRestServlet,
@@ -68,7 +69,10 @@ from synapse.rest.admin.rooms import (
RoomTimestampToEventRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
from synapse.rest.admin.statistics import (
LargestRoomsStatistics,
UserMediaStatisticsRestServlet,
)
from synapse.rest.admin.username_available import UsernameAvailableRestServlet
from synapse.rest.admin.users import (
AccountDataRestServlet,
@@ -259,6 +263,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
UserRestServletV2(hs).register(http_server)
UsersRestServletV2(hs).register(http_server)
UserMediaStatisticsRestServlet(hs).register(http_server)
LargestRoomsStatistics(hs).register(http_server)
EventReportDetailRestServlet(hs).register(http_server)
EventReportsRestServlet(hs).register(http_server)
AccountDataRestServlet(hs).register(http_server)
@@ -288,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
ExperimentalFeaturesRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(
+119
View File
@@ -0,0 +1,119 @@
# Copyright 2023 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Tuple
from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.rest.admin import admin_patterns, assert_requester_is_admin
from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
class ExperimentalFeature(str, Enum):
"""
Currently supported per-user features
"""
MSC3026 = "msc3026"
MSC2654 = "msc2654"
MSC3881 = "msc3881"
MSC3967 = "msc3967"
class ExperimentalFeaturesRestServlet(RestServlet):
"""
Enable or disable experimental features for a user or determine which features are enabled
for a given user
"""
PATTERNS = admin_patterns("/experimental_features/(?P<user_id>[^/]*)")
def __init__(self, hs: "HomeServer"):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.is_mine = hs.is_mine
async def on_GET(
self,
request: SynapseRequest,
user_id: str,
) -> Tuple[int, JsonDict]:
"""
List which features are enabled for a given user
"""
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(user_id)
if not self.is_mine(target_user):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"User must be local to check what experimental features are enabled.",
)
enabled_features = await self.store.list_enabled_features(user_id)
user_features = {}
for feature in ExperimentalFeature:
if feature in enabled_features:
user_features[feature] = True
else:
user_features[feature] = False
return HTTPStatus.OK, {"features": user_features}
async def on_PUT(
self, request: SynapseRequest, user_id: str
) -> Tuple[HTTPStatus, Dict]:
"""
Enable or disable the provided features for the requester
"""
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request)
target_user = UserID.from_string(user_id)
if not self.is_mine(target_user):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"User must be local to enable experimental features.",
)
features = body.get("features")
if not features:
raise SynapseError(
HTTPStatus.BAD_REQUEST, "You must provide features to set."
)
# validate the provided features
validated_features = {}
for feature, enabled in features.items():
try:
validated_feature = ExperimentalFeature(feature)
validated_features[validated_feature] = enabled
except ValueError:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
f"{feature!r} is not recognised as a valid experimental feature.",
)
await self.store.set_features_for_user(user_id, validated_features)
return HTTPStatus.OK, {}
+25
View File
@@ -113,3 +113,28 @@ class UserMediaStatisticsRestServlet(RestServlet):
ret["next_token"] = start + len(users_media)
return HTTPStatus.OK, ret
class LargestRoomsStatistics(RestServlet):
"""Get the largest rooms by database size.
Only works when using PostgreSQL.
"""
PATTERNS = admin_patterns("/statistics/database/rooms$")
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.stats_controller = hs.get_storage_controllers().stats
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
room_sizes = await self.stats_controller.get_room_db_size_estimate()
return HTTPStatus.OK, {
"rooms": [
{"room_id": room_id, "estimated_size": size}
for room_id, size in room_sizes
]
}
+1 -1
View File
@@ -94,7 +94,7 @@ class CreateFilterRestServlet(RestServlet):
set_timeline_upper_limit(content, self.hs.config.server.filter_timeline_limit)
filter_id = await self.filtering.add_user_filter(
user_localpart=target_user.localpart, user_filter=content
user_id=target_user, user_filter=content
)
return 200, {"filter_id": str(filter_id)}
+37 -5
View File
@@ -16,7 +16,8 @@
import logging
import re
from typing import TYPE_CHECKING, Any, Optional, Tuple
from collections import Counter
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from synapse.api.errors import InvalidAPICallError, SynapseError
from synapse.http.server import HttpServer
@@ -289,16 +290,40 @@ class OneTimeKeyServlet(RestServlet):
await self.auth.get_user_by_req(request, allow_guest=True)
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
# Generate a count for each algorithm, which is hard-coded to 1.
query: Dict[str, Dict[str, Dict[str, int]]] = {}
for user_id, one_time_keys in body.get("one_time_keys", {}).items():
for device_id, algorithm in one_time_keys.items():
query.setdefault(user_id, {})[device_id] = {algorithm: 1}
result = await self.e2e_keys_handler.claim_one_time_keys(
body, timeout, always_include_fallback_keys=False
query, timeout, always_include_fallback_keys=False
)
return 200, result
class UnstableOneTimeKeyServlet(RestServlet):
"""
Identical to the stable endpoint (OneTimeKeyServlet) except it always includes
fallback keys in the response.
Identical to the stable endpoint (OneTimeKeyServlet) except it allows for
querying for multiple OTKs at once and always includes fallback keys in the
response.
POST /keys/claim HTTP/1.1
{
"one_time_keys": {
"<user_id>": {
"<device_id>": ["<algorithm>", ...]
} } }
HTTP/1.1 200 OK
{
"one_time_keys": {
"<user_id>": {
"<device_id>": {
"<algorithm>:<key_id>": "<key_base64>"
} } } }
"""
PATTERNS = [re.compile(r"^/_matrix/client/unstable/org.matrix.msc3983/keys/claim$")]
@@ -313,8 +338,15 @@ class UnstableOneTimeKeyServlet(RestServlet):
await self.auth.get_user_by_req(request, allow_guest=True)
timeout = parse_integer(request, "timeout", 10 * 1000)
body = parse_json_object_from_request(request)
# Generate a count for each algorithm.
query: Dict[str, Dict[str, Dict[str, int]]] = {}
for user_id, one_time_keys in body.get("one_time_keys", {}).items():
for device_id, algorithms in one_time_keys.items():
query.setdefault(user_id, {})[device_id] = Counter(algorithms)
result = await self.e2e_keys_handler.claim_one_time_keys(
body, timeout, always_include_fallback_keys=True
query, timeout, always_include_fallback_keys=True
)
return 200, result
+9 -1
View File
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Optional, Tuple
from synapse.api.constants import Direction
from synapse.handlers.relations import ThreadsListInclude
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.storage.databases.main.relations import ThreadsNextBatch
@@ -49,6 +49,7 @@ class RelationPaginationServlet(RestServlet):
self.auth = hs.get_auth()
self._store = hs.get_datastores().main
self._relations_handler = hs.get_relations_handler()
self._support_recurse = hs.config.experimental.msc3981_recurse_relations
async def on_GET(
self,
@@ -63,6 +64,12 @@ class RelationPaginationServlet(RestServlet):
pagination_config = await PaginationConfig.from_request(
self._store, request, default_limit=5, default_dir=Direction.BACKWARDS
)
if self._support_recurse:
recurse = parse_boolean(
request, "org.matrix.msc3981.recurse", default=False
)
else:
recurse = False
# The unstable version of this API returns an extra field for client
# compatibility, see https://github.com/matrix-org/synapse/issues/12930.
@@ -75,6 +82,7 @@ class RelationPaginationServlet(RestServlet):
event_id=parent_id,
room_id=room_id,
pagin_config=pagination_config,
recurse=recurse,
include_original_event=include_original_event,
relation_type=relation_type,
event_type=event_type,
+21
View File
@@ -27,6 +27,7 @@ from typing_extensions import TypeAlias
from twisted.internet.interfaces import IOpenSSLContextFactory
from twisted.internet.tcp import Port
from twisted.web.client import HTTPConnectionPool
from twisted.web.iweb import IPolicyForHTTPS
from twisted.web.resource import Resource
@@ -453,6 +454,26 @@ class HomeServer(metaclass=abc.ABCMeta):
use_proxy=True,
)
@cache_in_self
def get_pusher_http_client(self) -> SimpleHttpClient:
# the pusher makes lots of concurrent SSL connections to Sygnal, and tends to
# do so in batches, so we need to allow the pool to keep lots of idle
# connections around.
pool = HTTPConnectionPool(self.get_reactor())
# XXX: The justification for using the cache factor here is that larger
# instances will need both more cache and more connections.
# Still, this should probably be a separate dial
pool.maxPersistentPerHost = max(int(100 * self.config.caches.global_factor), 5)
pool.cachedConnectionTimeout = 2 * 60
return SimpleHttpClient(
self,
ip_whitelist=self.config.server.ip_range_whitelist,
ip_blacklist=self.config.server.ip_range_blacklist,
use_proxy=True,
connection_pool=pool,
)
@cache_in_self
def get_federation_http_client(self) -> MatrixFederationHttpClient:
"""
+2
View File
@@ -19,6 +19,7 @@ from synapse.storage.controllers.persist_events import (
)
from synapse.storage.controllers.purge_events import PurgeEventsStorageController
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.controllers.stats import StatsController
from synapse.storage.databases import Databases
from synapse.storage.databases.main import DataStore
@@ -40,6 +41,7 @@ class StorageControllers:
self.purge_events = PurgeEventsStorageController(hs, stores)
self.state = StateStorageController(hs, stores)
self.stats = StatsController(hs, stores)
self.persistence = None
if stores.persist_events:
+113
View File
@@ -0,0 +1,113 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import Counter
from typing import TYPE_CHECKING, Collection, List, Tuple
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
from synapse.storage.engines import PostgresEngine
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class StatsController:
"""High level interface for getting statistics."""
def __init__(self, hs: "HomeServer", stores: Databases):
self.stores = stores
async def get_room_db_size_estimate(self) -> List[Tuple[str, int]]:
"""Get an estimate of the largest rooms and how much database space they
use, in bytes.
Only works against PostgreSQL.
Note: this uses the postgres statistics so is a very rough estimate.
"""
# Note: We look at both tables on the main and state databases.
if not isinstance(self.stores.main.database_engine, PostgresEngine):
raise SynapseError(400, "Endpoint requires using PostgreSQL")
if not isinstance(self.stores.state.database_engine, PostgresEngine):
raise SynapseError(400, "Endpoint requires using PostgreSQL")
# For each "large" table, we go through and get the largest rooms
# and an estimate of how much space they take. We can then sum the
# results and return the top 10.
#
# This isn't the most accurate, but given all of these are estimates
# anyway its good enough.
room_estimates: Counter[str] = Counter()
# Return size of the table on disk, including indexes and TOAST.
table_sql = """
SELECT pg_total_relation_size(?)
"""
# Get an estimate for the largest rooms and their frequency.
#
# Note: the cast here is a hack to cast from `anyarray` to an actual
# type. This ensures that psycopg2 passes us a back a a Python list.
column_sql = """
SELECT
most_common_vals::TEXT::TEXT[], most_common_freqs::TEXT::NUMERIC[]
FROM pg_stats
WHERE tablename = ? and attname = 'room_id'
"""
def get_room_db_size_estimate_txn(
txn: LoggingTransaction,
tables: Collection[str],
) -> None:
for table in tables:
txn.execute(table_sql, (table,))
row = txn.fetchone()
assert row is not None
(table_size,) = row
txn.execute(column_sql, (table,))
row = txn.fetchone()
assert row is not None
vals, freqs = row
for room_id, freq in zip(vals, freqs):
room_estimates[room_id] += int(freq * table_size)
await self.stores.main.db_pool.runInteraction(
"get_room_db_size_estimate_main",
get_room_db_size_estimate_txn,
(
"event_json",
"events",
"event_search",
"event_edges",
"event_push_actions",
"stream_ordering_to_exterm",
),
)
await self.stores.state.db_pool.runInteraction(
"get_room_db_size_estimate_state",
get_room_db_size_estimate_txn,
("state_groups_state",),
)
return room_estimates.most_common(10)
+2 -2
View File
@@ -95,7 +95,7 @@ class Databases(Generic[DataStoreT]):
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main, db_conn)
persist_events = PersistEventsStore(hs, database, main, db_conn) # type: ignore[arg-type]
if "state" in database_config.databases:
logger.info(
@@ -133,6 +133,6 @@ class Databases(Generic[DataStoreT]):
# We use local variables here to ensure that the databases do not have
# optional types.
self.main = main
self.main = main # type: ignore[assignment]
self.state = state
self.persist_events = persist_events
@@ -43,6 +43,7 @@ from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
from .experimental_features import ExperimentalFeaturesStore
from .filtering import FilteringWorkerStore
from .keys import KeyStore
from .lock import LockStore
@@ -82,6 +83,7 @@ logger = logging.getLogger(__name__)
class DataStore(
EventsBackgroundUpdatesStore,
ExperimentalFeaturesStore,
DeviceStore,
RoomMemberStore,
RoomStore,
+9 -7
View File
@@ -205,13 +205,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
assert isinstance(data, EventsStreamCurrentStateRow)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -229,7 +229,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self._invalidate_local_get_event_cache(event_id) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
@@ -242,10 +242,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
if redacts:
self._invalidate_local_get_event_cache(redacts)
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
# Caches which might leak edits must be invalidated for the event being
# redacted.
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
@@ -254,7 +254,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
@@ -378,6 +378,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
)
if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None
# get_next() returns a context manager which is designed to wrap
# the transaction. However, we want to only get an ID when we want
# to use it, here, so we need to call __enter__ manually, and have
@@ -1027,8 +1027,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
...
async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
self, query_list: Iterable[Tuple[str, str, str, int]]
) -> Tuple[
Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str, int]]
]:
"""Take a list of one time keys out of the database.
Args:
@@ -1043,8 +1045,12 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
@trace
def _claim_e2e_one_time_key_simple(
txn: LoggingTransaction, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
txn: LoggingTransaction,
user_id: str,
device_id: str,
algorithm: str,
count: int,
) -> List[Tuple[str, str]]:
"""Claim OTK for device for DBs that don't support RETURNING.
Returns:
@@ -1055,36 +1061,41 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
sql = """
SELECT key_id, key_json FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
LIMIT ?
"""
txn.execute(sql, (user_id, device_id, algorithm))
otk_row = txn.fetchone()
if otk_row is None:
return None
txn.execute(sql, (user_id, device_id, algorithm, count))
otk_rows = list(txn)
if not otk_rows:
return []
key_id, key_json = otk_row
self.db_pool.simple_delete_one_txn(
self.db_pool.simple_delete_many_txn(
txn,
table="e2e_one_time_keys_json",
column="key_id",
values=[otk_row[0] for otk_row in otk_rows],
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
},
)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
return f"{algorithm}:{key_id}", key_json
return [
(f"{algorithm}:{key_id}", key_json) for key_id, key_json in otk_rows
]
@trace
def _claim_e2e_one_time_key_returning(
txn: LoggingTransaction, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
txn: LoggingTransaction,
user_id: str,
device_id: str,
algorithm: str,
count: int,
) -> List[Tuple[str, str]]:
"""Claim OTK for device for DBs that support RETURNING.
Returns:
@@ -1099,28 +1110,30 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
AND key_id IN (
SELECT key_id FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
LIMIT ?
)
RETURNING key_id, key_json
"""
txn.execute(
sql, (user_id, device_id, algorithm, user_id, device_id, algorithm)
sql,
(user_id, device_id, algorithm, user_id, device_id, algorithm, count),
)
otk_row = txn.fetchone()
if otk_row is None:
return None
otk_rows = list(txn)
if not otk_rows:
return []
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json
return [
(f"{algorithm}:{key_id}", key_json) for key_id, key_json in otk_rows
]
results: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
missing: List[Tuple[str, str, str]] = []
for user_id, device_id, algorithm in query_list:
missing: List[Tuple[str, str, str, int]] = []
for user_id, device_id, algorithm, count in query_list:
if self.database_engine.supports_returning:
# If we support RETURNING clause we can use a single query that
# allows us to use autocommit mode.
@@ -1130,21 +1143,25 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
_claim_e2e_one_time_key = _claim_e2e_one_time_key_simple
db_autocommit = False
claim_row = await self.db_pool.runInteraction(
claim_rows = await self.db_pool.runInteraction(
"claim_e2e_one_time_keys",
_claim_e2e_one_time_key,
user_id,
device_id,
algorithm,
count,
db_autocommit=db_autocommit,
)
if claim_row:
if claim_rows:
device_results = results.setdefault(user_id, {}).setdefault(
device_id, {}
)
device_results[claim_row[0]] = json_decoder.decode(claim_row[1])
else:
missing.append((user_id, device_id, algorithm))
for claim_row in claim_rows:
device_results[claim_row[0]] = json_decoder.decode(claim_row[1])
# Did we get enough OTKs?
count -= len(claim_rows)
if count:
missing.append((user_id, device_id, algorithm, count))
return results, missing
@@ -0,0 +1,75 @@
# Copyright 2023 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING, Dict
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.types import StrCollection
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.rest.admin.experimental_features import ExperimentalFeature
from synapse.server import HomeServer
class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
) -> None:
super().__init__(database, db_conn, hs)
@cached()
async def list_enabled_features(self, user_id: str) -> StrCollection:
"""
Checks to see what features are enabled for a given user
Args:
user:
the user to be queried on
Returns:
the features currently enabled for the user
"""
enabled = await self.db_pool.simple_select_list(
"per_user_experimental_features",
{"user_id": user_id, "enabled": True},
["feature"],
)
return [feature["feature"] for feature in enabled]
async def set_features_for_user(
self,
user: str,
features: Dict["ExperimentalFeature", bool],
) -> None:
"""
Enables or disables features for a given user
Args:
user:
the user for whom to enable/disable the features
features:
pairs of features and True/False for whether the feature should be enabled
"""
for feature, enabled in features.items():
await self.db_pool.simple_upsert(
table="per_user_experimental_features",
keyvalues={"feature": feature, "user_id": user},
values={"enabled": enabled},
insertion_values={"user_id": user, "feature": feature},
)
await self.invalidate_cache_and_stream("list_enabled_features", (user,))
+39 -8
View File
@@ -16,15 +16,38 @@
from typing import Optional, Tuple, Union, cast
from canonicaljson import encode_canonical_json
from typing_extensions import TYPE_CHECKING
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import JsonDict, UserID
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.server import HomeServer
class FilteringWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
"full_users_filters_unique_idx",
index_name="full_users_unique_idx",
table="user_filters",
columns=["full_user_id, filter_id"],
unique=True,
)
@cached(num_args=2)
async def get_user_filter(
self, user_localpart: str, filter_id: Union[int, str]
@@ -46,7 +69,7 @@ class FilteringWorkerStore(SQLBaseStore):
return db_to_json(def_json)
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
async def add_user_filter(self, user_id: UserID, user_filter: JsonDict) -> int:
def_json = encode_canonical_json(user_filter)
# Need an atomic transaction to SELECT the maximal ID so far then
@@ -56,13 +79,13 @@ class FilteringWorkerStore(SQLBaseStore):
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
)
txn.execute(sql, (user_localpart, bytearray(def_json)))
txn.execute(sql, (user_id.localpart, bytearray(def_json)))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]
sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
txn.execute(sql, (user_localpart,))
txn.execute(sql, (user_id.localpart,))
max_id = cast(Tuple[Optional[int]], txn.fetchone())[0]
if max_id is None:
filter_id = 0
@@ -70,10 +93,18 @@ class FilteringWorkerStore(SQLBaseStore):
filter_id = max_id + 1
sql = (
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
"VALUES(?, ?, ?)"
"INSERT INTO user_filters (full_user_id, user_id, filter_id, filter_json)"
"VALUES(?, ?, ?, ?)"
)
txn.execute(
sql,
(
user_id.to_string(),
user_id.localpart,
filter_id,
bytearray(def_json),
),
)
txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))
return filter_id
+35 -7
View File
@@ -11,14 +11,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from typing import TYPE_CHECKING, Optional
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.types import UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
class ProfileWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
"profiles_full_user_id_key_idx",
index_name="profiles_full_user_id_key",
table="profiles",
columns=["full_user_id"],
unique=True,
)
async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
try:
profile = await self.db_pool.simple_select_one(
@@ -54,28 +74,36 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_avatar_url",
)
async def create_profile(self, user_localpart: str) -> None:
async def create_profile(self, user_id: UserID) -> None:
user_localpart = user_id.localpart
await self.db_pool.simple_insert(
table="profiles", values={"user_id": user_localpart}, desc="create_profile"
table="profiles",
values={"user_id": user_localpart, "full_user_id": user_id.to_string()},
desc="create_profile",
)
async def set_profile_displayname(
self, user_localpart: str, new_displayname: Optional[str]
self, user_id: UserID, new_displayname: Optional[str]
) -> None:
user_localpart = user_id.localpart
await self.db_pool.simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
values={"displayname": new_displayname},
values={
"displayname": new_displayname,
"full_user_id": user_id.to_string(),
},
desc="set_profile_displayname",
)
async def set_profile_avatar_url(
self, user_localpart: str, new_avatar_url: Optional[str]
self, user_id: UserID, new_avatar_url: Optional[str]
) -> None:
user_localpart = user_id.localpart
await self.db_pool.simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
values={"avatar_url": new_avatar_url},
values={"avatar_url": new_avatar_url, "full_user_id": user_id.to_string()},
desc="set_profile_avatar_url",
)
@@ -2414,8 +2414,8 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
# *obviously* the 'profiles' table uses localpart for user_id
# while everything else uses the full mxid.
txn.execute(
"INSERT INTO profiles(user_id, displayname) VALUES (?,?)",
(user_id_obj.localpart, create_profile_with_displayname),
"INSERT INTO profiles(full_user_id, user_id, displayname) VALUES (?,?,?)",
(user_id, user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats.stats_enabled:
+48 -17
View File
@@ -172,6 +172,7 @@ class RelationsWorkerStore(SQLBaseStore):
direction: Direction = Direction.BACKWARDS,
from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None,
recurse: bool = False,
) -> Tuple[Sequence[_RelatedEvent], Optional[StreamToken]]:
"""Get a list of relations for an event, ordered by topological ordering.
@@ -186,6 +187,7 @@ class RelationsWorkerStore(SQLBaseStore):
oldest first (forwards).
from_token: Fetch rows from the given token, or from the start if None.
to_token: Fetch rows up to the given token, or up to the end if None.
recurse: Whether to recursively find relations.
Returns:
A tuple of:
@@ -200,8 +202,8 @@ class RelationsWorkerStore(SQLBaseStore):
# Ensure bad limits aren't being passed in.
assert limit >= 0
where_clause = ["relates_to_id = ?", "room_id = ?"]
where_args: List[Union[str, int]] = [event.event_id, room_id]
where_clause = ["room_id = ?"]
where_args: List[Union[str, int]] = [room_id]
is_redacted = event.internal_metadata.is_redacted()
if relation_type is not None:
@@ -229,23 +231,52 @@ class RelationsWorkerStore(SQLBaseStore):
if pagination_clause:
where_clause.append(pagination_clause)
sql = """
SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
FROM event_relations
INNER JOIN events USING (event_id)
WHERE %s
ORDER BY topological_ordering %s, stream_ordering %s
LIMIT ?
""" % (
" AND ".join(where_clause),
order,
order,
)
# If a recursive query is requested then the filters are applied after
# recursively following relationships from the requested event to children
# up to 3-relations deep.
#
# If no recursion is needed then the event_relations table is queried
# for direct children of the requested event.
if recurse:
sql = """
WITH RECURSIVE related_events AS (
SELECT event_id, relation_type, relates_to_id, 0 AS depth
FROM event_relations
WHERE relates_to_id = ?
UNION SELECT e.event_id, e.relation_type, e.relates_to_id, depth + 1
FROM event_relations e
INNER JOIN related_events r ON r.event_id = e.relates_to_id
WHERE depth <= 3
)
SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
FROM related_events
INNER JOIN events USING (event_id)
WHERE %s
ORDER BY topological_ordering %s, stream_ordering %s
LIMIT ?;
""" % (
" AND ".join(where_clause),
order,
order,
)
else:
sql = """
SELECT event_id, relation_type, sender, topological_ordering, stream_ordering
FROM event_relations
INNER JOIN events USING (event_id)
WHERE relates_to_id = ? AND %s
ORDER BY topological_ordering %s, stream_ordering %s
LIMIT ?
""" % (
" AND ".join(where_clause),
order,
order,
)
def _get_recent_references_for_event_txn(
txn: LoggingTransaction,
) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
txn.execute(sql, where_args + [limit + 1])
txn.execute(sql, [event.event_id] + where_args + [limit + 1])
events = []
topo_orderings: List[int] = []
@@ -965,7 +996,7 @@ class RelationsWorkerStore(SQLBaseStore):
# relation.
sql = """
WITH RECURSIVE related_events AS (
SELECT event_id, relates_to_id, relation_type, 0 depth
SELECT event_id, relates_to_id, relation_type, 0 AS depth
FROM event_relations
WHERE event_id = ?
UNION SELECT e.event_id, e.relates_to_id, e.relation_type, depth + 1
@@ -1025,7 +1056,7 @@ class RelationsWorkerStore(SQLBaseStore):
sql = """
SELECT relates_to_id FROM event_relations WHERE relates_to_id = COALESCE((
WITH RECURSIVE related_events AS (
SELECT event_id, relates_to_id, relation_type, 0 depth
SELECT event_id, relates_to_id, relation_type, 0 AS depth
FROM event_relations
WHERE event_id = ?
UNION SELECT e.event_id, e.relates_to_id, e.relation_type, depth + 1
+5 -3
View File
@@ -22,7 +22,7 @@ import attr
from typing_extensions import Counter as CounterType
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
from synapse.storage.types import Cursor
@@ -168,7 +168,9 @@ def prepare_database(
def _setup_new_database(
cur: Cursor, database_engine: BaseDatabaseEngine, databases: Collection[str]
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
databases: Collection[str],
) -> None:
"""Sets up the physical database by finding a base set of "full schemas" and
then applying any necessary deltas, including schemas from the given data
@@ -289,7 +291,7 @@ def _setup_new_database(
def _upgrade_existing_database(
cur: Cursor,
cur: LoggingTransaction,
current_schema_state: _SchemaState,
database_engine: BaseDatabaseEngine,
config: Optional[HomeServerConfig],
+4 -1
View File
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 75 # remember to update the list below when updating
SCHEMA_VERSION = 76 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -97,6 +97,9 @@ Changes in SCHEMA_VERSION = 75:
`local_current_membership` & `room_memberships`) is now being populated for new
rows. When the background job to populate historical rows lands this will
become the compat schema version.
Changes in SCHEMA_VERSION = 76:
- Adds a full_user_id column to tables profiles and user_filters.
"""
@@ -24,10 +24,13 @@ UTF-8 bytes, so we have to do it in Python.
import logging
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
logger.info("Porting pushers table...")
cur.execute(
"""
@@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
"""
)
count = 0
for row in cur.fetchall():
row = list(row)
for tuple_row in cur.fetchall():
row = list(tuple_row)
row[8] = bytes(row[8]).decode("utf-8")
row[11] = bytes(row[11]).decode("utf-8")
cur.execute(
@@ -81,7 +84,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)
def run_upgrade(*args, **kwargs):
pass
+3 -6
View File
@@ -14,7 +14,8 @@
import json
import logging
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -41,7 +42,7 @@ SQLITE_TABLE = (
)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
for statement in get_statements(POSTGRES_TABLE.splitlines()):
cur.execute(statement)
@@ -72,7 +73,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)
cur.execute(sql, ("event_search", progress_json))
def run_upgrade(*args, **kwargs):
pass
+3 -5
View File
@@ -14,6 +14,8 @@
import json
import logging
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -25,7 +27,7 @@ ALTER_TABLE = (
)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)
@@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)
cur.execute(sql, ("event_origin_server_ts", progress_json))
def run_upgrade(*args, **kwargs):
pass
@@ -12,13 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, Iterable, List, Tuple, cast
from synapse.config.appservice import load_appservices
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# NULL indicates user was not registered by an appservice.
try:
cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
@@ -27,9 +31,13 @@ def run_create(cur, database_engine, *args, **kwargs):
pass
def run_upgrade(cur, database_engine, config, *args, **kwargs):
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
cur.execute("SELECT name FROM users")
rows = cur.fetchall()
rows = cast(Iterable[Tuple[str]], cur.fetchall())
config_files = []
try:
@@ -39,7 +47,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
appservices = load_appservices(config.server.server_name, config_files)
owned = {}
owned: Dict[str, List[str]] = {}
for row in rows:
user_id = row[0]
@@ -20,14 +20,17 @@
import logging
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
logger = logging.getLogger(__name__)
def token_to_stream_ordering(token):
def token_to_stream_ordering(token: str) -> int:
return int(token[1:].split("_")[0])
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
logger.info("Porting pushers table, delta 31...")
cur.execute(
"""
@@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
"""
)
count = 0
for row in cur.fetchall():
row = list(row)
for tuple_row in cur.fetchall():
row = list(tuple_row)
row[12] = token_to_stream_ordering(row[12])
cur.execute(
"""
@@ -80,7 +83,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("DROP TABLE pushers")
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
logger.info("Moved %d pushers to new table", count)
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,7 +14,8 @@
import json
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -26,7 +27,7 @@ ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT;
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if not isinstance(database_engine, PostgresEngine):
return
@@ -56,7 +57,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)
cur.execute(sql, ("event_search_order", progress_json))
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,6 +14,8 @@
import json
import logging
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -25,7 +27,7 @@ ALTER TABLE events ADD COLUMN contains_url BOOLEAN;
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)
@@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
)
cur.execute(sql, ("event_fields_sender_url", progress_json))
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,14 +14,22 @@
import time
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
cur.execute(ALTER_TABLE)
def run_upgrade(cur, database_engine, *args, **kwargs):
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
cur.execute(
"UPDATE remote_media_cache SET last_access_ts = ?",
(int(time.time() * 1000),),
@@ -14,7 +14,8 @@
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -34,13 +35,9 @@ CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if not isinstance(database_engine, PostgresEngine):
return
for statement in get_statements(CREATE_TABLE.splitlines()):
cur.execute(statement)
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,19 +14,16 @@
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
cur.execute("TRUNCATE received_transactions")
else:
cur.execute("DELETE FROM received_transactions")
cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)")
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,7 +14,8 @@
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -68,7 +69,7 @@ CREATE INDEX evauth_edges_id ON event_auth(event_id);
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(DROP_INDICES.splitlines()):
cur.execute(statement)
@@ -79,7 +80,3 @@ def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(drop_constraint.splitlines()):
cur.execute(statement)
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
@@ -14,7 +14,8 @@
import logging
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.prepare_database import get_statements
logger = logging.getLogger(__name__)
@@ -66,7 +67,7 @@ CREATE VIRTUAL TABLE user_directory_search
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
for statement in get_statements(BOTH_TABLES.splitlines()):
cur.execute(statement)
@@ -78,7 +79,3 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute(statement)
else:
raise Exception("Unrecognized database engine")
def run_upgrade(*args, **kwargs):
pass
@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import get_statements
FIX_INDEXES = """
@@ -34,7 +36,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
"""
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
# remove duplicates from group_users & group_invites tables
@@ -57,7 +59,3 @@ def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(FIX_INDEXES.splitlines()):
cur.execute(statement)
def run_upgrade(*args, **kwargs):
pass
@@ -53,16 +53,13 @@ SQLite:
import logging
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
pass
def run_upgrade(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
cur.execute(
"""
@@ -76,7 +73,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute(
"SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
)
(oldsql,) = cur.fetchone()
row = cur.fetchone()
assert row is not None
(oldsql,) = row
sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
if sql == oldsql:
@@ -85,7 +84,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
logger.info("Replacing definition of 'events' with: %s", sql)
cur.execute("PRAGMA schema_version")
(oldver,) = cur.fetchone()
row = cur.fetchone()
assert row is not None
(oldver,) = row
cur.execute("PRAGMA writable_schema=ON")
cur.execute(
"UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
@@ -1,7 +1,8 @@
import logging
from io import StringIO
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import execute_statements_from_stream
logger = logging.getLogger(__name__)
@@ -16,11 +17,7 @@ This migration updates the user_filters table as follows:
"""
def run_upgrade(cur, database_engine, *args, **kwargs):
pass
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
select_clause = """
SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
@@ -27,7 +27,16 @@
# equivalent behaviour as if the server had remained in the room).
def run_upgrade(cur, database_engine, config, *args, **kwargs):
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
# We need to do the insert in `run_upgrade` section as we don't have access
# to `config` in `run_create`.
@@ -77,7 +86,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
)
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
cur.execute(
"""
CREATE TABLE local_current_membership (
@@ -20,18 +20,14 @@ entries, and with a UNIQUE index.
import logging
from io import StringIO
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import execute_statements_from_stream
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
def run_upgrade(*args, **kwargs):
pass
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# some instances might already have this index, in which case we can skip this
if isinstance(database_engine, PostgresEngine):
cur.execute(
@@ -16,19 +16,16 @@
Adds a postgres SEQUENCE for generating guest user IDs.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.registration import (
find_max_generated_user_id_localpart,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if not isinstance(database_engine, PostgresEngine):
return
next_id = find_max_generated_user_id_localpart(cur) + 1
cur.execute("CREATE SEQUENCE user_id_seq START WITH %s", (next_id,))
def run_upgrade(*args, **kwargs):
pass
@@ -20,18 +20,14 @@ import logging
from io import StringIO
from synapse.storage._base import db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import execute_statements_from_stream
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
pass
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
logger.info("Creating ignored_users table")
execute_statements_from_stream(cur, StringIO(_create_commands))
@@ -16,11 +16,11 @@
This migration handles the process of changing the type of `room_depth.min_depth` to
a BIGINT.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if not isinstance(database_engine, PostgresEngine):
# this only applies to postgres - sqlite does not distinguish between big and
# little ints.
@@ -64,7 +64,3 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
(6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
"""
)
def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
pass
@@ -18,11 +18,11 @@ This migration adds triggers to the partial_state_events tables to enforce uniqu
Triggers cannot be expressed in .sql files, so we have to use a separate file.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# complain if the room_id in partial_state_events doesn't match
# that in `events`. We already have a fk constraint which ensures that the event
# exists in `events`, so all we have to do is raise if there is a row with a
@@ -17,10 +17,11 @@
Adds a postgres SEQUENCE for generating application service transaction IDs.
"""
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
# If we already have some AS TXNs we want to start from the current
# maximum value. There are two potential places this is stored - the
@@ -30,10 +31,12 @@ def run_create(cur, database_engine, *args, **kwargs):
cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns")
row = cur.fetchone()
assert row is not None
txn_max = row[0]
cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state")
row = cur.fetchone()
assert row is not None
last_txn_max = row[0]
start_val = max(last_txn_max, txn_max) + 1
@@ -14,10 +14,11 @@
import json
from synapse.storage.types import Cursor
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
def run_create(cur: Cursor, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
"""Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
# we know that any new events will have the columns populated (and that has been
@@ -27,7 +28,9 @@ def run_create(cur: Cursor, database_engine, *args, **kwargs):
# current min and max stream orderings, since that is guaranteed to include all
# the events that were stored before the new columns were added.
cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
(min_stream_ordering, max_stream_ordering) = cur.fetchone()
row = cur.fetchone()
assert row is not None
(min_stream_ordering, max_stream_ordering) = row
if min_stream_ordering is None:
# no rows, nothing to do.
@@ -19,9 +19,16 @@ for its completion can be removed.
Note the background job must still remain defined in the database class.
"""
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine
def run_upgrade(cur, database_engine, *args, **kwargs):
def run_upgrade(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
config: HomeServerConfig,
) -> None:
cur.execute("SELECT update_name FROM background_updates")
rows = cur.fetchall()
for row in rows:
@@ -13,11 +13,11 @@
# limitations under the License.
import json
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, Sqlite3Engine
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
"""
Upgrade the event_search table to use the porter tokenizer if it isn't already
@@ -38,6 +38,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
# Re-run the background job to re-populate the event_search table.
cur.execute("SELECT MIN(stream_ordering) FROM events")
row = cur.fetchone()
assert row is not None
min_stream_id = row[0]
# If there are not any events, nothing to do.
@@ -46,6 +47,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
cur.execute("SELECT MAX(stream_ordering) FROM events")
row = cur.fetchone()
assert row is not None
max_stream_id = row[0]
progress = {
@@ -17,11 +17,11 @@
This migration adds triggers to the room membership tables to enforce consistency.
Triggers cannot be expressed in .sql files, so we have to use a separate file.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Cursor
def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(database_engine, Sqlite3Engine):
@@ -0,0 +1,20 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE profiles ADD COLUMN full_user_id TEXT;
-- Make sure the column has a unique constraint, mirroring the `profiles_user_id_key`
-- constraint.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7501, 'profiles_full_user_id_key_idx', '{}');
@@ -0,0 +1,20 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE user_filters ADD COLUMN full_user_id TEXT;
-- Add a unique index on the new column, mirroring the `user_filters_unique` unique
-- index.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7502, 'full_users_filters_unique_idx', '{}');
@@ -0,0 +1,27 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Table containing experimental features and whether they are enabled for a given user
CREATE TABLE per_user_experimental_features (
-- The User ID to check/set the feature for
user_id TEXT NOT NULL,
-- Contains features to be enabled/disabled
feature TEXT NOT NULL,
-- whether the feature is enabled/disabled for a given user, defaults to disabled
enabled BOOLEAN DEFAULT FALSE,
FOREIGN KEY (user_id) REFERENCES users(name),
PRIMARY KEY (user_id, feature)
);
@@ -12,15 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.engines import PostgresEngine
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
def run_create(cur, database_engine, *args, **kwargs):
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
if isinstance(database_engine, PostgresEngine):
# if we already have some state groups, we want to start making new
# ones with a higher id.
cur.execute("SELECT max(id) FROM state_groups")
row = cur.fetchone()
assert row is not None
if row[0] is None:
start_val = 1
@@ -28,7 +30,3 @@ def run_create(cur, database_engine, *args, **kwargs):
start_val = row[0] + 1
cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
def run_upgrade(*args, **kwargs):
pass
+9 -7
View File
@@ -26,13 +26,15 @@ from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.api.presence import UserPresenceState
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.types import JsonDict, UserID
from synapse.util import Clock
from synapse.util.frozenutils import freeze
from tests import unittest
from tests.events.test_utils import MockEvent
user_id = UserID.from_string("@test_user:test")
user2_id = UserID.from_string("@test_user2:test")
user_localpart = "test_user"
@@ -437,7 +439,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
user_filter_json = {"presence": {"senders": ["@foo:bar"]}}
filter_id = self.get_success(
self.datastore.add_user_filter(
user_localpart=user_localpart, user_filter=user_filter_json
user_id=user_id, user_filter=user_filter_json
)
)
presence_states = [
@@ -467,7 +469,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
filter_id = self.get_success(
self.datastore.add_user_filter(
user_localpart=user_localpart + "2", user_filter=user_filter_json
user_id=user2_id, user_filter=user_filter_json
)
)
presence_states = [
@@ -495,7 +497,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
user_filter_json = {"room": {"state": {"types": ["m.*"]}}}
filter_id = self.get_success(
self.datastore.add_user_filter(
user_localpart=user_localpart, user_filter=user_filter_json
user_id=user_id, user_filter=user_filter_json
)
)
event = MockEvent(sender="@foo:bar", type="m.room.topic", room_id="!foo:bar")
@@ -514,7 +516,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
user_filter_json = {"room": {"state": {"types": ["m.*"]}}}
filter_id = self.get_success(
self.datastore.add_user_filter(
user_localpart=user_localpart, user_filter=user_filter_json
user_id=user_id, user_filter=user_filter_json
)
)
event = MockEvent(
@@ -598,7 +600,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
filter_id = self.get_success(
self.filtering.add_user_filter(
user_localpart=user_localpart, user_filter=user_filter_json
user_id=user_id, user_filter=user_filter_json
)
)
@@ -619,7 +621,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
filter_id = self.get_success(
self.datastore.add_user_filter(
user_localpart=user_localpart, user_filter=user_filter_json
user_id=user_id, user_filter=user_filter_json
)
)
+5 -6
View File
@@ -195,11 +195,11 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
MISSING_KEYS = [
# Known user, known device, missing algorithm.
("@alice:example.org", "DEVICE_1", "signed_curve25519:DDDDHg"),
("@alice:example.org", "DEVICE_2", "xyz", 1),
# Known user, missing device.
("@alice:example.org", "DEVICE_3", "signed_curve25519:EEEEHg"),
("@alice:example.org", "DEVICE_3", "signed_curve25519", 1),
# Unknown user.
("@bob:example.org", "DEVICE_4", "signed_curve25519:FFFFHg"),
("@bob:example.org", "DEVICE_4", "signed_curve25519", 1),
]
claimed_keys, missing = self.get_success(
@@ -207,9 +207,8 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
self.service,
[
# Found devices
("@alice:example.org", "DEVICE_1", "signed_curve25519:AAAAHg"),
("@alice:example.org", "DEVICE_1", "signed_curve25519:BBBBHg"),
("@alice:example.org", "DEVICE_2", "signed_curve25519:CCCCHg"),
("@alice:example.org", "DEVICE_1", "signed_curve25519", 1),
("@alice:example.org", "DEVICE_2", "signed_curve25519", 1),
]
+ MISSING_KEYS,
)
+14 -18
View File
@@ -160,7 +160,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
res2 = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -205,7 +205,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# key
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -224,7 +224,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# claiming an OTK again should return the same fallback key
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -273,7 +273,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -285,7 +285,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -306,7 +306,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -347,7 +347,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# return both.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
@@ -369,7 +369,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# Claiming an OTK again should return only the fallback key.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id: "alg1"}}},
{local_user: {device_id: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
@@ -1052,7 +1052,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# Setup a response, but only for device 2.
self.appservice_api.claim_client_keys.return_value = make_awaitable(
({local_user: {device_id_2: otk}}, [(local_user, device_id_1, "alg1")])
({local_user: {device_id_2: otk}}, [(local_user, device_id_1, "alg1", 1)])
)
# we shouldn't have any unused fallback keys yet
@@ -1079,11 +1079,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# query the fallback keys.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{
"one_time_keys": {
local_user: {device_id_1: "alg1", device_id_2: "alg1"}
}
},
{local_user: {device_id_1: {"alg1": 1}, device_id_2: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=False,
)
@@ -1128,7 +1124,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# Claim OTKs, which will ask the appservice and do nothing else.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id_1: "alg1"}}},
{local_user: {device_id_1: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
@@ -1172,7 +1168,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# uploaded fallback key.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id_1: "alg1"}}},
{local_user: {device_id_1: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
@@ -1205,7 +1201,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# Claim OTKs, which will return information only from the database.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id_1: "alg1"}}},
{local_user: {device_id_1: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
@@ -1232,7 +1228,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
# Claim OTKs, which will return only the fallback key from the database.
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{"one_time_keys": {local_user: {device_id_1: "alg1"}}},
{local_user: {device_id_1: {"alg1": 1}}},
timeout=None,
always_include_fallback_keys=True,
)
+12 -14
View File
@@ -66,9 +66,7 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.handler = hs.get_profile_handler()
def test_get_my_name(self) -> None:
self.get_success(
self.store.set_profile_displayname(self.frank.localpart, "Frank")
)
self.get_success(self.store.set_profile_displayname(self.frank, "Frank"))
displayname = self.get_success(self.handler.get_displayname(self.frank))
@@ -121,9 +119,7 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.hs.config.registration.enable_set_displayname = False
# Setting displayname for the first time is allowed
self.get_success(
self.store.set_profile_displayname(self.frank.localpart, "Frank")
)
self.get_success(self.store.set_profile_displayname(self.frank, "Frank"))
self.assertEqual(
(
@@ -166,8 +162,14 @@ class ProfileTestCase(unittest.HomeserverTestCase):
)
def test_incoming_fed_query(self) -> None:
self.get_success(self.store.create_profile("caroline"))
self.get_success(self.store.set_profile_displayname("caroline", "Caroline"))
self.get_success(
self.store.create_profile(UserID.from_string("@caroline:test"))
)
self.get_success(
self.store.set_profile_displayname(
UserID.from_string("@caroline:test"), "Caroline"
)
)
response = self.get_success(
self.query_handlers["profile"](
@@ -183,9 +185,7 @@ class ProfileTestCase(unittest.HomeserverTestCase):
def test_get_my_avatar(self) -> None:
self.get_success(
self.store.set_profile_avatar_url(
self.frank.localpart, "http://my.server/me.png"
)
self.store.set_profile_avatar_url(self.frank, "http://my.server/me.png")
)
avatar_url = self.get_success(self.handler.get_avatar_url(self.frank))
@@ -237,9 +237,7 @@ class ProfileTestCase(unittest.HomeserverTestCase):
# Setting displayname for the first time is allowed
self.get_success(
self.store.set_profile_avatar_url(
self.frank.localpart, "http://my.server/me.png"
)
self.store.set_profile_avatar_url(self.frank, "http://my.server/me.png")
)
self.assertEqual(
+1 -1
View File
@@ -52,7 +52,7 @@ class HTTPPusherTests(HomeserverTestCase):
m.post_json_get_json = post_json_get_json
hs = self.setup_test_homeserver(proxied_blacklisted_http_client=m)
hs = self.setup_test_homeserver(pusher_http_client=m)
return hs

Some files were not shown because too many files have changed in this diff Show More