Compare commits
46 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9796706a96 | |||
| 86b836f2b5 | |||
| edb91e78b2 | |||
| 0e67a3f703 | |||
| 1630199e4d | |||
| a3e154910a | |||
| b3e0354c98 | |||
| e55ad9b6cf | |||
| 4b180db298 | |||
| c25ec34d73 | |||
| 41b9def9f2 | |||
| 4ee82c0576 | |||
| 375b0a8a11 | |||
| 7148c2a0d6 | |||
| 9f6ff6a0eb | |||
| 77cda342be | |||
| c51d2e6199 | |||
| 0ccfb9318c | |||
| 3ec9f3b0cc | |||
| c97198ee14 | |||
| 55b08534a4 | |||
| ba572647b2 | |||
| f2905d827f | |||
| eb3c1823d8 | |||
| ba6b21c81e | |||
| 8583346335 | |||
| b3ada9bfb4 | |||
| aa5c0592e7 | |||
| 3690d5bd89 | |||
| 7b6c9f4c04 | |||
| 2e8a2bda52 | |||
| 3fd8eb81de | |||
| 1b4782a37d | |||
| 34ab801379 | |||
| bcd2495469 | |||
| def480442d | |||
| 808105bd31 | |||
| c96a1d2a27 | |||
| 08297f2f18 | |||
| 7c76514f1e | |||
| d19d1edbcf | |||
| 5a7742a833 | |||
| 2611433b70 | |||
| 5bf9ec9e3e | |||
| e4f545c452 | |||
| 722ccc30b5 |
@@ -31,35 +31,6 @@ sed -i \
|
||||
-e '/systemd/d' \
|
||||
pyproject.toml
|
||||
|
||||
# Use poetry to do the installation. This ensures that the versions are all mutually
|
||||
# compatible (as far the package metadata declares, anyway); pip's package resolver
|
||||
# is more lax.
|
||||
#
|
||||
# Rather than `poetry install --no-dev`, we drop all dev dependencies and the dev-docs
|
||||
# group from the toml file. This means we don't have to ensure compatibility between
|
||||
# old deps and dev tools.
|
||||
|
||||
pip install toml wheel
|
||||
|
||||
REMOVE_DEV_DEPENDENCIES="
|
||||
import toml
|
||||
with open('pyproject.toml', 'r') as f:
|
||||
data = toml.loads(f.read())
|
||||
|
||||
del data['tool']['poetry']['dev-dependencies']
|
||||
del data['tool']['poetry']['group']['dev-docs']
|
||||
|
||||
with open('pyproject.toml', 'w') as f:
|
||||
toml.dump(data, f)
|
||||
"
|
||||
python3 -c "$REMOVE_DEV_DEPENDENCIES"
|
||||
|
||||
pip install poetry==1.3.2
|
||||
poetry lock
|
||||
|
||||
echo "::group::Patched pyproject.toml"
|
||||
cat pyproject.toml
|
||||
echo "::endgroup::"
|
||||
echo "::group::Lockfile after patch"
|
||||
cat poetry.lock
|
||||
echo "::endgroup::"
|
||||
|
||||
@@ -22,7 +22,7 @@ jobs:
|
||||
path: book
|
||||
|
||||
- name: 📤 Deploy to Netlify
|
||||
uses: matrix-org/netlify-pr-preview@v1
|
||||
uses: matrix-org/netlify-pr-preview@v2
|
||||
with:
|
||||
path: book
|
||||
owner: ${{ github.event.workflow_run.head_repository.owner.login }}
|
||||
|
||||
+22
-30
@@ -107,14 +107,15 @@ jobs:
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
# NB: I have two concerns with this action:
|
||||
# 1. We occasionally see odd mypy problems that aren't reproducible
|
||||
# locally with clean caches. I suspect some dodgy caching behaviour.
|
||||
# 2. The action uses GHA machinery that's deprecated
|
||||
# (https://github.com/AustinScola/mypy-cache-github-action/issues/277)
|
||||
# It may be simpler to use actions/cache ourselves to restore .mypy_cache.
|
||||
# Cribbed from
|
||||
# https://github.com/AustinScola/mypy-cache-github-action/blob/85ea4f2972abed39b33bd02c36e341b28ca59213/src/restore.ts#L10-L17
|
||||
- name: Restore/persist mypy's cache
|
||||
uses: AustinScola/mypy-cache-github-action@df56268388422ee282636ee2c7a9cc55ec644a41
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
.mypy_cache
|
||||
key: mypy-cache-${{ github.context.sha }}
|
||||
restore-keys: mypy-cache-
|
||||
|
||||
- name: Run mypy
|
||||
run: poetry run mypy
|
||||
@@ -320,34 +321,25 @@ jobs:
|
||||
with:
|
||||
python-version: '3.7'
|
||||
|
||||
# Calculating the old-deps actually takes a bunch of time, so we cache the
|
||||
# pyproject.toml / poetry.lock. We need to cache pyproject.toml as
|
||||
# otherwise the `poetry install` step will error due to the poetry.lock
|
||||
# file being outdated.
|
||||
#
|
||||
# This caches the output of `Prepare old deps`, which should generate the
|
||||
# same `pyproject.toml` and `poetry.lock` for a given `pyproject.toml` input.
|
||||
- uses: actions/cache@v3
|
||||
id: cache-poetry-old-deps
|
||||
name: Cache poetry.lock
|
||||
with:
|
||||
path: |
|
||||
poetry.lock
|
||||
pyproject.toml
|
||||
key: poetry-old-deps2-${{ hashFiles('pyproject.toml') }}
|
||||
- name: Prepare old deps
|
||||
if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true'
|
||||
run: .ci/scripts/prepare_old_deps.sh
|
||||
|
||||
# We only now install poetry so that `setup-python-poetry` caches the
|
||||
# right poetry.lock's dependencies.
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
python-version: '3.7'
|
||||
poetry-version: "1.3.2"
|
||||
extras: "all test"
|
||||
# Note: we install using `pip` here, not poetry. `poetry install` ignores the
|
||||
# build-system section (https://github.com/python-poetry/poetry/issues/6154), but
|
||||
# we explicitly want to test that you can `pip install` using the oldest version
|
||||
# of poetry-core and setuptools-rust.
|
||||
- run: pip install .[all,test]
|
||||
|
||||
- run: poetry run trial -j6 tests
|
||||
# We nuke the local copy, as we've installed synapse into the virtualenv
|
||||
# (rather than use an editable install, which we no longer support). If we
|
||||
# don't do this then python can't find the native lib.
|
||||
- run: rm -rf synapse/
|
||||
|
||||
# Sanity check we can import/run Synapse
|
||||
- run: python -m synapse.app.homeserver --help
|
||||
|
||||
- run: python -m twisted.trial -j6 tests
|
||||
- name: Dump logs
|
||||
# Logs are most useful when the command fails, always include them.
|
||||
if: ${{ always() }}
|
||||
|
||||
+77
@@ -1,3 +1,80 @@
|
||||
Synapse 1.84.0rc1 (2023-05-16)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add an option to prevent media downloads from configured domains. ([\#15197](https://github.com/matrix-org/synapse/issues/15197))
|
||||
- Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them. ([\#15224](https://github.com/matrix-org/synapse/issues/15224))
|
||||
- Add redis TLS configuration options. ([\#15312](https://github.com/matrix-org/synapse/issues/15312))
|
||||
- Add a config option to delay push notifications by a random amount, to discourage time-based profiling. ([\#15516](https://github.com/matrix-org/synapse/issues/15516))
|
||||
- Stabilize support for [MSC2659](https://github.com/matrix-org/matrix-spec-proposals/pull/2659): application service ping endpoint. Contributed by Tulir @ Beeper. ([\#15528](https://github.com/matrix-org/synapse/issues/15528))
|
||||
- Implement [MSC4009](https://github.com/matrix-org/matrix-spec-proposals/pull/4009) to expand the supported characters in Matrix IDs. ([\#15536](https://github.com/matrix-org/synapse/issues/15536))
|
||||
- Advertise support for Matrix 1.6 on `/_matrix/client/versions`. ([\#15559](https://github.com/matrix-org/synapse/issues/15559))
|
||||
- Print full error and stack-trace of any exception that occurs during startup/initialization. ([\#15569](https://github.com/matrix-org/synapse/issues/15569))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Don't fail on federation over TOR where SRV queries are not supported. Contributed by Zdzichu. ([\#15523](https://github.com/matrix-org/synapse/issues/15523))
|
||||
- Experimental support for [MSC4010](https://github.com/matrix-org/matrix-spec-proposals/pull/4010) which rejects setting the `"m.push_rules"` via account data. ([\#15554](https://github.com/matrix-org/synapse/issues/15554), [\#15555](https://github.com/matrix-org/synapse/issues/15555))
|
||||
- Fix a long-standing bug where an invalid membership event could cause an internal server error. ([\#15564](https://github.com/matrix-org/synapse/issues/15564))
|
||||
- Require at least poetry-core v1.1.0. ([\#15566](https://github.com/matrix-org/synapse/issues/15566), [\#15571](https://github.com/matrix-org/synapse/issues/15571))
|
||||
|
||||
|
||||
Updates to the Docker image
|
||||
---------------------------
|
||||
|
||||
- Add pkg-config package to Stage 0 to be able to build Dockerfile on ppc64le architecture. ([\#15567](https://github.com/matrix-org/synapse/issues/15567))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Clarify documentation of the "Create or modify account" Admin API. ([\#15544](https://github.com/matrix-org/synapse/issues/15544))
|
||||
- Fix path to the `statistics/database/rooms` admin API in documentation. ([\#15560](https://github.com/matrix-org/synapse/issues/15560))
|
||||
- Update and improve Mastodon Single Sign-On documentation. ([\#15587](https://github.com/matrix-org/synapse/issues/15587))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Use oEmbed to generate URL previews for YouTube Shorts. ([\#15025](https://github.com/matrix-org/synapse/issues/15025))
|
||||
- Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little. ([\#15470](https://github.com/matrix-org/synapse/issues/15470))
|
||||
- Remove need for `worker_replication_*` based settings in worker configuration yaml by placing this data directly on the `instance_map` instead. ([\#15491](https://github.com/matrix-org/synapse/issues/15491))
|
||||
- Bump pyicu from 2.10.2 to 2.11. ([\#15509](https://github.com/matrix-org/synapse/issues/15509))
|
||||
- Remove references to supporting per-user flag for [MSC2654](https://github.com/matrix-org/matrix-spec-proposals/pull/2654). ([\#15522](https://github.com/matrix-org/synapse/issues/15522))
|
||||
- Don't use a trusted key server when running the demo scripts. ([\#15527](https://github.com/matrix-org/synapse/issues/15527))
|
||||
- Speed up rebuilding of the user directory for local users. ([\#15529](https://github.com/matrix-org/synapse/issues/15529))
|
||||
- Speed up deleting of old rows in `event_push_actions`. ([\#15531](https://github.com/matrix-org/synapse/issues/15531))
|
||||
- Install the `xmlsec` and `mdbook` packages and switch back to the upstream [cachix/devenv](https://github.com/cachix/devenv) repo in the nix development environment. ([\#15532](https://github.com/matrix-org/synapse/issues/15532), [\#15533](https://github.com/matrix-org/synapse/issues/15533), [\#15545](https://github.com/matrix-org/synapse/issues/15545))
|
||||
- Implement [MSC3987](https://github.com/matrix-org/matrix-spec-proposals/pull/3987) by removing `"dont_notify"` from the list of actions in default push rules. ([\#15534](https://github.com/matrix-org/synapse/issues/15534))
|
||||
- Move various module API callback registration methods to a dedicated class. ([\#15535](https://github.com/matrix-org/synapse/issues/15535))
|
||||
- Proxy `/user/devices` federation queries to application services for [MSC3984](https://github.com/matrix-org/matrix-spec-proposals/pull/3984). ([\#15539](https://github.com/matrix-org/synapse/issues/15539))
|
||||
- Factor out an `is_mine_server_name` method. ([\#15542](https://github.com/matrix-org/synapse/issues/15542))
|
||||
- Allow running Complement tests using [podman](https://podman.io/) by adding a `PODMAN` environment variable to `scripts-dev/complement.sh`. ([\#15543](https://github.com/matrix-org/synapse/issues/15543))
|
||||
- Bump serde from 1.0.160 to 1.0.162. ([\#15548](https://github.com/matrix-org/synapse/issues/15548))
|
||||
- Bump types-setuptools from 67.6.0.5 to 67.7.0.1. ([\#15549](https://github.com/matrix-org/synapse/issues/15549))
|
||||
- Bump sentry-sdk from 1.19.1 to 1.22.1. ([\#15550](https://github.com/matrix-org/synapse/issues/15550))
|
||||
- Bump ruff from 0.0.259 to 0.0.265. ([\#15551](https://github.com/matrix-org/synapse/issues/15551))
|
||||
- Bump hiredis from 2.2.2 to 2.2.3. ([\#15552](https://github.com/matrix-org/synapse/issues/15552))
|
||||
- Bump types-requests from 2.29.0.0 to 2.30.0.0. ([\#15553](https://github.com/matrix-org/synapse/issues/15553))
|
||||
- Add `org.matrix.msc3981` info to `/_matrix/client/versions`. ([\#15558](https://github.com/matrix-org/synapse/issues/15558))
|
||||
- Declare unstable support for [MSC3391](https://github.com/matrix-org/matrix-spec-proposals/pull/3391) under `/_matrix/client/versions` if the experimental implementation is enabled. ([\#15562](https://github.com/matrix-org/synapse/issues/15562))
|
||||
- Implement [MSC3821](https://github.com/matrix-org/matrix-spec-proposals/pull/3821) to update the redaction rules. ([\#15563](https://github.com/matrix-org/synapse/issues/15563))
|
||||
- Implement updated redaction rules from [MSC3389](https://github.com/matrix-org/matrix-spec-proposals/pull/3389). ([\#15565](https://github.com/matrix-org/synapse/issues/15565))
|
||||
- Allow `pip install` to use setuptools_rust 1.6.0 when building Synapse. ([\#15570](https://github.com/matrix-org/synapse/issues/15570))
|
||||
- Deal with upcoming Github Actions deprecations. ([\#15576](https://github.com/matrix-org/synapse/issues/15576))
|
||||
- Export `run_as_background_process` from the module API. ([\#15577](https://github.com/matrix-org/synapse/issues/15577))
|
||||
- Update build system requirements to allow building with poetry-core==1.6.0. ([\#15588](https://github.com/matrix-org/synapse/issues/15588))
|
||||
- Bump serde from 1.0.162 to 1.0.163. ([\#15589](https://github.com/matrix-org/synapse/issues/15589))
|
||||
- Bump phonenumbers from 8.13.7 to 8.13.11. ([\#15590](https://github.com/matrix-org/synapse/issues/15590))
|
||||
- Bump types-psycopg2 from 2.9.21.9 to 2.9.21.10. ([\#15591](https://github.com/matrix-org/synapse/issues/15591))
|
||||
- Bump types-commonmark from 0.9.2.2 to 0.9.2.3. ([\#15592](https://github.com/matrix-org/synapse/issues/15592))
|
||||
- Bump types-setuptools from 67.7.0.1 to 67.7.0.2. ([\#15594](https://github.com/matrix-org/synapse/issues/15594))
|
||||
|
||||
|
||||
Synapse 1.83.0 (2023-05-09)
|
||||
===========================
|
||||
|
||||
|
||||
Generated
+4
-4
@@ -323,18 +323,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.162"
|
||||
version = "1.0.163"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71b2f6e1ab5c2b98c05f0f35b236b22e8df7ead6ffbf51d7808da7f8817e7ab6"
|
||||
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.162"
|
||||
version = "1.0.163"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2a0814352fd64b58489904a44ea8d90cb1a91dcb6b4f5ebabc32c8318e93cb6"
|
||||
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Improve type hints in datastores.
|
||||
@@ -1 +0,0 @@
|
||||
Use oEmbed to generate URL previews for YouTube Shorts.
|
||||
@@ -1 +0,0 @@
|
||||
Add an option to prevent media downloads from configured domains.
|
||||
@@ -1 +0,0 @@
|
||||
Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them.
|
||||
@@ -1 +0,0 @@
|
||||
Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null.
|
||||
@@ -1 +0,0 @@
|
||||
Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little.
|
||||
@@ -1 +0,0 @@
|
||||
Bump pyicu from 2.10.2 to 2.11.
|
||||
@@ -1 +0,0 @@
|
||||
Add a config option to delay push notifications by a random amount, to discourage time-based profiling.
|
||||
@@ -1 +0,0 @@
|
||||
Remove references to supporting per-user flag for [MSC2654](https://github.com/matrix-org/matrix-spec-proposals/pull/2654) (#15522).
|
||||
@@ -1 +0,0 @@
|
||||
Don't fail on federation over TOR where SRV queries are not supported. Contributed by Zdzichu.
|
||||
@@ -1 +0,0 @@
|
||||
Don't use a trusted key server when running the demo scripts.
|
||||
@@ -1 +0,0 @@
|
||||
Stabilize support for [MSC2659](https://github.com/matrix-org/matrix-spec-proposals/pull/2659): application service ping endpoint. Contributed by Tulir @ Beeper.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up rebuilding of the user directory for local users.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up deleting of old rows in `event_push_actions`.
|
||||
@@ -1 +0,0 @@
|
||||
Install the `xmlsec` and `mdbook` packages and switch back to the upstream [cachix/devenv](https://github.com/cachix/devenv) repo in the nix development environment.
|
||||
@@ -1 +0,0 @@
|
||||
Install the `xmlsec` and `mdbook` packages and switch back to the upstream [cachix/devenv](https://github.com/cachix/devenv) repo in the nix development environment.
|
||||
@@ -1 +0,0 @@
|
||||
Implement [MSC3987](https://github.com/matrix-org/matrix-spec-proposals/pull/3987) by removing `"dont_notify"` from the list of actions in default push rules.
|
||||
@@ -1 +0,0 @@
|
||||
Move various module API callback registration methods to a dedicated class.
|
||||
@@ -1 +0,0 @@
|
||||
Implement [MSC4009](https://github.com/matrix-org/matrix-spec-proposals/pull/4009) to expand the supported characters in Matrix IDs.
|
||||
@@ -0,0 +1 @@
|
||||
Add not null constraint to column full_user_id of tables profiles and user_filters.
|
||||
@@ -1 +0,0 @@
|
||||
Proxy `/user/devices` federation queries to application services for [MSC3984](https://github.com/matrix-org/matrix-spec-proposals/pull/3984).
|
||||
@@ -1 +0,0 @@
|
||||
Factor out an `is_mine_server_name` method.
|
||||
@@ -1 +0,0 @@
|
||||
Allow running Complement tests using [podman](https://podman.io/) by adding a `PODMAN` environment variable to `scripts-dev/complement.sh`.
|
||||
@@ -1 +0,0 @@
|
||||
Clarify documentation of the "Create or modify account" Admin API.
|
||||
@@ -1 +0,0 @@
|
||||
Install the `xmlsec` and `mdbook` packages and switch back to the upstream [cachix/devenv](https://github.com/cachix/devenv) repo in the nix development environment.
|
||||
@@ -1 +0,0 @@
|
||||
Bump serde from 1.0.160 to 1.0.162.
|
||||
@@ -1 +0,0 @@
|
||||
Bump types-setuptools from 67.6.0.5 to 67.7.0.1.
|
||||
@@ -1 +0,0 @@
|
||||
Bump sentry-sdk from 1.19.1 to 1.22.1.
|
||||
@@ -1 +0,0 @@
|
||||
Bump ruff from 0.0.259 to 0.0.265.
|
||||
@@ -1 +0,0 @@
|
||||
Bump hiredis from 2.2.2 to 2.2.3.
|
||||
@@ -1 +0,0 @@
|
||||
Bump types-requests from 2.29.0.0 to 2.30.0.0.
|
||||
@@ -1 +0,0 @@
|
||||
Experimental support for [MSC4010](https://github.com/matrix-org/matrix-spec-proposals/pull/4010) which rejects setting the `"m.push_rules"` via account data.
|
||||
@@ -1 +0,0 @@
|
||||
Experimental support for [MSC4010](https://github.com/matrix-org/matrix-spec-proposals/pull/4010) which rejects setting the `"m.push_rules"` via account data.
|
||||
@@ -1 +0,0 @@
|
||||
Fix path to the `statistics/database/rooms` admin API in documentation.
|
||||
@@ -0,0 +1 @@
|
||||
Print full error and stack-trace of any exception that occurs during startup/initialization.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where the `url_preview_url_blacklist` configuration setting was not applied to oEmbed or image URLs found while previewing a URL.
|
||||
@@ -0,0 +1 @@
|
||||
Run mypy type checking with the minimum supported Python version to catch new usage that isn't backwards-compatible.
|
||||
@@ -0,0 +1 @@
|
||||
Fix subscriptable type usage in Python <3.9.
|
||||
@@ -0,0 +1 @@
|
||||
Update internal terminology for workers.
|
||||
@@ -0,0 +1 @@
|
||||
Add a new admin API to create a new device for a user.
|
||||
@@ -70,6 +70,10 @@ redis:
|
||||
port: 6379
|
||||
# dbid: <redis_logical_db_id>
|
||||
# password: <secret_password>
|
||||
# use_tls: True
|
||||
# certificate_file: <path_to_certificate>
|
||||
# private_key_file: <path_to_private_key>
|
||||
# ca_file: <path_to_ca_certificate>
|
||||
```
|
||||
|
||||
This assumes that your Redis service is called `redis` in your Docker Compose file.
|
||||
|
||||
Vendored
+6
@@ -1,3 +1,9 @@
|
||||
matrix-synapse-py3 (1.84.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.84.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 16 May 2023 11:12:02 +0100
|
||||
|
||||
matrix-synapse-py3 (1.83.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.83.0.
|
||||
|
||||
+1
-1
@@ -37,7 +37,7 @@ RUN \
|
||||
--mount=type=cache,target=/var/cache/apt,sharing=locked \
|
||||
--mount=type=cache,target=/var/lib/apt,sharing=locked \
|
||||
apt-get update -qq && apt-get install -yqq \
|
||||
build-essential curl git libffi-dev libssl-dev \
|
||||
build-essential curl git libffi-dev libssl-dev pkg-config \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install rust and ensure its in the PATH.
|
||||
|
||||
@@ -6,10 +6,6 @@
|
||||
worker_app: "{{ app }}"
|
||||
worker_name: "{{ name }}"
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: {{ port }}
|
||||
|
||||
@@ -69,6 +69,9 @@ import yaml
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
||||
MAIN_PROCESS_INSTANCE_NAME = "main"
|
||||
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
|
||||
MAIN_PROCESS_REPLICATION_PORT = 9093
|
||||
|
||||
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
|
||||
# during processing with the name of the worker.
|
||||
@@ -719,8 +722,8 @@ def generate_worker_files(
|
||||
# shared config file.
|
||||
listeners = [
|
||||
{
|
||||
"port": 9093,
|
||||
"bind_address": "127.0.0.1",
|
||||
"port": MAIN_PROCESS_REPLICATION_PORT,
|
||||
"bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
|
||||
"type": "http",
|
||||
"resources": [{"names": ["replication"]}],
|
||||
}
|
||||
@@ -870,6 +873,14 @@ def generate_worker_files(
|
||||
|
||||
workers_in_use = len(requested_worker_types) > 0
|
||||
|
||||
# If there are workers, add the main process to the instance_map too.
|
||||
if workers_in_use:
|
||||
instance_map = shared_config.setdefault("instance_map", {})
|
||||
instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
|
||||
"host": MAIN_PROCESS_LOCALHOST_ADDRESS,
|
||||
"port": MAIN_PROCESS_REPLICATION_PORT,
|
||||
}
|
||||
|
||||
# Shared homeserver config
|
||||
convert(
|
||||
"/conf/shared.yaml.j2",
|
||||
|
||||
@@ -813,6 +813,33 @@ The following fields are returned in the JSON response body:
|
||||
|
||||
- `total` - Total number of user's devices.
|
||||
|
||||
### Create a device
|
||||
|
||||
Creates a new device for a specific `user_id` and `device_id`. Does nothing if the `device_id`
|
||||
exists already.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v2/users/<user_id>/devices
|
||||
|
||||
{
|
||||
"device_id": "QBUAZIFURK"
|
||||
}
|
||||
```
|
||||
|
||||
An empty JSON dict is returned.
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- `user_id` - fully qualified: for example, `@user:server.com`.
|
||||
|
||||
The following fields are required in the JSON request body:
|
||||
|
||||
- `device_id` - The device ID to create.
|
||||
|
||||
### Delete multiple devices
|
||||
Deletes the given devices for a specific `user_id`, and invalidates
|
||||
any access token associated with them.
|
||||
|
||||
+4
-2
@@ -569,7 +569,7 @@ You should receive a response similar to the following. Make sure to save it.
|
||||
{"client_id":"someclientid_123","client_secret":"someclientsecret_123","id":"12345","name":"my_synapse_app","redirect_uri":"https://[synapse_public_baseurl]/_synapse/client/oidc/callback","website":null,"vapid_key":"somerandomvapidkey_123"}
|
||||
```
|
||||
|
||||
As the Synapse login mechanism needs an attribute to uniquely identify users, and Mastodon's endpoint does not return a `sub` property, an alternative `subject_claim` has to be set. Your Synapse configuration should include the following:
|
||||
As the Synapse login mechanism needs an attribute to uniquely identify users, and Mastodon's endpoint does not return a `sub` property, an alternative `subject_template` has to be set. Your Synapse configuration should include the following:
|
||||
|
||||
```yaml
|
||||
oidc_providers:
|
||||
@@ -585,7 +585,9 @@ oidc_providers:
|
||||
scopes: ["read"]
|
||||
user_mapping_provider:
|
||||
config:
|
||||
subject_claim: "id"
|
||||
subject_template: "{{ user.id }}"
|
||||
localpart_template: "{{ user.username }}"
|
||||
display_name_template: "{{ user.display_name }}"
|
||||
```
|
||||
|
||||
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
|
||||
|
||||
@@ -30,12 +30,6 @@ minimal.
|
||||
|
||||
See [the TCP replication documentation](tcp_replication.md).
|
||||
|
||||
### The Slaved DataStore
|
||||
|
||||
There are read-only version of the synapse storage layer in
|
||||
`synapse/replication/slave/storage` that use the response of the
|
||||
replication API to invalidate their caches.
|
||||
|
||||
### The TCP Replication Module
|
||||
Information about how the tcp replication module is structured, including how
|
||||
the classes interact, can be found in
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: generic_worker1
|
||||
|
||||
# The replication listener on the main synapse process.
|
||||
worker_replication_host: 127.0.0.1
|
||||
worker_replication_http_port: 9093
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 8083
|
||||
|
||||
@@ -88,6 +88,84 @@ process, for example:
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
|
||||
# Upgrading to v1.84.0
|
||||
|
||||
## Deprecation of `worker_replication_*` configuration settings
|
||||
|
||||
When using workers,
|
||||
* `worker_replication_host`
|
||||
* `worker_replication_http_port`
|
||||
* `worker_replication_http_tls`
|
||||
|
||||
can now be removed from individual worker YAML configuration ***if*** you add the main process to the `instance_map` in the shared YAML configuration,
|
||||
using the name `main`.
|
||||
|
||||
### Before:
|
||||
Shared YAML
|
||||
```yaml
|
||||
instance_map:
|
||||
generic_worker1:
|
||||
host: localhost
|
||||
port: 5678
|
||||
tls: false
|
||||
```
|
||||
Worker YAML
|
||||
```yaml
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: generic_worker1
|
||||
|
||||
worker_replication_host: localhost
|
||||
worker_replication_http_port: 3456
|
||||
worker_replication_http_tls: false
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 1234
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
- type: http
|
||||
port: 5678
|
||||
resources:
|
||||
- names: [replication]
|
||||
|
||||
worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml
|
||||
```
|
||||
### After:
|
||||
Shared YAML
|
||||
```yaml
|
||||
instance_map:
|
||||
main:
|
||||
host: localhost
|
||||
port: 3456
|
||||
tls: false
|
||||
generic_worker1:
|
||||
host: localhost
|
||||
port: 5678
|
||||
tls: false
|
||||
```
|
||||
Worker YAML
|
||||
```yaml
|
||||
worker_app: synapse.app.generic_worker
|
||||
worker_name: generic_worker1
|
||||
|
||||
worker_listeners:
|
||||
- type: http
|
||||
port: 1234
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
- type: http
|
||||
port: 5678
|
||||
resources:
|
||||
- names: [replication]
|
||||
|
||||
worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml
|
||||
|
||||
```
|
||||
Notes:
|
||||
* `tls` is optional but mirrors the functionality of `worker_replication_http_tls`
|
||||
|
||||
|
||||
|
||||
# Upgrading to v1.81.0
|
||||
|
||||
## Application service path & authentication deprecations
|
||||
|
||||
@@ -3884,15 +3884,20 @@ federation_sender_instances:
|
||||
### `instance_map`
|
||||
|
||||
When using workers this should be a map from [`worker_name`](#worker_name) to the
|
||||
HTTP replication listener of the worker, if configured.
|
||||
HTTP replication listener of the worker, if configured, and to the main process.
|
||||
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
|
||||
a HTTP replication listener, and that listener should be included in the `instance_map`.
|
||||
(The main process also needs an HTTP replication listener, but it should not be
|
||||
listed in the `instance_map`.)
|
||||
The main process also needs an entry on the `instance_map`, and it should be listed under
|
||||
`main` **if even one other worker exists**. Ensure the port matches with what is declared
|
||||
inside the `listener` block for a `replication` listener.
|
||||
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
instance_map:
|
||||
main:
|
||||
host: localhost
|
||||
port: 8030
|
||||
worker1:
|
||||
host: localhost
|
||||
port: 8034
|
||||
@@ -3976,9 +3981,16 @@ This setting has the following sub-options:
|
||||
localhost and 6379
|
||||
* `password`: Optional password if configured on the Redis instance.
|
||||
* `dbid`: Optional redis dbid if needs to connect to specific redis logical db.
|
||||
* `use_tls`: Whether to use tls connection. Defaults to false.
|
||||
* `certificate_file`: Optional path to the certificate file
|
||||
* `private_key_file`: Optional path to the private key file
|
||||
* `ca_file`: Optional path to the CA certificate file. Use this one or:
|
||||
* `ca_path`: Optional path to the folder containing the CA certificate file
|
||||
|
||||
_Added in Synapse 1.78.0._
|
||||
|
||||
_Changed in Synapse 1.84.0: Added use\_tls, certificate\_file, private\_key\_file, ca\_file and ca\_path attributes_
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
redis:
|
||||
@@ -3987,6 +3999,10 @@ redis:
|
||||
port: 6379
|
||||
password: <secret_password>
|
||||
dbid: <dbid>
|
||||
#use_tls: True
|
||||
#certificate_file: <path_to_the_certificate_file>
|
||||
#private_key_file: <path_to_the_private_key_file>
|
||||
#ca_file: <path_to_the_ca_certificate_file>
|
||||
```
|
||||
---
|
||||
## Individual worker configuration
|
||||
@@ -4024,6 +4040,7 @@ worker_name: generic_worker1
|
||||
```
|
||||
---
|
||||
### `worker_replication_host`
|
||||
*Deprecated as of version 1.84.0. Place `host` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||
|
||||
The HTTP replication endpoint that it should talk to on the main Synapse process.
|
||||
The main Synapse process defines this with a `replication` resource in
|
||||
@@ -4035,6 +4052,7 @@ worker_replication_host: 127.0.0.1
|
||||
```
|
||||
---
|
||||
### `worker_replication_http_port`
|
||||
*Deprecated as of version 1.84.0. Place `port` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||
|
||||
The HTTP replication port that it should talk to on the main Synapse process.
|
||||
The main Synapse process defines this with a `replication` resource in
|
||||
@@ -4046,6 +4064,7 @@ worker_replication_http_port: 9093
|
||||
```
|
||||
---
|
||||
### `worker_replication_http_tls`
|
||||
*Deprecated as of version 1.84.0. Place `tls` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.*
|
||||
|
||||
Whether TLS should be used for talking to the HTTP replication port on the main
|
||||
Synapse process.
|
||||
@@ -4071,9 +4090,9 @@ A worker can handle HTTP requests. To do so, a `worker_listeners` option
|
||||
must be declared, in the same way as the [`listeners` option](#listeners)
|
||||
in the shared config.
|
||||
|
||||
Workers declared in [`stream_writers`](#stream_writers) will need to include a
|
||||
`replication` listener here, in order to accept internal HTTP requests from
|
||||
other workers.
|
||||
Workers declared in [`stream_writers`](#stream_writers) and [`instance_map`](#instance_map)
|
||||
will need to include a `replication` listener here, in order to accept internal HTTP
|
||||
requests from other workers.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
|
||||
+29
-12
@@ -87,12 +87,18 @@ shared configuration file.
|
||||
|
||||
### Shared configuration
|
||||
|
||||
Normally, only a couple of changes are needed to make an existing configuration
|
||||
file suitable for use with workers. First, you need to enable an
|
||||
Normally, only a few changes are needed to make an existing configuration
|
||||
file suitable for use with workers:
|
||||
* First, you need to enable an
|
||||
["HTTP replication listener"](usage/configuration/config_documentation.md#listeners)
|
||||
for the main process; and secondly, you need to enable
|
||||
[redis-based replication](usage/configuration/config_documentation.md#redis).
|
||||
Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
|
||||
for the main process
|
||||
* Secondly, you need to enable
|
||||
[redis-based replication](usage/configuration/config_documentation.md#redis)
|
||||
* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map)
|
||||
with the `main` process defined, as well as the relevant connection information from
|
||||
it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined
|
||||
is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to.
|
||||
* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
|
||||
can be used to authenticate HTTP traffic between workers. For example:
|
||||
|
||||
```yaml
|
||||
@@ -111,6 +117,11 @@ worker_replication_secret: ""
|
||||
|
||||
redis:
|
||||
enabled: true
|
||||
|
||||
instance_map:
|
||||
main:
|
||||
host: 'localhost'
|
||||
port: 9093
|
||||
```
|
||||
|
||||
See the [configuration manual](usage/configuration/config_documentation.md)
|
||||
@@ -130,13 +141,13 @@ In the config file for each worker, you must specify:
|
||||
* The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)).
|
||||
The currently available worker applications are listed [below](#available-worker-applications).
|
||||
* A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)).
|
||||
* The HTTP replication endpoint that it should talk to on the main synapse process
|
||||
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
|
||||
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
|
||||
* If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
|
||||
with an `http` listener.
|
||||
* **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
|
||||
the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer.
|
||||
* **Synapse 1.83 and older:** The HTTP replication endpoint that the worker should talk to on the main synapse process
|
||||
([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and
|
||||
[`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). If using Synapse 1.84 and newer, these are not needed if `main` is defined on the [shared configuration](#shared-configuration) `instance_map`
|
||||
|
||||
For example:
|
||||
|
||||
@@ -417,11 +428,14 @@ effects of bursts of events from that bridge on events sent by normal users.
|
||||
Additionally, the writing of specific streams (such as events) can be moved off
|
||||
of the main process to a particular worker.
|
||||
|
||||
To enable this, the worker must have a
|
||||
[HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
|
||||
have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
|
||||
To enable this, the worker must have:
|
||||
* An [HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured,
|
||||
* Have a [`worker_name`](usage/configuration/config_documentation.md#worker_name)
|
||||
and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map)
|
||||
config. The same worker can handle multiple streams, but unless otherwise documented,
|
||||
config.
|
||||
* Have the main process declared on the [`instance_map`](usage/configuration/config_documentation.md#instance_map) as well.
|
||||
|
||||
Note: The same worker can handle multiple streams, but unless otherwise documented,
|
||||
each stream can only have a single writer.
|
||||
|
||||
For example, to move event persistence off to a dedicated worker, the shared
|
||||
@@ -429,6 +443,9 @@ configuration would include:
|
||||
|
||||
```yaml
|
||||
instance_map:
|
||||
main:
|
||||
host: localhost
|
||||
port: 8030
|
||||
event_persister1:
|
||||
host: localhost
|
||||
port: 8034
|
||||
|
||||
@@ -13,6 +13,9 @@ no_implicit_optional = True
|
||||
disallow_untyped_defs = True
|
||||
strict_equality = True
|
||||
warn_redundant_casts = True
|
||||
# Run mypy type checking with the minimum supported Python version to catch new usage
|
||||
# that isn't backwards-compatible (types, overloads, etc).
|
||||
python_version = 3.8
|
||||
|
||||
files =
|
||||
docker/,
|
||||
|
||||
Generated
+12
-12
@@ -1632,14 +1632,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "phonenumbers"
|
||||
version = "8.13.7"
|
||||
version = "8.13.11"
|
||||
description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "phonenumbers-8.13.7-py2.py3-none-any.whl", hash = "sha256:d3e3555b38c89b121f5b2e917847003bdd07027569d758d5f40156c01aeac089"},
|
||||
{file = "phonenumbers-8.13.7.tar.gz", hash = "sha256:253bb0e01250d21a11f2b42b3e6e161b7f6cb2ac440e2e2a95c1da71d221ee1a"},
|
||||
{file = "phonenumbers-8.13.11-py2.py3-none-any.whl", hash = "sha256:107469114fd297258a485bdf8238d0522cb392db1257faf2bf23384ecbdb0e8a"},
|
||||
{file = "phonenumbers-8.13.11.tar.gz", hash = "sha256:3e3274d88cab3609b55ff5b93417075dbca2d13064f103fbf562e0ea1dda0f9a"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3010,14 +3010,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-commonmark"
|
||||
version = "0.9.2.2"
|
||||
version = "0.9.2.3"
|
||||
description = "Typing stubs for commonmark"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "types-commonmark-0.9.2.2.tar.gz", hash = "sha256:f3259350634c2ce68ae503398430482f7cf44e5cae3d344995e916fbf453b4be"},
|
||||
{file = "types_commonmark-0.9.2.2-py3-none-any.whl", hash = "sha256:d3d878692615e7fbe47bf19ba67497837b135812d665012a3d42219c1f2c3a61"},
|
||||
{file = "types-commonmark-0.9.2.3.tar.gz", hash = "sha256:42769a2c194fd5b49fd9eedfd4a83cd1d2514c6d0a36f00f5c5ffe0b6a2d2fcf"},
|
||||
{file = "types_commonmark-0.9.2.3-py3-none-any.whl", hash = "sha256:b575156e1b8a292d43acb36f861110b85c4bc7aa53bbfb5ac64addec15d18cfa"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3070,14 +3070,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-psycopg2"
|
||||
version = "2.9.21.9"
|
||||
version = "2.9.21.10"
|
||||
description = "Typing stubs for psycopg2"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "types-psycopg2-2.9.21.9.tar.gz", hash = "sha256:388dc36a04551632289c4aaf1fc5b91e147654b165db896d094844e216f22bf5"},
|
||||
{file = "types_psycopg2-2.9.21.9-py3-none-any.whl", hash = "sha256:0332525fb9d3031d3da46f091e7d40b2c4d4958e9c00d2b4c1eaaa9f8ef9de4e"},
|
||||
{file = "types-psycopg2-2.9.21.10.tar.gz", hash = "sha256:c2600892312ae1c34e12f145749795d93dc4eac3ef7dbf8a9c1bfd45385e80d7"},
|
||||
{file = "types_psycopg2-2.9.21.10-py3-none-any.whl", hash = "sha256:918224a0731a3650832e46633e720703b5beef7693a064e777d9748654fcf5e5"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3124,14 +3124,14 @@ types-urllib3 = "*"
|
||||
|
||||
[[package]]
|
||||
name = "types-setuptools"
|
||||
version = "67.7.0.1"
|
||||
version = "67.7.0.2"
|
||||
description = "Typing stubs for setuptools"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "types-setuptools-67.7.0.1.tar.gz", hash = "sha256:980a2651b2b019809817e1585071596b87fbafcb54433ff3b12445461db23790"},
|
||||
{file = "types_setuptools-67.7.0.1-py3-none-any.whl", hash = "sha256:471a4ecf6984ffada63ffcfa884bfcb62718bd2d1a1acf8ee5513ec99789ed5e"},
|
||||
{file = "types-setuptools-67.7.0.2.tar.gz", hash = "sha256:155789e85e79d5682b0d341919d4beb6140408ae52bac922af25b54e36ab25c0"},
|
||||
{file = "types_setuptools-67.7.0.2-py3-none-any.whl", hash = "sha256:bd30f6dbe9b83f0a7e6e3eab6d2df748aa4f55700d54e9f077d3aa30cc019445"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
+2
-2
@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.83.0"
|
||||
version = "1.84.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
@@ -368,7 +368,7 @@ furo = ">=2022.12.7,<2024.0.0"
|
||||
# system changes.
|
||||
# We are happy to raise these upper bounds upon request,
|
||||
# provided we check that it's safe to do so (i.e. that CI passes).
|
||||
requires = ["poetry-core>=1.0.0,<=1.5.0", "setuptools_rust>=1.3,<=1.5.2"]
|
||||
requires = ["poetry-core>=1.1.0,<=1.6.0", "setuptools_rust>=1.3,<=1.6.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import time
|
||||
import traceback
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
@@ -53,7 +54,12 @@ from synapse.logging.context import (
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.notifier import ReplicationNotifier
|
||||
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
make_conn,
|
||||
)
|
||||
from synapse.storage.databases.main import FilteringWorkerStore, PushRuleStore
|
||||
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
|
||||
from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
|
||||
@@ -94,6 +100,9 @@ from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util import SYNAPSE_VERSION, Clock
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
# Cast safety: Twisted does some naughty magic which replaces the
|
||||
# twisted.internet.reactor module with a Reactor instance at runtime.
|
||||
reactor = cast(ISynapseReactor, reactor_)
|
||||
@@ -238,8 +247,18 @@ class Store(
|
||||
PusherBackgroundUpdatesStore,
|
||||
PresenceBackgroundUpdateStore,
|
||||
ReceiptsBackgroundUpdateStore,
|
||||
RelationsWorkerStore,
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
# This is a bit repetitive, but avoids dynamically setting attributes.
|
||||
self.relations = RelationsWorkerStore(database, db_conn, hs)
|
||||
|
||||
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
|
||||
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
|
||||
|
||||
|
||||
@@ -507,7 +507,7 @@ class Filter:
|
||||
# The event IDs to check, mypy doesn't understand the isinstance check.
|
||||
event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
|
||||
event_ids_to_keep = set(
|
||||
await self._store.events_have_relations(
|
||||
await self._store.relations.events_have_relations(
|
||||
event_ids, self.related_by_senders, self.related_by_rel_types
|
||||
)
|
||||
)
|
||||
|
||||
@@ -96,11 +96,15 @@ class RoomVersion:
|
||||
msc2716_historical: bool
|
||||
# MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events
|
||||
msc2716_redactions: bool
|
||||
# MSC3389: Protect relation information from redaction.
|
||||
msc3389_relation_redactions: bool
|
||||
# MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
|
||||
# knocks and restricted join rules into the same join condition.
|
||||
msc3787_knock_restricted_join_rule: bool
|
||||
# MSC3667: Enforce integer power levels
|
||||
msc3667_int_only_power_levels: bool
|
||||
# MSC3821: Do not redact the third_party_invite content field for membership events.
|
||||
msc3821_redaction_rules: bool
|
||||
# MSC3931: Adds a push rule condition for "room version feature flags", making
|
||||
# some push rules room version dependent. Note that adding a flag to this list
|
||||
# is not enough to mark it "supported": the push rule evaluator also needs to
|
||||
@@ -128,8 +132,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -149,8 +155,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -170,8 +178,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -191,8 +201,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -212,8 +224,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -233,8 +247,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -254,8 +270,10 @@ class RoomVersions:
|
||||
msc2403_knocking=False,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -275,8 +293,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -296,8 +316,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -317,8 +339,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -338,8 +362,33 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
MSC3821 = RoomVersion(
|
||||
"org.matrix.msc3821.opt1",
|
||||
RoomDisposition.UNSTABLE,
|
||||
EventFormatVersions.ROOM_V4_PLUS,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
strict_canonicaljson=True,
|
||||
limit_notifications_power_levels=True,
|
||||
msc2175_implicit_room_creator=False,
|
||||
msc2176_redaction_rules=False,
|
||||
msc3083_join_rules=True,
|
||||
msc3375_redaction_rules=True,
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=True,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -359,8 +408,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -380,8 +431,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=True,
|
||||
msc2716_redactions=True,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=False,
|
||||
msc3667_int_only_power_levels=False,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -402,8 +455,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
|
||||
msc3989_redaction_rules=False,
|
||||
)
|
||||
@@ -423,8 +478,10 @@ class RoomVersions:
|
||||
msc2403_knocking=True,
|
||||
msc2716_historical=False,
|
||||
msc2716_redactions=False,
|
||||
msc3389_relation_redactions=False,
|
||||
msc3787_knock_restricted_join_rule=True,
|
||||
msc3667_int_only_power_levels=True,
|
||||
msc3821_redaction_rules=False,
|
||||
msc3931_push_features=(),
|
||||
msc3989_redaction_rules=True,
|
||||
)
|
||||
|
||||
@@ -21,6 +21,7 @@ import socket
|
||||
import sys
|
||||
import traceback
|
||||
import warnings
|
||||
from textwrap import indent
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
@@ -212,8 +213,12 @@ def handle_startup_exception(e: Exception) -> NoReturn:
|
||||
# Exceptions that occur between setting up the logging and forking or starting
|
||||
# the reactor are written to the logs, followed by a summary to stderr.
|
||||
logger.exception("Exception during startup")
|
||||
|
||||
error_string = "".join(traceback.format_exception(type(e), e, e.__traceback__))
|
||||
indented_error_string = indent(error_string, " ")
|
||||
|
||||
quit_with_error(
|
||||
f"Error during initialisation:\n {e}\nThere may be more information in the logs."
|
||||
f"Error during initialisation:\n{indented_error_string}\nThere may be more information in the logs."
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ from synapse.util.logcontext import LoggingContext
|
||||
logger = logging.getLogger("synapse.app.admin_cmd")
|
||||
|
||||
|
||||
class AdminCmdSlavedStore(
|
||||
class AdminCmdStore(
|
||||
FilteringWorkerStore,
|
||||
ClientIpWorkerStore,
|
||||
DeviceWorkerStore,
|
||||
@@ -75,7 +75,6 @@ class AdminCmdSlavedStore(
|
||||
ApplicationServiceTransactionWorkerStore,
|
||||
ApplicationServiceWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
RelationsWorkerStore,
|
||||
EventFederationWorkerStore,
|
||||
EventPushActionsWorkerStore,
|
||||
StateGroupWorkerStore,
|
||||
@@ -101,9 +100,12 @@ class AdminCmdSlavedStore(
|
||||
# should refactor it to take a `Clock` directly.
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
# This is a bit repetitive, but avoids dynamically setting attributes.
|
||||
self.relations = RelationsWorkerStore(database, db_conn, hs)
|
||||
|
||||
|
||||
class AdminCmdServer(HomeServer):
|
||||
DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore
|
||||
DATASTORE_CLASS = AdminCmdStore # type: ignore
|
||||
|
||||
|
||||
async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -51,6 +51,7 @@ from synapse.rest.key.v2 import KeyResource
|
||||
from synapse.rest.synapse.client import build_synapse_client_resource_tree
|
||||
from synapse.rest.well_known import well_known_resource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
|
||||
from synapse.storage.databases.main.appservice import (
|
||||
ApplicationServiceTransactionWorkerStore,
|
||||
@@ -102,7 +103,7 @@ from synapse.util.httpresourcetree import create_resource_tree
|
||||
logger = logging.getLogger("synapse.app.generic_worker")
|
||||
|
||||
|
||||
class GenericWorkerSlavedStore(
|
||||
class GenericWorkerStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
@@ -132,7 +133,6 @@ class GenericWorkerSlavedStore(
|
||||
ServerMetricsStore,
|
||||
PusherWorkerStore,
|
||||
RoomMemberWorkerStore,
|
||||
RelationsWorkerStore,
|
||||
EventFederationWorkerStore,
|
||||
EventPushActionsWorkerStore,
|
||||
StateGroupWorkerStore,
|
||||
@@ -152,9 +152,20 @@ class GenericWorkerSlavedStore(
|
||||
server_name: str
|
||||
config: HomeServerConfig
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
# This is a bit repetitive, but avoids dynamically setting attributes.
|
||||
self.relations = RelationsWorkerStore(database, db_conn, hs)
|
||||
|
||||
|
||||
class GenericWorkerServer(HomeServer):
|
||||
DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore
|
||||
DATASTORE_CLASS = GenericWorkerStore # type: ignore
|
||||
|
||||
def _listen_http(self, listener_config: ListenerConfig) -> None:
|
||||
assert listener_config.http_options is not None
|
||||
|
||||
@@ -35,3 +35,9 @@ class RedisConfig(Config):
|
||||
self.redis_port = redis_config.get("port", 6379)
|
||||
self.redis_dbid = redis_config.get("dbid", None)
|
||||
self.redis_password = redis_config.get("password")
|
||||
|
||||
self.redis_use_tls = redis_config.get("use_tls", False)
|
||||
self.redis_certificate = redis_config.get("certificate_file", None)
|
||||
self.redis_private_key = redis_config.get("private_key_file", None)
|
||||
self.redis_ca_file = redis_config.get("ca_file", None)
|
||||
self.redis_ca_path = redis_config.get("ca_path", None)
|
||||
|
||||
+61
-17
@@ -39,6 +39,19 @@ The '%s' configuration option is deprecated and will be removed in a future
|
||||
Synapse version. Please use ``%s: name_of_worker`` instead.
|
||||
"""
|
||||
|
||||
_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """
|
||||
Missing data for a worker to connect to main process. Please include '%s' in the
|
||||
`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated
|
||||
solution) in every worker's yaml as various `worker_replication_*` settings as defined
|
||||
in workers documentation here:
|
||||
`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration`
|
||||
"""
|
||||
# This allows for a handy knob when it's time to change from 'master' to
|
||||
# something with less 'history'
|
||||
MAIN_PROCESS_INSTANCE_NAME = "master"
|
||||
# Use this to adjust what the main process is known as in the yaml instance_map
|
||||
MAIN_PROCESS_INSTANCE_MAP_NAME = "main"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -161,27 +174,15 @@ class WorkerConfig(Config):
|
||||
raise ConfigError("worker_log_config must be a string")
|
||||
self.worker_log_config = worker_log_config
|
||||
|
||||
# The host used to connect to the main synapse
|
||||
self.worker_replication_host = config.get("worker_replication_host", None)
|
||||
|
||||
# The port on the main synapse for TCP replication
|
||||
if "worker_replication_port" in config:
|
||||
raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
|
||||
|
||||
# The port on the main synapse for HTTP replication endpoint
|
||||
self.worker_replication_http_port = config.get("worker_replication_http_port")
|
||||
|
||||
# The tls mode on the main synapse for HTTP replication endpoint.
|
||||
# For backward compatibility this defaults to False.
|
||||
self.worker_replication_http_tls = config.get(
|
||||
"worker_replication_http_tls", False
|
||||
)
|
||||
|
||||
# The shared secret used for authentication when connecting to the main synapse.
|
||||
self.worker_replication_secret = config.get("worker_replication_secret", None)
|
||||
|
||||
self.worker_name = config.get("worker_name", self.worker_app)
|
||||
self.instance_name = self.worker_name or "master"
|
||||
self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME
|
||||
|
||||
# FIXME: Remove this check after a suitable amount of time.
|
||||
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
|
||||
@@ -215,12 +216,55 @@ class WorkerConfig(Config):
|
||||
)
|
||||
|
||||
# A map from instance name to host/port of their HTTP replication endpoint.
|
||||
# Check if the main process is declared. Inject it into the map if it's not,
|
||||
# based first on if a 'main' block is declared then on 'worker_replication_*'
|
||||
# data. If both are available, default to instance_map. The main process
|
||||
# itself doesn't need this data as it would never have to talk to itself.
|
||||
instance_map: Dict[str, Any] = config.get("instance_map", {})
|
||||
|
||||
if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME:
|
||||
# The host used to connect to the main synapse
|
||||
main_host = config.get("worker_replication_host", None)
|
||||
|
||||
# The port on the main synapse for HTTP replication endpoint
|
||||
main_port = config.get("worker_replication_http_port")
|
||||
|
||||
# The tls mode on the main synapse for HTTP replication endpoint.
|
||||
# For backward compatibility this defaults to False.
|
||||
main_tls = config.get("worker_replication_http_tls", False)
|
||||
|
||||
# For now, accept 'main' in the instance_map, but the replication system
|
||||
# expects 'master', force that into being until it's changed later.
|
||||
if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map:
|
||||
instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[
|
||||
MAIN_PROCESS_INSTANCE_MAP_NAME
|
||||
]
|
||||
del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME]
|
||||
|
||||
# This is the backwards compatibility bit that handles the
|
||||
# worker_replication_* bits using setdefault() to not overwrite anything.
|
||||
elif main_host is not None and main_port is not None:
|
||||
instance_map.setdefault(
|
||||
MAIN_PROCESS_INSTANCE_NAME,
|
||||
{
|
||||
"host": main_host,
|
||||
"port": main_port,
|
||||
"tls": main_tls,
|
||||
},
|
||||
)
|
||||
|
||||
else:
|
||||
# If we've gotten here, it means that the main process is not on the
|
||||
# instance_map and that not enough worker_replication_* variables
|
||||
# were declared in the worker's yaml.
|
||||
raise ConfigError(
|
||||
_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA
|
||||
% MAIN_PROCESS_INSTANCE_MAP_NAME
|
||||
)
|
||||
|
||||
self.instance_map: Dict[
|
||||
str, InstanceLocationConfig
|
||||
] = parse_and_validate_mapping(
|
||||
config.get("instance_map", {}),
|
||||
InstanceLocationConfig,
|
||||
)
|
||||
] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
|
||||
|
||||
# Map from type of streams to source, c.f. WriterLocations.
|
||||
writers = config.get("stream_writers") or {}
|
||||
|
||||
+11
-6
@@ -1054,10 +1054,15 @@ def _verify_third_party_invite(
|
||||
"""
|
||||
if "third_party_invite" not in event.content:
|
||||
return False
|
||||
if "signed" not in event.content["third_party_invite"]:
|
||||
third_party_invite = event.content["third_party_invite"]
|
||||
if not isinstance(third_party_invite, collections.abc.Mapping):
|
||||
return False
|
||||
signed = event.content["third_party_invite"]["signed"]
|
||||
for key in {"mxid", "token"}:
|
||||
if "signed" not in third_party_invite:
|
||||
return False
|
||||
signed = third_party_invite["signed"]
|
||||
if not isinstance(signed, collections.abc.Mapping):
|
||||
return False
|
||||
for key in {"mxid", "token", "signatures"}:
|
||||
if key not in signed:
|
||||
return False
|
||||
|
||||
@@ -1075,8 +1080,6 @@ def _verify_third_party_invite(
|
||||
|
||||
if signed["mxid"] != event.state_key:
|
||||
return False
|
||||
if signed["token"] != token:
|
||||
return False
|
||||
|
||||
for public_key_object in get_public_keys(invite_event):
|
||||
public_key = public_key_object["public_key"]
|
||||
@@ -1088,7 +1091,9 @@ def _verify_third_party_invite(
|
||||
verify_key = decode_verify_key_bytes(
|
||||
key_name, decode_base64(public_key)
|
||||
)
|
||||
verify_signed_json(signed, server, verify_key)
|
||||
# verify_signed_json incorrectly states it wants a dict, it
|
||||
# just needs a mapping.
|
||||
verify_signed_json(signed, server, verify_key) # type: ignore[arg-type]
|
||||
|
||||
# We got the public key from the invite, so we know that the
|
||||
# correct server signed the signed bundle.
|
||||
|
||||
@@ -130,6 +130,16 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
|
||||
add_fields("membership")
|
||||
if room_version.msc3375_redaction_rules:
|
||||
add_fields(EventContentFields.AUTHORISING_USER)
|
||||
if room_version.msc3821_redaction_rules:
|
||||
# Preserve the signed field under third_party_invite.
|
||||
third_party_invite = event_dict["content"].get("third_party_invite")
|
||||
if isinstance(third_party_invite, collections.abc.Mapping):
|
||||
new_content["third_party_invite"] = {}
|
||||
if "signed" in third_party_invite:
|
||||
new_content["third_party_invite"]["signed"] = third_party_invite[
|
||||
"signed"
|
||||
]
|
||||
|
||||
elif event_type == EventTypes.Create:
|
||||
# MSC2176 rules state that create events cannot be redacted.
|
||||
if room_version.msc2176_redaction_rules:
|
||||
@@ -171,6 +181,18 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
|
||||
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
|
||||
add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE)
|
||||
|
||||
# Protect the rel_type and event_id fields under the m.relates_to field.
|
||||
if room_version.msc3389_relation_redactions:
|
||||
relates_to = event_dict["content"].get("m.relates_to")
|
||||
if isinstance(relates_to, collections.abc.Mapping):
|
||||
new_relates_to = {}
|
||||
for field in ("rel_type", "event_id"):
|
||||
if field in relates_to:
|
||||
new_relates_to[field] = relates_to[field]
|
||||
# Only include a non-empty relates_to field.
|
||||
if new_relates_to:
|
||||
new_content["m.relates_to"] = new_relates_to
|
||||
|
||||
allowed_fields = {k: v for k, v in event_dict.items() if k in allowed_keys}
|
||||
|
||||
allowed_fields["content"] = new_content
|
||||
|
||||
@@ -1361,7 +1361,9 @@ class EventCreationHandler:
|
||||
else:
|
||||
# There must be some reason that the client knows the event exists,
|
||||
# see if there are existing relations. If so, assume everything is fine.
|
||||
if not await self.store.event_is_target_of_relation(relation.parent_id):
|
||||
if not await self.store.relations.event_is_target_of_relation(
|
||||
relation.parent_id
|
||||
):
|
||||
# Otherwise, the client can't know about the parent event!
|
||||
raise SynapseError(400, "Can't send relation to unknown event")
|
||||
|
||||
@@ -1377,7 +1379,7 @@ class EventCreationHandler:
|
||||
if len(aggregation_key) > 500:
|
||||
raise SynapseError(400, "Aggregation key is too long")
|
||||
|
||||
already_exists = await self.store.has_user_annotated_event(
|
||||
already_exists = await self.store.relations.has_user_annotated_event(
|
||||
relation.parent_id, event.type, aggregation_key, event.sender
|
||||
)
|
||||
if already_exists:
|
||||
@@ -1389,7 +1391,7 @@ class EventCreationHandler:
|
||||
|
||||
# Don't attempt to start a thread if the parent event is a relation.
|
||||
elif relation.rel_type == RelationTypes.THREAD:
|
||||
if await self.store.event_includes_relation(relation.parent_id):
|
||||
if await self.store.relations.event_includes_relation(relation.parent_id):
|
||||
raise SynapseError(
|
||||
400, "Cannot start threads from an event with a relation"
|
||||
)
|
||||
|
||||
@@ -124,7 +124,10 @@ class RelationsHandler:
|
||||
# Note that ignored users are not passed into get_relations_for_event
|
||||
# below. Ignored users are handled in filter_events_for_client (and by
|
||||
# not passing them in here we should get a better cache hit rate).
|
||||
related_events, next_token = await self._main_store.get_relations_for_event(
|
||||
(
|
||||
related_events,
|
||||
next_token,
|
||||
) = await self._main_store.relations.get_relations_for_event(
|
||||
event_id=event_id,
|
||||
event=event,
|
||||
room_id=room_id,
|
||||
@@ -211,7 +214,7 @@ class RelationsHandler:
|
||||
ShadowBanError if the requester is shadow-banned
|
||||
"""
|
||||
related_event_ids = (
|
||||
await self._main_store.get_all_relations_for_event_with_types(
|
||||
await self._main_store.relations.get_all_relations_for_event_with_types(
|
||||
event_id, relation_types
|
||||
)
|
||||
)
|
||||
@@ -250,7 +253,9 @@ class RelationsHandler:
|
||||
A map of event IDs to a list related events.
|
||||
"""
|
||||
|
||||
related_events = await self._main_store.get_references_for_events(event_ids)
|
||||
related_events = await self._main_store.relations.get_references_for_events(
|
||||
event_ids
|
||||
)
|
||||
|
||||
# Avoid additional logic if there are no ignored users.
|
||||
if not ignored_users:
|
||||
@@ -304,7 +309,7 @@ class RelationsHandler:
|
||||
event_ids = [eid for eid in events_by_id.keys() if eid not in relations_by_id]
|
||||
|
||||
# Fetch thread summaries.
|
||||
summaries = await self._main_store.get_thread_summaries(event_ids)
|
||||
summaries = await self._main_store.relations.get_thread_summaries(event_ids)
|
||||
|
||||
# Limit fetching whether the requester has participated in a thread to
|
||||
# events which are thread roots.
|
||||
@@ -320,7 +325,7 @@ class RelationsHandler:
|
||||
# For events the requester did not send, check the database for whether
|
||||
# the requester sent a threaded reply.
|
||||
participated.update(
|
||||
await self._main_store.get_threads_participated(
|
||||
await self._main_store.relations.get_threads_participated(
|
||||
[
|
||||
event_id
|
||||
for event_id in thread_event_ids
|
||||
@@ -331,8 +336,10 @@ class RelationsHandler:
|
||||
)
|
||||
|
||||
# Then subtract off the results for any ignored users.
|
||||
ignored_results = await self._main_store.get_threaded_messages_per_user(
|
||||
thread_event_ids, ignored_users
|
||||
ignored_results = (
|
||||
await self._main_store.relations.get_threaded_messages_per_user(
|
||||
thread_event_ids, ignored_users
|
||||
)
|
||||
)
|
||||
|
||||
# A map of event ID to the thread aggregation.
|
||||
@@ -361,7 +368,10 @@ class RelationsHandler:
|
||||
continue
|
||||
|
||||
# Attempt to find another event to use as the latest event.
|
||||
potential_events, _ = await self._main_store.get_relations_for_event(
|
||||
(
|
||||
potential_events,
|
||||
_,
|
||||
) = await self._main_store.relations.get_relations_for_event(
|
||||
event_id,
|
||||
event,
|
||||
room_id,
|
||||
@@ -498,7 +508,7 @@ class RelationsHandler:
|
||||
Note that there is no use in limiting edits by ignored users since the
|
||||
parent event should be ignored in the first place if the user is ignored.
|
||||
"""
|
||||
edits = await self._main_store.get_applicable_edits(
|
||||
edits = await self._main_store.relations.get_applicable_edits(
|
||||
[
|
||||
event_id
|
||||
for event_id, event in events_by_id.items()
|
||||
@@ -553,7 +563,7 @@ class RelationsHandler:
|
||||
# Note that ignored users are not passed into get_threads
|
||||
# below. Ignored users are handled in filter_events_for_client (and by
|
||||
# not passing them in here we should get a better cache hit rate).
|
||||
thread_roots, next_batch = await self._main_store.get_threads(
|
||||
thread_roots, next_batch = await self._main_store.relations.get_threads(
|
||||
room_id=room_id, limit=limit, from_token=from_token
|
||||
)
|
||||
|
||||
@@ -565,7 +575,7 @@ class RelationsHandler:
|
||||
# For events the requester did not send, check the database for whether
|
||||
# the requester sent a threaded reply.
|
||||
participated.update(
|
||||
await self._main_store.get_threads_participated(
|
||||
await self._main_store.relations.get_threads_participated(
|
||||
[eid for eid, p in participated.items() if not p],
|
||||
user_id,
|
||||
)
|
||||
|
||||
@@ -113,7 +113,7 @@ class UrlPreviewer:
|
||||
1. Checks URL and timestamp against the database cache and returns the result if it
|
||||
has not expired and was successful (a 2xx return code).
|
||||
2. Checks if the URL matches an oEmbed (https://oembed.com/) pattern. If it
|
||||
does, update the URL to download.
|
||||
does and the new URL is not blocked, update the URL to download.
|
||||
3. Downloads the URL and stores it into a file via the media storage provider
|
||||
and saves the local media metadata.
|
||||
4. If the media is an image:
|
||||
@@ -127,14 +127,14 @@ class UrlPreviewer:
|
||||
and saves the local media metadata.
|
||||
2. Convert the oEmbed response to an Open Graph response.
|
||||
3. Override any Open Graph data from the HTML with data from oEmbed.
|
||||
4. If an image exists in the Open Graph response:
|
||||
4. If an image URL exists in the Open Graph response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
3. Updates the Open Graph response based on image properties.
|
||||
6. If the media is JSON and an oEmbed URL was found:
|
||||
6. If an oEmbed URL was found and the media is JSON:
|
||||
1. Convert the oEmbed response to an Open Graph response.
|
||||
2. If a thumbnail or image is in the oEmbed response:
|
||||
2. If an image URL is in the oEmbed response:
|
||||
1. Downloads the URL and stores it into a file via the media storage
|
||||
provider and saves the local media metadata.
|
||||
2. Generates thumbnails.
|
||||
@@ -144,7 +144,8 @@ class UrlPreviewer:
|
||||
|
||||
If any additional requests (e.g. from oEmbed autodiscovery, step 5.3 or
|
||||
image thumbnailing, step 5.4 or 6.4) fails then the URL preview as a whole
|
||||
does not fail. As much information as possible is returned.
|
||||
does not fail. If any of them are blocked, then those additional requests
|
||||
are skipped. As much information as possible is returned.
|
||||
|
||||
The in-memory cache expires after 1 hour.
|
||||
|
||||
@@ -203,48 +204,14 @@ class UrlPreviewer:
|
||||
)
|
||||
|
||||
async def preview(self, url: str, user: UserID, ts: int) -> bytes:
|
||||
# XXX: we could move this into _do_preview if we wanted.
|
||||
url_tuple = urlsplit(url)
|
||||
for entry in self.url_preview_url_blacklist:
|
||||
match = True
|
||||
for attrib in entry:
|
||||
pattern = entry[attrib]
|
||||
value = getattr(url_tuple, attrib)
|
||||
logger.debug(
|
||||
"Matching attrib '%s' with value '%s' against pattern '%s'",
|
||||
attrib,
|
||||
value,
|
||||
pattern,
|
||||
)
|
||||
|
||||
if value is None:
|
||||
match = False
|
||||
continue
|
||||
|
||||
# Some attributes might not be parsed as strings by urlsplit (such as the
|
||||
# port, which is parsed as an int). Because we use match functions that
|
||||
# expect strings, we want to make sure that's what we give them.
|
||||
value_str = str(value)
|
||||
|
||||
if pattern.startswith("^"):
|
||||
if not re.match(pattern, value_str):
|
||||
match = False
|
||||
continue
|
||||
else:
|
||||
if not fnmatch.fnmatch(value_str, pattern):
|
||||
match = False
|
||||
continue
|
||||
if match:
|
||||
logger.warning("URL %s blocked by url_blacklist entry %s", url, entry)
|
||||
raise SynapseError(
|
||||
403, "URL blocked by url pattern blacklist entry", Codes.UNKNOWN
|
||||
)
|
||||
|
||||
# the in-memory cache:
|
||||
# * ensures that only one request is active at a time
|
||||
# * ensures that only one request to a URL is active at a time
|
||||
# * takes load off the DB for the thundering herds
|
||||
# * also caches any failures (unlike the DB) so we don't keep
|
||||
# requesting the same endpoint
|
||||
# requesting the same endpoint
|
||||
#
|
||||
# Note that autodiscovered oEmbed URLs and pre-caching of images
|
||||
# are not captured in the in-memory cache.
|
||||
|
||||
observable = self._cache.get(url)
|
||||
|
||||
@@ -283,7 +250,7 @@ class UrlPreviewer:
|
||||
og = og.encode("utf8")
|
||||
return og
|
||||
|
||||
# If this URL can be accessed via oEmbed, use that instead.
|
||||
# If this URL can be accessed via an allowed oEmbed, use that instead.
|
||||
url_to_download = url
|
||||
oembed_url = self._oembed.get_oembed_url(url)
|
||||
if oembed_url:
|
||||
@@ -329,6 +296,7 @@ class UrlPreviewer:
|
||||
# defer to that.
|
||||
oembed_url = self._oembed.autodiscover_from_html(tree)
|
||||
og_from_oembed: JsonDict = {}
|
||||
# Only download to the oEmbed URL if it is allowed.
|
||||
if oembed_url:
|
||||
try:
|
||||
oembed_info = await self._handle_url(
|
||||
@@ -411,6 +379,59 @@ class UrlPreviewer:
|
||||
|
||||
return jsonog.encode("utf8")
|
||||
|
||||
def _is_url_blocked(self, url: str) -> bool:
|
||||
"""
|
||||
Check whether the URL is allowed to be previewed (according to the homeserver
|
||||
configuration).
|
||||
|
||||
Args:
|
||||
url: The requested URL.
|
||||
|
||||
Return:
|
||||
True if the URL is blocked, False if it is allowed.
|
||||
"""
|
||||
url_tuple = urlsplit(url)
|
||||
for entry in self.url_preview_url_blacklist:
|
||||
match = True
|
||||
# Iterate over each entry. If *all* attributes of that entry match
|
||||
# the current URL, then reject it.
|
||||
for attrib, pattern in entry.items():
|
||||
value = getattr(url_tuple, attrib)
|
||||
logger.debug(
|
||||
"Matching attrib '%s' with value '%s' against pattern '%s'",
|
||||
attrib,
|
||||
value,
|
||||
pattern,
|
||||
)
|
||||
|
||||
if value is None:
|
||||
match = False
|
||||
break
|
||||
|
||||
# Some attributes might not be parsed as strings by urlsplit (such as the
|
||||
# port, which is parsed as an int). Because we use match functions that
|
||||
# expect strings, we want to make sure that's what we give them.
|
||||
value_str = str(value)
|
||||
|
||||
# Check the value against the pattern as either a regular expression or
|
||||
# a glob. If it doesn't match, the entry doesn't match.
|
||||
if pattern.startswith("^"):
|
||||
if not re.match(pattern, value_str):
|
||||
match = False
|
||||
break
|
||||
else:
|
||||
if not fnmatch.fnmatch(value_str, pattern):
|
||||
match = False
|
||||
break
|
||||
|
||||
# All fields matched, return true (the URL is blocked).
|
||||
if match:
|
||||
logger.warning("URL %s blocked by url_blacklist entry %s", url, entry)
|
||||
return match
|
||||
|
||||
# No matches were found, the URL is allowed.
|
||||
return False
|
||||
|
||||
async def _download_url(self, url: str, output_stream: BinaryIO) -> DownloadResult:
|
||||
"""
|
||||
Fetches a remote URL and parses the headers.
|
||||
@@ -547,8 +568,16 @@ class UrlPreviewer:
|
||||
|
||||
Returns:
|
||||
A MediaInfo object describing the fetched content.
|
||||
|
||||
Raises:
|
||||
SynapseError if the URL is blocked.
|
||||
"""
|
||||
|
||||
if self._is_url_blocked(url):
|
||||
raise SynapseError(
|
||||
403, "URL blocked by url pattern blacklist entry", Codes.UNKNOWN
|
||||
)
|
||||
|
||||
# TODO: we should probably honour robots.txt... except in practice
|
||||
# we're most likely being explicitly triggered by a human rather than a
|
||||
# bot, so are we really a robot?
|
||||
@@ -624,7 +653,7 @@ class UrlPreviewer:
|
||||
return
|
||||
|
||||
# The image URL from the HTML might be relative to the previewed page,
|
||||
# convert it to an URL which can be requested directly.
|
||||
# convert it to a URL which can be requested directly.
|
||||
url_parts = urlparse(image_url)
|
||||
if url_parts.scheme != "data":
|
||||
image_url = urljoin(media_info.uri, image_url)
|
||||
|
||||
@@ -134,7 +134,7 @@ from synapse.util.caches.descriptors import CachedFunction, cached as _cached
|
||||
from synapse.util.frozenutils import freeze
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.generic_worker import GenericWorkerSlavedStore
|
||||
from synapse.app.generic_worker import GenericWorkerStore
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
@@ -156,6 +156,7 @@ __all__ = [
|
||||
"parse_json_object_from_request",
|
||||
"respond_with_html",
|
||||
"run_in_background",
|
||||
"run_as_background_process",
|
||||
"cached",
|
||||
"NOT_SPAM",
|
||||
"UserID",
|
||||
@@ -236,9 +237,7 @@ class ModuleApi:
|
||||
|
||||
# TODO: Fix this type hint once the types for the data stores have been ironed
|
||||
# out.
|
||||
self._store: Union[
|
||||
DataStore, "GenericWorkerSlavedStore"
|
||||
] = hs.get_datastores().main
|
||||
self._store: Union[DataStore, "GenericWorkerStore"] = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._auth = hs.get_auth()
|
||||
self._auth_handler = auth_handler
|
||||
|
||||
@@ -368,7 +368,7 @@ class BulkPushRuleEvaluator:
|
||||
else:
|
||||
# Since the event has not yet been persisted we check whether
|
||||
# the parent is part of a thread.
|
||||
thread_id = await self.store.get_thread_id(relation.parent_id)
|
||||
thread_id = await self.store.relations.get_thread_id(relation.parent_id)
|
||||
|
||||
related_events = await self._related_events(event)
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from twisted.internet.error import ConnectError, DNSLookupError
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
@@ -197,11 +198,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
client = hs.get_replication_client()
|
||||
local_instance_name = hs.get_instance_name()
|
||||
|
||||
# The value of these option should match the replication listener settings
|
||||
master_host = hs.config.worker.worker_replication_host
|
||||
master_port = hs.config.worker.worker_replication_http_port
|
||||
master_tls = hs.config.worker.worker_replication_http_tls
|
||||
|
||||
instance_map = hs.config.worker.instance_map
|
||||
|
||||
outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME)
|
||||
@@ -213,7 +209,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
)
|
||||
|
||||
@trace_with_opname("outgoing_replication_request")
|
||||
async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
|
||||
async def send_request(
|
||||
*, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any
|
||||
) -> Any:
|
||||
# We have to pull these out here to avoid circular dependencies...
|
||||
streams = hs.get_replication_command_handler().get_streams_to_replicate()
|
||||
replication = hs.get_replication_data_handler()
|
||||
@@ -221,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
with outgoing_gauge.track_inprogress():
|
||||
if instance_name == local_instance_name:
|
||||
raise Exception("Trying to send HTTP request to self")
|
||||
if instance_name == "master":
|
||||
host = master_host
|
||||
port = master_port
|
||||
tls = master_tls
|
||||
elif instance_name in instance_map:
|
||||
if instance_name in instance_map:
|
||||
host = instance_map[instance_name].host
|
||||
port = instance_map[instance_name].port
|
||||
tls = instance_map[instance_name].tls
|
||||
|
||||
@@ -60,7 +60,7 @@ _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5
|
||||
class ReplicationDataHandler:
|
||||
"""Handles incoming stream updates from replication.
|
||||
|
||||
This instance notifies the slave data store about updates. Can be subclassed
|
||||
This instance notifies the data store about updates. Can be subclassed
|
||||
to handle updates in additional ways.
|
||||
"""
|
||||
|
||||
@@ -91,7 +91,7 @@ class ReplicationDataHandler:
|
||||
) -> None:
|
||||
"""Called to handle a batch of replication data with a given stream token.
|
||||
|
||||
By default this just pokes the slave store. Can be overridden in subclasses to
|
||||
By default, this just pokes the data store. Can be overridden in subclasses to
|
||||
handle more.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from OpenSSL.SSL import Context
|
||||
from twisted.internet import ssl
|
||||
|
||||
from synapse.config.redis import RedisConfig
|
||||
|
||||
|
||||
class ClientContextFactory(ssl.ClientContextFactory):
|
||||
def __init__(self, redis_config: RedisConfig):
|
||||
self.redis_config = redis_config
|
||||
|
||||
def getContext(self) -> Context:
|
||||
ctx = super().getContext()
|
||||
if self.redis_config.redis_certificate:
|
||||
ctx.use_certificate_file(self.redis_config.redis_certificate)
|
||||
if self.redis_config.redis_private_key:
|
||||
ctx.use_privatekey_file(self.redis_config.redis_private_key)
|
||||
if self.redis_config.redis_ca_file:
|
||||
ctx.load_verify_locations(cafile=self.redis_config.redis_ca_file)
|
||||
elif self.redis_config.redis_ca_path:
|
||||
ctx.load_verify_locations(capath=self.redis_config.redis_ca_path)
|
||||
return ctx
|
||||
@@ -46,6 +46,7 @@ from synapse.replication.tcp.commands import (
|
||||
UserIpCommand,
|
||||
UserSyncCommand,
|
||||
)
|
||||
from synapse.replication.tcp.context import ClientContextFactory
|
||||
from synapse.replication.tcp.protocol import IReplicationConnection
|
||||
from synapse.replication.tcp.streams import (
|
||||
STREAMS_MAP,
|
||||
@@ -348,13 +349,27 @@ class ReplicationCommandHandler:
|
||||
outbound_redis_connection,
|
||||
channel_names=self._channels_to_subscribe_to,
|
||||
)
|
||||
hs.get_reactor().connectTCP(
|
||||
hs.config.redis.redis_host,
|
||||
hs.config.redis.redis_port,
|
||||
self._factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
|
||||
reactor = hs.get_reactor()
|
||||
redis_config = hs.config.redis
|
||||
if hs.config.redis.redis_use_tls:
|
||||
ssl_context_factory = ClientContextFactory(hs.config.redis)
|
||||
reactor.connectSSL(
|
||||
redis_config.redis_host,
|
||||
redis_config.redis_port,
|
||||
self._factory,
|
||||
ssl_context_factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
else:
|
||||
reactor.connectTCP(
|
||||
redis_config.redis_host,
|
||||
redis_config.redis_port,
|
||||
self._factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
|
||||
def get_streams(self) -> Dict[str, Stream]:
|
||||
"""Get a map from stream name to all streams."""
|
||||
|
||||
@@ -35,6 +35,7 @@ from synapse.replication.tcp.commands import (
|
||||
ReplicateCommand,
|
||||
parse_command_from_line,
|
||||
)
|
||||
from synapse.replication.tcp.context import ClientContextFactory
|
||||
from synapse.replication.tcp.protocol import (
|
||||
IReplicationConnection,
|
||||
tcp_inbound_commands_counter,
|
||||
@@ -386,12 +387,24 @@ def lazyConnection(
|
||||
factory.continueTrying = reconnect
|
||||
|
||||
reactor = hs.get_reactor()
|
||||
reactor.connectTCP(
|
||||
host,
|
||||
port,
|
||||
factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
|
||||
if hs.config.redis.redis_use_tls:
|
||||
ssl_context_factory = ClientContextFactory(hs.config.redis)
|
||||
reactor.connectSSL(
|
||||
host,
|
||||
port,
|
||||
factory,
|
||||
ssl_context_factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
else:
|
||||
reactor.connectTCP(
|
||||
host,
|
||||
port,
|
||||
factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
|
||||
return factory.handler
|
||||
|
||||
@@ -137,6 +137,35 @@ class DevicesRestServlet(RestServlet):
|
||||
devices = await self.device_handler.get_devices_by_user(target_user.to_string())
|
||||
return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
"""Creates a new device for the user."""
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, "Can only create devices for local users"
|
||||
)
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
if u is None:
|
||||
raise NotFoundError("Unknown user")
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
device_id = body.get("device_id")
|
||||
if not device_id:
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Missing device_id")
|
||||
if not isinstance(device_id, str):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "device_id must be a string")
|
||||
|
||||
await self.device_handler.check_device_registered(
|
||||
user_id=user_id, device_id=device_id
|
||||
)
|
||||
|
||||
return HTTPStatus.CREATED, {}
|
||||
|
||||
|
||||
class DeleteDevicesRestServlet(RestServlet):
|
||||
"""
|
||||
|
||||
@@ -147,11 +147,15 @@ class ReceiptRestServlet(RestServlet):
|
||||
# If the receipt is on the main timeline, it is enough to check whether
|
||||
# the event is directly related to a thread.
|
||||
if thread_id == MAIN_TIMELINE:
|
||||
return MAIN_TIMELINE == await self._main_store.get_thread_id(event_id)
|
||||
return MAIN_TIMELINE == await self._main_store.relations.get_thread_id(
|
||||
event_id
|
||||
)
|
||||
|
||||
# Otherwise, check if the event is directly part of a thread, or is the
|
||||
# root message (or related to the root message) of a thread.
|
||||
return thread_id == await self._main_store.get_thread_id_for_receipts(event_id)
|
||||
return thread_id == await self._main_store.relations.get_thread_id_for_receipts(
|
||||
event_id
|
||||
)
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
|
||||
@@ -79,6 +79,7 @@ class VersionsRestServlet(RestServlet):
|
||||
"v1.3",
|
||||
"v1.4",
|
||||
"v1.5",
|
||||
"v1.6",
|
||||
],
|
||||
# as per MSC1497:
|
||||
"unstable_features": {
|
||||
@@ -125,6 +126,10 @@ class VersionsRestServlet(RestServlet):
|
||||
"org.matrix.msc3912": self.config.experimental.msc3912_enabled,
|
||||
# Adds support for unstable "intentional mentions" behaviour.
|
||||
"org.matrix.msc3952_intentional_mentions": self.config.experimental.msc3952_intentional_mentions,
|
||||
# Whether recursively provide relations is supported.
|
||||
"org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations,
|
||||
# Adds support for deleting account data.
|
||||
"org.matrix.msc3391": self.config.experimental.msc3391_enabled,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -34,6 +34,8 @@ class LocalKey(RestServlet):
|
||||
"""HTTP resource containing encoding the TLS X.509 certificate and NACL
|
||||
signature verification keys for this server::
|
||||
|
||||
GET /_matrix/key/v2/server HTTP/1.1
|
||||
|
||||
GET /_matrix/key/v2/server/a.key.id HTTP/1.1
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
@@ -100,6 +102,15 @@ class LocalKey(RestServlet):
|
||||
def on_GET(
|
||||
self, request: Request, key_id: Optional[str] = None
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# Matrix 1.6 drops support for passing the key_id, this is incompatible
|
||||
# with earlier versions and is allowed in order to support both.
|
||||
# A warning is issued to help determine when it is safe to drop this.
|
||||
if key_id:
|
||||
logger.warning(
|
||||
"Request for local server key with deprecated key ID (logging to determine usage level for future removal): %s",
|
||||
key_id,
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
# Update the expiry time if less than half the interval remains.
|
||||
if time_now + self.config.key.key_refresh_interval / 2 > self.valid_until_ts:
|
||||
|
||||
@@ -126,6 +126,15 @@ class RemoteKey(RestServlet):
|
||||
self, request: Request, server: str, key_id: Optional[str] = None
|
||||
) -> Tuple[int, JsonDict]:
|
||||
if server and key_id:
|
||||
# Matrix 1.6 drops support for passing the key_id, this is incompatible
|
||||
# with earlier versions and is allowed in order to support both.
|
||||
# A warning is issued to help determine when it is safe to drop this.
|
||||
logger.warning(
|
||||
"Request for remote server key with deprecated key ID (logging to determine usage level for future removal): %s / %s",
|
||||
server,
|
||||
key_id,
|
||||
)
|
||||
|
||||
minimum_valid_until_ts = parse_integer(request, "minimum_valid_until_ts")
|
||||
arguments = {}
|
||||
if minimum_valid_until_ts is not None:
|
||||
@@ -161,7 +170,7 @@ class RemoteKey(RestServlet):
|
||||
|
||||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
# Map server_name->key_id->int. Note that the value of the init is unused.
|
||||
# Map server_name->key_id->int. Note that the value of the int is unused.
|
||||
# XXX: why don't we just use a set?
|
||||
cache_misses: Dict[str, Dict[str, int]] = {}
|
||||
for (server_name, key_id, _), key_results in cached.items():
|
||||
|
||||
@@ -118,7 +118,10 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
self,
|
||||
cache_name: str,
|
||||
key: Optional[Collection[Any]],
|
||||
store_name: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Attempts to invalidate the cache of the given name, ignoring if the
|
||||
cache doesn't exist. Mainly used for invalidating caches on workers,
|
||||
@@ -132,10 +135,21 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
cache_name
|
||||
key: Entry to invalidate. If None then invalidates the entire
|
||||
cache.
|
||||
store_name: The name of the store, leave as None for stores which
|
||||
have not yet been split out.
|
||||
"""
|
||||
|
||||
# First get the store.
|
||||
store = self
|
||||
if store_name is not None:
|
||||
try:
|
||||
store = getattr(self, store_name)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# Then attempt to find the cache on that store.
|
||||
try:
|
||||
cache = getattr(self, cache_name)
|
||||
cache = getattr(store, cache_name)
|
||||
except AttributeError:
|
||||
# Check if an externally defined module cache has been registered
|
||||
cache = self.external_cached_functions.get(cache_name)
|
||||
|
||||
@@ -561,50 +561,6 @@ class BackgroundUpdater:
|
||||
updater, oneshot=True
|
||||
)
|
||||
|
||||
def register_background_validate_constraint(
|
||||
self, update_name: str, constraint_name: str, table: str
|
||||
) -> None:
|
||||
"""Helper for store classes to do a background validate constraint.
|
||||
|
||||
This only applies on PostgreSQL.
|
||||
|
||||
To use:
|
||||
|
||||
1. use a schema delta file to add a background update. Example:
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('validate_my_constraint', '{}');
|
||||
|
||||
2. In the Store constructor, call this method
|
||||
|
||||
Args:
|
||||
update_name: update_name to register for
|
||||
constraint_name: name of constraint to validate
|
||||
table: table the constraint is applied to
|
||||
"""
|
||||
|
||||
def runner(conn: Connection) -> None:
|
||||
c = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
|
||||
"""
|
||||
logger.debug("[SQL] %s", sql)
|
||||
c.execute(sql)
|
||||
|
||||
async def updater(progress: JsonDict, batch_size: int) -> int:
|
||||
assert isinstance(
|
||||
self.db_pool.engine, engines.PostgresEngine
|
||||
), "validate constraint background update registered for non-Postres database"
|
||||
|
||||
logger.info("Validating constraint %s to %s", constraint_name, table)
|
||||
await self.db_pool.runWithConnection(runner)
|
||||
await self._end_background_update(update_name)
|
||||
return 1
|
||||
|
||||
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
|
||||
updater, oneshot=True
|
||||
)
|
||||
|
||||
async def create_index_in_background(
|
||||
self,
|
||||
index_name: str,
|
||||
|
||||
@@ -13,8 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import Counter
|
||||
from typing import TYPE_CHECKING, Collection, List, Tuple
|
||||
from typing import TYPE_CHECKING, Collection, Counter, List, Tuple
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
|
||||
@@ -121,7 +121,6 @@ class DataStore(
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersWorkerStore,
|
||||
StatsStore,
|
||||
RelationsStore,
|
||||
CensorEventsStore,
|
||||
UIAuthStore,
|
||||
EventForwardExtremitiesStore,
|
||||
@@ -141,6 +140,9 @@ class DataStore(
|
||||
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
# This is a bit repetitive, but avoids dynamically setting attributes.
|
||||
self.relations = RelationsStore(database, db_conn, hs)
|
||||
|
||||
async def get_users(self) -> List[JsonDict]:
|
||||
"""Function to retrieve a list of users in users table.
|
||||
|
||||
|
||||
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||
writers=hs.config.worker.writers.account_data,
|
||||
)
|
||||
else:
|
||||
# Multiple writers are not supported for SQLite.
|
||||
#
|
||||
# We shouldn't be running in worker mode with SQLite, but its useful
|
||||
# to support it for unit tests.
|
||||
#
|
||||
# If this process is the writer than we need to use
|
||||
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
|
||||
# updated over replication. (Multiple writers are not supported for
|
||||
# SQLite).
|
||||
self._account_data_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
hs.get_replication_notifier(),
|
||||
|
||||
@@ -248,10 +248,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
|
||||
# Caches which might leak edits must be invalidated for the event being
|
||||
# redacted.
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_thread_id", (redacts,))
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_relations_for_event", (redacts,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_applicable_edit", (redacts,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_thread_id", (redacts,), "relations")
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_id_for_receipts", (redacts,), "relations"
|
||||
)
|
||||
|
||||
if etype == EventTypes.Member:
|
||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
|
||||
@@ -264,21 +270,31 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
|
||||
|
||||
if relates_to:
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_relations_for_event", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_references_for_event", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_applicable_edit", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_summary", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_thread_participated", (relates_to,), "relations"
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,), "relations")
|
||||
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[Any, ...]
|
||||
) -> None:
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
"""Invalidates the cache and adds it to the cache stream so other workers
|
||||
will know to invalidate their caches.
|
||||
|
||||
This should only be used to invalidate caches where slaves won't
|
||||
otherwise know from other replication streams that the cache should
|
||||
This should only be used to invalidate caches where other workers won't
|
||||
otherwise have known from other replication streams that the cache should
|
||||
be invalidated.
|
||||
"""
|
||||
cache_func = getattr(self, cache_name, None)
|
||||
@@ -297,11 +313,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
cache_func: CachedFunction,
|
||||
keys: Tuple[Any, ...],
|
||||
) -> None:
|
||||
"""Invalidates the cache and adds it to the cache stream so slaves
|
||||
"""Invalidates the cache and adds it to the cache stream so other workers
|
||||
will know to invalidate their caches.
|
||||
|
||||
This should only be used to invalidate caches where slaves won't
|
||||
otherwise know from other replication streams that the cache should
|
||||
This should only be used to invalidate caches where other workers won't
|
||||
otherwise have known from other replication streams that the cache should
|
||||
be invalidated.
|
||||
"""
|
||||
txn.call_after(cache_func.invalidate, keys)
|
||||
@@ -310,7 +326,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
def _invalidate_all_cache_and_stream(
|
||||
self, txn: LoggingTransaction, cache_func: CachedFunction
|
||||
) -> None:
|
||||
"""Invalidates the entire cache and adds it to the cache stream so slaves
|
||||
"""Invalidates the entire cache and adds it to the cache stream so other workers
|
||||
will know to invalidate their caches.
|
||||
"""
|
||||
|
||||
|
||||
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
is_writer=hs.config.worker.worker_app is None,
|
||||
)
|
||||
|
||||
# Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
|
||||
# StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
|
||||
device_list_max = self._device_list_id_gen.get_current_token()
|
||||
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
|
||||
db_conn,
|
||||
|
||||
@@ -100,6 +100,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -288,22 +289,180 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
unique=True,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_validate_constraint(
|
||||
"event_push_actions_staging_thread_id",
|
||||
constraint_name="event_push_actions_staging_thread_id",
|
||||
table="event_push_actions_staging",
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"event_push_backfill_thread_id",
|
||||
self._background_backfill_thread_id,
|
||||
)
|
||||
self.db_pool.updates.register_background_validate_constraint(
|
||||
"event_push_actions_thread_id",
|
||||
constraint_name="event_push_actions_thread_id",
|
||||
|
||||
# Indexes which will be used to quickly make the thread_id column non-null.
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"event_push_actions_thread_id_null",
|
||||
index_name="event_push_actions_thread_id_null",
|
||||
table="event_push_actions",
|
||||
columns=["thread_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
)
|
||||
self.db_pool.updates.register_background_validate_constraint(
|
||||
"event_push_summary_thread_id",
|
||||
constraint_name="event_push_summary_thread_id",
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"event_push_summary_thread_id_null",
|
||||
index_name="event_push_summary_thread_id_null",
|
||||
table="event_push_summary",
|
||||
columns=["thread_id"],
|
||||
where_clause="thread_id IS NULL",
|
||||
)
|
||||
|
||||
# Check ASAP (and then later, every 1s) to see if we have finished
|
||||
# background updates the event_push_actions and event_push_summary tables.
|
||||
self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
|
||||
self._event_push_backfill_thread_id_done = False
|
||||
|
||||
@wrap_as_background_process("check_event_push_backfill_thread_id")
|
||||
async def _check_event_push_backfill_thread_id(self) -> None:
|
||||
"""
|
||||
Has thread_id finished backfilling?
|
||||
|
||||
If not, we need to just-in-time update it so the queries work.
|
||||
"""
|
||||
done = await self.db_pool.updates.has_completed_background_update(
|
||||
"event_push_backfill_thread_id"
|
||||
)
|
||||
|
||||
if done:
|
||||
self._event_push_backfill_thread_id_done = True
|
||||
else:
|
||||
# Reschedule to run.
|
||||
self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
|
||||
|
||||
async def _background_backfill_thread_id(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Fill in the thread_id field for event_push_actions and event_push_summary.
|
||||
|
||||
This is preparatory so that it can be made non-nullable in the future.
|
||||
|
||||
Because all current (null) data is done in an unthreaded manner this
|
||||
simply assumes it is on the "main" timeline. Since event_push_actions
|
||||
are periodically cleared it is not possible to correctly re-calculate
|
||||
the thread_id.
|
||||
"""
|
||||
event_push_actions_done = progress.get("event_push_actions_done", False)
|
||||
|
||||
def add_thread_id_txn(
|
||||
txn: LoggingTransaction, start_stream_ordering: int
|
||||
) -> int:
|
||||
sql = """
|
||||
SELECT stream_ordering
|
||||
FROM event_push_actions
|
||||
WHERE
|
||||
thread_id IS NULL
|
||||
AND stream_ordering > ?
|
||||
ORDER BY stream_ordering
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (start_stream_ordering, batch_size))
|
||||
|
||||
# No more rows to process.
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
progress["event_push_actions_done"] = True
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_push_backfill_thread_id", progress
|
||||
)
|
||||
return 0
|
||||
|
||||
# Update the thread ID for any of those rows.
|
||||
max_stream_ordering = rows[-1][0]
|
||||
|
||||
sql = """
|
||||
UPDATE event_push_actions
|
||||
SET thread_id = 'main'
|
||||
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
|
||||
"""
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
start_stream_ordering,
|
||||
max_stream_ordering,
|
||||
),
|
||||
)
|
||||
|
||||
# Update progress.
|
||||
processed_rows = txn.rowcount
|
||||
progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_push_backfill_thread_id", progress
|
||||
)
|
||||
|
||||
return processed_rows
|
||||
|
||||
def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
|
||||
min_user_id = progress.get("max_summary_user_id", "")
|
||||
min_room_id = progress.get("max_summary_room_id", "")
|
||||
|
||||
# Slightly overcomplicated query for getting the Nth user ID / room
|
||||
# ID tuple, or the last if there are less than N remaining.
|
||||
sql = """
|
||||
SELECT user_id, room_id FROM (
|
||||
SELECT user_id, room_id FROM event_push_summary
|
||||
WHERE (user_id, room_id) > (?, ?)
|
||||
AND thread_id IS NULL
|
||||
ORDER BY user_id, room_id
|
||||
LIMIT ?
|
||||
) AS e
|
||||
ORDER BY user_id DESC, room_id DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
txn.execute(sql, (min_user_id, min_room_id, batch_size))
|
||||
row = txn.fetchone()
|
||||
if not row:
|
||||
return 0
|
||||
|
||||
max_user_id, max_room_id = row
|
||||
|
||||
sql = """
|
||||
UPDATE event_push_summary
|
||||
SET thread_id = 'main'
|
||||
WHERE
|
||||
(?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
|
||||
AND thread_id IS NULL
|
||||
"""
|
||||
txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
|
||||
processed_rows = txn.rowcount
|
||||
|
||||
progress["max_summary_user_id"] = max_user_id
|
||||
progress["max_summary_room_id"] = max_room_id
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn, "event_push_backfill_thread_id", progress
|
||||
)
|
||||
|
||||
return processed_rows
|
||||
|
||||
# First update the event_push_actions table, then the event_push_summary table.
|
||||
#
|
||||
# Note that the event_push_actions_staging table is ignored since it is
|
||||
# assumed that items in that table will only exist for a short period of
|
||||
# time.
|
||||
if not event_push_actions_done:
|
||||
result = await self.db_pool.runInteraction(
|
||||
"event_push_backfill_thread_id",
|
||||
add_thread_id_txn,
|
||||
progress.get("max_event_push_actions_stream_ordering", 0),
|
||||
)
|
||||
else:
|
||||
result = await self.db_pool.runInteraction(
|
||||
"event_push_backfill_thread_id",
|
||||
add_thread_id_summary_txn,
|
||||
)
|
||||
|
||||
# Only done after the event_push_summary table is done.
|
||||
if not result:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"event_push_backfill_thread_id"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
|
||||
"""Get the notification count by room for a user. Only considers notifications,
|
||||
not highlight or unread counts, and threads are currently aggregated under their room.
|
||||
@@ -552,6 +711,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
|
||||
)
|
||||
|
||||
# First ensure that the existing rows have an updated thread_id field.
|
||||
if not self._event_push_backfill_thread_id_done:
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_push_summary
|
||||
SET thread_id = ?
|
||||
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
|
||||
""",
|
||||
(MAIN_TIMELINE, room_id, user_id),
|
||||
)
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_push_actions
|
||||
SET thread_id = ?
|
||||
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
|
||||
""",
|
||||
(MAIN_TIMELINE, room_id, user_id),
|
||||
)
|
||||
|
||||
# First we pull the counts from the summary table.
|
||||
#
|
||||
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
|
||||
@@ -1367,6 +1545,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
(room_id, user_id, stream_ordering, *thread_args),
|
||||
)
|
||||
|
||||
# First ensure that the existing rows have an updated thread_id field.
|
||||
if not self._event_push_backfill_thread_id_done:
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_push_summary
|
||||
SET thread_id = ?
|
||||
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
|
||||
""",
|
||||
(MAIN_TIMELINE, room_id, user_id),
|
||||
)
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_push_actions
|
||||
SET thread_id = ?
|
||||
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
|
||||
""",
|
||||
(MAIN_TIMELINE, room_id, user_id),
|
||||
)
|
||||
|
||||
# Fetch the notification counts between the stream ordering of the
|
||||
# latest receipt and what was previously summarised.
|
||||
unread_counts = self._get_notif_unread_count_for_user_room(
|
||||
@@ -1501,6 +1698,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
|
||||
"""
|
||||
|
||||
# Ensure that any new actions have an updated thread_id.
|
||||
if not self._event_push_backfill_thread_id_done:
|
||||
txn.execute(
|
||||
"""
|
||||
UPDATE event_push_actions
|
||||
SET thread_id = ?
|
||||
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
|
||||
""",
|
||||
(MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
|
||||
)
|
||||
|
||||
# XXX Do we need to update summaries here too?
|
||||
|
||||
# Calculate the new counts that should be upserted into event_push_summary
|
||||
sql = """
|
||||
SELECT user_id, room_id, thread_id,
|
||||
@@ -1563,6 +1773,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
||||
|
||||
logger.info("Rotating notifications, handling %d rows", len(summaries))
|
||||
|
||||
# Ensure that any updated threads have the proper thread_id.
|
||||
if not self._event_push_backfill_thread_id_done:
|
||||
txn.execute_batch(
|
||||
"""
|
||||
UPDATE event_push_summary
|
||||
SET thread_id = ?
|
||||
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
|
||||
""",
|
||||
[
|
||||
(MAIN_TIMELINE, room_id, user_id)
|
||||
for user_id, room_id, _ in summaries
|
||||
],
|
||||
)
|
||||
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="event_push_summary",
|
||||
|
||||
@@ -2072,25 +2072,29 @@ class PersistEventsStore:
|
||||
|
||||
# Any relation information for the related event must be cleared.
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_relations_for_event, (redacted_relates_to,)
|
||||
txn, self.store.relations.get_relations_for_event, (redacted_relates_to,)
|
||||
)
|
||||
if rel_type == RelationTypes.REFERENCE:
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_references_for_event, (redacted_relates_to,)
|
||||
txn,
|
||||
self.store.relations.get_references_for_event,
|
||||
(redacted_relates_to,),
|
||||
)
|
||||
if rel_type == RelationTypes.REPLACE:
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_applicable_edit, (redacted_relates_to,)
|
||||
txn, self.store.relations.get_applicable_edit, (redacted_relates_to,)
|
||||
)
|
||||
if rel_type == RelationTypes.THREAD:
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_thread_summary, (redacted_relates_to,)
|
||||
txn, self.store.relations.get_thread_summary, (redacted_relates_to,)
|
||||
)
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_thread_participated, (redacted_relates_to,)
|
||||
txn,
|
||||
self.store.relations.get_thread_participated,
|
||||
(redacted_relates_to,),
|
||||
)
|
||||
self.store._invalidate_cache_and_stream(
|
||||
txn, self.store.get_threads, (room_id,)
|
||||
txn, self.store.relations.get_threads, (room_id,)
|
||||
)
|
||||
|
||||
# Find the new latest event in the thread.
|
||||
|
||||
@@ -1217,10 +1217,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||
for parent_id in {r[1] for r in relations_to_insert}:
|
||||
cache_tuple = (parent_id,)
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
|
||||
txn, self.relations.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
|
||||
)
|
||||
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
|
||||
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
|
||||
txn, self.relations.get_thread_summary, cache_tuple # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
if results:
|
||||
|
||||
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
writers=hs.config.worker.writers.events,
|
||||
)
|
||||
else:
|
||||
# Multiple writers are not supported for SQLite.
|
||||
#
|
||||
# We shouldn't be running in worker mode with SQLite, but its useful
|
||||
# to support it for unit tests.
|
||||
#
|
||||
# If this process is the writer than we need to use
|
||||
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
|
||||
# updated over replication. (Multiple writers are not supported for
|
||||
# SQLite).
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
hs.get_replication_notifier(),
|
||||
|
||||
@@ -25,6 +25,7 @@ from synapse.storage.database import (
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import JsonDict, UserID
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore):
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
self.server_name: str = hs.hostname
|
||||
self.database_engine = database.engine
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"full_users_filters_unique_idx",
|
||||
index_name="full_users_unique_idx",
|
||||
@@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore):
|
||||
unique=True,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"populate_full_user_id_user_filters",
|
||||
self.populate_full_user_id_user_filters,
|
||||
)
|
||||
|
||||
async def populate_full_user_id_user_filters(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the column `full_user_id` of the table
|
||||
user_filters from entries in the column `user_local_part` of the same table
|
||||
"""
|
||||
|
||||
lower_bound_id = progress.get("lower_bound_id", "")
|
||||
|
||||
def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
|
||||
sql = """
|
||||
SELECT user_id FROM user_filters
|
||||
WHERE user_id > ?
|
||||
ORDER BY user_id
|
||||
LIMIT 1 OFFSET 50
|
||||
"""
|
||||
txn.execute(sql, (lower_bound_id,))
|
||||
res = txn.fetchone()
|
||||
if res:
|
||||
upper_bound_id = res[0]
|
||||
return upper_bound_id
|
||||
else:
|
||||
return None
|
||||
|
||||
def _process_batch(
|
||||
txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
|
||||
) -> None:
|
||||
sql = """
|
||||
UPDATE user_filters
|
||||
SET full_user_id = '@' || user_id || ?
|
||||
WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
|
||||
"""
|
||||
txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
|
||||
|
||||
def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
|
||||
sql = """
|
||||
UPDATE user_filters
|
||||
SET full_user_id = '@' || user_id || ?
|
||||
WHERE ? < user_id AND full_user_id IS NULL
|
||||
"""
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
f":{self.server_name}",
|
||||
lower_bound_id,
|
||||
),
|
||||
)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = """
|
||||
ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null
|
||||
"""
|
||||
txn.execute(sql)
|
||||
|
||||
upper_bound_id = await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_user_filters", _get_last_id
|
||||
)
|
||||
|
||||
if upper_bound_id is None:
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_user_filters", _final_batch, lower_bound_id
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"populate_full_user_id_user_filters"
|
||||
)
|
||||
return 1
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_user_filters",
|
||||
_process_batch,
|
||||
lower_bound_id,
|
||||
upper_bound_id,
|
||||
)
|
||||
|
||||
progress["lower_bound_id"] = upper_bound_id
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_user_filters",
|
||||
self.db_pool.updates._background_update_progress_txn,
|
||||
"populate_full_user_id_user_filters",
|
||||
progress,
|
||||
)
|
||||
|
||||
return 50
|
||||
|
||||
@cached(num_args=2)
|
||||
async def get_user_filter(
|
||||
self, user_localpart: str, filter_id: Union[int, str]
|
||||
|
||||
@@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||
from synapse.types import UserID
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import JsonDict, UserID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
self.server_name: str = hs.hostname
|
||||
self.database_engine = database.engine
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
"profiles_full_user_id_key_idx",
|
||||
index_name="profiles_full_user_id_key",
|
||||
@@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||
unique=True,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"populate_full_user_id_profiles", self.populate_full_user_id_profiles
|
||||
)
|
||||
|
||||
async def populate_full_user_id_profiles(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the column `full_user_id` of the table
|
||||
profiles from entries in the column `user_local_part` of the same table
|
||||
"""
|
||||
|
||||
lower_bound_id = progress.get("lower_bound_id", "")
|
||||
|
||||
def _get_last_id(txn: LoggingTransaction) -> Optional[str]:
|
||||
sql = """
|
||||
SELECT user_id FROM profiles
|
||||
WHERE user_id > ?
|
||||
ORDER BY user_id
|
||||
LIMIT 1 OFFSET 50
|
||||
"""
|
||||
txn.execute(sql, (lower_bound_id,))
|
||||
res = txn.fetchone()
|
||||
if res:
|
||||
upper_bound_id = res[0]
|
||||
return upper_bound_id
|
||||
else:
|
||||
return None
|
||||
|
||||
def _process_batch(
|
||||
txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str
|
||||
) -> None:
|
||||
sql = """
|
||||
UPDATE profiles
|
||||
SET full_user_id = '@' || user_id || ?
|
||||
WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL
|
||||
"""
|
||||
txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id))
|
||||
|
||||
def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None:
|
||||
sql = """
|
||||
UPDATE profiles
|
||||
SET full_user_id = '@' || user_id || ?
|
||||
WHERE ? < user_id AND full_user_id IS NULL
|
||||
"""
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
f":{self.server_name}",
|
||||
lower_bound_id,
|
||||
),
|
||||
)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = """
|
||||
ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null
|
||||
"""
|
||||
txn.execute(sql)
|
||||
|
||||
upper_bound_id = await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_profiles", _get_last_id
|
||||
)
|
||||
|
||||
if upper_bound_id is None:
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_profiles", _final_batch, lower_bound_id
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"populate_full_user_id_profiles"
|
||||
)
|
||||
return 1
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_profiles",
|
||||
_process_batch,
|
||||
lower_bound_id,
|
||||
upper_bound_id,
|
||||
)
|
||||
|
||||
progress["lower_bound_id"] = upper_bound_id
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"populate_full_user_id_profiles",
|
||||
self.db_pool.updates._background_update_progress_txn,
|
||||
"populate_full_user_id_profiles",
|
||||
progress,
|
||||
)
|
||||
|
||||
return 50
|
||||
|
||||
async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
|
||||
try:
|
||||
profile = await self.db_pool.simple_select_one(
|
||||
|
||||
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
||||
else:
|
||||
self._can_write_to_receipts = True
|
||||
|
||||
# Multiple writers are not supported for SQLite.
|
||||
#
|
||||
# We shouldn't be running in worker mode with SQLite, but its useful
|
||||
# to support it for unit tests.
|
||||
#
|
||||
# If this process is the writer than we need to use
|
||||
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
|
||||
# updated over replication. (Multiple writers are not supported for
|
||||
# SQLite).
|
||||
self._receipts_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
hs.get_replication_notifier(),
|
||||
|
||||
@@ -557,14 +557,14 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
"get_applicable_edits", _get_applicable_edits_txn
|
||||
)
|
||||
|
||||
edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined]
|
||||
edits = await self.hs.get_datastores().main.get_events(edit_ids.values())
|
||||
|
||||
# Map to the original event IDs to the edit events.
|
||||
#
|
||||
# There might not be an edit event due to there being no edits or
|
||||
# due to the event not being known, either case is treated the same.
|
||||
return {
|
||||
original_event_id: edits.get(edit_ids.get(original_event_id))
|
||||
original_event_id: edits.get(edit_ids.get(original_event_id)) # type: ignore[arg-type]
|
||||
for original_event_id in event_ids
|
||||
}
|
||||
|
||||
@@ -671,7 +671,9 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
"get_thread_summaries", _get_thread_summaries_txn
|
||||
)
|
||||
|
||||
latest_events = await self.get_events(latest_event_ids.values()) # type: ignore[attr-defined]
|
||||
latest_events = await self.hs.get_datastores().main.get_events(
|
||||
latest_event_ids.values()
|
||||
)
|
||||
|
||||
# Map to the event IDs to the thread summary.
|
||||
#
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user