Compare commits
55 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bbd59237fc | |||
| e70f398f4a | |||
| 822646b636 | |||
| b8cf480fa9 | |||
| 62ed877433 | |||
| e2a1adbf5d | |||
| 3d87847ecc | |||
| 7982891794 | |||
| b5b5f66084 | |||
| 74b89c2761 | |||
| 527366f962 | |||
| b087964875 | |||
| 2a3cd59dd0 | |||
| a5d8fee097 | |||
| ceb7be56a6 | |||
| eb32bc5056 | |||
| 4ea8745724 | |||
| 373c485d8c | |||
| 3ac412b4e2 | |||
| 94bc21e69f | |||
| c2de2ca630 | |||
| a58b550eac | |||
| c369e95691 | |||
| 9d8a3234ba | |||
| da77720752 | |||
| f3ad68c343 | |||
| dfe8febe47 | |||
| 60c3fea327 | |||
| 2506dd7641 | |||
| be3a8a85e3 | |||
| 22e91b8019 | |||
| 96251af50d | |||
| d69bf3b24c | |||
| 9a9568168a | |||
| cf1059d045 | |||
| 9e82caac45 | |||
| 66d47b44cd | |||
| bb9f156978 | |||
| 9b6224577e | |||
| a16931f30d | |||
| 5d7c35b4d9 | |||
| dc6b60f68d | |||
| cb59e08062 | |||
| cee9445884 | |||
| 6a8310f3df | |||
| 501f62d1a6 | |||
| e1779bc69f | |||
| 93ac3c197e | |||
| 05eb55f57d | |||
| 057cc7850a | |||
| de6bb61062 | |||
| 7558d294ae | |||
| 680a8d4e9e | |||
| 802539159e | |||
| 8de5c59fd3 |
@@ -21,7 +21,7 @@ endblock
|
||||
|
||||
block Install Complement Dependencies
|
||||
sudo apt-get -qq update && sudo apt-get install -qqy libolm3 libolm-dev
|
||||
go get -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
|
||||
go install -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
|
||||
endblock
|
||||
|
||||
block Install custom gotestfmt template
|
||||
|
||||
@@ -197,8 +197,12 @@ jobs:
|
||||
- run: sudo apt-get -qq install xmlsec1
|
||||
- name: Set up PostgreSQL ${{ matrix.job.postgres-version }}
|
||||
if: ${{ matrix.job.postgres-version }}
|
||||
# 1. Mount postgres data files onto a tmpfs in-memory filesystem to reduce overhead of docker's overlayfs layer.
|
||||
# 2. Expose the unix socket for postgres. This removes latency of using docker-proxy for connections.
|
||||
run: |
|
||||
docker run -d -p 5432:5432 \
|
||||
--tmpfs /var/lib/postgres:rw,size=6144m \
|
||||
--mount 'type=bind,src=/var/run/postgresql,dst=/var/run/postgresql' \
|
||||
-e POSTGRES_PASSWORD=postgres \
|
||||
-e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
|
||||
postgres:${{ matrix.job.postgres-version }}
|
||||
@@ -220,10 +224,10 @@ jobs:
|
||||
if: ${{ matrix.job.postgres-version }}
|
||||
timeout-minutes: 2
|
||||
run: until pg_isready -h localhost; do sleep 1; done
|
||||
- run: poetry run trial --jobs=2 tests
|
||||
- run: poetry run trial --jobs=6 tests
|
||||
env:
|
||||
SYNAPSE_POSTGRES: ${{ matrix.job.database == 'postgres' || '' }}
|
||||
SYNAPSE_POSTGRES_HOST: localhost
|
||||
SYNAPSE_POSTGRES_HOST: /var/run/postgresql
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
- name: Dump logs
|
||||
@@ -292,7 +296,7 @@ jobs:
|
||||
python-version: '3.7'
|
||||
extras: "all test"
|
||||
|
||||
- run: poetry run trial -j2 tests
|
||||
- run: poetry run trial -j6 tests
|
||||
- name: Dump logs
|
||||
# Logs are most useful when the command fails, always include them.
|
||||
if: ${{ always() }}
|
||||
|
||||
+79
-2
@@ -1,8 +1,85 @@
|
||||
Synapse 1.73.0rc2 (2022-12-01)
|
||||
Synapse 1.74.0rc1 (2022-12-13)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Improve user search for international display names. ([\#14464](https://github.com/matrix-org/synapse/issues/14464))
|
||||
- Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`. ([\#14490](https://github.com/matrix-org/synapse/issues/14490), [\#14525](https://github.com/matrix-org/synapse/issues/14525))
|
||||
- Add new `push.enabled` config option to allow opting out of push notification calculation. ([\#14551](https://github.com/matrix-org/synapse/issues/14551), [\#14619](https://github.com/matrix-org/synapse/issues/14619))
|
||||
- Advertise support for Matrix 1.5 on `/_matrix/client/versions`. ([\#14576](https://github.com/matrix-org/synapse/issues/14576))
|
||||
- Improve opentracing and logging for to-device message handling. ([\#14598](https://github.com/matrix-org/synapse/issues/14598))
|
||||
- Allow selecting "prejoin" events by state keys in addition to event types. ([\#14642](https://github.com/matrix-org/synapse/issues/14642))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. ([\#14435](https://github.com/matrix-org/synapse/issues/14435), [\#14592](https://github.com/matrix-org/synapse/issues/14592), [\#14604](https://github.com/matrix-org/synapse/issues/14604))
|
||||
- Suppress a spurious warning when `POST /rooms/<room_id>/<membership>/`, `POST /join/<room_id_or_alias`, or the unspecced `PUT /join/<room_id_or_alias>/<txn_id>` receive an empty HTTP request body. ([\#14600](https://github.com/matrix-org/synapse/issues/14600))
|
||||
- Return spec-compliant JSON errors when unknown endpoints are requested. ([\#14620](https://github.com/matrix-org/synapse/issues/14620), [\#14621](https://github.com/matrix-org/synapse/issues/14621))
|
||||
- Update html templates to load images over HTTPS. Contributed by @ashfame. ([\#14625](https://github.com/matrix-org/synapse/issues/14625))
|
||||
- Fix a long-standing bug where the user directory would return 1 more row than requested. ([\#14631](https://github.com/matrix-org/synapse/issues/14631))
|
||||
- Reject invalid read receipt requests with empty room or event IDs. Contributed by Nick @ Beeper (@fizzadar). ([\#14632](https://github.com/matrix-org/synapse/issues/14632))
|
||||
- Fix a bug introduced in Synapse 1.67.0 where not specifying a config file or a server URL would lead to the `register_new_matrix_user` script failing. ([\#14637](https://github.com/matrix-org/synapse/issues/14637))
|
||||
- Fix a long-standing bug where the user directory and room/user stats might be out of sync. ([\#14639](https://github.com/matrix-org/synapse/issues/14639), [\#14643](https://github.com/matrix-org/synapse/issues/14643))
|
||||
- Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted. ([\#14650](https://github.com/matrix-org/synapse/issues/14650))
|
||||
- Improve validation of field size limits in events. ([\#14664](https://github.com/matrix-org/synapse/issues/14664))
|
||||
- Fix bugs introduced in Synapse 1.55.0 and 1.69.0 where application services would not be notified of events in the correct rooms, due to stale caches. ([\#14670](https://github.com/matrix-org/synapse/issues/14670))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Update worker settings for `pusher` and `federation_sender` functionality. ([\#14493](https://github.com/matrix-org/synapse/issues/14493))
|
||||
- Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages. ([\#14517](https://github.com/matrix-org/synapse/issues/14517))
|
||||
- Remove old, incorrect minimum postgres version note and replace with a link to the [Dependency Deprecation Policy](https://matrix-org.github.io/synapse/v1.73/deprecation_policy.html). ([\#14590](https://github.com/matrix-org/synapse/issues/14590))
|
||||
- Add Single-Sign On setup instructions for Mastodon-based instances. ([\#14594](https://github.com/matrix-org/synapse/issues/14594))
|
||||
- Change `turn_allow_guests` example value to lowercase `true`. ([\#14634](https://github.com/matrix-org/synapse/issues/14634))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar). ([\#14255](https://github.com/matrix-org/synapse/issues/14255))
|
||||
- Faster remote room joins: stream the un-partial-stating of rooms over replication. ([\#14473](https://github.com/matrix-org/synapse/issues/14473), [\#14474](https://github.com/matrix-org/synapse/issues/14474))
|
||||
- Share the `ClientRestResource` for both workers and the main process. ([\#14528](https://github.com/matrix-org/synapse/issues/14528))
|
||||
- Add `--editable` flag to `complement.sh` which uses an editable install of Synapse for faster turn-around times whilst developing iteratively. ([\#14548](https://github.com/matrix-org/synapse/issues/14548))
|
||||
- Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room. ([\#14549](https://github.com/matrix-org/synapse/issues/14549))
|
||||
- Modernize unit tests configuration related to workers. ([\#14568](https://github.com/matrix-org/synapse/issues/14568))
|
||||
- Bump jsonschema from 4.17.0 to 4.17.3. ([\#14591](https://github.com/matrix-org/synapse/issues/14591))
|
||||
- Fix Rust lint CI. ([\#14602](https://github.com/matrix-org/synapse/issues/14602))
|
||||
- Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1. ([\#14607](https://github.com/matrix-org/synapse/issues/14607))
|
||||
- Alter some unit test environment parameters to decrease time spent running tests. ([\#14610](https://github.com/matrix-org/synapse/issues/14610))
|
||||
- Switch to Go recommended installation method for `gotestfmt` template in CI. ([\#14611](https://github.com/matrix-org/synapse/issues/14611))
|
||||
- Bump phonenumbers from 8.13.0 to 8.13.1. ([\#14612](https://github.com/matrix-org/synapse/issues/14612))
|
||||
- Bump types-setuptools from 65.5.0.3 to 65.6.0.1. ([\#14613](https://github.com/matrix-org/synapse/issues/14613))
|
||||
- Bump twine from 4.0.1 to 4.0.2. ([\#14614](https://github.com/matrix-org/synapse/issues/14614))
|
||||
- Bump types-requests from 2.28.11.2 to 2.28.11.5. ([\#14615](https://github.com/matrix-org/synapse/issues/14615))
|
||||
- Bump cryptography from 38.0.3 to 38.0.4. ([\#14616](https://github.com/matrix-org/synapse/issues/14616))
|
||||
- Remove useless cargo install with apt from Dockerfile. ([\#14636](https://github.com/matrix-org/synapse/issues/14636))
|
||||
- Bump certifi from 2021.10.8 to 2022.12.7. ([\#14645](https://github.com/matrix-org/synapse/issues/14645))
|
||||
- Bump flake8-bugbear from 22.10.27 to 22.12.6. ([\#14656](https://github.com/matrix-org/synapse/issues/14656))
|
||||
- Bump packaging from 21.3 to 22.0. ([\#14657](https://github.com/matrix-org/synapse/issues/14657))
|
||||
- Bump types-pillow from 9.3.0.1 to 9.3.0.4. ([\#14658](https://github.com/matrix-org/synapse/issues/14658))
|
||||
- Bump serde from 1.0.148 to 1.0.150. ([\#14659](https://github.com/matrix-org/synapse/issues/14659))
|
||||
- Bump phonenumbers from 8.13.1 to 8.13.2. ([\#14660](https://github.com/matrix-org/synapse/issues/14660))
|
||||
- Bump authlib from 1.1.0 to 1.2.0. ([\#14661](https://github.com/matrix-org/synapse/issues/14661))
|
||||
- Move `StateFilter` to `synapse.types`. ([\#14668](https://github.com/matrix-org/synapse/issues/14668))
|
||||
- Improve type hints. ([\#14597](https://github.com/matrix-org/synapse/issues/14597), [\#14646](https://github.com/matrix-org/synapse/issues/14646), [\#14671](https://github.com/matrix-org/synapse/issues/14671))
|
||||
|
||||
|
||||
Synapse 1.73.0 (2022-12-06)
|
||||
===========================
|
||||
|
||||
Please note that legacy Prometheus metric names have been removed in this release; see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.73/docs/upgrade.md#legacy-prometheus-metric-names-have-now-been-removed) for more details.
|
||||
|
||||
No significant changes since 1.73.0rc2.
|
||||
|
||||
|
||||
Synapse 1.73.0rc2 (2022-12-01)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
@@ -17,7 +94,7 @@ Features
|
||||
|
||||
- Speed-up `/messages` with `filter_events_for_client` optimizations. ([\#14527](https://github.com/matrix-org/synapse/issues/14527))
|
||||
- Improve DB performance by reducing amount of data that gets read in `device_lists_changes_in_room`. ([\#14534](https://github.com/matrix-org/synapse/issues/14534))
|
||||
- Adds support for handling avatar in SSO login. Contributed by @ashfame. ([\#13917](https://github.com/matrix-org/synapse/issues/13917))
|
||||
- Add support for handling avatar in SSO OIDC login. Contributed by @ashfame. ([\#13917](https://github.com/matrix-org/synapse/issues/13917))
|
||||
- Move MSC3030 `/timestamp_to_event` endpoints to stable `v1` location (`/_matrix/client/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>`, `/_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>`). ([\#14471](https://github.com/matrix-org/synapse/issues/14471))
|
||||
- Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.5/client-server-api/#aggregations) which return bundled aggregations. ([\#14491](https://github.com/matrix-org/synapse/issues/14491), [\#14508](https://github.com/matrix-org/synapse/issues/14508), [\#14510](https://github.com/matrix-org/synapse/issues/14510))
|
||||
- Add unstable support for an Extensible Events room version (`org.matrix.msc1767.10`) via [MSC1767](https://github.com/matrix-org/matrix-spec-proposals/pull/1767), [MSC3931](https://github.com/matrix-org/matrix-spec-proposals/pull/3931), [MSC3932](https://github.com/matrix-org/matrix-spec-proposals/pull/3932), and [MSC3933](https://github.com/matrix-org/matrix-spec-proposals/pull/3933). ([\#14520](https://github.com/matrix-org/synapse/issues/14520), [\#14521](https://github.com/matrix-org/synapse/issues/14521), [\#14524](https://github.com/matrix-org/synapse/issues/14524))
|
||||
|
||||
Generated
+4
-4
@@ -323,18 +323,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.148"
|
||||
version = "1.0.150"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e53f64bb4ba0191d6d0676e1b141ca55047d83b74f5607e6d8eb88126c52c2dc"
|
||||
checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.148"
|
||||
version = "1.0.150"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a55492425aa53521babf6137309e7d34c20bbfbbfcfe2c7f3a047fd1f6b92c0c"
|
||||
checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -1 +0,0 @@
|
||||
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
|
||||
@@ -1 +0,0 @@
|
||||
Update worker settings for `pusher` and `federation_sender` functionality.
|
||||
@@ -1 +0,0 @@
|
||||
Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages.
|
||||
@@ -1 +0,0 @@
|
||||
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
|
||||
@@ -1 +0,0 @@
|
||||
Share the `ClientRestResource` for both workers and the main process.
|
||||
@@ -1 +0,0 @@
|
||||
Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room.
|
||||
@@ -1 +0,0 @@
|
||||
Add new `push.enabled` config option to allow opting out of push notification calculation.
|
||||
@@ -1 +0,0 @@
|
||||
Modernize unit tests configuration related to workers.
|
||||
@@ -1 +0,0 @@
|
||||
Advertise support for Matrix 1.5 on `/_matrix/client/versions`.
|
||||
@@ -1 +0,0 @@
|
||||
Bump jsonschema from 4.17.0 to 4.17.3.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
|
||||
@@ -1 +0,0 @@
|
||||
Add missing type hints.
|
||||
@@ -1 +0,0 @@
|
||||
Fix Rust lint CI.
|
||||
@@ -1 +0,0 @@
|
||||
Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1.
|
||||
Vendored
+14
@@ -1,3 +1,17 @@
|
||||
matrix-synapse-py3 (1.74.0~rc1) stable; urgency=medium
|
||||
|
||||
* New dependency on libicu-dev to provide improved results for user
|
||||
search.
|
||||
* New Synapse release 1.74.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 13 Dec 2022 13:30:01 +0000
|
||||
|
||||
matrix-synapse-py3 (1.73.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.73.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Dec 2022 11:48:56 +0000
|
||||
|
||||
matrix-synapse-py3 (1.73.0~rc2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.73.0rc2.
|
||||
|
||||
Vendored
+2
@@ -8,6 +8,8 @@ Build-Depends:
|
||||
dh-virtualenv (>= 1.1),
|
||||
libsystemd-dev,
|
||||
libpq-dev,
|
||||
libicu-dev,
|
||||
pkg-config,
|
||||
lsb-release,
|
||||
python3-dev,
|
||||
python3,
|
||||
|
||||
+3
-1
@@ -43,7 +43,7 @@ RUN \
|
||||
--mount=type=cache,target=/var/cache/apt,sharing=locked \
|
||||
--mount=type=cache,target=/var/lib/apt,sharing=locked \
|
||||
apt-get update -qq && apt-get install -yqq \
|
||||
build-essential cargo git libffi-dev libssl-dev \
|
||||
build-essential git libffi-dev libssl-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# We install poetry in its own build stage to avoid its dependencies conflicting with
|
||||
@@ -97,6 +97,8 @@ RUN \
|
||||
zlib1g-dev \
|
||||
git \
|
||||
curl \
|
||||
libicu-dev \
|
||||
pkg-config \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
|
||||
@@ -84,6 +84,8 @@ RUN apt-get update -qq -o Acquire::Languages=none \
|
||||
python3-venv \
|
||||
sqlite3 \
|
||||
libpq-dev \
|
||||
libicu-dev \
|
||||
pkg-config \
|
||||
xmlsec1
|
||||
|
||||
# Install rust and ensure it's in the PATH
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# syntax=docker/dockerfile:1
|
||||
|
||||
ARG SYNAPSE_VERSION=latest
|
||||
ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
|
||||
|
||||
# first of all, we create a base image with an nginx which we can copy into the
|
||||
# target image. For repeated rebuilds, this is much faster than apt installing
|
||||
@@ -23,7 +24,7 @@ FROM debian:bullseye-slim AS deps_base
|
||||
FROM redis:6-bullseye AS redis_base
|
||||
|
||||
# now build the final image, based on the the regular Synapse docker image
|
||||
FROM matrixdotorg/synapse:$SYNAPSE_VERSION
|
||||
FROM $FROM
|
||||
|
||||
# Install supervisord with pip instead of apt, to avoid installing a second
|
||||
# copy of python.
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
# https://github.com/matrix-org/synapse/blob/develop/docker/README-testing.md#testing-with-postgresql-and-single-or-multi-process-synapse
|
||||
|
||||
ARG SYNAPSE_VERSION=latest
|
||||
ARG FROM=matrixdotorg/synapse-workers:$SYNAPSE_VERSION
|
||||
|
||||
FROM matrixdotorg/synapse-workers:$SYNAPSE_VERSION
|
||||
FROM $FROM
|
||||
# First of all, we copy postgres server from the official postgres image,
|
||||
# since for repeated rebuilds, this is much faster than apt installing
|
||||
# postgres each time.
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
# syntax=docker/dockerfile:1
|
||||
# This dockerfile builds an editable install of Synapse.
|
||||
#
|
||||
# Used by `complement.sh`. Not suitable for production use.
|
||||
|
||||
ARG PYTHON_VERSION=3.9
|
||||
|
||||
###
|
||||
### Stage 0: generate requirements.txt
|
||||
###
|
||||
# We hardcode the use of Debian bullseye here because this could change upstream
|
||||
# and other Dockerfiles used for testing are expecting bullseye.
|
||||
FROM docker.io/python:${PYTHON_VERSION}-slim-bullseye
|
||||
|
||||
# Install Rust and other dependencies (stolen from normal Dockerfile)
|
||||
# install the OS build deps
|
||||
RUN \
|
||||
--mount=type=cache,target=/var/cache/apt,sharing=locked \
|
||||
--mount=type=cache,target=/var/lib/apt,sharing=locked \
|
||||
apt-get update -qq && apt-get install -yqq \
|
||||
build-essential \
|
||||
libffi-dev \
|
||||
libjpeg-dev \
|
||||
libpq-dev \
|
||||
libssl-dev \
|
||||
libwebp-dev \
|
||||
libxml++2.6-dev \
|
||||
libxslt1-dev \
|
||||
openssl \
|
||||
zlib1g-dev \
|
||||
git \
|
||||
curl \
|
||||
gosu \
|
||||
libjpeg62-turbo \
|
||||
libpq5 \
|
||||
libwebp6 \
|
||||
xmlsec1 \
|
||||
libjemalloc2 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
ENV RUSTUP_HOME=/rust
|
||||
ENV CARGO_HOME=/cargo
|
||||
ENV PATH=/cargo/bin:/rust/bin:$PATH
|
||||
RUN mkdir /rust /cargo
|
||||
RUN curl -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --default-toolchain stable --profile minimal
|
||||
|
||||
|
||||
# Make a base copy of the editable source tree, so that we have something to
|
||||
# install and build now — even though it's going to be covered up by a mount
|
||||
# at runtime.
|
||||
COPY synapse /editable-src/synapse/
|
||||
COPY rust /editable-src/rust/
|
||||
# ... and what we need to `pip install`.
|
||||
COPY pyproject.toml poetry.lock README.rst build_rust.py Cargo.toml Cargo.lock /editable-src/
|
||||
|
||||
RUN pip install poetry
|
||||
RUN poetry config virtualenvs.create false
|
||||
RUN cd /editable-src && poetry install --extras all
|
||||
|
||||
# Make copies of useful things for inspection:
|
||||
# - the Rust module (must be copied to the editable source tree before startup)
|
||||
# - poetry.lock is useful for checking if dependencies have changed.
|
||||
RUN cp /editable-src/synapse/synapse_rust.abi3.so /synapse_rust.abi3.so.bak
|
||||
RUN cp /editable-src/poetry.lock /poetry.lock.bak
|
||||
|
||||
|
||||
### Extra setup from original Dockerfile
|
||||
COPY ./docker/start.py /start.py
|
||||
COPY ./docker/conf /conf
|
||||
|
||||
EXPOSE 8008/tcp 8009/tcp 8448/tcp
|
||||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
|
||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||
CMD curl -fSs http://localhost:8008/health || exit 1
|
||||
@@ -590,3 +590,44 @@ oidc_providers:
|
||||
display_name_template: "{{ user.first_name }} {{ user.last_name }}"
|
||||
email_template: "{{ user.email }}"
|
||||
```
|
||||
|
||||
### Mastodon
|
||||
|
||||
[Mastodon](https://docs.joinmastodon.org/) instances provide an [OAuth API](https://docs.joinmastodon.org/spec/oauth/), allowing those instances to be used as a single sign-on provider for Synapse.
|
||||
|
||||
The first step is to register Synapse as an application with your Mastodon instance, using the [Create an application API](https://docs.joinmastodon.org/methods/apps/#create) (see also [here](https://docs.joinmastodon.org/client/token/)). There are several ways to do this, but in the example below we are using CURL.
|
||||
|
||||
This example assumes that:
|
||||
* the Mastodon instance website URL is `https://your.mastodon.instance.url`, and
|
||||
* Synapse will be registered as an app named `my_synapse_app`.
|
||||
|
||||
Send the following request, substituting the value of `synapse_public_baseurl` from your Synapse installation.
|
||||
```sh
|
||||
curl -d "client_name=my_synapse_app&redirect_uris=https://[synapse_public_baseurl]/_synapse/client/oidc/callback" -X POST https://your.mastodon.instance.url/api/v1/apps
|
||||
```
|
||||
|
||||
You should receive a response similar to the following. Make sure to save it.
|
||||
```json
|
||||
{"client_id":"someclientid_123","client_secret":"someclientsecret_123","id":"12345","name":"my_synapse_app","redirect_uri":"https://[synapse_public_baseurl]/_synapse/client/oidc/callback","website":null,"vapid_key":"somerandomvapidkey_123"}
|
||||
```
|
||||
|
||||
As the Synapse login mechanism needs an attribute to uniquely identify users, and Mastodon's endpoint does not return a `sub` property, an alternative `subject_claim` has to be set. Your Synapse configuration should include the following:
|
||||
|
||||
```yaml
|
||||
oidc_providers:
|
||||
- idp_id: my_mastodon
|
||||
idp_name: "Mastodon Instance Example"
|
||||
discover: false
|
||||
issuer: "https://your.mastodon.instance.url/@admin"
|
||||
client_id: "someclientid_123"
|
||||
client_secret: "someclientsecret_123"
|
||||
authorization_endpoint: "https://your.mastodon.instance.url/oauth/authorize"
|
||||
token_endpoint: "https://your.mastodon.instance.url/oauth/token"
|
||||
userinfo_endpoint: "https://your.mastodon.instance.url/api/v1/accounts/verify_credentials"
|
||||
scopes: ["read"]
|
||||
user_mapping_provider:
|
||||
config:
|
||||
subject_claim: "id"
|
||||
```
|
||||
|
||||
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
|
||||
|
||||
+2
-1
@@ -1,6 +1,7 @@
|
||||
# Using Postgres
|
||||
|
||||
Synapse supports PostgreSQL versions 10 or later.
|
||||
The minimum supported version of PostgreSQL is determined by the [Dependency
|
||||
Deprecation Policy](deprecation_policy.md).
|
||||
|
||||
## Install postgres client libraries
|
||||
|
||||
|
||||
+1
-1
@@ -38,7 +38,7 @@ As an example, here is the relevant section of the config file for `matrix.org`.
|
||||
turn_uris: [ "turn:turn.matrix.org?transport=udp", "turn:turn.matrix.org?transport=tcp" ]
|
||||
turn_shared_secret: "n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons"
|
||||
turn_user_lifetime: 86400000
|
||||
turn_allow_guests: True
|
||||
turn_allow_guests: true
|
||||
|
||||
After updating the homeserver configuration, you must restart synapse:
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ Here we can see that the request has been tagged with `GET-37`. (The tag depends
|
||||
grep 'GET-37' homeserver.log
|
||||
```
|
||||
|
||||
If you want to paste that output into a github issue or matrix room, please remember to surround it with triple-backticks (```) to make it legible (see https://help.github.com/en/articles/basic-writing-and-formatting-syntax#quoting-code).
|
||||
If you want to paste that output into a github issue or matrix room, please remember to surround it with triple-backticks (```) to make it legible (see [quoting code](https://help.github.com/en/articles/basic-writing-and-formatting-syntax#quoting-code)).
|
||||
|
||||
|
||||
What do all those fields in the 'Processed' line mean?
|
||||
|
||||
@@ -2501,32 +2501,53 @@ Config settings related to the client/server API
|
||||
---
|
||||
### `room_prejoin_state`
|
||||
|
||||
Controls for the state that is shared with users who receive an invite
|
||||
to a room. By default, the following state event types are shared with users who
|
||||
receive invites to the room:
|
||||
- m.room.join_rules
|
||||
- m.room.canonical_alias
|
||||
- m.room.avatar
|
||||
- m.room.encryption
|
||||
- m.room.name
|
||||
- m.room.create
|
||||
- m.room.topic
|
||||
This setting controls the state that is shared with users upon receiving an
|
||||
invite to a room, or in reply to a knock on a room. By default, the following
|
||||
state events are shared with users:
|
||||
|
||||
- `m.room.join_rules`
|
||||
- `m.room.canonical_alias`
|
||||
- `m.room.avatar`
|
||||
- `m.room.encryption`
|
||||
- `m.room.name`
|
||||
- `m.room.create`
|
||||
- `m.room.topic`
|
||||
|
||||
To change the default behavior, use the following sub-options:
|
||||
* `disable_default_event_types`: set to true to disable the above defaults. If this
|
||||
is enabled, only the event types listed in `additional_event_types` are shared.
|
||||
Defaults to false.
|
||||
* `additional_event_types`: Additional state event types to share with users when they are invited
|
||||
to a room. By default, this list is empty (so only the default event types are shared).
|
||||
* `disable_default_event_types`: boolean. Set to `true` to disable the above
|
||||
defaults. If this is enabled, only the event types listed in
|
||||
`additional_event_types` are shared. Defaults to `false`.
|
||||
* `additional_event_types`: A list of additional state events to include in the
|
||||
events to be shared. By default, this list is empty (so only the default event
|
||||
types are shared).
|
||||
|
||||
Each entry in this list should be either a single string or a list of two
|
||||
strings.
|
||||
* A standalone string `t` represents all events with type `t` (i.e.
|
||||
with no restrictions on state keys).
|
||||
* A pair of strings `[t, s]` represents a single event with type `t` and
|
||||
state key `s`. The same type can appear in two entries with different state
|
||||
keys: in this situation, both state keys are included in prejoin state.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
room_prejoin_state:
|
||||
disable_default_event_types: true
|
||||
disable_default_event_types: false
|
||||
additional_event_types:
|
||||
- org.example.custom.event.type
|
||||
- m.room.join_rules
|
||||
# Share all events of type `org.example.custom.event.typeA`
|
||||
- org.example.custom.event.typeA
|
||||
# Share only events of type `org.example.custom.event.typeB` whose
|
||||
# state_key is "foo"
|
||||
- ["org.example.custom.event.typeB", "foo"]
|
||||
# Share only events of type `org.example.custom.event.typeC` whose
|
||||
# state_key is "bar" or "baz"
|
||||
- ["org.example.custom.event.typeC", "bar"]
|
||||
- ["org.example.custom.event.typeC", "baz"]
|
||||
```
|
||||
|
||||
*Changed in Synapse 1.74:* admins can filter the events in prejoin state based
|
||||
on their state key.
|
||||
|
||||
---
|
||||
### `track_puppeted_user_ips`
|
||||
|
||||
@@ -3355,7 +3376,7 @@ Configuration settings related to push notifications
|
||||
This setting defines options for push notifications.
|
||||
|
||||
This option has a number of sub-options. They are as follows:
|
||||
* `enable_push`: Enables or disables push notification calculation. Note, disabling this will also
|
||||
* `enabled`: Enables or disables push notification calculation. Note, disabling this will also
|
||||
stop unread counts being calculated for rooms. This mode of operation is intended
|
||||
for homeservers which may only have bots or appservice users connected, or are otherwise
|
||||
not interested in push/unread counters. This is enabled by default.
|
||||
@@ -3379,7 +3400,7 @@ This option has a number of sub-options. They are as follows:
|
||||
Example configuration:
|
||||
```yaml
|
||||
push:
|
||||
enable_push: true
|
||||
enabled: true
|
||||
include_content: false
|
||||
group_unread_count_by_room: false
|
||||
```
|
||||
|
||||
@@ -12,6 +12,7 @@ local_partial_types = True
|
||||
no_implicit_optional = True
|
||||
disallow_untyped_defs = True
|
||||
strict_equality = True
|
||||
warn_redundant_casts = True
|
||||
|
||||
files =
|
||||
docker/,
|
||||
@@ -88,6 +89,15 @@ disallow_untyped_defs = False
|
||||
[mypy-tests.*]
|
||||
disallow_untyped_defs = False
|
||||
|
||||
[mypy-tests.config.test_api]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.federation.transport.test_client]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.handlers.test_sso]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.handlers.test_user_directory]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
@@ -97,28 +107,19 @@ disallow_untyped_defs = True
|
||||
[mypy-tests.push.test_bulk_push_rule_evaluator]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.test_server]
|
||||
[mypy-tests.rest.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.state.test_profile]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.storage.test_id_generators]
|
||||
[mypy-tests.storage.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.storage.test_profile]
|
||||
[mypy-tests.test_server]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.handlers.test_sso]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.storage.test_user_directory]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.rest.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.federation.transport.test_client]
|
||||
[mypy-tests.types.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.util.caches.*]
|
||||
|
||||
Generated
+806
-719
File diff suppressed because it is too large
Load Diff
+14
-2
@@ -57,7 +57,7 @@ manifest-path = "rust/Cargo.toml"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.73.0rc2"
|
||||
version = "1.74.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
@@ -141,7 +141,8 @@ pyasn1 = ">=0.1.9"
|
||||
pyasn1-modules = ">=0.0.7"
|
||||
bcrypt = ">=3.1.7"
|
||||
Pillow = ">=5.4.0"
|
||||
sortedcontainers = ">=1.4.4"
|
||||
# We use SortedDict.peekitem(), which was added in sortedcontainers 1.5.2.
|
||||
sortedcontainers = ">=1.5.2"
|
||||
pymacaroons = ">=0.13.0"
|
||||
msgpack = ">=0.5.2"
|
||||
phonenumbers = ">=8.2.0"
|
||||
@@ -207,6 +208,8 @@ hiredis = { version = "*", optional = true }
|
||||
Pympler = { version = "*", optional = true }
|
||||
parameterized = { version = ">=0.7.4", optional = true }
|
||||
idna = { version = ">=2.5", optional = true }
|
||||
pyicu = { version = ">=2.10.2", optional = true }
|
||||
uvloop = { version = ">=0.17.0", optional = true }
|
||||
|
||||
[tool.poetry.extras]
|
||||
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
|
||||
@@ -229,6 +232,11 @@ redis = ["txredisapi", "hiredis"]
|
||||
# Required to use experimental `caches.track_memory_usage` config option.
|
||||
cache-memory = ["pympler"]
|
||||
test = ["parameterized", "idna"]
|
||||
# Allows for better search for international characters in the user directory. This
|
||||
# requires libicu's development headers installed on the system (e.g. libicu-dev on
|
||||
# Debian-based distributions).
|
||||
user-search = ["pyicu"]
|
||||
uvloop = ["uvloop"]
|
||||
|
||||
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
|
||||
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
|
||||
@@ -260,6 +268,10 @@ all = [
|
||||
"txredisapi", "hiredis",
|
||||
# cache-memory
|
||||
"pympler",
|
||||
# improved user search
|
||||
"pyicu",
|
||||
# uvloop
|
||||
"uvloop",
|
||||
# omitted:
|
||||
# - test: it's useful to have this separate from dev deps in the olddeps job
|
||||
# - systemd: this is a system-based requirement
|
||||
|
||||
+80
-16
@@ -53,6 +53,12 @@ Run the complement test suite on Synapse.
|
||||
Only build the Docker images. Don't actually run Complement.
|
||||
Conflicts with -f/--fast.
|
||||
|
||||
-e, --editable
|
||||
Use an editable build of Synapse, rebuilding the image if necessary.
|
||||
This is suitable for use in development where a fast turn-around time
|
||||
is important.
|
||||
Not suitable for use in CI in case the editable environment is impure.
|
||||
|
||||
For help on arguments to 'go test', run 'go help testflag'.
|
||||
EOF
|
||||
}
|
||||
@@ -73,6 +79,9 @@ while [ $# -ge 1 ]; do
|
||||
"--build-only")
|
||||
skip_complement_run=1
|
||||
;;
|
||||
"-e"|"--editable")
|
||||
use_editable_synapse=1
|
||||
;;
|
||||
*)
|
||||
# unknown arg: presumably an argument to gotest. break the loop.
|
||||
break
|
||||
@@ -96,25 +105,76 @@ if [[ -z "$COMPLEMENT_DIR" ]]; then
|
||||
echo "Checkout available at 'complement-${COMPLEMENT_REF}'"
|
||||
fi
|
||||
|
||||
if [ -n "$use_editable_synapse" ]; then
|
||||
if [[ -e synapse/synapse_rust.abi3.so ]]; then
|
||||
# In an editable install, back up the host's compiled Rust module to prevent
|
||||
# inconvenience; the container will overwrite the module with its own copy.
|
||||
mv -n synapse/synapse_rust.abi3.so synapse/synapse_rust.abi3.so~host
|
||||
# And restore it on exit:
|
||||
synapse_pkg=`realpath synapse`
|
||||
trap "mv -f '$synapse_pkg/synapse_rust.abi3.so~host' '$synapse_pkg/synapse_rust.abi3.so'" EXIT
|
||||
fi
|
||||
|
||||
editable_mount="$(realpath .):/editable-src:z"
|
||||
if docker inspect complement-synapse-editable &>/dev/null; then
|
||||
# complement-synapse-editable already exists: see if we can still use it:
|
||||
# - The Rust module must still be importable; it will fail to import if the Rust source has changed.
|
||||
# - The Poetry lock file must be the same (otherwise we assume dependencies have changed)
|
||||
|
||||
# First set up the module in the right place for an editable installation.
|
||||
docker run --rm -v $editable_mount --entrypoint 'cp' complement-synapse-editable -- /synapse_rust.abi3.so.bak /editable-src/synapse/synapse_rust.abi3.so
|
||||
|
||||
if (docker run --rm -v $editable_mount --entrypoint 'python' complement-synapse-editable -c 'import synapse.synapse_rust' \
|
||||
&& docker run --rm -v $editable_mount --entrypoint 'diff' complement-synapse-editable --brief /editable-src/poetry.lock /poetry.lock.bak); then
|
||||
skip_docker_build=1
|
||||
else
|
||||
echo "Editable Synapse image is stale. Will rebuild."
|
||||
unset skip_docker_build
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$skip_docker_build" ]; then
|
||||
# Build the base Synapse image from the local checkout
|
||||
echo_if_github "::group::Build Docker image: matrixdotorg/synapse"
|
||||
docker build -t matrixdotorg/synapse \
|
||||
--build-arg TEST_ONLY_SKIP_DEP_HASH_VERIFICATION \
|
||||
--build-arg TEST_ONLY_IGNORE_POETRY_LOCKFILE \
|
||||
-f "docker/Dockerfile" .
|
||||
echo_if_github "::endgroup::"
|
||||
if [ -n "$use_editable_synapse" ]; then
|
||||
|
||||
# Build the workers docker image (from the base Synapse image we just built).
|
||||
echo_if_github "::group::Build Docker image: matrixdotorg/synapse-workers"
|
||||
docker build -t matrixdotorg/synapse-workers -f "docker/Dockerfile-workers" .
|
||||
echo_if_github "::endgroup::"
|
||||
# Build a special image designed for use in development with editable
|
||||
# installs.
|
||||
docker build -t synapse-editable \
|
||||
-f "docker/editable.Dockerfile" .
|
||||
|
||||
# Build the unified Complement image (from the worker Synapse image we just built).
|
||||
echo_if_github "::group::Build Docker image: complement/Dockerfile"
|
||||
docker build -t complement-synapse \
|
||||
-f "docker/complement/Dockerfile" "docker/complement"
|
||||
echo_if_github "::endgroup::"
|
||||
docker build -t synapse-workers-editable \
|
||||
--build-arg FROM=synapse-editable \
|
||||
-f "docker/Dockerfile-workers" .
|
||||
|
||||
docker build -t complement-synapse-editable \
|
||||
--build-arg FROM=synapse-workers-editable \
|
||||
-f "docker/complement/Dockerfile" "docker/complement"
|
||||
|
||||
# Prepare the Rust module
|
||||
docker run --rm -v $editable_mount --entrypoint 'cp' complement-synapse-editable -- /synapse_rust.abi3.so.bak /editable-src/synapse/synapse_rust.abi3.so
|
||||
|
||||
else
|
||||
|
||||
# Build the base Synapse image from the local checkout
|
||||
echo_if_github "::group::Build Docker image: matrixdotorg/synapse"
|
||||
docker build -t matrixdotorg/synapse \
|
||||
--build-arg TEST_ONLY_SKIP_DEP_HASH_VERIFICATION \
|
||||
--build-arg TEST_ONLY_IGNORE_POETRY_LOCKFILE \
|
||||
-f "docker/Dockerfile" .
|
||||
echo_if_github "::endgroup::"
|
||||
|
||||
# Build the workers docker image (from the base Synapse image we just built).
|
||||
echo_if_github "::group::Build Docker image: matrixdotorg/synapse-workers"
|
||||
docker build -t matrixdotorg/synapse-workers -f "docker/Dockerfile-workers" .
|
||||
echo_if_github "::endgroup::"
|
||||
|
||||
# Build the unified Complement image (from the worker Synapse image we just built).
|
||||
echo_if_github "::group::Build Docker image: complement/Dockerfile"
|
||||
docker build -t complement-synapse \
|
||||
-f "docker/complement/Dockerfile" "docker/complement"
|
||||
echo_if_github "::endgroup::"
|
||||
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -n "$skip_complement_run" ]; then
|
||||
@@ -123,6 +183,10 @@ if [ -n "$skip_complement_run" ]; then
|
||||
fi
|
||||
|
||||
export COMPLEMENT_BASE_IMAGE=complement-synapse
|
||||
if [ -n "$use_editable_synapse" ]; then
|
||||
export COMPLEMENT_BASE_IMAGE=complement-synapse-editable
|
||||
export COMPLEMENT_HOST_MOUNTS="$editable_mount"
|
||||
fi
|
||||
|
||||
extra_test_args=()
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ import time
|
||||
import urllib.request
|
||||
from os import path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any, List, Optional, cast
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import attr
|
||||
import click
|
||||
@@ -174,9 +174,7 @@ def _prepare() -> None:
|
||||
click.get_current_context().abort()
|
||||
|
||||
# Switch to the release branch.
|
||||
# Cast safety: parse() won't return a version.LegacyVersion from our
|
||||
# version string format.
|
||||
parsed_new_version = cast(version.Version, version.parse(new_version))
|
||||
parsed_new_version = version.parse(new_version)
|
||||
|
||||
# We assume for debian changelogs that we only do RCs or full releases.
|
||||
assert not parsed_new_version.is_devrelease
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Stub for PyICU.
|
||||
|
||||
class Locale:
|
||||
@staticmethod
|
||||
def getDefault() -> Locale: ...
|
||||
|
||||
class BreakIterator:
|
||||
@staticmethod
|
||||
def createWordInstance(locale: Locale) -> BreakIterator: ...
|
||||
def setText(self, text: str) -> None: ...
|
||||
def nextBoundary(self) -> int: ...
|
||||
@@ -45,7 +45,7 @@ class PushRuleEvaluator:
|
||||
notification_power_levels: Mapping[str, int],
|
||||
related_events_flattened: Mapping[str, Mapping[str, str]],
|
||||
related_event_match_enabled: bool,
|
||||
room_version_feature_flags: list[str],
|
||||
room_version_feature_flags: Tuple[str, ...],
|
||||
msc3931_enabled: bool,
|
||||
): ...
|
||||
def run(
|
||||
|
||||
@@ -44,8 +44,15 @@ if strtobool(os.environ.get("SYNAPSE_ASYNC_IO_REACTOR", "0")):
|
||||
|
||||
from twisted.internet import asyncioreactor
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_UVLOOP", False)):
|
||||
import uvloop
|
||||
|
||||
uvloop.install()
|
||||
print("Using uvloop")
|
||||
|
||||
asyncioreactor.install(asyncio.get_event_loop())
|
||||
|
||||
|
||||
# Twisted and canonicaljson will fail to import when this file is executed to
|
||||
# get the __version__ during a fresh install. That's OK and subsequent calls to
|
||||
# actually start Synapse will import these libraries fine.
|
||||
|
||||
@@ -222,6 +222,7 @@ def main() -> None:
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
config: Optional[Dict[str, Any]] = None
|
||||
if "config" in args and args.config:
|
||||
config = yaml.safe_load(args.config)
|
||||
|
||||
@@ -229,7 +230,7 @@ def main() -> None:
|
||||
secret = args.shared_secret
|
||||
else:
|
||||
# argparse should check that we have either config or shared secret
|
||||
assert config
|
||||
assert config is not None
|
||||
|
||||
secret = config.get("registration_shared_secret")
|
||||
secret_file = config.get("registration_shared_secret_path")
|
||||
@@ -244,7 +245,7 @@ def main() -> None:
|
||||
|
||||
if args.server_url:
|
||||
server_url = args.server_url
|
||||
elif config:
|
||||
elif config is not None:
|
||||
server_url = _find_client_listener(config)
|
||||
if not server_url:
|
||||
server_url = _DEFAULT_SERVER_URL
|
||||
|
||||
@@ -152,6 +152,7 @@ class EduTypes:
|
||||
|
||||
class RejectedReason:
|
||||
AUTH_ERROR: Final = "auth_error"
|
||||
OVERSIZED_EVENT: Final = "oversized_event"
|
||||
|
||||
|
||||
class RoomCreationPreset:
|
||||
@@ -230,6 +231,9 @@ class EventContentFields:
|
||||
# The authorising user for joining a restricted room.
|
||||
AUTHORISING_USER: Final = "join_authorised_via_users_server"
|
||||
|
||||
# an unspecced field added to to-device messages to identify them uniquely-ish
|
||||
TO_DEVICE_MSGID: Final = "org.matrix.msgid"
|
||||
|
||||
|
||||
class RoomTypes:
|
||||
"""Understood values of the room_type field of m.room.create events."""
|
||||
|
||||
+12
-5
@@ -300,10 +300,8 @@ class InteractiveAuthIncompleteError(Exception):
|
||||
class UnrecognizedRequestError(SynapseError):
|
||||
"""An error indicating we don't understand the request you're trying to make"""
|
||||
|
||||
def __init__(
|
||||
self, msg: str = "Unrecognized request", errcode: str = Codes.UNRECOGNIZED
|
||||
):
|
||||
super().__init__(400, msg, errcode)
|
||||
def __init__(self, msg: str = "Unrecognized request", code: int = 400):
|
||||
super().__init__(code, msg, Codes.UNRECOGNIZED)
|
||||
|
||||
|
||||
class NotFoundError(SynapseError):
|
||||
@@ -426,8 +424,17 @@ class ResourceLimitError(SynapseError):
|
||||
class EventSizeError(SynapseError):
|
||||
"""An error raised when an event is too big."""
|
||||
|
||||
def __init__(self, msg: str):
|
||||
def __init__(self, msg: str, unpersistable: bool):
|
||||
"""
|
||||
unpersistable:
|
||||
if True, the PDU must not be persisted, not even as a rejected PDU
|
||||
when received over federation.
|
||||
This is notably true when the entire PDU exceeds the size limit for a PDU,
|
||||
(as opposed to an individual key's size limit being exceeded).
|
||||
"""
|
||||
|
||||
super().__init__(413, msg, Codes.TOO_LARGE)
|
||||
self.unpersistable = unpersistable
|
||||
|
||||
|
||||
class LoginError(SynapseError):
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Callable, Dict, List, Optional
|
||||
from typing import Callable, Dict, Optional, Tuple
|
||||
|
||||
import attr
|
||||
|
||||
@@ -103,7 +103,7 @@ class RoomVersion:
|
||||
# is not enough to mark it "supported": the push rule evaluator also needs to
|
||||
# support the flag. Unknown flags are ignored by the evaluator, making conditions
|
||||
# fail if used.
|
||||
msc3931_push_features: List[str] # values from PushRuleRoomFlag
|
||||
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
|
||||
|
||||
|
||||
class RoomVersions:
|
||||
@@ -124,7 +124,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V2 = RoomVersion(
|
||||
"2",
|
||||
@@ -143,7 +143,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V3 = RoomVersion(
|
||||
"3",
|
||||
@@ -162,7 +162,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V4 = RoomVersion(
|
||||
"4",
|
||||
@@ -181,7 +181,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V5 = RoomVersion(
|
||||
"5",
|
||||
@@ -200,7 +200,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V6 = RoomVersion(
|
||||
"6",
|
||||
@@ -219,7 +219,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
MSC2176 = RoomVersion(
|
||||
"org.matrix.msc2176",
|
||||
@@ -238,7 +238,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V7 = RoomVersion(
|
||||
"7",
|
||||
@@ -257,7 +257,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V8 = RoomVersion(
|
||||
"8",
|
||||
@@ -276,7 +276,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V9 = RoomVersion(
|
||||
"9",
|
||||
@@ -295,7 +295,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
MSC3787 = RoomVersion(
|
||||
"org.matrix.msc3787",
|
||||
@@ -314,7 +314,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
V10 = RoomVersion(
|
||||
"10",
|
||||
@@ -333,7 +333,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
MSC2716v4 = RoomVersion(
|
||||
"org.matrix.msc2716v4",
|
||||
@@ -352,7 +352,7 @@ class RoomVersions:
|
||||
msc2716_redactions=True,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3931_push_features=[],
|
||||
msc3931_push_features=(),
|
||||
)
|
||||
MSC1767v10 = RoomVersion(
|
||||
# MSC1767 (Extensible Events) based on room version "10"
|
||||
@@ -372,7 +372,7 @@ class RoomVersions:
|
||||
msc2716_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
msc3931_push_features=[PushRuleRoomFlag.EXTENSIBLE_EVENTS],
|
||||
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -245,7 +245,9 @@ class ApplicationService:
|
||||
return True
|
||||
|
||||
# likewise with the room's aliases (if it has any)
|
||||
alias_list = await store.get_aliases_for_room(room_id)
|
||||
alias_list = await store.get_aliases_for_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
for alias in alias_list:
|
||||
if self.is_room_alias_in_namespace(alias):
|
||||
return True
|
||||
@@ -311,7 +313,9 @@ class ApplicationService:
|
||||
# Find all the rooms the sender is in
|
||||
if self.is_interested_in_user(user_id.to_string()):
|
||||
return True
|
||||
room_ids = await store.get_rooms_for_user(user_id.to_string())
|
||||
room_ids = await store.get_rooms_for_user(
|
||||
user_id.to_string(), on_invalidate=cache_context.invalidate
|
||||
)
|
||||
|
||||
# Then find out if the appservice is interested in any of those rooms
|
||||
for room_id in room_ids:
|
||||
|
||||
@@ -33,6 +33,9 @@ def validate_config(
|
||||
config: the configuration value to be validated
|
||||
config_path: the path within the config file. This will be used as a basis
|
||||
for the error message.
|
||||
|
||||
Raises:
|
||||
ConfigError, if validation fails.
|
||||
"""
|
||||
try:
|
||||
jsonschema.validate(config, json_schema)
|
||||
|
||||
+42
-21
@@ -13,12 +13,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Any, Iterable
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.config._base import Config, ConfigError
|
||||
from synapse.config._util import validate_config
|
||||
from synapse.types import JsonDict
|
||||
from synapse.types.state import StateFilter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -26,16 +27,20 @@ logger = logging.getLogger(__name__)
|
||||
class ApiConfig(Config):
|
||||
section = "api"
|
||||
|
||||
room_prejoin_state: StateFilter
|
||||
track_puppetted_users_ips: bool
|
||||
|
||||
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
||||
validate_config(_MAIN_SCHEMA, config, ())
|
||||
self.room_prejoin_state = list(self._get_prejoin_state_types(config))
|
||||
self.room_prejoin_state = StateFilter.from_types(
|
||||
self._get_prejoin_state_entries(config)
|
||||
)
|
||||
self.track_puppeted_user_ips = config.get("track_puppeted_user_ips", False)
|
||||
|
||||
def _get_prejoin_state_types(self, config: JsonDict) -> Iterable[str]:
|
||||
"""Get the event types to include in the prejoin state
|
||||
|
||||
Parses the config and returns an iterable of the event types to be included.
|
||||
"""
|
||||
def _get_prejoin_state_entries(
|
||||
self, config: JsonDict
|
||||
) -> Iterable[Tuple[str, Optional[str]]]:
|
||||
"""Get the event types and state keys to include in the prejoin state."""
|
||||
room_prejoin_state_config = config.get("room_prejoin_state") or {}
|
||||
|
||||
# backwards-compatibility support for room_invite_state_types
|
||||
@@ -50,33 +55,39 @@ class ApiConfig(Config):
|
||||
|
||||
logger.warning(_ROOM_INVITE_STATE_TYPES_WARNING)
|
||||
|
||||
yield from config["room_invite_state_types"]
|
||||
for event_type in config["room_invite_state_types"]:
|
||||
yield event_type, None
|
||||
return
|
||||
|
||||
if not room_prejoin_state_config.get("disable_default_event_types"):
|
||||
yield from _DEFAULT_PREJOIN_STATE_TYPES
|
||||
yield from _DEFAULT_PREJOIN_STATE_TYPES_AND_STATE_KEYS
|
||||
|
||||
yield from room_prejoin_state_config.get("additional_event_types", [])
|
||||
for entry in room_prejoin_state_config.get("additional_event_types", []):
|
||||
if isinstance(entry, str):
|
||||
yield entry, None
|
||||
else:
|
||||
yield entry
|
||||
|
||||
|
||||
_ROOM_INVITE_STATE_TYPES_WARNING = """\
|
||||
WARNING: The 'room_invite_state_types' configuration setting is now deprecated,
|
||||
and replaced with 'room_prejoin_state'. New features may not work correctly
|
||||
unless 'room_invite_state_types' is removed. See the sample configuration file for
|
||||
details of 'room_prejoin_state'.
|
||||
unless 'room_invite_state_types' is removed. See the config documentation at
|
||||
https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#room_prejoin_state
|
||||
for details of 'room_prejoin_state'.
|
||||
--------------------------------------------------------------------------------
|
||||
"""
|
||||
|
||||
_DEFAULT_PREJOIN_STATE_TYPES = [
|
||||
EventTypes.JoinRules,
|
||||
EventTypes.CanonicalAlias,
|
||||
EventTypes.RoomAvatar,
|
||||
EventTypes.RoomEncryption,
|
||||
EventTypes.Name,
|
||||
_DEFAULT_PREJOIN_STATE_TYPES_AND_STATE_KEYS = [
|
||||
(EventTypes.JoinRules, ""),
|
||||
(EventTypes.CanonicalAlias, ""),
|
||||
(EventTypes.RoomAvatar, ""),
|
||||
(EventTypes.RoomEncryption, ""),
|
||||
(EventTypes.Name, ""),
|
||||
# Per MSC1772.
|
||||
EventTypes.Create,
|
||||
(EventTypes.Create, ""),
|
||||
# Per MSC3173.
|
||||
EventTypes.Topic,
|
||||
(EventTypes.Topic, ""),
|
||||
]
|
||||
|
||||
|
||||
@@ -90,7 +101,17 @@ _ROOM_PREJOIN_STATE_CONFIG_SCHEMA = {
|
||||
"disable_default_event_types": {"type": "boolean"},
|
||||
"additional_event_types": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"items": {
|
||||
"oneOf": [
|
||||
{"type": "string"},
|
||||
{
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"minItems": 2,
|
||||
"maxItems": 2,
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
+73
-11
@@ -52,6 +52,7 @@ from synapse.api.room_versions import (
|
||||
KNOWN_ROOM_VERSIONS,
|
||||
EventFormatVersions,
|
||||
RoomVersion,
|
||||
RoomVersions,
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import MutableStateMap, StateMap, UserID, get_domain_from_id
|
||||
@@ -341,19 +342,80 @@ def check_state_dependent_auth_rules(
|
||||
logger.debug("Allowing! %s", event)
|
||||
|
||||
|
||||
# Set of room versions where Synapse did not apply event key size limits
|
||||
# in bytes, but rather in codepoints.
|
||||
# In these room versions, we are more lenient with event size validation.
|
||||
LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
|
||||
RoomVersions.V1,
|
||||
RoomVersions.V2,
|
||||
RoomVersions.V3,
|
||||
RoomVersions.V4,
|
||||
RoomVersions.V5,
|
||||
RoomVersions.V6,
|
||||
RoomVersions.MSC2176,
|
||||
RoomVersions.V7,
|
||||
RoomVersions.V8,
|
||||
RoomVersions.V9,
|
||||
RoomVersions.MSC3787,
|
||||
RoomVersions.V10,
|
||||
RoomVersions.MSC2716v4,
|
||||
RoomVersions.MSC1767v10,
|
||||
}
|
||||
|
||||
|
||||
def _check_size_limits(event: "EventBase") -> None:
|
||||
if len(event.user_id) > 255:
|
||||
raise EventSizeError("'user_id' too large")
|
||||
if len(event.room_id) > 255:
|
||||
raise EventSizeError("'room_id' too large")
|
||||
if event.is_state() and len(event.state_key) > 255:
|
||||
raise EventSizeError("'state_key' too large")
|
||||
if len(event.type) > 255:
|
||||
raise EventSizeError("'type' too large")
|
||||
if len(event.event_id) > 255:
|
||||
raise EventSizeError("'event_id' too large")
|
||||
"""
|
||||
Checks the size limits in a PDU.
|
||||
|
||||
The entire size limit of the PDU is checked first.
|
||||
Then the size of fields is checked, first in codepoints and then in bytes.
|
||||
|
||||
The codepoint size limits are only for Synapse compatibility.
|
||||
|
||||
Raises:
|
||||
EventSizeError:
|
||||
when a size limit has been violated.
|
||||
|
||||
unpersistable=True if Synapse never would have accepted the event and
|
||||
the PDU must NOT be persisted.
|
||||
|
||||
unpersistable=False if a prior version of Synapse would have accepted the
|
||||
event and so the PDU must be persisted as rejected to avoid
|
||||
breaking the room.
|
||||
"""
|
||||
|
||||
# Whole PDU check
|
||||
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
|
||||
raise EventSizeError("event too large")
|
||||
raise EventSizeError("event too large", unpersistable=True)
|
||||
|
||||
# Codepoint size check: Synapse always enforced these limits, so apply
|
||||
# them strictly.
|
||||
if len(event.user_id) > 255:
|
||||
raise EventSizeError("'user_id' too large", unpersistable=True)
|
||||
if len(event.room_id) > 255:
|
||||
raise EventSizeError("'room_id' too large", unpersistable=True)
|
||||
if event.is_state() and len(event.state_key) > 255:
|
||||
raise EventSizeError("'state_key' too large", unpersistable=True)
|
||||
if len(event.type) > 255:
|
||||
raise EventSizeError("'type' too large", unpersistable=True)
|
||||
if len(event.event_id) > 255:
|
||||
raise EventSizeError("'event_id' too large", unpersistable=True)
|
||||
|
||||
strict_byte_limits = (
|
||||
event.room_version not in LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS
|
||||
)
|
||||
|
||||
# Byte size check: if these fail, then be lenient to avoid breaking rooms.
|
||||
if len(event.user_id.encode("utf-8")) > 255:
|
||||
raise EventSizeError("'user_id' too large", unpersistable=strict_byte_limits)
|
||||
if len(event.room_id.encode("utf-8")) > 255:
|
||||
raise EventSizeError("'room_id' too large", unpersistable=strict_byte_limits)
|
||||
if event.is_state() and len(event.state_key.encode("utf-8")) > 255:
|
||||
raise EventSizeError("'state_key' too large", unpersistable=strict_byte_limits)
|
||||
if len(event.type.encode("utf-8")) > 255:
|
||||
raise EventSizeError("'type' too large", unpersistable=strict_byte_limits)
|
||||
if len(event.event_id.encode("utf-8")) > 255:
|
||||
raise EventSizeError("'event_id' too large", unpersistable=strict_byte_limits)
|
||||
|
||||
|
||||
def _check_create(event: "EventBase") -> None:
|
||||
|
||||
@@ -28,8 +28,8 @@ from synapse.event_auth import auth_types_for_event
|
||||
from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict
|
||||
from synapse.state import StateHandler
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import EventID, JsonDict
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.types import JsonDict, StateMap
|
||||
if TYPE_CHECKING:
|
||||
from synapse.storage.controllers import StorageControllers
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types.state import StateFilter
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
|
||||
+31
-1
@@ -28,8 +28,14 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
||||
from synapse.api.constants import (
|
||||
MAX_PDU_SIZE,
|
||||
EventContentFields,
|
||||
EventTypes,
|
||||
RelationTypes,
|
||||
)
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.types import JsonDict
|
||||
@@ -674,3 +680,27 @@ def validate_canonicaljson(value: Any) -> None:
|
||||
elif not isinstance(value, (bool, str)) and value is not None:
|
||||
# Other potential JSON values (bool, None, str) are safe.
|
||||
raise SynapseError(400, "Unknown JSON value", Codes.BAD_JSON)
|
||||
|
||||
|
||||
def maybe_upsert_event_field(
|
||||
event: EventBase, container: JsonDict, key: str, value: object
|
||||
) -> bool:
|
||||
"""Upsert an event field, but only if this doesn't make the event too large.
|
||||
|
||||
Returns true iff the upsert took place.
|
||||
"""
|
||||
if key in container:
|
||||
old_value: object = container[key]
|
||||
container[key] = value
|
||||
# NB: here and below, we assume that passing a non-None `time_now` argument to
|
||||
# get_pdu_json doesn't increase the size of the encoded result.
|
||||
upsert_okay = len(encode_canonical_json(event.get_pdu_json())) <= MAX_PDU_SIZE
|
||||
if not upsert_okay:
|
||||
container[key] = old_value
|
||||
else:
|
||||
container[key] = value
|
||||
upsert_okay = len(encode_canonical_json(event.get_pdu_json())) <= MAX_PDU_SIZE
|
||||
if not upsert_okay:
|
||||
del container[key]
|
||||
|
||||
return upsert_okay
|
||||
|
||||
@@ -771,17 +771,28 @@ class FederationClient(FederationBase):
|
||||
"""
|
||||
if synapse_error is None:
|
||||
synapse_error = e.to_synapse_error()
|
||||
# There is no good way to detect an "unknown" endpoint.
|
||||
# MSC3743 specifies that servers should return a 404 or 405 with an errcode
|
||||
# of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
|
||||
# to an unknown method, respectively.
|
||||
#
|
||||
# Dendrite returns a 404 (with a body of "404 page not found");
|
||||
# Conduit returns a 404 (with no body); and Synapse returns a 400
|
||||
# with M_UNRECOGNIZED.
|
||||
#
|
||||
# This needs to be rather specific as some endpoints truly do return 404
|
||||
# errors.
|
||||
# Older versions of servers don't properly handle this. This needs to be
|
||||
# rather specific as some endpoints truly do return 404 errors.
|
||||
return (
|
||||
e.code == 404 and (not e.response or e.response == b"404 page not found")
|
||||
) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
|
||||
# 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
|
||||
(e.code == 404 or e.code == 405)
|
||||
and (
|
||||
# Older Dendrites returned a text or empty body.
|
||||
# Older Conduit returned an empty body.
|
||||
not e.response
|
||||
or e.response == b"404 page not found"
|
||||
# The proper response JSON with M_UNRECOGNIZED errcode.
|
||||
or synapse_error.errcode == Codes.UNRECOGNIZED
|
||||
)
|
||||
) or (
|
||||
# Older Synapses returned a 400 error.
|
||||
e.code == 400
|
||||
and synapse_error.errcode == Codes.UNRECOGNIZED
|
||||
)
|
||||
|
||||
async def _try_destination_list(
|
||||
self,
|
||||
|
||||
@@ -641,7 +641,7 @@ class PerDestinationQueue:
|
||||
if not message_id:
|
||||
continue
|
||||
|
||||
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
|
||||
|
||||
edus = [
|
||||
Edu(
|
||||
|
||||
@@ -578,9 +578,6 @@ class ApplicationServicesHandler:
|
||||
device_id,
|
||||
), messages in recipient_device_to_messages.items():
|
||||
for message_json in messages:
|
||||
# Remove 'message_id' from the to-device message, as it's an internal ID
|
||||
message_json.pop("message_id", None)
|
||||
|
||||
message_payload.append(
|
||||
{
|
||||
"to_user_id": user_id,
|
||||
@@ -615,8 +612,8 @@ class ApplicationServicesHandler:
|
||||
)
|
||||
|
||||
# Fetch the users who have modified their device list since then.
|
||||
users_with_changed_device_lists = (
|
||||
await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
|
||||
users_with_changed_device_lists = await self.store.get_all_devices_changed(
|
||||
from_key, to_key=new_key
|
||||
)
|
||||
|
||||
# Filter out any users the application service is not interested in
|
||||
|
||||
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||
# Check if we are partially joining any rooms. If so we need to store
|
||||
# all device list updates so that we can handle them correctly once we
|
||||
# know who is in the room.
|
||||
# TODO(faster joins): this fetches and processes a bunch of data that we don't
|
||||
# TODO(faster_joins): this fetches and processes a bunch of data that we don't
|
||||
# use. Could be replaced by a tighter query e.g.
|
||||
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
|
||||
partial_rooms = await self.store.get_partial_state_room_resync_info()
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from synapse.api.constants import EduTypes, ToDeviceEventTypes
|
||||
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.logging.context import run_in_background
|
||||
@@ -216,14 +216,24 @@ class DeviceMessageHandler:
|
||||
"""
|
||||
sender_user_id = requester.user.to_string()
|
||||
|
||||
message_id = random_string(16)
|
||||
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||
|
||||
log_kv({"number_of_to_device_messages": len(messages)})
|
||||
set_tag("sender", sender_user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
|
||||
local_messages = {}
|
||||
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
|
||||
for user_id, by_device in messages.items():
|
||||
# add an opentracing log entry for each message
|
||||
for device_id, message_content in by_device.items():
|
||||
log_kv(
|
||||
{
|
||||
"event": "send_to_device_message",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
EventContentFields.TO_DEVICE_MSGID: message_content.get(
|
||||
EventContentFields.TO_DEVICE_MSGID
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
# Ratelimit local cross-user key requests by the sending device.
|
||||
if (
|
||||
message_type == ToDeviceEventTypes.RoomKeyRequest
|
||||
@@ -233,6 +243,7 @@ class DeviceMessageHandler:
|
||||
requester, (sender_user_id, requester.device_id)
|
||||
)
|
||||
if not allowed:
|
||||
log_kv({"message": f"dropping key requests to {user_id}"})
|
||||
logger.info(
|
||||
"Dropping room_key_request from %s to %s due to rate limit",
|
||||
sender_user_id,
|
||||
@@ -247,18 +258,11 @@ class DeviceMessageHandler:
|
||||
"content": message_content,
|
||||
"type": message_type,
|
||||
"sender": sender_user_id,
|
||||
"message_id": message_id,
|
||||
}
|
||||
for device_id, message_content in by_device.items()
|
||||
}
|
||||
if messages_by_device:
|
||||
local_messages[user_id] = messages_by_device
|
||||
log_kv(
|
||||
{
|
||||
"user_id": user_id,
|
||||
"device_id": list(messages_by_device),
|
||||
}
|
||||
)
|
||||
else:
|
||||
destination = get_domain_from_id(user_id)
|
||||
remote_messages.setdefault(destination, {})[user_id] = by_device
|
||||
@@ -267,7 +271,11 @@ class DeviceMessageHandler:
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
log_kv({"destination": destination})
|
||||
# The EDU contains a "message_id" property which is used for
|
||||
# idempotence. Make up a random one.
|
||||
message_id = random_string(16)
|
||||
log_kv({"destination": destination, "message_id": message_id})
|
||||
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
|
||||
@@ -70,8 +70,8 @@ from synapse.replication.http.federation import (
|
||||
)
|
||||
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.visibility import filter_events_for_server
|
||||
@@ -152,6 +152,7 @@ class FederationHandler:
|
||||
self._federation_event_handler = hs.get_federation_event_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||
self._notifier = hs.get_notifier()
|
||||
|
||||
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
||||
hs
|
||||
@@ -1692,6 +1693,9 @@ class FederationHandler:
|
||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||
room_id
|
||||
)
|
||||
# Poke the notifier so that other workers see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self._notifier.notify_replication()
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
# https://github.com/matrix-org/synapse/issues/12814
|
||||
|
||||
@@ -43,6 +43,7 @@ from synapse.api.constants import (
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
EventSizeError,
|
||||
FederationError,
|
||||
FederationPullAttemptBackoffError,
|
||||
HttpResponseException,
|
||||
@@ -75,7 +76,6 @@ from synapse.replication.http.federation import (
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
@@ -83,6 +83,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
get_domain_from_id,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.iterutils import batch_iter
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
@@ -1736,6 +1737,15 @@ class FederationEventHandler:
|
||||
except AuthError as e:
|
||||
logger.warning("Rejecting %r because %s", event, e)
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
except EventSizeError as e:
|
||||
if e.unpersistable:
|
||||
# This event is completely unpersistable.
|
||||
raise e
|
||||
# Otherwise, we are somewhat lenient and just persist the event
|
||||
# as rejected, for moderate compatibility with older Synapse
|
||||
# versions.
|
||||
logger.warning("While validating received event %r: %s", event, e)
|
||||
context.rejected = RejectedReason.OVERSIZED_EVENT
|
||||
|
||||
events_and_contexts_to_persist.append((event, context))
|
||||
|
||||
@@ -1781,6 +1791,16 @@ class FederationEventHandler:
|
||||
# TODO: use a different rejected reason here?
|
||||
context.rejected = RejectedReason.AUTH_ERROR
|
||||
return
|
||||
except EventSizeError as e:
|
||||
if e.unpersistable:
|
||||
# This event is completely unpersistable.
|
||||
raise e
|
||||
# Otherwise, we are somewhat lenient and just persist the event
|
||||
# as rejected, for moderate compatibility with older Synapse
|
||||
# versions.
|
||||
logger.warning("While validating received event %r: %s", event, e)
|
||||
context.rejected = RejectedReason.OVERSIZED_EVENT
|
||||
return
|
||||
|
||||
# next, check that we have all of the event's auth events.
|
||||
#
|
||||
|
||||
+19
-12
@@ -50,6 +50,7 @@ from synapse.event_auth import validate_event_for_room_version
|
||||
from synapse.events import EventBase, relation_from_event
|
||||
from synapse.events.builder import EventBuilder
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.utils import maybe_upsert_event_field
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.handlers.directory import DirectoryHandler
|
||||
from synapse.logging import opentracing
|
||||
@@ -59,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
|
||||
from synapse.storage.databases.main.events import PartialStateConflictError
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
MutableStateMap,
|
||||
PersistedEventPosition,
|
||||
@@ -70,6 +70,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer, gather_results
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -1739,12 +1740,15 @@ class EventCreationHandler:
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.content["membership"] == Membership.INVITE:
|
||||
event.unsigned[
|
||||
"invite_room_state"
|
||||
] = await self.store.get_stripped_room_state_from_event_context(
|
||||
context,
|
||||
self.room_prejoin_state_types,
|
||||
membership_user_id=event.sender,
|
||||
maybe_upsert_event_field(
|
||||
event,
|
||||
event.unsigned,
|
||||
"invite_room_state",
|
||||
await self.store.get_stripped_room_state_from_event_context(
|
||||
context,
|
||||
self.room_prejoin_state_types,
|
||||
membership_user_id=event.sender,
|
||||
),
|
||||
)
|
||||
|
||||
invitee = UserID.from_string(event.state_key)
|
||||
@@ -1762,11 +1766,14 @@ class EventCreationHandler:
|
||||
event.signatures.update(returned_invite.signatures)
|
||||
|
||||
if event.content["membership"] == Membership.KNOCK:
|
||||
event.unsigned[
|
||||
"knock_room_state"
|
||||
] = await self.store.get_stripped_room_state_from_event_context(
|
||||
context,
|
||||
self.room_prejoin_state_types,
|
||||
maybe_upsert_event_field(
|
||||
event,
|
||||
event.unsigned,
|
||||
"knock_room_state",
|
||||
await self.store.get_stripped_room_state_from_event_context(
|
||||
context,
|
||||
self.room_prejoin_state_types,
|
||||
),
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Redaction:
|
||||
|
||||
@@ -27,9 +27,9 @@ from synapse.handlers.room import ShutdownRoomResponse
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, StreamKeyType
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
|
||||
if from_key is not None:
|
||||
# First get all users that have had a presence update
|
||||
updated_users = stream_change_cache.get_all_entities_changed(from_key)
|
||||
result = stream_change_cache.get_all_entities_changed(from_key)
|
||||
|
||||
# Cross-reference users we're interested in with those that have had updates.
|
||||
if updated_users is not None:
|
||||
if result.hit:
|
||||
updated_users = result.entities
|
||||
|
||||
# If we have the full list of changes for presence we can
|
||||
# simply check which ones share a room with the user.
|
||||
get_updates_counter.labels("stream").inc()
|
||||
@@ -1767,9 +1769,9 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
|
||||
updated_users = None
|
||||
if from_key:
|
||||
# Only return updates since the last sync
|
||||
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
|
||||
from_key
|
||||
)
|
||||
result = self.store.presence_stream_cache.get_all_entities_changed(from_key)
|
||||
if result.hit:
|
||||
updated_users = result.entities
|
||||
|
||||
if updated_users is not None:
|
||||
# Get the actual presence update for each change
|
||||
|
||||
@@ -46,8 +46,8 @@ from synapse.replication.http.register import (
|
||||
ReplicationRegisterServlet,
|
||||
)
|
||||
from synapse.spam_checker_api import RegistrationBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import RoomAlias, UserID, create_requester
|
||||
from synapse.types.state import StateFilter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
@@ -62,7 +62,6 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams import EventSource
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
@@ -77,6 +76,7 @@ from synapse.types import (
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
|
||||
@@ -34,7 +34,6 @@ from synapse.events.snapshot import EventContext
|
||||
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
|
||||
from synapse.logging import opentracing
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
Requester,
|
||||
@@ -45,6 +44,7 @@ from synapse.types import (
|
||||
create_requester,
|
||||
get_domain_from_id,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_left_room
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import NotFoundError, SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict, StreamKeyType, UserID
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
+24
-10
@@ -31,19 +31,24 @@ from typing import (
|
||||
import attr
|
||||
from prometheus_client import Counter
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.filtering import FilterCollection
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.logging.context import current_context
|
||||
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
log_kv,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
|
||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
DeviceListUpdates,
|
||||
JsonDict,
|
||||
@@ -55,6 +60,7 @@ from synapse.types import (
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -1528,10 +1534,12 @@ class SyncHandler:
|
||||
#
|
||||
# If we don't have that info cached then we get all the users that
|
||||
# share a room with our user and check if those users have changed.
|
||||
changed_users = self.store.get_cached_device_list_changes(
|
||||
cache_result = self.store.get_cached_device_list_changes(
|
||||
since_token.device_list_key
|
||||
)
|
||||
if changed_users is not None:
|
||||
if cache_result.hit:
|
||||
changed_users = cache_result.entities
|
||||
|
||||
result = await self.store.get_rooms_for_users(changed_users)
|
||||
|
||||
for changed_user_id, entries in result.items():
|
||||
@@ -1584,6 +1592,7 @@ class SyncHandler:
|
||||
else:
|
||||
return DeviceListUpdates()
|
||||
|
||||
@trace
|
||||
async def _generate_sync_entry_for_to_device(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
) -> None:
|
||||
@@ -1603,11 +1612,16 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
for message in messages:
|
||||
# We pop here as we shouldn't be sending the message ID down
|
||||
# `/sync`
|
||||
message_id = message.pop("message_id", None)
|
||||
if message_id:
|
||||
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||
log_kv(
|
||||
{
|
||||
"event": "to_device_message",
|
||||
"sender": message["sender"],
|
||||
"type": message["type"],
|
||||
EventContentFields.TO_DEVICE_MSGID: message["content"].get(
|
||||
EventContentFields.TO_DEVICE_MSGID
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Returning %d to-device messages between %d and %d (current token: %d)",
|
||||
|
||||
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
changed_rooms: Optional[
|
||||
Iterable[str]
|
||||
] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
|
||||
result = self._typing_stream_change_cache.get_all_entities_changed(last_id)
|
||||
|
||||
if changed_rooms is None:
|
||||
if result.hit:
|
||||
changed_rooms: Iterable[str] = result.entities
|
||||
else:
|
||||
changed_rooms = self._room_serials
|
||||
|
||||
rows = []
|
||||
|
||||
+18
-1
@@ -577,7 +577,24 @@ def _unrecognised_request_handler(request: Request) -> NoReturn:
|
||||
Args:
|
||||
request: Unused, but passed in to match the signature of ServletCallback.
|
||||
"""
|
||||
raise UnrecognizedRequestError()
|
||||
raise UnrecognizedRequestError(code=404)
|
||||
|
||||
|
||||
class UnrecognizedRequestResource(resource.Resource):
|
||||
"""
|
||||
Similar to twisted.web.resource.NoResource, but returns a JSON 404 with an
|
||||
errcode of M_UNRECOGNIZED.
|
||||
"""
|
||||
|
||||
def render(self, request: SynapseRequest) -> int:
|
||||
f = failure.Failure(UnrecognizedRequestError(code=404))
|
||||
return_json_error(f, request, None)
|
||||
# A response has already been sent but Twisted requires either NOT_DONE_YET
|
||||
# or the response bytes as a return value.
|
||||
return NOT_DONE_YET
|
||||
|
||||
def getChild(self, name: str, request: Request) -> resource.Resource:
|
||||
return self
|
||||
|
||||
|
||||
class RootRedirect(resource.Resource):
|
||||
|
||||
@@ -292,8 +292,15 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SynapseTags:
|
||||
# The message ID of any to_device message processed
|
||||
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
||||
# The message ID of any to_device EDU processed
|
||||
TO_DEVICE_EDU_ID = "to_device.edu_id"
|
||||
|
||||
# Details about to-device messages
|
||||
TO_DEVICE_TYPE = "to_device.type"
|
||||
TO_DEVICE_SENDER = "to_device.sender"
|
||||
TO_DEVICE_RECIPIENT = "to_device.recipient"
|
||||
TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
|
||||
TO_DEVICE_MSGID = "to_device.msgid" # client-generated ID
|
||||
|
||||
# Whether the sync response has new data to be returned to the client.
|
||||
SYNC_RESULT = "sync.new_data"
|
||||
|
||||
@@ -111,7 +111,6 @@ from synapse.storage.background_updates import (
|
||||
)
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction
|
||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
DomainSpecificString,
|
||||
JsonDict,
|
||||
@@ -124,6 +123,7 @@ from synapse.types import (
|
||||
UserProfile,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import Clock
|
||||
from synapse.util.async_helpers import maybe_awaitable
|
||||
from synapse.util.caches.descriptors import CachedFunction, cached
|
||||
|
||||
@@ -35,8 +35,8 @@ from synapse.events import EventBase, relation_from_event
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.state import POWER_KEY
|
||||
from synapse.storage.databases.main.roommember import EventIdMembership
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_event_for_clients_with_state
|
||||
@@ -342,10 +342,6 @@ class BulkPushRuleEvaluator:
|
||||
for user_id, level in notification_levels.items():
|
||||
notification_levels[user_id] = int(level)
|
||||
|
||||
room_version_features = event.room_version.msc3931_push_features
|
||||
if not room_version_features:
|
||||
room_version_features = []
|
||||
|
||||
evaluator = PushRuleEvaluator(
|
||||
_flatten_dict(event, room_version=event.room_version),
|
||||
room_member_count,
|
||||
@@ -353,7 +349,7 @@ class BulkPushRuleEvaluator:
|
||||
notification_levels,
|
||||
related_events,
|
||||
self._related_event_match_enabled,
|
||||
room_version_features,
|
||||
event.room_version.msc3931_push_features,
|
||||
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
|
||||
)
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ from synapse.push.push_types import (
|
||||
TemplateVars,
|
||||
)
|
||||
from synapse.storage.databases.main.event_push_actions import EmailPushAction
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import StateMap, UserID
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
|
||||
@@ -36,12 +36,14 @@ from synapse.replication.tcp.streams import (
|
||||
TagAccountDataStream,
|
||||
ToDeviceStream,
|
||||
TypingStream,
|
||||
UnPartialStatedRoomStream,
|
||||
)
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStream,
|
||||
EventsStreamEventRow,
|
||||
EventsStreamRow,
|
||||
)
|
||||
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
|
||||
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
|
||||
from synapse.util.async_helpers import Linearizer, timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
@@ -117,6 +119,7 @@ class ReplicationDataHandler:
|
||||
self._streams = hs.get_replication_streams()
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._typing_handler = hs.get_typing_handler()
|
||||
self._state_storage_controller = hs.get_storage_controllers().state
|
||||
|
||||
self._notify_pushers = hs.config.worker.start_pushers
|
||||
self._pusher_pool = hs.get_pusherpool()
|
||||
@@ -236,6 +239,14 @@ class ReplicationDataHandler:
|
||||
self.notifier.notify_user_joined_room(
|
||||
row.data.event_id, row.data.room_id
|
||||
)
|
||||
elif stream_name == UnPartialStatedRoomStream.NAME:
|
||||
for row in rows:
|
||||
assert isinstance(row, UnPartialStatedRoomStreamRow)
|
||||
|
||||
# Wake up any tasks waiting for the room to be un-partial-stated.
|
||||
self._state_storage_controller.notify_room_un_partial_stated(
|
||||
row.room_id
|
||||
)
|
||||
|
||||
await self._presence_handler.process_replication_rows(
|
||||
stream_name, instance_name, token, rows
|
||||
|
||||
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
|
||||
)
|
||||
from synapse.replication.tcp.streams.events import EventsStream
|
||||
from synapse.replication.tcp.streams.federation import FederationStream
|
||||
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
|
||||
|
||||
STREAMS_MAP = {
|
||||
stream.NAME: stream
|
||||
@@ -61,6 +62,7 @@ STREAMS_MAP = {
|
||||
TagAccountDataStream,
|
||||
AccountDataStream,
|
||||
UserSignatureStream,
|
||||
UnPartialStatedRoomStream,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -80,4 +82,5 @@ __all__ = [
|
||||
"TagAccountDataStream",
|
||||
"AccountDataStream",
|
||||
"UserSignatureStream",
|
||||
"UnPartialStatedRoomStream",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import attr
|
||||
|
||||
from synapse.replication.tcp.streams import Stream
|
||||
from synapse.replication.tcp.streams._base import current_token_without_instance
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class UnPartialStatedRoomStreamRow:
|
||||
# ID of the room that has been un-partial-stated.
|
||||
room_id: str
|
||||
|
||||
|
||||
class UnPartialStatedRoomStream(Stream):
|
||||
"""
|
||||
Stream to notify about rooms becoming un-partial-stated;
|
||||
that is, when the background sync finishes such that we now have full state for
|
||||
the room.
|
||||
"""
|
||||
|
||||
NAME = "un_partial_stated_room"
|
||||
ROW_TYPE = UnPartialStatedRoomStreamRow
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
store = hs.get_datastores().main
|
||||
super().__init__(
|
||||
hs.get_instance_name(),
|
||||
# TODO(faster_joins, multiple writers): we need to account for instance names
|
||||
current_token_without_instance(store.get_un_partial_stated_rooms_token),
|
||||
store.get_un_partial_stated_rooms_from_stream,
|
||||
)
|
||||
@@ -13,13 +13,13 @@
|
||||
<body>
|
||||
<header class="mx_Header">
|
||||
{% if app_name == "Riot" %}
|
||||
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
{% elif app_name == "Vector" %}
|
||||
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
{% elif app_name == "Element" %}
|
||||
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
|
||||
{% else %}
|
||||
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
{% endif %}
|
||||
</header>
|
||||
|
||||
|
||||
@@ -21,13 +21,13 @@
|
||||
</td>
|
||||
<td class="logo">
|
||||
{% if app_name == "Riot" %}
|
||||
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
{% elif app_name == "Vector" %}
|
||||
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
{% elif app_name == "Element" %}
|
||||
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
|
||||
{% else %}
|
||||
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
@@ -22,13 +22,13 @@
|
||||
</td>
|
||||
<td class="logo">
|
||||
{%- if app_name == "Riot" %}
|
||||
<img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
<img src="https://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/>
|
||||
{%- elif app_name == "Vector" %}
|
||||
<img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
<img src="https://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/>
|
||||
{%- elif app_name == "Element" %}
|
||||
<img src="https://static.element.io/images/email-logo.png" width="83" height="83" alt="[Element]"/>
|
||||
{%- else %}
|
||||
<img src="http://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
<img src="https://matrix.org/img/matrix-120x51.png" width="120" height="51" alt="[matrix]"/>
|
||||
{%- endif %}
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
@@ -34,9 +34,9 @@ from synapse.rest.admin._base import (
|
||||
assert_user_is_admin,
|
||||
)
|
||||
from synapse.storage.databases.main.room import RoomSortOrder
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, RoomID, UserID, create_requester
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.types import JsonDict
|
||||
from synapse.types import EventID, JsonDict, RoomID
|
||||
|
||||
from ._base import client_patterns
|
||||
|
||||
@@ -56,6 +56,9 @@ class ReceiptRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
if not RoomID.is_valid(room_id) or not event_id.startswith(EventID.SIGIL):
|
||||
raise SynapseError(400, "A valid room ID and event ID must be specified")
|
||||
|
||||
if receipt_type not in self._known_receipt_types:
|
||||
raise SynapseError(
|
||||
400,
|
||||
|
||||
@@ -55,9 +55,9 @@ from synapse.logging.opentracing import set_tag
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.stringutils import parse_and_validate_server_name, random_string
|
||||
@@ -396,12 +396,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
try:
|
||||
content = parse_json_object_from_request(request)
|
||||
except Exception:
|
||||
# Turns out we used to ignore the body entirely, and some clients
|
||||
# cheekily send invalid bodies.
|
||||
content = {}
|
||||
content = parse_json_object_from_request(request, allow_empty_body=True)
|
||||
|
||||
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
|
||||
args: Dict[bytes, List[bytes]] = request.args # type: ignore
|
||||
@@ -952,12 +947,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
|
||||
}:
|
||||
raise AuthError(403, "Guest access not allowed")
|
||||
|
||||
try:
|
||||
content = parse_json_object_from_request(request)
|
||||
except Exception:
|
||||
# Turns out we used to ignore the body entirely, and some clients
|
||||
# cheekily send invalid bodies.
|
||||
content = {}
|
||||
content = parse_json_object_from_request(request, allow_empty_body=True)
|
||||
|
||||
if membership_action == "invite" and all(
|
||||
key in content for key in ("medium", "address")
|
||||
|
||||
@@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
|
||||
def on_PUT(
|
||||
self, request: SynapseRequest, message_type: str, txn_id: str
|
||||
) -> Awaitable[Tuple[int, JsonDict]]:
|
||||
set_tag("message_type", message_type)
|
||||
set_tag("txn_id", txn_id)
|
||||
return self.txns.fetch_or_execute_request(
|
||||
request, self._put, request, message_type, txn_id
|
||||
|
||||
@@ -63,8 +63,8 @@ class UserDirectorySearchRestServlet(RestServlet):
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
limit = body.get("limit", 10)
|
||||
limit = min(limit, 50)
|
||||
limit = int(body.get("limit", 10))
|
||||
limit = max(min(limit, 50), 0)
|
||||
|
||||
try:
|
||||
search_term = body["search_term"]
|
||||
|
||||
@@ -24,7 +24,6 @@ from matrix_common.types.mxc_uri import MXCUri
|
||||
import twisted.internet.error
|
||||
import twisted.web.http
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from synapse.api.errors import (
|
||||
FederationDeniedError,
|
||||
@@ -35,6 +34,7 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.repository import ThumbnailRequirement
|
||||
from synapse.http.server import UnrecognizedRequestResource
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
@@ -1046,7 +1046,7 @@ class MediaRepository:
|
||||
return removed_media, len(removed_media)
|
||||
|
||||
|
||||
class MediaRepositoryResource(Resource):
|
||||
class MediaRepositoryResource(UnrecognizedRequestResource):
|
||||
"""File uploading and downloading.
|
||||
|
||||
Uploads are POSTed to a resource which returns a token which is used to GET
|
||||
|
||||
@@ -44,8 +44,8 @@ from synapse.logging.context import ContextResourceUsage
|
||||
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
|
||||
from synapse.state import v1, v2
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
|
||||
@@ -544,6 +544,48 @@ class BackgroundUpdater:
|
||||
The named index will be dropped upon completion of the new index.
|
||||
"""
|
||||
|
||||
async def updater(progress: JsonDict, batch_size: int) -> int:
|
||||
await self.create_index_in_background(
|
||||
index_name=index_name,
|
||||
table=table,
|
||||
columns=columns,
|
||||
where_clause=where_clause,
|
||||
unique=unique,
|
||||
psql_only=psql_only,
|
||||
replaces_index=replaces_index,
|
||||
)
|
||||
await self._end_background_update(update_name)
|
||||
return 1
|
||||
|
||||
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||
updater, oneshot=True
|
||||
)
|
||||
|
||||
async def create_index_in_background(
|
||||
self,
|
||||
index_name: str,
|
||||
table: str,
|
||||
columns: Iterable[str],
|
||||
where_clause: Optional[str] = None,
|
||||
unique: bool = False,
|
||||
psql_only: bool = False,
|
||||
replaces_index: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Add an index in the background.
|
||||
|
||||
Args:
|
||||
update_name: update_name to register for
|
||||
index_name: name of index to add
|
||||
table: table to add index to
|
||||
columns: columns/expressions to include in index
|
||||
where_clause: A WHERE clause to specify a partial unique index.
|
||||
unique: true to make a UNIQUE index
|
||||
psql_only: true to only create this index on psql databases (useful
|
||||
for virtual sqlite tables)
|
||||
replaces_index: The name of an index that this index replaces.
|
||||
The named index will be dropped upon completion of the new index.
|
||||
"""
|
||||
|
||||
def create_index_psql(conn: Connection) -> None:
|
||||
conn.rollback()
|
||||
# postgres insists on autocommit for the index
|
||||
@@ -618,16 +660,11 @@ class BackgroundUpdater:
|
||||
else:
|
||||
runner = create_index_sqlite
|
||||
|
||||
async def updater(progress: JsonDict, batch_size: int) -> int:
|
||||
if runner is not None:
|
||||
logger.info("Adding index %s to %s", index_name, table)
|
||||
await self.db_pool.runWithConnection(runner)
|
||||
await self._end_background_update(update_name)
|
||||
return 1
|
||||
if runner is None:
|
||||
return
|
||||
|
||||
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||
updater, oneshot=True
|
||||
)
|
||||
logger.info("Adding index %s to %s", index_name, table)
|
||||
await self.db_pool.runWithConnection(runner)
|
||||
|
||||
async def _end_background_update(self, update_name: str) -> None:
|
||||
"""Removes a completed background update task from the queue.
|
||||
|
||||
@@ -58,13 +58,13 @@ from synapse.storage.controllers.state import StateStorageController
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
get_domain_from_id,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
|
||||
@@ -31,12 +31,12 @@ from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.storage.util.partial_state_events_tracker import (
|
||||
PartialCurrentStateTracker,
|
||||
PartialStateEventsTracker,
|
||||
)
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.cancellation import cancellable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@@ -667,7 +667,8 @@ class DatabasePool:
|
||||
)
|
||||
# also check variables referenced in func's closure
|
||||
if inspect.isfunction(func):
|
||||
f = cast(types.FunctionType, func)
|
||||
# Keep the cast for now---it helps PyCharm to understand what `func` is.
|
||||
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
|
||||
if f.__closure__:
|
||||
for i, cell in enumerate(f.__closure__):
|
||||
if inspect.isgenerator(cell.cell_contents):
|
||||
|
||||
@@ -26,8 +26,15 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.logging import issue9533_logger
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
log_kv,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import (
|
||||
@@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
(recipient_user_id, recipient_device_id), []
|
||||
).append(message_dict)
|
||||
|
||||
# start a new span for each message, so that we can tag each separately
|
||||
with start_active_span("get_to_device_message"):
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
|
||||
set_tag(
|
||||
SynapseTags.TO_DEVICE_MSGID,
|
||||
message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
|
||||
)
|
||||
|
||||
if limit is not None and rowcount == limit:
|
||||
# We ended up bumping up against the message limit. There may be more messages
|
||||
# to retrieve. Return what we have, as well as the last stream position that
|
||||
@@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
if remote_messages_by_destination:
|
||||
issue9533_logger.debug(
|
||||
"Queued outgoing to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
list(remote_messages_by_destination.keys()),
|
||||
)
|
||||
for destination, edu in remote_messages_by_destination.items():
|
||||
if issue9533_logger.isEnabledFor(logging.DEBUG):
|
||||
issue9533_logger.debug(
|
||||
"Queued outgoing to-device messages with "
|
||||
"stream_id %i, EDU message_id %s, type %s for %s: %s",
|
||||
stream_id,
|
||||
edu["message_id"],
|
||||
edu["type"],
|
||||
destination,
|
||||
[
|
||||
f"{user_id}/{device_id} (msgid "
|
||||
f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
|
||||
for (user_id, messages_by_device) in edu["messages"].items()
|
||||
for (device_id, msg) in messages_by_device.items()
|
||||
],
|
||||
)
|
||||
|
||||
for (user_id, messages_by_device) in edu["messages"].items():
|
||||
for (device_id, msg) in messages_by_device.items():
|
||||
with start_active_span("store_outgoing_to_device_message"):
|
||||
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
|
||||
set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
|
||||
set_tag(
|
||||
SynapseTags.TO_DEVICE_MSGID,
|
||||
msg.get(EventContentFields.TO_DEVICE_MSGID),
|
||||
)
|
||||
|
||||
async with self._device_inbox_id_gen.get_next() as stream_id:
|
||||
now_ms = self._clock.time_msec()
|
||||
@@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
# Only insert into the local inbox if the device exists on
|
||||
# this server
|
||||
device_id = row["device_id"]
|
||||
message_json = json_encoder.encode(messages_by_device[device_id])
|
||||
|
||||
with start_active_span("serialise_to_device_message"):
|
||||
msg = messages_by_device[device_id]
|
||||
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
|
||||
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
|
||||
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
|
||||
set_tag(
|
||||
SynapseTags.TO_DEVICE_MSGID,
|
||||
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
|
||||
)
|
||||
message_json = json_encoder.encode(msg)
|
||||
|
||||
messages_json_for_user[device_id] = message_json
|
||||
|
||||
if messages_json_for_user:
|
||||
@@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
issue9533_logger.debug(
|
||||
"Stored to-device messages with stream_id %i for %s",
|
||||
stream_id,
|
||||
[
|
||||
(user_id, device_id)
|
||||
for (user_id, messages_by_device) in local_by_user_then_device.items()
|
||||
for device_id in messages_by_device.keys()
|
||||
],
|
||||
)
|
||||
if issue9533_logger.isEnabledFor(logging.DEBUG):
|
||||
issue9533_logger.debug(
|
||||
"Stored to-device messages with stream_id %i: %s",
|
||||
stream_id,
|
||||
[
|
||||
f"{user_id}/{device_id} (msgid "
|
||||
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
|
||||
for (
|
||||
user_id,
|
||||
messages_by_device,
|
||||
) in messages_by_user_then_device.items()
|
||||
for (device_id, msg) in messages_by_device.items()
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
|
||||
@@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.caches.stream_change_cache import (
|
||||
AllEntitiesChangedResult,
|
||||
StreamChangeCache,
|
||||
)
|
||||
from synapse.util.cancellation import cancellable
|
||||
from synapse.util.iterutils import batch_iter
|
||||
from synapse.util.stringutils import shortstr
|
||||
@@ -799,18 +802,66 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
def get_cached_device_list_changes(
|
||||
self,
|
||||
from_key: int,
|
||||
) -> Optional[List[str]]:
|
||||
) -> AllEntitiesChangedResult:
|
||||
"""Get set of users whose devices have changed since `from_key`, or None
|
||||
if that information is not in our cache.
|
||||
"""
|
||||
|
||||
return self._device_list_stream_cache.get_all_entities_changed(from_key)
|
||||
|
||||
@cancellable
|
||||
async def get_all_devices_changed(
|
||||
self,
|
||||
from_key: int,
|
||||
to_key: int,
|
||||
) -> Set[str]:
|
||||
"""Get all users whose devices have changed in the given range.
|
||||
|
||||
Args:
|
||||
from_key: The minimum device lists stream token to query device list
|
||||
changes for, exclusive.
|
||||
to_key: The maximum device lists stream token to query device list
|
||||
changes for, inclusive.
|
||||
|
||||
Returns:
|
||||
The set of user_ids whose devices have changed since `from_key`
|
||||
(exclusive) until `to_key` (inclusive).
|
||||
"""
|
||||
|
||||
result = self._device_list_stream_cache.get_all_entities_changed(from_key)
|
||||
|
||||
if result.hit:
|
||||
# We know which users might have changed devices.
|
||||
if not result.entities:
|
||||
# If no users then we can return early.
|
||||
return set()
|
||||
|
||||
# Otherwise we need to filter down the list
|
||||
return await self.get_users_whose_devices_changed(
|
||||
from_key, result.entities, to_key
|
||||
)
|
||||
|
||||
# If the cache didn't tell us anything, we just need to query the full
|
||||
# range.
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM device_lists_stream
|
||||
WHERE ? < stream_id AND stream_id <= ?
|
||||
"""
|
||||
|
||||
rows = await self.db_pool.execute(
|
||||
"get_all_devices_changed",
|
||||
None,
|
||||
sql,
|
||||
from_key,
|
||||
to_key,
|
||||
)
|
||||
return {u for u, in rows}
|
||||
|
||||
@cancellable
|
||||
async def get_users_whose_devices_changed(
|
||||
self,
|
||||
from_key: int,
|
||||
user_ids: Optional[Collection[str]] = None,
|
||||
user_ids: Collection[str],
|
||||
to_key: Optional[int] = None,
|
||||
) -> Set[str]:
|
||||
"""Get set of users whose devices have changed since `from_key` that
|
||||
@@ -830,52 +881,32 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
"""
|
||||
# Get set of users who *may* have changed. Users not in the returned
|
||||
# list have definitely not changed.
|
||||
user_ids_to_check: Optional[Collection[str]]
|
||||
if user_ids is None:
|
||||
# Get set of all users that have had device list changes since 'from_key'
|
||||
user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
|
||||
from_key
|
||||
)
|
||||
else:
|
||||
# The same as above, but filter results to only those users in 'user_ids'
|
||||
user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
|
||||
user_ids, from_key
|
||||
)
|
||||
user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
|
||||
user_ids, from_key
|
||||
)
|
||||
|
||||
# If an empty set was returned, there's nothing to do.
|
||||
if user_ids_to_check is not None and not user_ids_to_check:
|
||||
if not user_ids_to_check:
|
||||
return set()
|
||||
|
||||
if to_key is None:
|
||||
to_key = self._device_list_id_gen.get_current_token()
|
||||
|
||||
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
|
||||
stream_id_where_clause = "stream_id > ?"
|
||||
sql_args = [from_key]
|
||||
|
||||
if to_key:
|
||||
stream_id_where_clause += " AND stream_id <= ?"
|
||||
sql_args.append(to_key)
|
||||
|
||||
sql = f"""
|
||||
sql = """
|
||||
SELECT DISTINCT user_id FROM device_lists_stream
|
||||
WHERE {stream_id_where_clause}
|
||||
WHERE ? < stream_id AND stream_id <= ? AND %s
|
||||
"""
|
||||
|
||||
# If the stream change cache gave us no information, fetch *all*
|
||||
# users between the stream IDs.
|
||||
if user_ids_to_check is None:
|
||||
txn.execute(sql, sql_args)
|
||||
return {user_id for user_id, in txn}
|
||||
changes: Set[str] = set()
|
||||
|
||||
# Otherwise, fetch changes for the given users.
|
||||
else:
|
||||
changes: Set[str] = set()
|
||||
|
||||
# Query device changes with a batch of users at a time
|
||||
for chunk in batch_iter(user_ids_to_check, 100):
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "user_id", chunk
|
||||
)
|
||||
txn.execute(sql + " AND " + clause, sql_args + args)
|
||||
changes.update(user_id for user_id, in txn)
|
||||
# Query device changes with a batch of users at a time
|
||||
for chunk in batch_iter(user_ids_to_check, 100):
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "user_id", chunk
|
||||
)
|
||||
txn.execute(sql % (clause,), [from_key, to_key] + args)
|
||||
changes.update(user_id for user_id, in txn)
|
||||
|
||||
return changes
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
|
||||
@cancellable
|
||||
async def get_e2e_device_keys_for_cs_api(
|
||||
self,
|
||||
query_list: List[Tuple[str, Optional[str]]],
|
||||
query_list: Collection[Tuple[str, Optional[str]]],
|
||||
include_displaynames: bool = True,
|
||||
) -> Dict[str, Dict[str, JsonDict]]:
|
||||
"""Fetch a list of device keys, formatted suitably for the C/S API.
|
||||
|
||||
@@ -16,11 +16,11 @@ import logging
|
||||
import threading
|
||||
import weakref
|
||||
from enum import Enum, auto
|
||||
from itertools import chain
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Collection,
|
||||
Container,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
@@ -76,6 +76,7 @@ from synapse.storage.util.id_generators import (
|
||||
)
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
@@ -879,7 +880,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
async def get_stripped_room_state_from_event_context(
|
||||
self,
|
||||
context: EventContext,
|
||||
state_types_to_include: Container[str],
|
||||
state_keys_to_include: StateFilter,
|
||||
membership_user_id: Optional[str] = None,
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
@@ -892,7 +893,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
Args:
|
||||
context: The event context to retrieve state of the room from.
|
||||
state_types_to_include: The type of state events to include.
|
||||
state_keys_to_include: The state events to include, for each event type.
|
||||
membership_user_id: An optional user ID to include the stripped membership state
|
||||
events of. This is useful when generating the stripped state of a room for
|
||||
invites. We want to send membership events of the inviter, so that the
|
||||
@@ -901,21 +902,25 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
Returns:
|
||||
A list of dictionaries, each representing a stripped state event from the room.
|
||||
"""
|
||||
current_state_ids = await context.get_current_state_ids()
|
||||
if membership_user_id:
|
||||
types = chain(
|
||||
state_keys_to_include.to_types(),
|
||||
[(EventTypes.Member, membership_user_id)],
|
||||
)
|
||||
filter = StateFilter.from_types(types)
|
||||
else:
|
||||
filter = state_keys_to_include
|
||||
selected_state_ids = await context.get_current_state_ids(filter)
|
||||
|
||||
# We know this event is not an outlier, so this must be
|
||||
# non-None.
|
||||
assert current_state_ids is not None
|
||||
assert selected_state_ids is not None
|
||||
|
||||
# The state to include
|
||||
state_to_include_ids = [
|
||||
e_id
|
||||
for k, e_id in current_state_ids.items()
|
||||
if k[0] in state_types_to_include
|
||||
or (membership_user_id and k == (EventTypes.Member, membership_user_id))
|
||||
]
|
||||
# Confusingly, get_current_state_events may return events that are discarded by
|
||||
# the filter, if they're in context._state_delta_due_to_event. Strip these away.
|
||||
selected_state_ids = filter.filter_state(selected_state_ids)
|
||||
|
||||
state_to_include = await self.get_events(state_to_include_ids)
|
||||
state_to_include = await self.get_events(selected_state_ids.values())
|
||||
|
||||
return [
|
||||
{
|
||||
|
||||
@@ -924,39 +924,6 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
||||
|
||||
return batch_size
|
||||
|
||||
async def _create_receipts_index(self, index_name: str, table: str) -> None:
|
||||
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
|
||||
receipts table, for non-thread receipts."""
|
||||
|
||||
def _create_index(conn: LoggingDatabaseConnection) -> None:
|
||||
conn.rollback()
|
||||
|
||||
# we have to set autocommit, because postgres refuses to
|
||||
# CREATE INDEX CONCURRENTLY without it.
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
conn.set_session(autocommit=True)
|
||||
|
||||
try:
|
||||
c = conn.cursor()
|
||||
|
||||
# Now that the duplicates are gone, we can create the index.
|
||||
concurrently = (
|
||||
"CONCURRENTLY"
|
||||
if isinstance(self.database_engine, PostgresEngine)
|
||||
else ""
|
||||
)
|
||||
sql = f"""
|
||||
CREATE UNIQUE INDEX {concurrently} {index_name}
|
||||
ON {table}(room_id, receipt_type, user_id)
|
||||
WHERE thread_id IS NULL
|
||||
"""
|
||||
c.execute(sql)
|
||||
finally:
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
conn.set_session(autocommit=False)
|
||||
|
||||
await self.db_pool.runWithConnection(_create_index)
|
||||
|
||||
async def _background_receipts_linearized_unique_index(
|
||||
self, progress: dict, batch_size: int
|
||||
) -> int:
|
||||
@@ -999,9 +966,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
||||
_remote_duplicate_receipts_txn,
|
||||
)
|
||||
|
||||
await self._create_receipts_index(
|
||||
"receipts_linearized_unique_index",
|
||||
"receipts_linearized",
|
||||
await self.db_pool.updates.create_index_in_background(
|
||||
index_name="receipts_linearized_unique_index",
|
||||
table="receipts_linearized",
|
||||
columns=["room_id", "receipt_type", "user_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
unique=True,
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
@@ -1050,9 +1020,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
||||
_remote_duplicate_receipts_txn,
|
||||
)
|
||||
|
||||
await self._create_receipts_index(
|
||||
"receipts_graph_unique_index",
|
||||
"receipts_graph",
|
||||
await self.db_pool.updates.create_index_in_background(
|
||||
index_name="receipts_graph_unique_index",
|
||||
table="receipts_graph",
|
||||
columns=["room_id", "receipt_type", "user_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
unique=True,
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -50,8 +50,14 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.id_generators import IdGenerator
|
||||
from synapse.storage.util.id_generators import (
|
||||
AbstractStreamIdGenerator,
|
||||
IdGenerator,
|
||||
MultiWriterIdGenerator,
|
||||
StreamIdGenerator,
|
||||
)
|
||||
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
|
||||
self.config: HomeServerConfig = hs.config
|
||||
|
||||
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
|
||||
|
||||
if isinstance(database.engine, PostgresEngine):
|
||||
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
|
||||
db_conn=db_conn,
|
||||
db=database,
|
||||
stream_name="un_partial_stated_room_stream",
|
||||
instance_name=self._instance_name,
|
||||
tables=[
|
||||
("un_partial_stated_room_stream", "instance_name", "stream_id")
|
||||
],
|
||||
sequence_name="un_partial_stated_room_stream_sequence",
|
||||
# TODO(faster_joins, multiple writers) Support multiple writers.
|
||||
writers=["master"],
|
||||
)
|
||||
else:
|
||||
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "un_partial_stated_room_stream", "stream_id"
|
||||
)
|
||||
|
||||
async def store_room(
|
||||
self,
|
||||
room_id: str,
|
||||
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
|
||||
return room_servers
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
Args:
|
||||
room_id: The room whose partial state flag is to be cleared.
|
||||
|
||||
Returns:
|
||||
`True` if the partial state flag has been cleared successfully.
|
||||
|
||||
`False` if the partial state flag could not be cleared because the room
|
||||
still contains events with partial state.
|
||||
"""
|
||||
try:
|
||||
await self.db_pool.runInteraction(
|
||||
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
|
||||
)
|
||||
return True
|
||||
except self.db_pool.engine.module.IntegrityError as e:
|
||||
# Assume that any `IntegrityError`s are due to partial state events.
|
||||
logger.info(
|
||||
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
return False
|
||||
|
||||
def _clear_partial_state_room_txn(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
DatabasePool.simple_delete_txn(
|
||||
txn,
|
||||
table="partial_state_rooms_servers",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
DatabasePool.simple_delete_one_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_partial_state_servers_at_join, (room_id,)
|
||||
)
|
||||
|
||||
# We now delete anything from `device_lists_remote_pending` with a
|
||||
# stream ID less than the minimum
|
||||
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
|
||||
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={},
|
||||
retcol="MIN(device_lists_stream_id)",
|
||||
allow_none=True,
|
||||
)
|
||||
if device_lists_stream_id is None:
|
||||
# There are no rooms being currently partially joined, so we delete everything.
|
||||
txn.execute("DELETE FROM device_lists_remote_pending")
|
||||
else:
|
||||
sql = """
|
||||
DELETE FROM device_lists_remote_pending
|
||||
WHERE stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (device_lists_stream_id,))
|
||||
|
||||
@cached()
|
||||
async def is_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Checks if this room has partial state.
|
||||
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
||||
)
|
||||
return result["join_event_id"], result["device_lists_stream_id"]
|
||||
|
||||
def get_un_partial_stated_rooms_token(self) -> int:
|
||||
# TODO(faster_joins, multiple writers): This is inappropriate if there
|
||||
# are multiple writers because workers that don't write often will
|
||||
# hold all readers up.
|
||||
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
|
||||
# explanation.)
|
||||
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
|
||||
|
||||
async def get_un_partial_stated_rooms_from_stream(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||
"""Get updates for caches replication stream.
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
here since there is only ever one writer.
|
||||
last_id: The token to fetch updates from. Exclusive.
|
||||
current_id: The token to fetch updates up to. Inclusive.
|
||||
limit: The requested limit for the number of rows to return. The
|
||||
function may return more or fewer rows.
|
||||
|
||||
Returns:
|
||||
A tuple consisting of: the updates, a token to use to fetch
|
||||
subsequent updates, and whether we returned fewer rows than exists
|
||||
between the requested tokens due to the limit.
|
||||
|
||||
The token returned can be used in a subsequent call to this
|
||||
function to get further updatees.
|
||||
|
||||
The updates are a list of 2-tuples of stream ID and the row data
|
||||
"""
|
||||
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_un_partial_stated_rooms_from_stream_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||
sql = """
|
||||
SELECT stream_id, room_id
|
||||
FROM un_partial_stated_room_stream
|
||||
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
updates = [(row[0], (row[1],)) for row in txn]
|
||||
limited = False
|
||||
upto_token = current_id
|
||||
if len(updates) >= limit:
|
||||
upto_token = updates[-1][0]
|
||||
limited = True
|
||||
|
||||
return updates, upto_token, limited
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_un_partial_stated_rooms_from_stream",
|
||||
get_un_partial_stated_rooms_from_stream_txn,
|
||||
)
|
||||
|
||||
|
||||
class _BackgroundUpdates:
|
||||
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
|
||||
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
async def upsert_room_on_join(
|
||||
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
|
||||
) -> None:
|
||||
@@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
self.is_room_blocked,
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
Args:
|
||||
room_id: The room whose partial state flag is to be cleared.
|
||||
|
||||
Returns:
|
||||
`True` if the partial state flag has been cleared successfully.
|
||||
|
||||
`False` if the partial state flag could not be cleared because the room
|
||||
still contains events with partial state.
|
||||
"""
|
||||
try:
|
||||
async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
|
||||
await self.db_pool.runInteraction(
|
||||
"clear_partial_state_room",
|
||||
self._clear_partial_state_room_txn,
|
||||
room_id,
|
||||
un_partial_state_room_stream_id,
|
||||
)
|
||||
return True
|
||||
except self.db_pool.engine.module.IntegrityError as e:
|
||||
# Assume that any `IntegrityError`s are due to partial state events.
|
||||
logger.info(
|
||||
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
return False
|
||||
|
||||
def _clear_partial_state_room_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
un_partial_state_room_stream_id: int,
|
||||
) -> None:
|
||||
DatabasePool.simple_delete_txn(
|
||||
txn,
|
||||
table="partial_state_rooms_servers",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
DatabasePool.simple_delete_one_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_partial_state_servers_at_join, (room_id,)
|
||||
)
|
||||
|
||||
DatabasePool.simple_insert_txn(
|
||||
txn,
|
||||
"un_partial_stated_room_stream",
|
||||
{
|
||||
"stream_id": un_partial_state_room_stream_id,
|
||||
"instance_name": self._instance_name,
|
||||
"room_id": room_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We now delete anything from `device_lists_remote_pending` with a
|
||||
# stream ID less than the minimum
|
||||
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
|
||||
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={},
|
||||
retcol="MIN(device_lists_stream_id)",
|
||||
allow_none=True,
|
||||
)
|
||||
if device_lists_stream_id is None:
|
||||
# There are no rooms being currently partially joined, so we delete everything.
|
||||
txn.execute("DELETE FROM device_lists_remote_pending")
|
||||
else:
|
||||
sql = """
|
||||
DELETE FROM device_lists_remote_pending
|
||||
WHERE stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (device_lists_stream_id,))
|
||||
|
||||
@@ -33,8 +33,8 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict, JsonMapping, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches import intern_string
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.cancellation import cancellable
|
||||
|
||||
@@ -26,6 +26,14 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
try:
|
||||
# Figure out if ICU support is available for searching users.
|
||||
import icu
|
||||
|
||||
USE_ICU = True
|
||||
except ModuleNotFoundError:
|
||||
USE_ICU = False
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
@@ -886,7 +894,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
|
||||
|
||||
limited = len(results) > limit
|
||||
|
||||
return {"limited": limited, "results": results}
|
||||
return {"limited": limited, "results": results[0:limit]}
|
||||
|
||||
|
||||
def _parse_query_sqlite(search_term: str) -> str:
|
||||
@@ -900,7 +908,7 @@ def _parse_query_sqlite(search_term: str) -> str:
|
||||
"""
|
||||
|
||||
# Pull out the individual words, discarding any non-word characters.
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
results = _parse_words(search_term)
|
||||
return " & ".join("(%s* OR %s)" % (result, result) for result in results)
|
||||
|
||||
|
||||
@@ -910,12 +918,63 @@ def _parse_query_postgres(search_term: str) -> Tuple[str, str, str]:
|
||||
We use this so that we can add prefix matching, which isn't something
|
||||
that is supported by default.
|
||||
"""
|
||||
|
||||
# Pull out the individual words, discarding any non-word characters.
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
results = _parse_words(search_term)
|
||||
|
||||
both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
|
||||
exact = " & ".join("%s" % (result,) for result in results)
|
||||
prefix = " & ".join("%s:*" % (result,) for result in results)
|
||||
|
||||
return both, exact, prefix
|
||||
|
||||
|
||||
def _parse_words(search_term: str) -> List[str]:
|
||||
"""Split the provided search string into a list of its words.
|
||||
|
||||
If support for ICU (International Components for Unicode) is available, use it.
|
||||
Otherwise, fall back to using a regex to detect word boundaries. This latter
|
||||
solution works well enough for most latin-based languages, but doesn't work as well
|
||||
with other languages.
|
||||
|
||||
Args:
|
||||
search_term: The search string.
|
||||
|
||||
Returns:
|
||||
A list of the words in the search string.
|
||||
"""
|
||||
if USE_ICU:
|
||||
return _parse_words_with_icu(search_term)
|
||||
|
||||
return re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
|
||||
def _parse_words_with_icu(search_term: str) -> List[str]:
|
||||
"""Break down the provided search string into its individual words using ICU
|
||||
(International Components for Unicode).
|
||||
|
||||
Args:
|
||||
search_term: The search string.
|
||||
|
||||
Returns:
|
||||
A list of the words in the search string.
|
||||
"""
|
||||
results = []
|
||||
breaker = icu.BreakIterator.createWordInstance(icu.Locale.getDefault())
|
||||
breaker.setText(search_term)
|
||||
i = 0
|
||||
while True:
|
||||
j = breaker.nextBoundary()
|
||||
if j < 0:
|
||||
break
|
||||
|
||||
result = search_term[i:j]
|
||||
|
||||
# libicu considers spaces and punctuation between words as words, but we don't
|
||||
# want to include those in results as they would result in syntax errors in SQL
|
||||
# queries (e.g. "foo bar" would result in the search query including "foo & &
|
||||
# bar").
|
||||
if len(re.findall(r"([\w\-]+)", result, re.UNICODE)):
|
||||
results.append(result)
|
||||
|
||||
i = j
|
||||
|
||||
return results
|
||||
|
||||
@@ -22,8 +22,8 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches import intern_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@@ -25,10 +25,10 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import MutableStateMap, StateKey, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.cancellation import cancellable
|
||||
|
||||
@@ -77,7 +77,7 @@ class PostgresEngine(
|
||||
# docs: The number is formed by converting the major, minor, and
|
||||
# revision numbers into two-decimal-digit numbers and appending them
|
||||
# together. For example, version 8.1.5 will be returned as 80105
|
||||
self._version = cast(int, db_conn.server_version)
|
||||
self._version = db_conn.server_version
|
||||
allow_unsafe_locale = self.config.get("allow_unsafe_locale", False)
|
||||
|
||||
# Are we on a supported PostgreSQL version?
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Stream for notifying that a room has become un-partial-stated.
|
||||
CREATE TABLE un_partial_stated_room_stream(
|
||||
-- Position in the stream
|
||||
stream_id BIGINT PRIMARY KEY NOT NULL,
|
||||
|
||||
-- Which instance wrote this entry.
|
||||
instance_name TEXT NOT NULL,
|
||||
|
||||
-- Which room has been un-partial-stated.
|
||||
room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- We want an index here because of the foreign key constraint:
|
||||
-- upon deleting a room, the database needs to be able to check here.
|
||||
-- This index is not unique because we can join a room multiple times in a server's lifetime,
|
||||
-- so the same room could be un-partial-stated multiple times!
|
||||
CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user