1
0

Compare commits

..

5 Commits

Author SHA1 Message Date
Erik Johnston dcd574b89c Stuff 2023-01-03 09:42:37 +00:00
Erik Johnston c03785e121 Implement {get,pop}_node 2022-12-24 13:44:17 +00:00
Erik Johnston b9cdf3d85e String cache 2022-12-24 12:36:54 +00:00
Erik Johnston 18ac015ecd bindings 2022-12-05 14:03:28 +00:00
Erik Johnston 4874d6320a Add tree cache 2022-12-05 09:57:38 +00:00
158 changed files with 2945 additions and 3883 deletions
@@ -21,7 +21,7 @@ endblock
block Install Complement Dependencies
sudo apt-get -qq update && sudo apt-get install -qqy libolm3 libolm-dev
go install -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
go get -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
endblock
block Install custom gotestfmt template
+3 -7
View File
@@ -197,12 +197,8 @@ jobs:
- run: sudo apt-get -qq install xmlsec1
- name: Set up PostgreSQL ${{ matrix.job.postgres-version }}
if: ${{ matrix.job.postgres-version }}
# 1. Mount postgres data files onto a tmpfs in-memory filesystem to reduce overhead of docker's overlayfs layer.
# 2. Expose the unix socket for postgres. This removes latency of using docker-proxy for connections.
run: |
docker run -d -p 5432:5432 \
--tmpfs /var/lib/postgres:rw,size=6144m \
--mount 'type=bind,src=/var/run/postgresql,dst=/var/run/postgresql' \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
postgres:${{ matrix.job.postgres-version }}
@@ -224,10 +220,10 @@ jobs:
if: ${{ matrix.job.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
- run: poetry run trial --jobs=6 tests
- run: poetry run trial --jobs=2 tests
env:
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
SYNAPSE_POSTGRES_HOST: localhost
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
- name: Dump logs
@@ -296,7 +292,7 @@ jobs:
python-version: '3.7'
extras: "all test"
- run: poetry run trial -j6 tests
- run: poetry run trial -j2 tests
- name: Dump logs
# Logs are most useful when the command fails, always include them.
if: ${{ always() }}
+3 -80
View File
@@ -1,85 +1,8 @@
Synapse 1.74.0rc1 (2022-12-13)
==============================
Features
--------
- Improve user search for international display names. ([\#14464](https://github.com/matrix-org/synapse/issues/14464))
- Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`. ([\#14490](https://github.com/matrix-org/synapse/issues/14490), [\#14525](https://github.com/matrix-org/synapse/issues/14525))
- Add new `push.enabled` config option to allow opting out of push notification calculation. ([\#14551](https://github.com/matrix-org/synapse/issues/14551), [\#14619](https://github.com/matrix-org/synapse/issues/14619))
- Advertise support for Matrix 1.5 on `/_matrix/client/versions`. ([\#14576](https://github.com/matrix-org/synapse/issues/14576))
- Improve opentracing and logging for to-device message handling. ([\#14598](https://github.com/matrix-org/synapse/issues/14598))
- Allow selecting "prejoin" events by state keys in addition to event types. ([\#14642](https://github.com/matrix-org/synapse/issues/14642))
Bugfixes
--------
- Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. ([\#14435](https://github.com/matrix-org/synapse/issues/14435), [\#14592](https://github.com/matrix-org/synapse/issues/14592), [\#14604](https://github.com/matrix-org/synapse/issues/14604))
- Suppress a spurious warning when `POST /rooms/<room_id>/<membership>/`, `POST /join/<room_id_or_alias`, or the unspecced `PUT /join/<room_id_or_alias>/<txn_id>` receive an empty HTTP request body. ([\#14600](https://github.com/matrix-org/synapse/issues/14600))
- Return spec-compliant JSON errors when unknown endpoints are requested. ([\#14620](https://github.com/matrix-org/synapse/issues/14620), [\#14621](https://github.com/matrix-org/synapse/issues/14621))
- Update html templates to load images over HTTPS. Contributed by @ashfame. ([\#14625](https://github.com/matrix-org/synapse/issues/14625))
- Fix a long-standing bug where the user directory would return 1 more row than requested. ([\#14631](https://github.com/matrix-org/synapse/issues/14631))
- Reject invalid read receipt requests with empty room or event IDs. Contributed by Nick @ Beeper (@fizzadar). ([\#14632](https://github.com/matrix-org/synapse/issues/14632))
- Fix a bug introduced in Synapse 1.67.0 where not specifying a config file or a server URL would lead to the `register_new_matrix_user` script failing. ([\#14637](https://github.com/matrix-org/synapse/issues/14637))
- Fix a long-standing bug where the user directory and room/user stats might be out of sync. ([\#14639](https://github.com/matrix-org/synapse/issues/14639), [\#14643](https://github.com/matrix-org/synapse/issues/14643))
- Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted. ([\#14650](https://github.com/matrix-org/synapse/issues/14650))
- Improve validation of field size limits in events. ([\#14664](https://github.com/matrix-org/synapse/issues/14664))
- Fix bugs introduced in Synapse 1.55.0 and 1.69.0 where application services would not be notified of events in the correct rooms, due to stale caches. ([\#14670](https://github.com/matrix-org/synapse/issues/14670))
Improved Documentation
----------------------
- Update worker settings for `pusher` and `federation_sender` functionality. ([\#14493](https://github.com/matrix-org/synapse/issues/14493))
- Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages. ([\#14517](https://github.com/matrix-org/synapse/issues/14517))
- Remove old, incorrect minimum postgres version note and replace with a link to the [Dependency Deprecation Policy](https://matrix-org.github.io/synapse/v1.73/deprecation_policy.html). ([\#14590](https://github.com/matrix-org/synapse/issues/14590))
- Add Single-Sign On setup instructions for Mastodon-based instances. ([\#14594](https://github.com/matrix-org/synapse/issues/14594))
- Change `turn_allow_guests` example value to lowercase `true`. ([\#14634](https://github.com/matrix-org/synapse/issues/14634))
Internal Changes
----------------
- Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar). ([\#14255](https://github.com/matrix-org/synapse/issues/14255))
- Faster remote room joins: stream the un-partial-stating of rooms over replication. ([\#14473](https://github.com/matrix-org/synapse/issues/14473), [\#14474](https://github.com/matrix-org/synapse/issues/14474))
- Share the `ClientRestResource` for both workers and the main process. ([\#14528](https://github.com/matrix-org/synapse/issues/14528))
- Add `--editable` flag to `complement.sh` which uses an editable install of Synapse for faster turn-around times whilst developing iteratively. ([\#14548](https://github.com/matrix-org/synapse/issues/14548))
- Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room. ([\#14549](https://github.com/matrix-org/synapse/issues/14549))
- Modernize unit tests configuration related to workers. ([\#14568](https://github.com/matrix-org/synapse/issues/14568))
- Bump jsonschema from 4.17.0 to 4.17.3. ([\#14591](https://github.com/matrix-org/synapse/issues/14591))
- Fix Rust lint CI. ([\#14602](https://github.com/matrix-org/synapse/issues/14602))
- Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1. ([\#14607](https://github.com/matrix-org/synapse/issues/14607))
- Alter some unit test environment parameters to decrease time spent running tests. ([\#14610](https://github.com/matrix-org/synapse/issues/14610))
- Switch to Go recommended installation method for `gotestfmt` template in CI. ([\#14611](https://github.com/matrix-org/synapse/issues/14611))
- Bump phonenumbers from 8.13.0 to 8.13.1. ([\#14612](https://github.com/matrix-org/synapse/issues/14612))
- Bump types-setuptools from 65.5.0.3 to 65.6.0.1. ([\#14613](https://github.com/matrix-org/synapse/issues/14613))
- Bump twine from 4.0.1 to 4.0.2. ([\#14614](https://github.com/matrix-org/synapse/issues/14614))
- Bump types-requests from 2.28.11.2 to 2.28.11.5. ([\#14615](https://github.com/matrix-org/synapse/issues/14615))
- Bump cryptography from 38.0.3 to 38.0.4. ([\#14616](https://github.com/matrix-org/synapse/issues/14616))
- Remove useless cargo install with apt from Dockerfile. ([\#14636](https://github.com/matrix-org/synapse/issues/14636))
- Bump certifi from 2021.10.8 to 2022.12.7. ([\#14645](https://github.com/matrix-org/synapse/issues/14645))
- Bump flake8-bugbear from 22.10.27 to 22.12.6. ([\#14656](https://github.com/matrix-org/synapse/issues/14656))
- Bump packaging from 21.3 to 22.0. ([\#14657](https://github.com/matrix-org/synapse/issues/14657))
- Bump types-pillow from 9.3.0.1 to 9.3.0.4. ([\#14658](https://github.com/matrix-org/synapse/issues/14658))
- Bump serde from 1.0.148 to 1.0.150. ([\#14659](https://github.com/matrix-org/synapse/issues/14659))
- Bump phonenumbers from 8.13.1 to 8.13.2. ([\#14660](https://github.com/matrix-org/synapse/issues/14660))
- Bump authlib from 1.1.0 to 1.2.0. ([\#14661](https://github.com/matrix-org/synapse/issues/14661))
- Move `StateFilter` to `synapse.types`. ([\#14668](https://github.com/matrix-org/synapse/issues/14668))
- Improve type hints. ([\#14597](https://github.com/matrix-org/synapse/issues/14597), [\#14646](https://github.com/matrix-org/synapse/issues/14646), [\#14671](https://github.com/matrix-org/synapse/issues/14671))
Synapse 1.73.0 (2022-12-06)
===========================
Please note that legacy Prometheus metric names have been removed in this release; see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.73/docs/upgrade.md#legacy-prometheus-metric-names-have-now-been-removed) for more details.
No significant changes since 1.73.0rc2.
Synapse 1.73.0rc2 (2022-12-01)
==============================
Please note that legacy Prometheus metric names have been removed in this release; see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.73/docs/upgrade.md#legacy-prometheus-metric-names-have-now-been-removed) for more details.
Bugfixes
--------
@@ -94,7 +17,7 @@ Features
- Speed-up `/messages` with `filter_events_for_client` optimizations. ([\#14527](https://github.com/matrix-org/synapse/issues/14527))
- Improve DB performance by reducing amount of data that gets read in `device_lists_changes_in_room`. ([\#14534](https://github.com/matrix-org/synapse/issues/14534))
- Add support for handling avatar in SSO OIDC login. Contributed by @ashfame. ([\#13917](https://github.com/matrix-org/synapse/issues/13917))
- Adds support for handling avatar in SSO login. Contributed by @ashfame. ([\#13917](https://github.com/matrix-org/synapse/issues/13917))
- Move MSC3030 `/timestamp_to_event` endpoints to stable `v1` location (`/_matrix/client/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>`, `/_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>`). ([\#14471](https://github.com/matrix-org/synapse/issues/14471))
- Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.5/client-server-api/#aggregations) which return bundled aggregations. ([\#14491](https://github.com/matrix-org/synapse/issues/14491), [\#14508](https://github.com/matrix-org/synapse/issues/14508), [\#14510](https://github.com/matrix-org/synapse/issues/14510))
- Add unstable support for an Extensible Events room version (`org.matrix.msc1767.10`) via [MSC1767](https://github.com/matrix-org/matrix-spec-proposals/pull/1767), [MSC3931](https://github.com/matrix-org/matrix-spec-proposals/pull/3931), [MSC3932](https://github.com/matrix-org/matrix-spec-proposals/pull/3932), and [MSC3933](https://github.com/matrix-org/matrix-spec-proposals/pull/3933). ([\#14520](https://github.com/matrix-org/synapse/issues/14520), [\#14521](https://github.com/matrix-org/synapse/issues/14521), [\#14524](https://github.com/matrix-org/synapse/issues/14524))
Generated
+4 -4
View File
@@ -323,18 +323,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.150"
version = "1.0.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
checksum = "e53f64bb4ba0191d6d0676e1b141ca55047d83b74f5607e6d8eb88126c52c2dc"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.150"
version = "1.0.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
checksum = "a55492425aa53521babf6137309e7d34c20bbfbbfcfe2c7f3a047fd1f6b92c0c"
dependencies = [
"proc-macro2",
"quote",
+4
View File
@@ -3,3 +3,7 @@
[workspace]
members = ["rust"]
[profile.dbgrelease]
inherits = "release"
debug = true
+1
View File
@@ -0,0 +1 @@
Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).
+1
View File
@@ -0,0 +1 @@
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
+1
View File
@@ -0,0 +1 @@
Update worker settings for `pusher` and `federation_sender` functionality.
+1
View File
@@ -0,0 +1 @@
Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages.
+1
View File
@@ -0,0 +1 @@
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
+1
View File
@@ -0,0 +1 @@
Share the `ClientRestResource` for both workers and the main process.
+1
View File
@@ -0,0 +1 @@
Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room.
+1
View File
@@ -0,0 +1 @@
Add new `push.enabled` config option to allow opting out of push notification calculation.
+1
View File
@@ -0,0 +1 @@
Modernize unit tests configuration related to workers.
+1
View File
@@ -0,0 +1 @@
Advertise support for Matrix 1.5 on `/_matrix/client/versions`.
+1
View File
@@ -0,0 +1 @@
Bump jsonschema from 4.17.0 to 4.17.3.
+1
View File
@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
+1
View File
@@ -0,0 +1 @@
Add missing type hints.
+1
View File
@@ -0,0 +1 @@
Fix Rust lint CI.
+1
View File
@@ -0,0 +1 @@
Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1.
-14
View File
@@ -1,17 +1,3 @@
matrix-synapse-py3 (1.74.0~rc1) stable; urgency=medium
* New dependency on libicu-dev to provide improved results for user
search.
* New Synapse release 1.74.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 13 Dec 2022 13:30:01 +0000
matrix-synapse-py3 (1.73.0) stable; urgency=medium
* New Synapse release 1.73.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Dec 2022 11:48:56 +0000
matrix-synapse-py3 (1.73.0~rc2) stable; urgency=medium
* New Synapse release 1.73.0rc2.
-2
View File
@@ -8,8 +8,6 @@ Build-Depends:
dh-virtualenv (>= 1.1),
libsystemd-dev,
libpq-dev,
libicu-dev,
pkg-config,
lsb-release,
python3-dev,
python3,
+1 -3
View File
@@ -43,7 +43,7 @@ RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential git libffi-dev libssl-dev \
build-essential cargo git libffi-dev libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# We install poetry in its own build stage to avoid its dependencies conflicting with
@@ -97,8 +97,6 @@ RUN \
zlib1g-dev \
git \
curl \
libicu-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
-2
View File
@@ -84,8 +84,6 @@ RUN apt-get update -qq -o Acquire::Languages=none \
python3-venv \
sqlite3 \
libpq-dev \
libicu-dev \
pkg-config \
xmlsec1
# Install rust and ensure it's in the PATH
+1 -2
View File
@@ -1,7 +1,6 @@
# syntax=docker/dockerfile:1
ARG SYNAPSE_VERSION=latest
ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
# first of all, we create a base image with an nginx which we can copy into the
# target image. For repeated rebuilds, this is much faster than apt installing
@@ -24,7 +23,7 @@ FROM debian:bullseye-slim AS deps_base
FROM redis:6-bullseye AS redis_base
# now build the final image, based on the the regular Synapse docker image
FROM $FROM
FROM matrixdotorg/synapse:$SYNAPSE_VERSION
# Install supervisord with pip instead of apt, to avoid installing a second
# copy of python.
+1 -2
View File
@@ -7,9 +7,8 @@
# https://github.com/matrix-org/synapse/blob/develop/docker/README-testing.md#testing-with-postgresql-and-single-or-multi-process-synapse
ARG SYNAPSE_VERSION=latest
ARG FROM=matrixdotorg/synapse-workers:$SYNAPSE_VERSION
FROM $FROM
FROM matrixdotorg/synapse-workers:$SYNAPSE_VERSION
# First of all, we copy postgres server from the official postgres image,
# since for repeated rebuilds, this is much faster than apt installing
# postgres each time.
-75
View File
@@ -1,75 +0,0 @@
# syntax=docker/dockerfile:1
# This dockerfile builds an editable install of Synapse.
#
# Used by `complement.sh`. Not suitable for production use.
ARG PYTHON_VERSION=3.9
###
### Stage 0: generate requirements.txt
###
# We hardcode the use of Debian bullseye here because this could change upstream
# and other Dockerfiles used for testing are expecting bullseye.
FROM docker.io/python:${PYTHON_VERSION}-slim-bullseye
# Install Rust and other dependencies (stolen from normal Dockerfile)
# install the OS build deps
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && apt-get install -yqq \
build-essential \
libffi-dev \
libjpeg-dev \
libpq-dev \
libssl-dev \
libwebp-dev \
libxml++2.6-dev \
libxslt1-dev \
openssl \
zlib1g-dev \
git \
curl \
gosu \
libjpeg62-turbo \
libpq5 \
libwebp6 \
xmlsec1 \
libjemalloc2 \
&& rm -rf /var/lib/apt/lists/*
ENV RUSTUP_HOME=/rust
ENV CARGO_HOME=/cargo
ENV PATH=/cargo/bin:/rust/bin:$PATH
RUN mkdir /rust /cargo
RUN curl -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --default-toolchain stable --profile minimal
# Make a base copy of the editable source tree, so that we have something to
# install and build now — even though it's going to be covered up by a mount
# at runtime.
COPY synapse /editable-src/synapse/
COPY rust /editable-src/rust/
# ... and what we need to `pip install`.
COPY pyproject.toml poetry.lock README.rst build_rust.py Cargo.toml Cargo.lock /editable-src/
RUN pip install poetry
RUN poetry config virtualenvs.create false
RUN cd /editable-src && poetry install --extras all
# Make copies of useful things for inspection:
# - the Rust module (must be copied to the editable source tree before startup)
# - poetry.lock is useful for checking if dependencies have changed.
RUN cp /editable-src/synapse/synapse_rust.abi3.so /synapse_rust.abi3.so.bak
RUN cp /editable-src/poetry.lock /poetry.lock.bak
### Extra setup from original Dockerfile
COPY ./docker/start.py /start.py
COPY ./docker/conf /conf
EXPOSE 8008/tcp 8009/tcp 8448/tcp
ENTRYPOINT ["/start.py"]
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD curl -fSs http://localhost:8008/health || exit 1
-41
View File
@@ -590,44 +590,3 @@ oidc_providers:
display_name_template: "{{ user.first_name }} {{ user.last_name }}"
email_template: "{{ user.email }}"
```
### Mastodon
[Mastodon](https://docs.joinmastodon.org/) instances provide an [OAuth API](https://docs.joinmastodon.org/spec/oauth/), allowing those instances to be used as a single sign-on provider for Synapse.
The first step is to register Synapse as an application with your Mastodon instance, using the [Create an application API](https://docs.joinmastodon.org/methods/apps/#create) (see also [here](https://docs.joinmastodon.org/client/token/)). There are several ways to do this, but in the example below we are using CURL.
This example assumes that:
* the Mastodon instance website URL is `https://your.mastodon.instance.url`, and
* Synapse will be registered as an app named `my_synapse_app`.
Send the following request, substituting the value of `synapse_public_baseurl` from your Synapse installation.
```sh
curl -d "client_name=my_synapse_app&redirect_uris=https://[synapse_public_baseurl]/_synapse/client/oidc/callback" -X POST https://your.mastodon.instance.url/api/v1/apps
```
You should receive a response similar to the following. Make sure to save it.
```json
{"client_id":"someclientid_123","client_secret":"someclientsecret_123","id":"12345","name":"my_synapse_app","redirect_uri":"https://[synapse_public_baseurl]/_synapse/client/oidc/callback","website":null,"vapid_key":"somerandomvapidkey_123"}
```
As the Synapse login mechanism needs an attribute to uniquely identify users, and Mastodon's endpoint does not return a `sub` property, an alternative `subject_claim` has to be set. Your Synapse configuration should include the following:
```yaml
oidc_providers:
- idp_id: my_mastodon
idp_name: "Mastodon Instance Example"
discover: false
issuer: "https://your.mastodon.instance.url/@admin"
client_id: "someclientid_123"
client_secret: "someclientsecret_123"
authorization_endpoint: "https://your.mastodon.instance.url/oauth/authorize"
token_endpoint: "https://your.mastodon.instance.url/oauth/token"
userinfo_endpoint: "https://your.mastodon.instance.url/api/v1/accounts/verify_credentials"
scopes: ["read"]
user_mapping_provider:
config:
subject_claim: "id"
```
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
+1 -2
View File
@@ -1,7 +1,6 @@
# Using Postgres
The minimum supported version of PostgreSQL is determined by the [Dependency
Deprecation Policy](deprecation_policy.md).
Synapse supports PostgreSQL versions 10 or later.
## Install postgres client libraries
+1 -1
View File
@@ -38,7 +38,7 @@ As an example, here is the relevant section of the config file for `matrix.org`.
turn_uris: [ "turn:turn.matrix.org?transport=udp", "turn:turn.matrix.org?transport=tcp" ]
turn_shared_secret: "n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons"
turn_user_lifetime: 86400000
turn_allow_guests: true
turn_allow_guests: True
After updating the homeserver configuration, you must restart synapse:
+1 -1
View File
@@ -79,7 +79,7 @@ Here we can see that the request has been tagged with `GET-37`. (The tag depends
grep 'GET-37' homeserver.log
```
If you want to paste that output into a github issue or matrix room, please remember to surround it with triple-backticks (```) to make it legible (see [quoting code](https://help.github.com/en/articles/basic-writing-and-formatting-syntax#quoting-code)).
If you want to paste that output into a github issue or matrix room, please remember to surround it with triple-backticks (```) to make it legible (see https://help.github.com/en/articles/basic-writing-and-formatting-syntax#quoting-code).
What do all those fields in the 'Processed' line mean?
@@ -2501,53 +2501,32 @@ Config settings related to the client/server API
---
### `room_prejoin_state`
This setting controls the state that is shared with users upon receiving an
invite to a room, or in reply to a knock on a room. By default, the following
state events are shared with users:
- `m.room.join_rules`
- `m.room.canonical_alias`
- `m.room.avatar`
- `m.room.encryption`
- `m.room.name`
- `m.room.create`
- `m.room.topic`
Controls for the state that is shared with users who receive an invite
to a room. By default, the following state event types are shared with users who
receive invites to the room:
- m.room.join_rules
- m.room.canonical_alias
- m.room.avatar
- m.room.encryption
- m.room.name
- m.room.create
- m.room.topic
To change the default behavior, use the following sub-options:
* `disable_default_event_types`: boolean. Set to `true` to disable the above
defaults. If this is enabled, only the event types listed in
`additional_event_types` are shared. Defaults to `false`.
* `additional_event_types`: A list of additional state events to include in the
events to be shared. By default, this list is empty (so only the default event
types are shared).
Each entry in this list should be either a single string or a list of two
strings.
* A standalone string `t` represents all events with type `t` (i.e.
with no restrictions on state keys).
* A pair of strings `[t, s]` represents a single event with type `t` and
state key `s`. The same type can appear in two entries with different state
keys: in this situation, both state keys are included in prejoin state.
* `disable_default_event_types`: set to true to disable the above defaults. If this
is enabled, only the event types listed in `additional_event_types` are shared.
Defaults to false.
* `additional_event_types`: Additional state event types to share with users when they are invited
to a room. By default, this list is empty (so only the default event types are shared).
Example configuration:
```yaml
room_prejoin_state:
disable_default_event_types: false
disable_default_event_types: true
additional_event_types:
# Share all events of type `org.example.custom.event.typeA`
- org.example.custom.event.typeA
# Share only events of type `org.example.custom.event.typeB` whose
# state_key is "foo"
- ["org.example.custom.event.typeB", "foo"]
# Share only events of type `org.example.custom.event.typeC` whose
# state_key is "bar" or "baz"
- ["org.example.custom.event.typeC", "bar"]
- ["org.example.custom.event.typeC", "baz"]
- org.example.custom.event.type
- m.room.join_rules
```
*Changed in Synapse 1.74:* admins can filter the events in prejoin state based
on their state key.
---
### `track_puppeted_user_ips`
@@ -3376,7 +3355,7 @@ Configuration settings related to push notifications
This setting defines options for push notifications.
This option has a number of sub-options. They are as follows:
* `enabled`: Enables or disables push notification calculation. Note, disabling this will also
* `enable_push`: Enables or disables push notification calculation. Note, disabling this will also
stop unread counts being calculated for rooms. This mode of operation is intended
for homeservers which may only have bots or appservice users connected, or are otherwise
not interested in push/unread counters. This is enabled by default.
@@ -3400,7 +3379,7 @@ This option has a number of sub-options. They are as follows:
Example configuration:
```yaml
push:
enabled: true
enable_push: true
include_content: false
group_unread_count_by_room: false
```
+13 -14
View File
@@ -12,7 +12,6 @@ local_partial_types = True
no_implicit_optional = True
disallow_untyped_defs = True
strict_equality = True
warn_redundant_casts = True
files =
docker/,
@@ -89,15 +88,6 @@ disallow_untyped_defs = False
[mypy-tests.*]
disallow_untyped_defs = False
[mypy-tests.config.test_api]
disallow_untyped_defs = True
[mypy-tests.federation.transport.test_client]
disallow_untyped_defs = True
[mypy-tests.handlers.test_sso]
disallow_untyped_defs = True
[mypy-tests.handlers.test_user_directory]
disallow_untyped_defs = True
@@ -107,19 +97,28 @@ disallow_untyped_defs = True
[mypy-tests.push.test_bulk_push_rule_evaluator]
disallow_untyped_defs = True
[mypy-tests.rest.*]
[mypy-tests.test_server]
disallow_untyped_defs = True
[mypy-tests.state.test_profile]
disallow_untyped_defs = True
[mypy-tests.storage.*]
[mypy-tests.storage.test_id_generators]
disallow_untyped_defs = True
[mypy-tests.test_server]
[mypy-tests.storage.test_profile]
disallow_untyped_defs = True
[mypy-tests.types.*]
[mypy-tests.handlers.test_sso]
disallow_untyped_defs = True
[mypy-tests.storage.test_user_directory]
disallow_untyped_defs = True
[mypy-tests.rest.*]
disallow_untyped_defs = True
[mypy-tests.federation.transport.test_client]
disallow_untyped_defs = True
[mypy-tests.util.caches.*]
Generated
+718 -805
View File
File diff suppressed because it is too large Load Diff
+2 -14
View File
@@ -57,7 +57,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
version = "1.74.0rc1"
version = "1.73.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
@@ -141,8 +141,7 @@ pyasn1 = ">=0.1.9"
pyasn1-modules = ">=0.0.7"
bcrypt = ">=3.1.7"
Pillow = ">=5.4.0"
# We use SortedDict.peekitem(), which was added in sortedcontainers 1.5.2.
sortedcontainers = ">=1.5.2"
sortedcontainers = ">=1.4.4"
pymacaroons = ">=0.13.0"
msgpack = ">=0.5.2"
phonenumbers = ">=8.2.0"
@@ -208,8 +207,6 @@ hiredis = { version = "*", optional = true }
Pympler = { version = "*", optional = true }
parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
pyicu = { version = ">=2.10.2", optional = true }
uvloop = { version = ">=0.17.0", optional = true }
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
@@ -232,11 +229,6 @@ redis = ["txredisapi", "hiredis"]
# Required to use experimental `caches.track_memory_usage` config option.
cache-memory = ["pympler"]
test = ["parameterized", "idna"]
# Allows for better search for international characters in the user directory. This
# requires libicu's development headers installed on the system (e.g. libicu-dev on
# Debian-based distributions).
user-search = ["pyicu"]
uvloop = ["uvloop"]
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
@@ -268,10 +260,6 @@ all = [
"txredisapi", "hiredis",
# cache-memory
"pympler",
# improved user search
"pyicu",
# uvloop
"uvloop",
# omitted:
# - test: it's useful to have this separate from dev deps in the olddeps job
# - systemd: this is a system-based requirement
+77
View File
@@ -0,0 +1,77 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(test)]
use synapse::tree_cache::TreeCache;
use test::Bencher;
extern crate test;
#[bench]
fn bench_tree_cache_get_non_empty(b: &mut Bencher) {
let mut cache: TreeCache<&str, &str> = TreeCache::new();
cache.set(["a", "b", "c", "d"], "f").unwrap();
b.iter(|| cache.get(&["a", "b", "c", "d"]));
}
#[bench]
fn bench_tree_cache_get_empty(b: &mut Bencher) {
let cache: TreeCache<&str, &str> = TreeCache::new();
b.iter(|| cache.get(&["a", "b", "c", "d"]));
}
#[bench]
fn bench_tree_cache_set(b: &mut Bencher) {
let mut cache: TreeCache<&str, &str> = TreeCache::new();
b.iter(|| cache.set(["a", "b", "c", "d"], "f").unwrap());
}
#[bench]
fn bench_tree_cache_length(b: &mut Bencher) {
let mut cache: TreeCache<u32, u32> = TreeCache::new();
for c1 in 0..=10 {
for c2 in 0..=10 {
for c3 in 0..=10 {
for c4 in 0..=10 {
cache.set([c1, c2, c3, c4], 1).unwrap()
}
}
}
}
b.iter(|| cache.len());
}
#[bench]
fn tree_cache_iterate(b: &mut Bencher) {
let mut cache: TreeCache<u32, u32> = TreeCache::new();
for c1 in 0..=10 {
for c2 in 0..=10 {
for c3 in 0..=10 {
for c4 in 0..=10 {
cache.set([c1, c2, c3, c4], 1).unwrap()
}
}
}
}
b.iter(|| cache.items().count());
}
+2
View File
@@ -1,6 +1,7 @@
use pyo3::prelude::*;
pub mod push;
pub mod tree_cache;
/// Returns the hash of all the rust source files at the time it was compiled.
///
@@ -26,6 +27,7 @@ fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(get_rust_file_digest, m)?)?;
push::register_module(py, m)?;
tree_cache::binding::register_module(py, m)?;
Ok(())
}
+247
View File
@@ -0,0 +1,247 @@
use std::hash::Hash;
use anyhow::Error;
use pyo3::{
pyclass, pymethods,
types::{PyModule, PyTuple},
IntoPy, PyAny, PyObject, PyResult, Python, ToPyObject,
};
use super::TreeCache;
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
let child_module = PyModule::new(py, "tree_cache")?;
child_module.add_class::<PythonTreeCache>()?;
child_module.add_class::<StringTreeCache>()?;
m.add_submodule(child_module)?;
// We need to manually add the module to sys.modules to make `from
// synapse.synapse_rust import push` work.
py.import("sys")?
.getattr("modules")?
.set_item("synapse.synapse_rust.tree_cache", child_module)?;
Ok(())
}
#[derive(Clone)]
struct HashablePyObject {
obj: PyObject,
hash: isize,
}
impl HashablePyObject {
pub fn new(obj: &PyAny) -> Result<Self, Error> {
let hash = obj.hash()?;
Ok(HashablePyObject {
obj: obj.to_object(obj.py()),
hash,
})
}
}
impl IntoPy<PyObject> for HashablePyObject {
fn into_py(self, _: Python<'_>) -> PyObject {
self.obj.clone()
}
}
impl IntoPy<PyObject> for &HashablePyObject {
fn into_py(self, _: Python<'_>) -> PyObject {
self.obj.clone()
}
}
impl ToPyObject for HashablePyObject {
fn to_object(&self, _py: Python<'_>) -> PyObject {
self.obj.clone()
}
}
impl Hash for HashablePyObject {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash.hash(state);
}
}
impl PartialEq for HashablePyObject {
fn eq(&self, other: &Self) -> bool {
let equal = Python::with_gil(|py| {
let result = self.obj.as_ref(py).eq(other.obj.as_ref(py));
result.unwrap_or(false)
});
equal
}
}
impl Eq for HashablePyObject {}
#[pyclass]
struct PythonTreeCache(TreeCache<HashablePyObject, PyObject>);
#[pymethods]
impl PythonTreeCache {
#[new]
fn new() -> Self {
PythonTreeCache(Default::default())
}
pub fn set(&mut self, key: &PyAny, value: PyObject) -> Result<(), Error> {
let v: Vec<HashablePyObject> = key
.iter()?
.map(|obj| HashablePyObject::new(obj?))
.collect::<Result<_, _>>()?;
self.0.set(v, value)?;
Ok(())
}
pub fn get_node<'a>(
&'a self,
py: Python<'a>,
key: &'a PyAny,
) -> Result<Option<Vec<(&'a PyTuple, &'a PyObject)>>, Error> {
let v: Vec<HashablePyObject> = key
.iter()?
.map(|obj| HashablePyObject::new(obj?))
.collect::<Result<_, _>>()?;
let Some(node) = self.0.get_node(v.clone())? else {
return Ok(None)
};
let items = node
.items()
.map(|(k, value)| {
let vec = v.iter().chain(k.iter().map(|a| *a)).collect::<Vec<_>>();
let nk = PyTuple::new(py, vec);
(nk, value)
})
.collect::<Vec<_>>();
Ok(Some(items))
}
pub fn get(&self, key: &PyAny) -> Result<Option<&PyObject>, Error> {
let v: Vec<HashablePyObject> = key
.iter()?
.map(|obj| HashablePyObject::new(obj?))
.collect::<Result<_, _>>()?;
Ok(self.0.get(&v)?)
}
pub fn pop_node<'a>(
&'a mut self,
py: Python<'a>,
key: &'a PyAny,
) -> Result<Option<Vec<(&'a PyTuple, PyObject)>>, Error> {
let v: Vec<HashablePyObject> = key
.iter()?
.map(|obj| HashablePyObject::new(obj?))
.collect::<Result<_, _>>()?;
let Some(node) = self.0.pop_node(v.clone())? else {
return Ok(None)
};
let items = node
.into_items()
.map(|(k, value)| {
let vec = v.iter().chain(k.iter()).collect::<Vec<_>>();
let nk = PyTuple::new(py, vec);
(nk, value)
})
.collect::<Vec<_>>();
Ok(Some(items))
}
pub fn pop(&mut self, key: &PyAny) -> Result<Option<PyObject>, Error> {
let v: Vec<HashablePyObject> = key
.iter()?
.map(|obj| HashablePyObject::new(obj?))
.collect::<Result<_, _>>()?;
Ok(self.0.pop(&v)?)
}
pub fn clear(&mut self) {
self.0.clear()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn values(&self) -> Vec<&PyObject> {
self.0.values().collect()
}
pub fn items(&self) -> Vec<(Vec<&HashablePyObject>, &PyObject)> {
todo!()
}
}
#[pyclass]
struct StringTreeCache(TreeCache<String, String>);
#[pymethods]
impl StringTreeCache {
#[new]
fn new() -> Self {
StringTreeCache(Default::default())
}
pub fn set(&mut self, key: &PyAny, value: String) -> Result<(), Error> {
let key = key
.iter()?
.map(|o| o.expect("iter failed").extract().expect("not a string"));
self.0.set(key, value)?;
Ok(())
}
// pub fn get_node(&self, key: &PyAny) -> Result<Option<&TreeCacheNode<K, PyObject>>, Error> {
// todo!()
// }
pub fn get(&self, key: &PyAny) -> Result<Option<&String>, Error> {
let key = key.iter()?.map(|o| {
o.expect("iter failed")
.extract::<String>()
.expect("not a string")
});
Ok(self.0.get(key)?)
}
// pub fn pop_node(&mut self, key: &PyAny) -> Result<Option<TreeCacheNode<K, PyObject>>, Error> {
// todo!()
// }
pub fn pop(&mut self, key: Vec<String>) -> Result<Option<String>, Error> {
Ok(self.0.pop(&key)?)
}
pub fn clear(&mut self) {
self.0.clear()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn values(&self) -> Vec<&String> {
self.0.values().collect()
}
pub fn items(&self) -> Vec<(Vec<&HashablePyObject>, &PyObject)> {
todo!()
}
}
+421
View File
@@ -0,0 +1,421 @@
use std::{borrow::Borrow, collections::HashMap, hash::Hash};
use anyhow::{bail, Error};
pub mod binding;
pub enum TreeCacheNode<K, V> {
Leaf(V),
Branch(usize, HashMap<K, TreeCacheNode<K, V>>),
}
impl<K, V> TreeCacheNode<K, V> {
pub fn new_branch() -> Self {
TreeCacheNode::Branch(0, Default::default())
}
fn len(&self) -> usize {
match self {
TreeCacheNode::Leaf(_) => 1,
TreeCacheNode::Branch(size, _) => *size,
}
}
}
impl<'a, K: Eq + Hash + 'a, V> TreeCacheNode<K, V> {
pub fn set(
&mut self,
mut key: impl Iterator<Item = K>,
value: V,
) -> Result<(usize, usize), Error> {
if let Some(k) = key.next() {
match self {
TreeCacheNode::Leaf(_) => bail!("Given key is too long"),
TreeCacheNode::Branch(size, map) => {
let node = map.entry(k).or_insert_with(TreeCacheNode::new_branch);
let (added, removed) = node.set(key, value)?;
*size += added;
*size -= removed;
Ok((added, removed))
}
}
} else {
let added = if let TreeCacheNode::Branch(_, map) = self {
(1, map.len())
} else {
(0, 0)
};
*self = TreeCacheNode::Leaf(value);
Ok(added)
}
}
pub fn pop<Q>(
&mut self,
current_key: Q,
mut next_keys: impl Iterator<Item = Q>,
) -> Result<Option<TreeCacheNode<K, V>>, Error>
where
Q: Borrow<K>,
Q: Hash + Eq + 'a,
{
if let Some(next_key) = next_keys.next() {
match self {
TreeCacheNode::Leaf(_) => bail!("Given key is too long"),
TreeCacheNode::Branch(size, map) => {
let node = if let Some(node) = map.get_mut(current_key.borrow()) {
node
} else {
return Ok(None);
};
if let Some(popped) = node.pop(next_key, next_keys)? {
*size -= node.len();
Ok(Some(popped))
} else {
Ok(None)
}
}
}
} else {
match self {
TreeCacheNode::Leaf(_) => bail!("Given key is too long"),
TreeCacheNode::Branch(size, map) => {
if let Some(node) = map.remove(current_key.borrow()) {
*size -= node.len();
Ok(Some(node))
} else {
Ok(None)
}
}
}
}
}
pub fn items(&'a self) -> impl Iterator<Item = (Vec<&K>, &V)> {
// To avoid a lot of mallocs we guess the length of the key. Ideally
// we'd know this.
let capacity_guesstimate = 10;
let mut stack = vec![(Vec::with_capacity(capacity_guesstimate), self)];
std::iter::from_fn(move || {
while let Some((prefix, node)) = stack.pop() {
match node {
TreeCacheNode::Leaf(value) => return Some((prefix, value)),
TreeCacheNode::Branch(_, map) => {
stack.extend(map.iter().map(|(k, v)| {
let mut new_prefix = Vec::with_capacity(capacity_guesstimate);
new_prefix.extend_from_slice(&prefix);
new_prefix.push(k);
(new_prefix, v)
}));
}
}
}
None
})
}
pub fn values(&'a self) -> impl Iterator<Item = &V> {
let mut stack = vec![self];
std::iter::from_fn(move || {
while let Some(node) = stack.pop() {
match node {
TreeCacheNode::Leaf(value) => return Some(value),
TreeCacheNode::Branch(_, map) => {
stack.extend(map.iter().map(|(_k, v)| v));
}
}
}
None
})
}
}
impl<'a, K: Clone + Eq + Hash + 'a, V> TreeCacheNode<K, V> {
pub fn into_items(self) -> impl Iterator<Item = (Vec<K>, V)> {
let mut stack = vec![(Vec::new(), self)];
std::iter::from_fn(move || {
while let Some((prefix, node)) = stack.pop() {
match node {
TreeCacheNode::Leaf(value) => return Some((prefix, value)),
TreeCacheNode::Branch(_, map) => {
stack.extend(map.into_iter().map(|(k, v)| {
let mut prefix = prefix.clone();
prefix.push(k);
(prefix, v)
}));
}
}
}
None
})
}
}
impl<K, V> Default for TreeCacheNode<K, V> {
fn default() -> Self {
TreeCacheNode::new_branch()
}
}
pub struct TreeCache<K, V> {
root: TreeCacheNode<K, V>,
}
impl<K, V> TreeCache<K, V> {
pub fn new() -> Self {
TreeCache {
root: TreeCacheNode::new_branch(),
}
}
}
impl<'a, K: Eq + Hash + 'a, V> TreeCache<K, V> {
pub fn set(&mut self, key: impl IntoIterator<Item = K>, value: V) -> Result<(), Error> {
self.root.set(key.into_iter(), value)?;
Ok(())
}
pub fn get_node<Q>(
&self,
key: impl IntoIterator<Item = Q>,
) -> Result<Option<&TreeCacheNode<K, V>>, Error>
where
Q: Borrow<K>,
Q: Hash + Eq + 'a,
{
let mut node = &self.root;
for k in key {
match node {
TreeCacheNode::Leaf(_) => bail!("Given key is too long"),
TreeCacheNode::Branch(_, map) => {
node = if let Some(node) = map.get(k.borrow()) {
node
} else {
return Ok(None);
};
}
}
}
Ok(Some(node))
}
pub fn get<Q>(&self, key: impl IntoIterator<Item = Q>) -> Result<Option<&V>, Error>
where
Q: Borrow<K>,
Q: Hash + Eq + 'a,
{
if let Some(node) = self.get_node(key)? {
match node {
TreeCacheNode::Leaf(value) => Ok(Some(value)),
TreeCacheNode::Branch(_, _) => bail!("Given key is too short"),
}
} else {
Ok(None)
}
}
pub fn pop_node<Q>(
&mut self,
key: impl IntoIterator<Item = Q>,
) -> Result<Option<TreeCacheNode<K, V>>, Error>
where
Q: Borrow<K>,
Q: Hash + Eq + 'a,
{
let mut key_iter = key.into_iter();
let k = if let Some(k) = key_iter.next() {
k
} else {
let node = std::mem::replace(&mut self.root, TreeCacheNode::new_branch());
return Ok(Some(node));
};
self.root.pop(k, key_iter)
}
pub fn pop(&mut self, key: &[K]) -> Result<Option<V>, Error> {
if let Some(node) = self.pop_node(key)? {
match node {
TreeCacheNode::Leaf(value) => Ok(Some(value)),
TreeCacheNode::Branch(_, _) => bail!("Given key is too short"),
}
} else {
Ok(None)
}
}
pub fn clear(&mut self) {
self.root = TreeCacheNode::new_branch();
}
pub fn len(&self) -> usize {
match self.root {
TreeCacheNode::Leaf(_) => 1,
TreeCacheNode::Branch(size, _) => size,
}
}
pub fn values(&self) -> impl Iterator<Item = &V> {
let mut stack = vec![&self.root];
std::iter::from_fn(move || {
while let Some(node) = stack.pop() {
match node {
TreeCacheNode::Leaf(value) => return Some(value),
TreeCacheNode::Branch(_, map) => {
stack.extend(map.values());
}
}
}
None
})
}
pub fn items(&self) -> impl Iterator<Item = (Vec<&K>, &V)> {
self.root.items()
}
}
impl<K, V> Default for TreeCache<K, V> {
fn default() -> Self {
TreeCache::new()
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeSet;
use super::*;
#[test]
fn get_set() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
assert_eq!(cache.get(&["a", "b"])?, Some(&"c"));
let node = cache.get_node(&["a"])?.unwrap();
match node {
TreeCacheNode::Leaf(_) => bail!("expected branch"),
TreeCacheNode::Branch(_, map) => {
assert_eq!(map.len(), 1);
assert!(map.contains_key("b"));
}
}
Ok(())
}
#[test]
fn length() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
assert_eq!(cache.len(), 1);
cache.set(vec!["a", "b"], "d")?;
assert_eq!(cache.len(), 1);
cache.set(vec!["e", "f"], "g")?;
assert_eq!(cache.len(), 2);
cache.set(vec!["e", "h"], "i")?;
assert_eq!(cache.len(), 3);
cache.set(vec!["e"], "i")?;
assert_eq!(cache.len(), 2);
cache.pop_node(&["a"])?;
assert_eq!(cache.len(), 1);
Ok(())
}
#[test]
fn clear() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
assert_eq!(cache.len(), 1);
cache.clear();
assert_eq!(cache.len(), 0);
assert_eq!(cache.get(&["a", "b"])?, None);
Ok(())
}
#[test]
fn pop() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
assert_eq!(cache.pop(&["a", "b"])?, Some("c"));
assert_eq!(cache.pop(&["a", "b"])?, None);
Ok(())
}
#[test]
fn values() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
let expected = ["c"].iter().collect();
assert_eq!(cache.values().collect::<BTreeSet<_>>(), expected);
cache.set(vec!["d", "e"], "f")?;
let expected = ["c", "f"].iter().collect();
assert_eq!(cache.values().collect::<BTreeSet<_>>(), expected);
Ok(())
}
#[test]
fn items() -> Result<(), Error> {
let mut cache = TreeCache::new();
cache.set(vec!["a", "b"], "c")?;
cache.set(vec!["d", "e"], "f")?;
let expected = [(vec![&"a", &"b"], &"c"), (vec![&"d", &"e"], &"f")]
.into_iter()
.collect();
assert_eq!(cache.items().collect::<BTreeSet<_>>(), expected);
Ok(())
}
}
+16 -80
View File
@@ -53,12 +53,6 @@ Run the complement test suite on Synapse.
Only build the Docker images. Don't actually run Complement.
Conflicts with -f/--fast.
-e, --editable
Use an editable build of Synapse, rebuilding the image if necessary.
This is suitable for use in development where a fast turn-around time
is important.
Not suitable for use in CI in case the editable environment is impure.
For help on arguments to 'go test', run 'go help testflag'.
EOF
}
@@ -79,9 +73,6 @@ while [ $# -ge 1 ]; do
"--build-only")
skip_complement_run=1
;;
"-e"|"--editable")
use_editable_synapse=1
;;
*)
# unknown arg: presumably an argument to gotest. break the loop.
break
@@ -105,76 +96,25 @@ if [[ -z "$COMPLEMENT_DIR" ]]; then
echo "Checkout available at 'complement-${COMPLEMENT_REF}'"
fi
if [ -n "$use_editable_synapse" ]; then
if [[ -e synapse/synapse_rust.abi3.so ]]; then
# In an editable install, back up the host's compiled Rust module to prevent
# inconvenience; the container will overwrite the module with its own copy.
mv -n synapse/synapse_rust.abi3.so synapse/synapse_rust.abi3.so~host
# And restore it on exit:
synapse_pkg=`realpath synapse`
trap "mv -f '$synapse_pkg/synapse_rust.abi3.so~host' '$synapse_pkg/synapse_rust.abi3.so'" EXIT
fi
editable_mount="$(realpath .):/editable-src:z"
if docker inspect complement-synapse-editable &>/dev/null; then
# complement-synapse-editable already exists: see if we can still use it:
# - The Rust module must still be importable; it will fail to import if the Rust source has changed.
# - The Poetry lock file must be the same (otherwise we assume dependencies have changed)
# First set up the module in the right place for an editable installation.
docker run --rm -v $editable_mount --entrypoint 'cp' complement-synapse-editable -- /synapse_rust.abi3.so.bak /editable-src/synapse/synapse_rust.abi3.so
if (docker run --rm -v $editable_mount --entrypoint 'python' complement-synapse-editable -c 'import synapse.synapse_rust' \
&& docker run --rm -v $editable_mount --entrypoint 'diff' complement-synapse-editable --brief /editable-src/poetry.lock /poetry.lock.bak); then
skip_docker_build=1
else
echo "Editable Synapse image is stale. Will rebuild."
unset skip_docker_build
fi
fi
fi
if [ -z "$skip_docker_build" ]; then
if [ -n "$use_editable_synapse" ]; then
# Build the base Synapse image from the local checkout
echo_if_github "::group::Build Docker image: matrixdotorg/synapse"
docker build -t matrixdotorg/synapse \
--build-arg TEST_ONLY_SKIP_DEP_HASH_VERIFICATION \
--build-arg TEST_ONLY_IGNORE_POETRY_LOCKFILE \
-f "docker/Dockerfile" .
echo_if_github "::endgroup::"
# Build a special image designed for use in development with editable
# installs.
docker build -t synapse-editable \
-f "docker/editable.Dockerfile" .
# Build the workers docker image (from the base Synapse image we just built).
echo_if_github "::group::Build Docker image: matrixdotorg/synapse-workers"
docker build -t matrixdotorg/synapse-workers -f "docker/Dockerfile-workers" .
echo_if_github "::endgroup::"
docker build -t synapse-workers-editable \
--build-arg FROM=synapse-editable \
-f "docker/Dockerfile-workers" .
docker build -t complement-synapse-editable \
--build-arg FROM=synapse-workers-editable \
-f "docker/complement/Dockerfile" "docker/complement"
# Prepare the Rust module
docker run --rm -v $editable_mount --entrypoint 'cp' complement-synapse-editable -- /synapse_rust.abi3.so.bak /editable-src/synapse/synapse_rust.abi3.so
else
# Build the base Synapse image from the local checkout
echo_if_github "::group::Build Docker image: matrixdotorg/synapse"
docker build -t matrixdotorg/synapse \
--build-arg TEST_ONLY_SKIP_DEP_HASH_VERIFICATION \
--build-arg TEST_ONLY_IGNORE_POETRY_LOCKFILE \
-f "docker/Dockerfile" .
echo_if_github "::endgroup::"
# Build the workers docker image (from the base Synapse image we just built).
echo_if_github "::group::Build Docker image: matrixdotorg/synapse-workers"
docker build -t matrixdotorg/synapse-workers -f "docker/Dockerfile-workers" .
echo_if_github "::endgroup::"
# Build the unified Complement image (from the worker Synapse image we just built).
echo_if_github "::group::Build Docker image: complement/Dockerfile"
docker build -t complement-synapse \
-f "docker/complement/Dockerfile" "docker/complement"
echo_if_github "::endgroup::"
fi
# Build the unified Complement image (from the worker Synapse image we just built).
echo_if_github "::group::Build Docker image: complement/Dockerfile"
docker build -t complement-synapse \
-f "docker/complement/Dockerfile" "docker/complement"
echo_if_github "::endgroup::"
fi
if [ -n "$skip_complement_run" ]; then
@@ -183,10 +123,6 @@ if [ -n "$skip_complement_run" ]; then
fi
export COMPLEMENT_BASE_IMAGE=complement-synapse
if [ -n "$use_editable_synapse" ]; then
export COMPLEMENT_BASE_IMAGE=complement-synapse-editable
export COMPLEMENT_HOST_MOUNTS="$editable_mount"
fi
extra_test_args=()
+4 -2
View File
@@ -27,7 +27,7 @@ import time
import urllib.request
from os import path
from tempfile import TemporaryDirectory
from typing import Any, List, Optional
from typing import Any, List, Optional, cast
import attr
import click
@@ -174,7 +174,9 @@ def _prepare() -> None:
click.get_current_context().abort()
# Switch to the release branch.
parsed_new_version = version.parse(new_version)
# Cast safety: parse() won't return a version.LegacyVersion from our
# version string format.
parsed_new_version = cast(version.Version, version.parse(new_version))
# We assume for debian changelogs that we only do RCs or full releases.
assert not parsed_new_version.is_devrelease
-25
View File
@@ -1,25 +0,0 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Stub for PyICU.
class Locale:
@staticmethod
def getDefault() -> Locale: ...
class BreakIterator:
@staticmethod
def createWordInstance(locale: Locale) -> BreakIterator: ...
def setText(self, text: str) -> None: ...
def nextBoundary(self) -> int: ...
+1 -1
View File
@@ -45,7 +45,7 @@ class PushRuleEvaluator:
notification_power_levels: Mapping[str, int],
related_events_flattened: Mapping[str, Mapping[str, str]],
related_event_match_enabled: bool,
room_version_feature_flags: Tuple[str, ...],
room_version_feature_flags: list[str],
msc3931_enabled: bool,
): ...
def run(
-7
View File
@@ -44,15 +44,8 @@ if strtobool(os.environ.get("SYNAPSE_ASYNC_IO_REACTOR", "0")):
from twisted.internet import asyncioreactor
if bool(os.environ.get("SYNAPSE_UVLOOP", False)):
import uvloop
uvloop.install()
print("Using uvloop")
asyncioreactor.install(asyncio.get_event_loop())
# Twisted and canonicaljson will fail to import when this file is executed to
# get the __version__ during a fresh install. That's OK and subsequent calls to
# actually start Synapse will import these libraries fine.
+2 -3
View File
@@ -222,7 +222,6 @@ def main() -> None:
args = parser.parse_args()
config: Optional[Dict[str, Any]] = None
if "config" in args and args.config:
config = yaml.safe_load(args.config)
@@ -230,7 +229,7 @@ def main() -> None:
secret = args.shared_secret
else:
# argparse should check that we have either config or shared secret
assert config is not None
assert config
secret = config.get("registration_shared_secret")
secret_file = config.get("registration_shared_secret_path")
@@ -245,7 +244,7 @@ def main() -> None:
if args.server_url:
server_url = args.server_url
elif config is not None:
elif config:
server_url = _find_client_listener(config)
if not server_url:
server_url = _DEFAULT_SERVER_URL
-4
View File
@@ -152,7 +152,6 @@ class EduTypes:
class RejectedReason:
AUTH_ERROR: Final = "auth_error"
OVERSIZED_EVENT: Final = "oversized_event"
class RoomCreationPreset:
@@ -231,9 +230,6 @@ class EventContentFields:
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"
# an unspecced field added to to-device messages to identify them uniquely-ish
TO_DEVICE_MSGID: Final = "org.matrix.msgid"
class RoomTypes:
"""Understood values of the room_type field of m.room.create events."""
+5 -12
View File
@@ -300,8 +300,10 @@ class InteractiveAuthIncompleteError(Exception):
class UnrecognizedRequestError(SynapseError):
"""An error indicating we don't understand the request you're trying to make"""
def __init__(self, msg: str = "Unrecognized request", code: int = 400):
super().__init__(code, msg, Codes.UNRECOGNIZED)
def __init__(
self, msg: str = "Unrecognized request", errcode: str = Codes.UNRECOGNIZED
):
super().__init__(400, msg, errcode)
class NotFoundError(SynapseError):
@@ -424,17 +426,8 @@ class ResourceLimitError(SynapseError):
class EventSizeError(SynapseError):
"""An error raised when an event is too big."""
def __init__(self, msg: str, unpersistable: bool):
"""
unpersistable:
if True, the PDU must not be persisted, not even as a rejected PDU
when received over federation.
This is notably true when the entire PDU exceeds the size limit for a PDU,
(as opposed to an individual key's size limit being exceeded).
"""
def __init__(self, msg: str):
super().__init__(413, msg, Codes.TOO_LARGE)
self.unpersistable = unpersistable
class LoginError(SynapseError):
+16 -16
View File
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Dict, Optional, Tuple
from typing import Callable, Dict, List, Optional
import attr
@@ -103,7 +103,7 @@ class RoomVersion:
# is not enough to mark it "supported": the push rule evaluator also needs to
# support the flag. Unknown flags are ignored by the evaluator, making conditions
# fail if used.
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
msc3931_push_features: List[str] # values from PushRuleRoomFlag
class RoomVersions:
@@ -124,7 +124,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V2 = RoomVersion(
"2",
@@ -143,7 +143,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V3 = RoomVersion(
"3",
@@ -162,7 +162,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V4 = RoomVersion(
"4",
@@ -181,7 +181,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V5 = RoomVersion(
"5",
@@ -200,7 +200,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V6 = RoomVersion(
"6",
@@ -219,7 +219,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
@@ -238,7 +238,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V7 = RoomVersion(
"7",
@@ -257,7 +257,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V8 = RoomVersion(
"8",
@@ -276,7 +276,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V9 = RoomVersion(
"9",
@@ -295,7 +295,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC3787 = RoomVersion(
"org.matrix.msc3787",
@@ -314,7 +314,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V10 = RoomVersion(
"10",
@@ -333,7 +333,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC2716v4 = RoomVersion(
"org.matrix.msc2716v4",
@@ -352,7 +352,7 @@ class RoomVersions:
msc2716_redactions=True,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC1767v10 = RoomVersion(
# MSC1767 (Extensible Events) based on room version "10"
@@ -372,7 +372,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
msc3931_push_features=[PushRuleRoomFlag.EXTENSIBLE_EVENTS],
)
+2 -6
View File
@@ -245,9 +245,7 @@ class ApplicationService:
return True
# likewise with the room's aliases (if it has any)
alias_list = await store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
)
alias_list = await store.get_aliases_for_room(room_id)
for alias in alias_list:
if self.is_room_alias_in_namespace(alias):
return True
@@ -313,9 +311,7 @@ class ApplicationService:
# Find all the rooms the sender is in
if self.is_interested_in_user(user_id.to_string()):
return True
room_ids = await store.get_rooms_for_user(
user_id.to_string(), on_invalidate=cache_context.invalidate
)
room_ids = await store.get_rooms_for_user(user_id.to_string())
# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
-3
View File
@@ -33,9 +33,6 @@ def validate_config(
config: the configuration value to be validated
config_path: the path within the config file. This will be used as a basis
for the error message.
Raises:
ConfigError, if validation fails.
"""
try:
jsonschema.validate(config, json_schema)
+21 -42
View File
@@ -13,13 +13,12 @@
# limitations under the License.
import logging
from typing import Any, Iterable, Optional, Tuple
from typing import Any, Iterable
from synapse.api.constants import EventTypes
from synapse.config._base import Config, ConfigError
from synapse.config._util import validate_config
from synapse.types import JsonDict
from synapse.types.state import StateFilter
logger = logging.getLogger(__name__)
@@ -27,20 +26,16 @@ logger = logging.getLogger(__name__)
class ApiConfig(Config):
section = "api"
room_prejoin_state: StateFilter
track_puppetted_users_ips: bool
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
validate_config(_MAIN_SCHEMA, config, ())
self.room_prejoin_state = StateFilter.from_types(
self._get_prejoin_state_entries(config)
)
self.room_prejoin_state = list(self._get_prejoin_state_types(config))
self.track_puppeted_user_ips = config.get("track_puppeted_user_ips", False)
def _get_prejoin_state_entries(
self, config: JsonDict
) -> Iterable[Tuple[str, Optional[str]]]:
"""Get the event types and state keys to include in the prejoin state."""
def _get_prejoin_state_types(self, config: JsonDict) -> Iterable[str]:
"""Get the event types to include in the prejoin state
Parses the config and returns an iterable of the event types to be included.
"""
room_prejoin_state_config = config.get("room_prejoin_state") or {}
# backwards-compatibility support for room_invite_state_types
@@ -55,39 +50,33 @@ class ApiConfig(Config):
logger.warning(_ROOM_INVITE_STATE_TYPES_WARNING)
for event_type in config["room_invite_state_types"]:
yield event_type, None
yield from config["room_invite_state_types"]
return
if not room_prejoin_state_config.get("disable_default_event_types"):
yield from _DEFAULT_PREJOIN_STATE_TYPES_AND_STATE_KEYS
yield from _DEFAULT_PREJOIN_STATE_TYPES
for entry in room_prejoin_state_config.get("additional_event_types", []):
if isinstance(entry, str):
yield entry, None
else:
yield entry
yield from room_prejoin_state_config.get("additional_event_types", [])
_ROOM_INVITE_STATE_TYPES_WARNING = """\
WARNING: The 'room_invite_state_types' configuration setting is now deprecated,
and replaced with 'room_prejoin_state'. New features may not work correctly
unless 'room_invite_state_types' is removed. See the config documentation at
https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#room_prejoin_state
for details of 'room_prejoin_state'.
unless 'room_invite_state_types' is removed. See the sample configuration file for
details of 'room_prejoin_state'.
--------------------------------------------------------------------------------
"""
_DEFAULT_PREJOIN_STATE_TYPES_AND_STATE_KEYS = [
(EventTypes.JoinRules, ""),
(EventTypes.CanonicalAlias, ""),
(EventTypes.RoomAvatar, ""),
(EventTypes.RoomEncryption, ""),
(EventTypes.Name, ""),
_DEFAULT_PREJOIN_STATE_TYPES = [
EventTypes.JoinRules,
EventTypes.CanonicalAlias,
EventTypes.RoomAvatar,
EventTypes.RoomEncryption,
EventTypes.Name,
# Per MSC1772.
(EventTypes.Create, ""),
EventTypes.Create,
# Per MSC3173.
(EventTypes.Topic, ""),
EventTypes.Topic,
]
@@ -101,17 +90,7 @@ _ROOM_PREJOIN_STATE_CONFIG_SCHEMA = {
"disable_default_event_types": {"type": "boolean"},
"additional_event_types": {
"type": "array",
"items": {
"oneOf": [
{"type": "string"},
{
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 2,
},
],
},
"items": {"type": "string"},
},
},
},
+7 -69
View File
@@ -52,7 +52,6 @@ from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
RoomVersions,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import MutableStateMap, StateMap, UserID, get_domain_from_id
@@ -342,80 +341,19 @@ def check_state_dependent_auth_rules(
logger.debug("Allowing! %s", event)
# Set of room versions where Synapse did not apply event key size limits
# in bytes, but rather in codepoints.
# In these room versions, we are more lenient with event size validation.
LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
RoomVersions.V1,
RoomVersions.V2,
RoomVersions.V3,
RoomVersions.V4,
RoomVersions.V5,
RoomVersions.V6,
RoomVersions.MSC2176,
RoomVersions.V7,
RoomVersions.V8,
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
RoomVersions.MSC2716v4,
RoomVersions.MSC1767v10,
}
def _check_size_limits(event: "EventBase") -> None:
"""
Checks the size limits in a PDU.
The entire size limit of the PDU is checked first.
Then the size of fields is checked, first in codepoints and then in bytes.
The codepoint size limits are only for Synapse compatibility.
Raises:
EventSizeError:
when a size limit has been violated.
unpersistable=True if Synapse never would have accepted the event and
the PDU must NOT be persisted.
unpersistable=False if a prior version of Synapse would have accepted the
event and so the PDU must be persisted as rejected to avoid
breaking the room.
"""
# Whole PDU check
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
raise EventSizeError("event too large", unpersistable=True)
# Codepoint size check: Synapse always enforced these limits, so apply
# them strictly.
if len(event.user_id) > 255:
raise EventSizeError("'user_id' too large", unpersistable=True)
raise EventSizeError("'user_id' too large")
if len(event.room_id) > 255:
raise EventSizeError("'room_id' too large", unpersistable=True)
raise EventSizeError("'room_id' too large")
if event.is_state() and len(event.state_key) > 255:
raise EventSizeError("'state_key' too large", unpersistable=True)
raise EventSizeError("'state_key' too large")
if len(event.type) > 255:
raise EventSizeError("'type' too large", unpersistable=True)
raise EventSizeError("'type' too large")
if len(event.event_id) > 255:
raise EventSizeError("'event_id' too large", unpersistable=True)
strict_byte_limits = (
event.room_version not in LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS
)
# Byte size check: if these fail, then be lenient to avoid breaking rooms.
if len(event.user_id.encode("utf-8")) > 255:
raise EventSizeError("'user_id' too large", unpersistable=strict_byte_limits)
if len(event.room_id.encode("utf-8")) > 255:
raise EventSizeError("'room_id' too large", unpersistable=strict_byte_limits)
if event.is_state() and len(event.state_key.encode("utf-8")) > 255:
raise EventSizeError("'state_key' too large", unpersistable=strict_byte_limits)
if len(event.type.encode("utf-8")) > 255:
raise EventSizeError("'type' too large", unpersistable=strict_byte_limits)
if len(event.event_id.encode("utf-8")) > 255:
raise EventSizeError("'event_id' too large", unpersistable=strict_byte_limits)
raise EventSizeError("'event_id' too large")
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
raise EventSizeError("event too large")
def _check_create(event: "EventBase") -> None:
+1 -1
View File
@@ -28,8 +28,8 @@ from synapse.event_auth import auth_types_for_event
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.storage.state import StateFilter
from synapse.types import EventID, JsonDict
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.stringutils import random_string
+1 -1
View File
@@ -23,7 +23,7 @@ from synapse.types import JsonDict, StateMap
if TYPE_CHECKING:
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
from synapse.types.state import StateFilter
from synapse.storage.state import StateFilter
@attr.s(slots=True, auto_attribs=True)
+1 -31
View File
@@ -28,14 +28,8 @@ from typing import (
)
import attr
from canonicaljson import encode_canonical_json
from synapse.api.constants import (
MAX_PDU_SIZE,
EventContentFields,
EventTypes,
RelationTypes,
)
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.types import JsonDict
@@ -680,27 +674,3 @@ def validate_canonicaljson(value: Any) -> None:
elif not isinstance(value, (bool, str)) and value is not None:
# Other potential JSON values (bool, None, str) are safe.
raise SynapseError(400, "Unknown JSON value", Codes.BAD_JSON)
def maybe_upsert_event_field(
event: EventBase, container: JsonDict, key: str, value: object
) -> bool:
"""Upsert an event field, but only if this doesn't make the event too large.
Returns true iff the upsert took place.
"""
if key in container:
old_value: object = container[key]
container[key] = value
# NB: here and below, we assume that passing a non-None `time_now` argument to
# get_pdu_json doesn't increase the size of the encoded result.
upsert_okay = len(encode_canonical_json(event.get_pdu_json())) <= MAX_PDU_SIZE
if not upsert_okay:
container[key] = old_value
else:
container[key] = value
upsert_okay = len(encode_canonical_json(event.get_pdu_json())) <= MAX_PDU_SIZE
if not upsert_okay:
del container[key]
return upsert_okay
+9 -20
View File
@@ -771,28 +771,17 @@ class FederationClient(FederationBase):
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
# MSC3743 specifies that servers should return a 404 or 405 with an errcode
# of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
# to an unknown method, respectively.
# There is no good way to detect an "unknown" endpoint.
#
# Older versions of servers don't properly handle this. This needs to be
# rather specific as some endpoints truly do return 404 errors.
# Dendrite returns a 404 (with a body of "404 page not found");
# Conduit returns a 404 (with no body); and Synapse returns a 400
# with M_UNRECOGNIZED.
#
# This needs to be rather specific as some endpoints truly do return 404
# errors.
return (
# 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
(e.code == 404 or e.code == 405)
and (
# Older Dendrites returned a text or empty body.
# Older Conduit returned an empty body.
not e.response
or e.response == b"404 page not found"
# The proper response JSON with M_UNRECOGNIZED errcode.
or synapse_error.errcode == Codes.UNRECOGNIZED
)
) or (
# Older Synapses returned a 400 error.
e.code == 400
and synapse_error.errcode == Codes.UNRECOGNIZED
)
e.code == 404 and (not e.response or e.response == b"404 page not found")
) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
async def _try_destination_list(
self,
@@ -641,7 +641,7 @@ class PerDestinationQueue:
if not message_id:
continue
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
edus = [
Edu(
+5 -2
View File
@@ -578,6 +578,9 @@ class ApplicationServicesHandler:
device_id,
), messages in recipient_device_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)
message_payload.append(
{
"to_user_id": user_id,
@@ -612,8 +615,8 @@ class ApplicationServicesHandler:
)
# Fetch the users who have modified their device list since then.
users_with_changed_device_lists = await self.store.get_all_devices_changed(
from_key, to_key=new_key
users_with_changed_device_lists = (
await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
)
# Filter out any users the application service is not interested in
+1 -1
View File
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
# TODO(faster_joins): this fetches and processes a bunch of data that we don't
# TODO(faster joins): this fetches and processes a bunch of data that we don't
# use. Could be replaced by a tighter query e.g.
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
partial_rooms = await self.store.get_partial_state_room_resync_info()
+14 -22
View File
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.constants import EduTypes, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
@@ -216,24 +216,14 @@ class DeviceMessageHandler:
"""
sender_user_id = requester.user.to_string()
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
# add an opentracing log entry for each message
for device_id, message_content in by_device.items():
log_kv(
{
"event": "send_to_device_message",
"user_id": user_id,
"device_id": device_id,
EventContentFields.TO_DEVICE_MSGID: message_content.get(
EventContentFields.TO_DEVICE_MSGID
),
}
)
# Ratelimit local cross-user key requests by the sending device.
if (
message_type == ToDeviceEventTypes.RoomKeyRequest
@@ -243,7 +233,6 @@ class DeviceMessageHandler:
requester, (sender_user_id, requester.device_id)
)
if not allowed:
log_kv({"message": f"dropping key requests to {user_id}"})
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
@@ -258,11 +247,18 @@ class DeviceMessageHandler:
"content": message_content,
"type": message_type,
"sender": sender_user_id,
"message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device
@@ -271,11 +267,7 @@ class DeviceMessageHandler:
remote_edu_contents = {}
for destination, messages in remote_messages.items():
# The EDU contains a "message_id" property which is used for
# idempotence. Make up a random one.
message_id = random_string(16)
log_kv({"destination": destination, "message_id": message_id})
log_kv({"destination": destination})
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
+1 -5
View File
@@ -70,8 +70,8 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -152,7 +152,6 @@ class FederationHandler:
self._federation_event_handler = hs.get_federation_event_handler()
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -1693,9 +1692,6 @@ class FederationHandler:
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()
# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
+1 -21
View File
@@ -43,7 +43,6 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
EventSizeError,
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
@@ -76,6 +75,7 @@ from synapse.replication.http.federation import (
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@@ -83,7 +83,6 @@ from synapse.types import (
UserID,
get_domain_from_id,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
@@ -1737,15 +1736,6 @@ class FederationEventHandler:
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
except EventSizeError as e:
if e.unpersistable:
# This event is completely unpersistable.
raise e
# Otherwise, we are somewhat lenient and just persist the event
# as rejected, for moderate compatibility with older Synapse
# versions.
logger.warning("While validating received event %r: %s", event, e)
context.rejected = RejectedReason.OVERSIZED_EVENT
events_and_contexts_to_persist.append((event, context))
@@ -1791,16 +1781,6 @@ class FederationEventHandler:
# TODO: use a different rejected reason here?
context.rejected = RejectedReason.AUTH_ERROR
return
except EventSizeError as e:
if e.unpersistable:
# This event is completely unpersistable.
raise e
# Otherwise, we are somewhat lenient and just persist the event
# as rejected, for moderate compatibility with older Synapse
# versions.
logger.warning("While validating received event %r: %s", event, e)
context.rejected = RejectedReason.OVERSIZED_EVENT
return
# next, check that we have all of the event's auth events.
#
+12 -19
View File
@@ -50,7 +50,6 @@ from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.utils import maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.logging import opentracing
@@ -60,6 +59,7 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@@ -70,7 +70,6 @@ from synapse.types import (
UserID,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util.caches.expiringcache import ExpiringCache
@@ -1740,15 +1739,12 @@ class EventCreationHandler:
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
maybe_upsert_event_field(
event,
event.unsigned,
"invite_room_state",
await self.store.get_stripped_room_state_from_event_context(
context,
self.room_prejoin_state_types,
membership_user_id=event.sender,
),
event.unsigned[
"invite_room_state"
] = await self.store.get_stripped_room_state_from_event_context(
context,
self.room_prejoin_state_types,
membership_user_id=event.sender,
)
invitee = UserID.from_string(event.state_key)
@@ -1766,14 +1762,11 @@ class EventCreationHandler:
event.signatures.update(returned_invite.signatures)
if event.content["membership"] == Membership.KNOCK:
maybe_upsert_event_field(
event,
event.unsigned,
"knock_room_state",
await self.store.get_stripped_room_state_from_event_context(
context,
self.room_prejoin_state_types,
),
event.unsigned[
"knock_room_state"
] = await self.store.get_stripped_room_state_from_event_context(
context,
self.room_prejoin_state_types,
)
if event.type == EventTypes.Redaction:
+1 -1
View File
@@ -27,9 +27,9 @@ from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
+5 -7
View File
@@ -1692,12 +1692,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
if from_key is not None:
# First get all users that have had a presence update
result = stream_change_cache.get_all_entities_changed(from_key)
updated_users = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
if result.hit:
updated_users = result.entities
if updated_users is not None:
# If we have the full list of changes for presence we can
# simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
@@ -1769,9 +1767,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
updated_users = None
if from_key:
# Only return updates since the last sync
result = self.store.presence_stream_cache.get_all_entities_changed(from_key)
if result.hit:
updated_users = result.entities
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if updated_users is not None:
# Get the actual presence update for each change
+1 -1
View File
@@ -46,8 +46,8 @@ from synapse.replication.http.register import (
ReplicationRegisterServlet,
)
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID, create_requester
from synapse.types.state import StateFilter
if TYPE_CHECKING:
from synapse.server import HomeServer
+1 -1
View File
@@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -76,7 +77,6 @@ from synapse.types import (
UserID,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import stringutils
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_and_validate_server_name
+1 -1
View File
@@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
from synapse.storage.state import StateFilter
from synapse.types import (
JsonDict,
Requester,
@@ -44,7 +45,6 @@ from synapse.types import (
create_requester,
get_domain_from_id,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_left_room
+1 -1
View File
@@ -23,8 +23,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, StreamKeyType, UserID
from synapse.types.state import StateFilter
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
+10 -24
View File
@@ -31,24 +31,19 @@ from typing import (
import attr
from prometheus_client import Counter
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -60,7 +55,6 @@ from synapse.types import (
StreamToken,
UserID,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
@@ -1534,12 +1528,10 @@ class SyncHandler:
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
cache_result = self.store.get_cached_device_list_changes(
changed_users = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
if cache_result.hit:
changed_users = cache_result.entities
if changed_users is not None:
result = await self.store.get_rooms_for_users(changed_users)
for changed_user_id, entries in result.items():
@@ -1592,7 +1584,6 @@ class SyncHandler:
else:
return DeviceListUpdates()
@trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
@@ -1612,16 +1603,11 @@ class SyncHandler:
)
for message in messages:
log_kv(
{
"event": "to_device_message",
"sender": message["sender"],
"type": message["type"],
EventContentFields.TO_DEVICE_MSGID: message["content"].get(
EventContentFields.TO_DEVICE_MSGID
),
}
)
# We pop here as we shouldn't be sending the message ID down
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
+4 -4
View File
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler):
if last_id == current_id:
return [], current_id, False
result = self._typing_stream_change_cache.get_all_entities_changed(last_id)
changed_rooms: Optional[
Iterable[str]
] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
if result.hit:
changed_rooms: Iterable[str] = result.entities
else:
if changed_rooms is None:
changed_rooms = self._room_serials
rows = []
+1 -18
View File
@@ -577,24 +577,7 @@ def _unrecognised_request_handler(request: Request) -> NoReturn:
Args:
request: Unused, but passed in to match the signature of ServletCallback.
"""
raise UnrecognizedRequestError(code=404)
class UnrecognizedRequestResource(resource.Resource):
"""
Similar to twisted.web.resource.NoResource, but returns a JSON 404 with an
errcode of M_UNRECOGNIZED.
"""
def render(self, request: SynapseRequest) -> int:
f = failure.Failure(UnrecognizedRequestError(code=404))
return_json_error(f, request, None)
# A response has already been sent but Twisted requires either NOT_DONE_YET
# or the response bytes as a return value.
return NOT_DONE_YET
def getChild(self, name: str, request: Request) -> resource.Resource:
return self
raise UnrecognizedRequestError()
class RootRedirect(resource.Resource):
+2 -9
View File
@@ -292,15 +292,8 @@ logger = logging.getLogger(__name__)
class SynapseTags:
# The message ID of any to_device EDU processed
TO_DEVICE_EDU_ID = "to_device.edu_id"
# Details about to-device messages
TO_DEVICE_TYPE = "to_device.type"
TO_DEVICE_SENDER = "to_device.sender"
TO_DEVICE_RECIPIENT = "to_device.recipient"
TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID
# The message ID of any to_device message processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
+1 -1
View File
@@ -111,6 +111,7 @@ from synapse.storage.background_updates import (
)
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.types import (
DomainSpecificString,
JsonDict,
@@ -123,7 +124,6 @@ from synapse.types import (
UserProfile,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import Clock
from synapse.util.async_helpers import maybe_awaitable
from synapse.util.caches.descriptors import CachedFunction, cached
+6 -2
View File
@@ -35,8 +35,8 @@ from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.state import StateFilter
from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
from synapse.types.state import StateFilter
from synapse.util.caches import register_cache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_event_for_clients_with_state
@@ -342,6 +342,10 @@ class BulkPushRuleEvaluator:
for user_id, level in notification_levels.items():
notification_levels[user_id] = int(level)
room_version_features = event.room_version.msc3931_push_features
if not room_version_features:
room_version_features = []
evaluator = PushRuleEvaluator(
_flatten_dict(event, room_version=event.room_version),
room_member_count,
@@ -349,7 +353,7 @@ class BulkPushRuleEvaluator:
notification_levels,
related_events,
self._related_event_match_enabled,
event.room_version.msc3931_push_features,
room_version_features,
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
)
+1 -1
View File
@@ -37,8 +37,8 @@ from synapse.push.push_types import (
TemplateVars,
)
from synapse.storage.databases.main.event_push_actions import EmailPushAction
from synapse.storage.state import StateFilter
from synapse.types import StateMap, UserID
from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
-11
View File
@@ -36,14 +36,12 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
@@ -119,7 +117,6 @@ class ReplicationDataHandler:
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
self._state_storage_controller = hs.get_storage_controllers().state
self._notify_pushers = hs.config.worker.start_pushers
self._pusher_pool = hs.get_pusherpool()
@@ -239,14 +236,6 @@ class ReplicationDataHandler:
self.notifier.notify_user_joined_room(
row.data.event_id, row.data.room_id
)
elif stream_name == UnPartialStatedRoomStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedRoomStreamRow)
# Wake up any tasks waiting for the room to be un-partial-stated.
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
@@ -42,7 +42,6 @@ from synapse.replication.tcp.streams._base import (
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
STREAMS_MAP = {
stream.NAME: stream
@@ -62,7 +61,6 @@ STREAMS_MAP = {
TagAccountDataStream,
AccountDataStream,
UserSignatureStream,
UnPartialStatedRoomStream,
)
}
@@ -82,5 +80,4 @@ __all__ = [
"TagAccountDataStream",
"AccountDataStream",
"UserSignatureStream",
"UnPartialStatedRoomStream",
]
@@ -1,48 +0,0 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
import attr
from synapse.replication.tcp.streams import Stream
from synapse.replication.tcp.streams._base import current_token_without_instance
if TYPE_CHECKING:
from synapse.server import HomeServer
@attr.s(slots=True, frozen=True, auto_attribs=True)
class UnPartialStatedRoomStreamRow:
# ID of the room that has been un-partial-stated.
room_id: str
class UnPartialStatedRoomStream(Stream):
"""
Stream to notify about rooms becoming un-partial-stated;
that is, when the background sync finishes such that we now have full state for
the room.
"""
NAME = "un_partial_stated_room"
ROW_TYPE = UnPartialStatedRoomStreamRow
def __init__(self, hs: "HomeServer"):
store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
# TODO(faster_joins, multiple writers): we need to account for instance names
current_token_without_instance(store.get_un_partial_stated_rooms_token),
store.get_un_partial_stated_rooms_from_stream,
)
+3 -3
View File
@@ -13,13 +13,13 @@
<body>
<header class="mx_Header">
{% if app_name == "Riot" %}
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{% elif app_name == "Vector" %}
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{% elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{% else %}
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{% endif %}
</header>
+3 -3
View File
@@ -21,13 +21,13 @@
</td>
<td class="logo">
{% if app_name == "Riot" %}
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{% elif app_name == "Vector" %}
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{% elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{% else %}
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{% endif %}
</td>
</tr>
+3 -3
View File
@@ -22,13 +22,13 @@
</td>
<td class="logo">
{%- if app_name == "Riot" %}
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
{%- elif app_name == "Vector" %}
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
{%- elif app_name == "Element" %}
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
{%- else %}
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
{%- endif %}
</td>
</tr>
+1 -1
View File
@@ -34,9 +34,9 @@ from synapse.rest.admin._base import (
assert_user_is_admin,
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.types.state import StateFilter
from synapse.util import json_decoder
if TYPE_CHECKING:
+1 -4
View File
@@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.types import EventID, JsonDict, RoomID
from synapse.types import JsonDict
from ._base import client_patterns
@@ -56,9 +56,6 @@ class ReceiptRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
if not RoomID.is_valid(room_id) or not event_id.startswith(EventID.SIGIL):
raise SynapseError(400, "A valid room ID and event ID must be specified")
if receipt_type not in self._known_receipt_types:
raise SynapseError(
400,
+13 -3
View File
@@ -55,9 +55,9 @@ from synapse.logging.opentracing import set_tag
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID
from synapse.types.state import StateFilter
from synapse.util import json_decoder
from synapse.util.cancellation import cancellable
from synapse.util.stringutils import parse_and_validate_server_name, random_string
@@ -396,7 +396,12 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request, allow_empty_body=True)
try:
content = parse_json_object_from_request(request)
except Exception:
# Turns out we used to ignore the body entirely, and some clients
# cheekily send invalid bodies.
content = {}
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
@@ -947,7 +952,12 @@ class RoomMembershipRestServlet(TransactionRestServlet):
}:
raise AuthError(403, "Guest access not allowed")
content = parse_json_object_from_request(request, allow_empty_body=True)
try:
content = parse_json_object_from_request(request)
except Exception:
# Turns out we used to ignore the body entirely, and some clients
# cheekily send invalid bodies.
content = {}
if membership_action == "invite" and all(
key in content for key in ("medium", "address")
+1
View File
@@ -46,6 +46,7 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
+2 -2
View File
@@ -63,8 +63,8 @@ class UserDirectorySearchRestServlet(RestServlet):
body = parse_json_object_from_request(request)
limit = int(body.get("limit", 10))
limit = max(min(limit, 50), 0)
limit = body.get("limit", 10)
limit = min(limit, 50)
try:
search_term = body["search_term"]
+2 -2
View File
@@ -24,6 +24,7 @@ from matrix_common.types.mxc_uri import MXCUri
import twisted.internet.error
import twisted.web.http
from twisted.internet.defer import Deferred
from twisted.web.resource import Resource
from synapse.api.errors import (
FederationDeniedError,
@@ -34,7 +35,6 @@ from synapse.api.errors import (
)
from synapse.config._base import ConfigError
from synapse.config.repository import ThumbnailRequirement
from synapse.http.server import UnrecognizedRequestResource
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -1046,7 +1046,7 @@ class MediaRepository:
return removed_media, len(removed_media)
class MediaRepositoryResource(UnrecognizedRequestResource):
class MediaRepositoryResource(Resource):
"""File uploading and downloading.
Uploads are POSTed to a resource which returns a token which is used to GET
+1 -1
View File
@@ -44,8 +44,8 @@ from synapse.logging.context import ContextResourceUsage
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import StateMap
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
+9 -46
View File
@@ -544,48 +544,6 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index.
"""
async def updater(progress: JsonDict, batch_size: int) -> int:
await self.create_index_in_background(
index_name=index_name,
table=table,
columns=columns,
where_clause=where_clause,
unique=unique,
psql_only=psql_only,
replaces_index=replaces_index,
)
await self._end_background_update(update_name)
return 1
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def create_index_in_background(
self,
index_name: str,
table: str,
columns: Iterable[str],
where_clause: Optional[str] = None,
unique: bool = False,
psql_only: bool = False,
replaces_index: Optional[str] = None,
) -> None:
"""Add an index in the background.
Args:
update_name: update_name to register for
index_name: name of index to add
table: table to add index to
columns: columns/expressions to include in index
where_clause: A WHERE clause to specify a partial unique index.
unique: true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
replaces_index: The name of an index that this index replaces.
The named index will be dropped upon completion of the new index.
"""
def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
@@ -660,11 +618,16 @@ class BackgroundUpdater:
else:
runner = create_index_sqlite
if runner is None:
return
async def updater(progress: JsonDict, batch_size: int) -> int:
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
@@ -58,13 +58,13 @@ from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
StateMap,
get_domain_from_id,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
from synapse.util.metrics import Measure
+1 -1
View File
@@ -31,12 +31,12 @@ from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateMap
from synapse.types.state import StateFilter
from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
+1 -2
View File
@@ -667,8 +667,7 @@ class DatabasePool:
)
# also check variables referenced in func's closure
if inspect.isfunction(func):
# Keep the cast for now---it helps PyCharm to understand what `func` is.
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
f = cast(types.FunctionType, func)
if f.__closure__:
for i, cell in enumerate(f.__closure__):
if inspect.isgenerator(cell.cell_contents):
+17 -75
View File
@@ -26,15 +26,8 @@ from typing import (
cast,
)
from synapse.api.constants import EventContentFields
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import (
SynapseTags,
log_kv,
set_tag,
start_active_span,
trace,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -404,17 +397,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(recipient_user_id, recipient_device_id), []
).append(message_dict)
# start a new span for each message, so that we can tag each separately
with start_active_span("get_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
if limit is not None and rowcount == limit:
# We ended up bumping up against the message limit. There may be more messages
# to retrieve. Return what we have, as well as the last stream position that
@@ -696,35 +678,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
for destination, edu in remote_messages_by_destination.items():
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Queued outgoing to-device messages with "
"stream_id %i, EDU message_id %s, type %s for %s: %s",
stream_id,
edu["message_id"],
edu["type"],
destination,
[
f"{user_id}/{device_id} (msgid "
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
for (user_id, messages_by_device) in edu["messages"].items()
for (device_id, msg) in messages_by_device.items()
],
)
for (user_id, messages_by_device) in edu["messages"].items():
for (device_id, msg) in messages_by_device.items():
with start_active_span("store_outgoing_to_device_message"):
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg.get(EventContentFields.TO_DEVICE_MSGID),
)
if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
@@ -842,19 +801,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Only insert into the local inbox if the device exists on
# this server
device_id = row["device_id"]
with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
message_json = json_encoder.encode(msg)
message_json = json_encoder.encode(messages_by_device[device_id])
messages_json_for_user[device_id] = message_json
if messages_json_for_user:
@@ -874,20 +821,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Stored to-device messages with stream_id %i: %s",
stream_id,
[
f"{user_id}/{device_id} (msgid "
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
for (
user_id,
messages_by_device,
) in messages_by_user_then_device.items()
for (device_id, msg) in messages_by_device.items()
],
)
issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
+40 -71
View File
@@ -58,10 +58,7 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.stream_change_cache import (
AllEntitiesChangedResult,
StreamChangeCache,
)
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -802,66 +799,18 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_cached_device_list_changes(
self,
from_key: int,
) -> AllEntitiesChangedResult:
) -> Optional[List[str]]:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""
return self._device_list_stream_cache.get_all_entities_changed(from_key)
@cancellable
async def get_all_devices_changed(
self,
from_key: int,
to_key: int,
) -> Set[str]:
"""Get all users whose devices have changed in the given range.
Args:
from_key: The minimum device lists stream token to query device list
changes for, exclusive.
to_key: The maximum device lists stream token to query device list
changes for, inclusive.
Returns:
The set of user_ids whose devices have changed since `from_key`
(exclusive) until `to_key` (inclusive).
"""
result = self._device_list_stream_cache.get_all_entities_changed(from_key)
if result.hit:
# We know which users might have changed devices.
if not result.entities:
# If no users then we can return early.
return set()
# Otherwise we need to filter down the list
return await self.get_users_whose_devices_changed(
from_key, result.entities, to_key
)
# If the cache didn't tell us anything, we just need to query the full
# range.
sql = """
SELECT DISTINCT user_id FROM device_lists_stream
WHERE ? < stream_id AND stream_id <= ?
"""
rows = await self.db_pool.execute(
"get_all_devices_changed",
None,
sql,
from_key,
to_key,
)
return {u for u, in rows}
@cancellable
async def get_users_whose_devices_changed(
self,
from_key: int,
user_ids: Collection[str],
user_ids: Optional[Collection[str]] = None,
to_key: Optional[int] = None,
) -> Set[str]:
"""Get set of users whose devices have changed since `from_key` that
@@ -881,32 +830,52 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"""
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
user_ids, from_key
)
user_ids_to_check: Optional[Collection[str]]
if user_ids is None:
# Get set of all users that have had device list changes since 'from_key'
user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
from_key
)
else:
# The same as above, but filter results to only those users in 'user_ids'
user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
user_ids, from_key
)
# If an empty set was returned, there's nothing to do.
if not user_ids_to_check:
if user_ids_to_check is not None and not user_ids_to_check:
return set()
if to_key is None:
to_key = self._device_list_id_gen.get_current_token()
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
sql = """
stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]
if to_key:
stream_id_where_clause += " AND stream_id <= ?"
sql_args.append(to_key)
sql = f"""
SELECT DISTINCT user_id FROM device_lists_stream
WHERE ? < stream_id AND stream_id <= ? AND %s
WHERE {stream_id_where_clause}
"""
changes: Set[str] = set()
# If the stream change cache gave us no information, fetch *all*
# users between the stream IDs.
if user_ids_to_check is None:
txn.execute(sql, sql_args)
return {user_id for user_id, in txn}
# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql % (clause,), [from_key, to_key] + args)
changes.update(user_id for user_id, in txn)
# Otherwise, fetch changes for the given users.
else:
changes: Set[str] = set()
# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + " AND " + clause, sql_args + args)
changes.update(user_id for user_id, in txn)
return changes
@@ -140,7 +140,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
@cancellable
async def get_e2e_device_keys_for_cs_api(
self,
query_list: Collection[Tuple[str, Optional[str]]],
query_list: List[Tuple[str, Optional[str]]],
include_displaynames: bool = True,
) -> Dict[str, Dict[str, JsonDict]]:
"""Fetch a list of device keys, formatted suitably for the C/S API.
+13 -18
View File
@@ -16,11 +16,11 @@ import logging
import threading
import weakref
from enum import Enum, auto
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
Collection,
Container,
Dict,
Iterable,
List,
@@ -76,7 +76,6 @@ from synapse.storage.util.id_generators import (
)
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
@@ -880,7 +879,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_stripped_room_state_from_event_context(
self,
context: EventContext,
state_keys_to_include: StateFilter,
state_types_to_include: Container[str],
membership_user_id: Optional[str] = None,
) -> List[JsonDict]:
"""
@@ -893,7 +892,7 @@ class EventsWorkerStore(SQLBaseStore):
Args:
context: The event context to retrieve state of the room from.
state_keys_to_include: The state events to include, for each event type.
state_types_to_include: The type of state events to include.
membership_user_id: An optional user ID to include the stripped membership state
events of. This is useful when generating the stripped state of a room for
invites. We want to send membership events of the inviter, so that the
@@ -902,25 +901,21 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
A list of dictionaries, each representing a stripped state event from the room.
"""
if membership_user_id:
types = chain(
state_keys_to_include.to_types(),
[(EventTypes.Member, membership_user_id)],
)
filter = StateFilter.from_types(types)
else:
filter = state_keys_to_include
selected_state_ids = await context.get_current_state_ids(filter)
current_state_ids = await context.get_current_state_ids()
# We know this event is not an outlier, so this must be
# non-None.
assert selected_state_ids is not None
assert current_state_ids is not None
# Confusingly, get_current_state_events may return events that are discarded by
# the filter, if they're in context._state_delta_due_to_event. Strip these away.
selected_state_ids = filter.filter_state(selected_state_ids)
# The state to include
state_to_include_ids = [
e_id
for k, e_id in current_state_ids.items()
if k[0] in state_types_to_include
or (membership_user_id and k == (EventTypes.Member, membership_user_id))
]
state_to_include = await self.get_events(selected_state_ids.values())
state_to_include = await self.get_events(state_to_include_ids)
return [
{
+39 -12
View File
@@ -924,6 +924,39 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
return batch_size
async def _create_receipts_index(self, index_name: str, table: str) -> None:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts."""
def _create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=True)
try:
c = conn.cursor()
# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
c.execute(sql)
finally:
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=False)
await self.db_pool.runWithConnection(_create_index)
async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
@@ -966,12 +999,9 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
await self.db_pool.updates.create_index_in_background(
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
await self._create_receipts_index(
"receipts_linearized_unique_index",
"receipts_linearized",
)
await self.db_pool.updates._end_background_update(
@@ -1020,12 +1050,9 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn,
)
await self.db_pool.updates.create_index_in_background(
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
await self._create_receipts_index(
"receipts_graph_unique_index",
"receipts_graph",
)
await self.db_pool.updates._end_background_update(
+66 -171
View File
@@ -1,5 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -50,14 +50,8 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.storage.util.id_generators import IdGenerator
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -120,26 +114,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="un_partial_stated_room_stream",
instance_name=self._instance_name,
tables=[
("un_partial_stated_room_stream", "instance_name", "stream_id")
],
sequence_name="un_partial_stated_room_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)
else:
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
db_conn, "un_partial_stated_room_stream", "stream_id"
)
async def store_room(
self,
room_id: str,
@@ -1242,6 +1216,70 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
async def clear_partial_state_room(self, room_id: str) -> bool:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
`True` if the partial state flag has been cleared successfully.
`False` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
await self.db_pool.runInteraction(
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
)
return True
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False
def _clear_partial_state_room_txn(
self, txn: LoggingTransaction, room_id: str
) -> None:
DatabasePool.simple_delete_txn(
txn,
table="partial_state_rooms_servers",
keyvalues={"room_id": room_id},
)
DatabasePool.simple_delete_one_txn(
txn,
table="partial_state_rooms",
keyvalues={"room_id": room_id},
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
self._invalidate_cache_and_stream(
txn, self.get_partial_state_servers_at_join, (room_id,)
)
# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@@ -1277,66 +1315,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
return result["join_event_id"], result["device_lists_stream_id"]
def get_un_partial_stated_rooms_token(self) -> int:
# TODO(faster_joins, multiple writers): This is inappropriate if there
# are multiple writers because workers that don't write often will
# hold all readers up.
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
return [], current_id, False
def get_un_partial_stated_rooms_from_stream_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
sql = """
SELECT stream_id, room_id
FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, instance_name, limit))
updates = [(row[0], (row[1],)) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True
return updates, upto_token, limited
return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_from_stream",
get_un_partial_stated_rooms_from_stream_txn,
)
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1828,8 +1806,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._instance_name = hs.get_instance_name()
async def upsert_room_on_join(
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
) -> None:
@@ -2294,84 +2270,3 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self.is_room_blocked,
(room_id,),
)
async def clear_partial_state_room(self, room_id: str) -> bool:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
`True` if the partial state flag has been cleared successfully.
`False` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
await self.db_pool.runInteraction(
"clear_partial_state_room",
self._clear_partial_state_room_txn,
room_id,
un_partial_state_room_stream_id,
)
return True
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False
def _clear_partial_state_room_txn(
self,
txn: LoggingTransaction,
room_id: str,
un_partial_state_room_stream_id: int,
) -> None:
DatabasePool.simple_delete_txn(
txn,
table="partial_state_rooms_servers",
keyvalues={"room_id": room_id},
)
DatabasePool.simple_delete_one_txn(
txn,
table="partial_state_rooms",
keyvalues={"room_id": room_id},
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
self._invalidate_cache_and_stream(
txn, self.get_partial_state_servers_at_join, (room_id,)
)
DatabasePool.simple_insert_txn(
txn,
"un_partial_stated_room_stream",
{
"stream_id": un_partial_state_room_stream_id,
"instance_name": self._instance_name,
"room_id": room_id,
},
)
# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id <= ?
"""
txn.execute(sql, (device_lists_stream_id,))
+1 -1
View File
@@ -33,8 +33,8 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateFilter
from synapse.types import JsonDict, JsonMapping, StateMap
from synapse.types.state import StateFilter
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.cancellation import cancellable

Some files were not shown because too many files have changed in this diff Show More