Compare commits
6 Commits
mv/parse-d
...
shay/fix_d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9cd13d0f26 | ||
|
|
b4cc31d906 | ||
|
|
d69d109ad6 | ||
|
|
a317ccbc7a | ||
|
|
7bc2aefe92 | ||
|
|
67f152b476 |
23
.github/workflows/latest_deps.yml
vendored
23
.github/workflows/latest_deps.yml
vendored
@@ -22,21 +22,7 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
check_repo:
|
||||
# Prevent this workflow from running on any fork of Synapse other than matrix-org/synapse, as it is
|
||||
# only useful to the Synapse core team.
|
||||
# All other workflow steps depend on this one, thus if 'should_run_workflow' is not 'true', the rest
|
||||
# of the workflow will be skipped as well.
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
should_run_workflow: ${{ steps.check_condition.outputs.should_run_workflow }}
|
||||
steps:
|
||||
- id: check_condition
|
||||
run: echo "should_run_workflow=${{ github.repository == 'matrix-org/synapse' }}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
mypy:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -61,8 +47,6 @@ jobs:
|
||||
run: sed '/warn_unused_ignores = True/d' -i mypy.ini
|
||||
- run: poetry run mypy
|
||||
trial:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -121,8 +105,6 @@ jobs:
|
||||
|
||||
|
||||
sytest:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: matrixdotorg/sytest-synapse:testing
|
||||
@@ -174,8 +156,7 @@ jobs:
|
||||
|
||||
|
||||
complement:
|
||||
needs: check_repo
|
||||
if: "!failure() && !cancelled() && needs.check_repo.outputs.should_run_workflow == 'true'"
|
||||
if: "${{ !failure() && !cancelled() }}"
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
@@ -211,7 +192,7 @@ jobs:
|
||||
# Open an issue if the build fails, so we know about it.
|
||||
# Only do this if we're not experimenting with this action in a PR.
|
||||
open-issue:
|
||||
if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request' && needs.check_repo.outputs.should_run_workflow == 'true'"
|
||||
if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request'"
|
||||
needs:
|
||||
# TODO: should mypy be included here? It feels more brittle than the others.
|
||||
- mypy
|
||||
|
||||
45
.github/workflows/tests.yml
vendored
45
.github/workflows/tests.yml
vendored
@@ -35,7 +35,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
@@ -45,16 +45,6 @@ jobs:
|
||||
- run: poetry run scripts-dev/generate_sample_config.sh --check
|
||||
- run: poetry run scripts-dev/config-lint.sh
|
||||
|
||||
check-schema-delta:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
|
||||
- run: scripts-dev/check_schema_delta.py --force-colors
|
||||
|
||||
check-lockfile:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
@@ -92,10 +82,6 @@ jobs:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- name: Setup Poetry
|
||||
uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
@@ -107,6 +93,10 @@ jobs:
|
||||
# To make CI green, err towards caution and install the project.
|
||||
install-project: "true"
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
# Cribbed from
|
||||
# https://github.com/AustinScola/mypy-cache-github-action/blob/85ea4f2972abed39b33bd02c36e341b28ca59213/src/restore.ts#L10-L17
|
||||
- name: Restore/persist mypy's cache
|
||||
@@ -150,7 +140,7 @@ jobs:
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
with:
|
||||
@@ -167,7 +157,7 @@ jobs:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
with:
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
@@ -221,7 +211,6 @@ jobs:
|
||||
- lint-newsfile
|
||||
- lint-pydantic
|
||||
- check-sampleconfig
|
||||
- check-schema-delta
|
||||
- check-lockfile
|
||||
- lint-clippy
|
||||
- lint-rustfmt
|
||||
@@ -268,7 +257,7 @@ jobs:
|
||||
postgres:${{ matrix.job.postgres-version }}
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- uses: matrix-org/setup-python-poetry@v1
|
||||
@@ -308,7 +297,7 @@ jobs:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
# There aren't wheels for some of the older deps, so we need to install
|
||||
@@ -416,7 +405,7 @@ jobs:
|
||||
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- name: Run SyTest
|
||||
@@ -556,7 +545,7 @@ jobs:
|
||||
path: synapse
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- uses: actions/setup-go@v4
|
||||
@@ -584,7 +573,7 @@ jobs:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@1.60.0
|
||||
uses: dtolnay/rust-toolchain@1.58.1
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
|
||||
- run: cargo test
|
||||
@@ -609,6 +598,16 @@ jobs:
|
||||
|
||||
- run: cargo bench --no-run
|
||||
|
||||
check-schema-delta:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
|
||||
- run: scripts-dev/check_schema_delta.py --force-colors
|
||||
|
||||
# a job which marks all the other jobs as complete, thus allowing PRs to be merged.
|
||||
tests-done:
|
||||
if: ${{ always() }}
|
||||
|
||||
24
.github/workflows/twisted_trunk.yml
vendored
24
.github/workflows/twisted_trunk.yml
vendored
@@ -18,22 +18,7 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
check_repo:
|
||||
# Prevent this workflow from running on any fork of Synapse other than matrix-org/synapse, as it is
|
||||
# only useful to the Synapse core team.
|
||||
# All other workflow steps depend on this one, thus if 'should_run_workflow' is not 'true', the rest
|
||||
# of the workflow will be skipped as well.
|
||||
if: github.repository == 'matrix-org/synapse'
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
should_run_workflow: ${{ steps.check_condition.outputs.should_run_workflow }}
|
||||
steps:
|
||||
- id: check_condition
|
||||
run: echo "should_run_workflow=${{ github.repository == 'matrix-org/synapse' }}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
mypy:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
@@ -56,8 +41,6 @@ jobs:
|
||||
- run: poetry run mypy
|
||||
|
||||
trial:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
@@ -92,8 +75,6 @@ jobs:
|
||||
|| true
|
||||
|
||||
sytest:
|
||||
needs: check_repo
|
||||
if: needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: matrixdotorg/sytest-synapse:buster
|
||||
@@ -138,8 +119,7 @@ jobs:
|
||||
/logs/**/*.log*
|
||||
|
||||
complement:
|
||||
needs: check_repo
|
||||
if: "!failure() && !cancelled() && needs.check_repo.outputs.should_run_workflow == 'true'"
|
||||
if: "${{ !failure() && !cancelled() }}"
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
@@ -186,7 +166,7 @@ jobs:
|
||||
|
||||
# open an issue if the build fails, so we know about it.
|
||||
open-issue:
|
||||
if: failure() && needs.check_repo.outputs.should_run_workflow == 'true'
|
||||
if: failure()
|
||||
needs:
|
||||
- mypy
|
||||
- trial
|
||||
|
||||
41
CHANGES.md
41
CHANGES.md
@@ -1,44 +1,3 @@
|
||||
Synapse 1.85.2 (2023-06-08)
|
||||
===========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix regression where using TLS for HTTP replication between workers did not work. Introduced in v1.85.0. ([\#15746](https://github.com/matrix-org/synapse/issues/15746))
|
||||
|
||||
|
||||
Synapse 1.85.1 (2023-06-07)
|
||||
===========================
|
||||
|
||||
Note: this release only fixes a bug that stopped some deployments from upgrading to v1.85.0. There is no need to upgrade to v1.85.1 if successfully running v1.85.0.
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug in schema delta that broke upgrades for some deployments. Introduced in v1.85.0. ([\#15738](https://github.com/matrix-org/synapse/issues/15738), [\#15739](https://github.com/matrix-org/synapse/issues/15739))
|
||||
|
||||
|
||||
Synapse 1.85.0 (2023-06-06)
|
||||
===========================
|
||||
|
||||
No significant changes since 1.85.0rc2.
|
||||
|
||||
|
||||
## Security advisory
|
||||
|
||||
The following issues are fixed in 1.85.0 (and RCs).
|
||||
|
||||
- [GHSA-26c5-ppr8-f33p](https://github.com/matrix-org/synapse/security/advisories/GHSA-26c5-ppr8-f33p) / [CVE-2023-32682](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-32682) — Low Severity
|
||||
|
||||
It may be possible for a deactivated user to login when using uncommon configurations.
|
||||
|
||||
- [GHSA-98px-6486-j7qc](https://github.com/matrix-org/synapse/security/advisories/GHSA-98px-6486-j7qc) / [CVE-2023-32683](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-32683) — Low Severity
|
||||
|
||||
A discovered oEmbed or image URL can bypass the `url_preview_url_blacklist` setting potentially allowing server side request forgery or bypassing network policies. Impact is limited to IP addresses allowed by the `url_preview_ip_range_blacklist` setting (by default this only allows public IPs).
|
||||
|
||||
See the advisories for more details. If you have any questions, email security@matrix.org.
|
||||
|
||||
|
||||
Synapse 1.85.0rc2 (2023-06-01)
|
||||
==============================
|
||||
|
||||
|
||||
28
Cargo.lock
generated
28
Cargo.lock
generated
@@ -4,9 +4,9 @@ version = 3
|
||||
|
||||
[[package]]
|
||||
name = "aho-corasick"
|
||||
version = "1.0.2"
|
||||
version = "0.7.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
|
||||
checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
@@ -132,9 +132,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.19"
|
||||
version = "0.4.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
|
||||
checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
@@ -229,9 +229,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pyo3-log"
|
||||
version = "0.8.2"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c94ff6535a6bae58d7d0b85e60d4c53f7f84d0d0aa35d6a28c3f3e70bfe51444"
|
||||
checksum = "f9c8b57fe71fb5dcf38970ebedc2b1531cf1c14b1b9b4c560a182a57e115575c"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"log",
|
||||
@@ -291,9 +291,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.8.4"
|
||||
version = "1.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
|
||||
checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -302,9 +302,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.7.2"
|
||||
version = "0.6.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
@@ -320,18 +320,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.164"
|
||||
version = "1.0.163"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d"
|
||||
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.164"
|
||||
version = "1.0.163"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68"
|
||||
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Allow for the configuration of max request retries and min/max retry delays in the matrix federation client.
|
||||
@@ -1 +0,0 @@
|
||||
Replace `EventContext` fields `prev_group` and `delta_ids` with field `state_group_deltas`.
|
||||
@@ -1 +0,0 @@
|
||||
Enable support for [MSC3952](https://github.com/matrix-org/matrix-spec-proposals/pull/3952): intentional mentions.
|
||||
@@ -1 +0,0 @@
|
||||
Correctly clear caches when we delete a room.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for tracing functions which return `Awaitable`s.
|
||||
@@ -1 +0,0 @@
|
||||
Check permissions for enabling encryption earlier during room creation to avoid creating broken rooms.
|
||||
@@ -1 +0,0 @@
|
||||
Update docstring and traces on `maybe_backfill()` functions.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up `/messages` by backfilling in the background when there are no backward extremities where we are directly paginating.
|
||||
@@ -1 +0,0 @@
|
||||
Add context for when/why to use the `long_retries` option when sending Federation requests.
|
||||
@@ -1 +0,0 @@
|
||||
Removed some unused fields.
|
||||
1
changelog.d/15724.bugfix
Normal file
1
changelog.d/15724.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix missing dependencies in background jobs.
|
||||
@@ -1 +0,0 @@
|
||||
Update federation error to more plainly explain we can only authorize our own membership events.
|
||||
@@ -1 +0,0 @@
|
||||
Prevent the `latest_deps` and `twisted_trunk` daily GitHub Actions workflows from running on forks of the codebase.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of user directory search.
|
||||
@@ -1 +0,0 @@
|
||||
Remove redundant table join with `room_memberships` when doing a `is_host_joined()`/`is_host_invited()` call (`membership` is already part of the `current_state_events`).
|
||||
@@ -1 +0,0 @@
|
||||
Simplify query to find participating servers in a room.
|
||||
@@ -1 +0,0 @@
|
||||
Remove superfluous `room_memberships` join from background update.
|
||||
@@ -1 +0,0 @@
|
||||
Improve `/messages` response time by avoiding backfill when we already have messages to return.
|
||||
@@ -1 +0,0 @@
|
||||
Expose a metric reporting the database background update status.
|
||||
@@ -1 +0,0 @@
|
||||
Speed up typechecking CI.
|
||||
@@ -1 +0,0 @@
|
||||
Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).
|
||||
@@ -1 +0,0 @@
|
||||
Bump minimum supported Rust version to 1.60.0.
|
||||
@@ -1 +0,0 @@
|
||||
Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).
|
||||
@@ -1 +0,0 @@
|
||||
Use parse_duration for federation client timeout and retry options.
|
||||
18
debian/changelog
vendored
18
debian/changelog
vendored
@@ -1,21 +1,3 @@
|
||||
matrix-synapse-py3 (1.85.2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.85.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Thu, 08 Jun 2023 13:04:18 +0100
|
||||
|
||||
matrix-synapse-py3 (1.85.1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.85.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 07 Jun 2023 10:51:12 +0100
|
||||
|
||||
matrix-synapse-py3 (1.85.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.85.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Jun 2023 09:39:29 +0100
|
||||
|
||||
matrix-synapse-py3 (1.85.0~rc2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.85.0rc2.
|
||||
|
||||
@@ -21,7 +21,7 @@ FROM docker.io/library/debian:bullseye-slim AS deps_base
|
||||
# which makes it much easier to copy (but we need to make sure we use an image
|
||||
# based on the same debian version as the synapse image, to make sure we get
|
||||
# the expected version of libc.
|
||||
FROM docker.io/library/redis:7-bullseye AS redis_base
|
||||
FROM docker.io/library/redis:6-bullseye AS redis_base
|
||||
|
||||
# now build the final image, based on the the regular Synapse docker image
|
||||
FROM $FROM
|
||||
|
||||
@@ -87,14 +87,6 @@ process, for example:
|
||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
# Upgrading to v1.86.0
|
||||
|
||||
## Minimum supported Rust version
|
||||
|
||||
The minimum supported Rust version has been increased from v1.58.1 to v1.60.0.
|
||||
Users building from source will need to ensure their `rustc` version is up to
|
||||
date.
|
||||
|
||||
|
||||
# Upgrading to v1.85.0
|
||||
|
||||
|
||||
@@ -27,8 +27,9 @@ What servers are currently participating in this room?
|
||||
Run this sql query on your db:
|
||||
```sql
|
||||
SELECT DISTINCT split_part(state_key, ':', 2)
|
||||
FROM current_state_events
|
||||
WHERE room_id = '!cURbafjkfsMDVwdRDQ:matrix.org' AND membership = 'join';
|
||||
FROM current_state_events AS c
|
||||
INNER JOIN room_memberships AS m USING (room_id, event_id)
|
||||
WHERE room_id = '!cURbafjkfsMDVwdRDQ:matrix.org' AND membership = 'join';
|
||||
```
|
||||
|
||||
What users are registered on my server?
|
||||
|
||||
@@ -1196,32 +1196,6 @@ Example configuration:
|
||||
allow_device_name_lookup_over_federation: true
|
||||
```
|
||||
---
|
||||
### `federation`
|
||||
|
||||
The federation section defines some sub-options related to federation.
|
||||
|
||||
The following options are related to configuring timeout and retry logic for one request,
|
||||
independently of the others.
|
||||
Short retry algorithm is used when something or someone will wait for the request to have an
|
||||
answer, while long retry is used for requests that happen in the background,
|
||||
like sending a federation transaction.
|
||||
|
||||
* `client_timeout`: timeout for the federation requests in seconds. Default to 60s.
|
||||
* `max_short_retry_delay`: maximum delay to be used for the short retry algo in seconds. Default to 2s.
|
||||
* `max_long_retry_delay`: maximum delay to be used for the short retry algo in seconds. Default to 60s.
|
||||
* `max_short_retries`: maximum number of retries for the short retry algo. Default to 3 attempts.
|
||||
* `max_long_retries`: maximum number of retries for the long retry algo. Default to 10 attempts.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
federation:
|
||||
client_timeout: 180
|
||||
max_short_retry_delay: 7
|
||||
max_long_retry_delay: 100
|
||||
max_short_retries: 5
|
||||
max_long_retries: 20
|
||||
```
|
||||
---
|
||||
## Caching
|
||||
|
||||
Options related to caching.
|
||||
|
||||
287
poetry.lock
generated
287
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.85.2"
|
||||
version = "1.85.0rc2"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -7,7 +7,7 @@ name = "synapse"
|
||||
version = "0.1.0"
|
||||
|
||||
edition = "2021"
|
||||
rust-version = "1.60.0"
|
||||
rust-version = "1.58.1"
|
||||
|
||||
[lib]
|
||||
name = "synapse"
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(test)]
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use synapse::push::{
|
||||
evaluator::PushRuleEvaluator, Condition, EventMatchCondition, FilteredPushRules, JsonValue,
|
||||
PushRules, SimpleJsonValue,
|
||||
@@ -195,6 +197,7 @@ fn bench_eval_message(b: &mut Bencher) {
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
);
|
||||
|
||||
b.iter(|| eval.run(&rules, Some("bob"), Some("person")));
|
||||
|
||||
@@ -142,11 +142,11 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
|
||||
default_enabled: true,
|
||||
},
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/override/.m.is_user_mention"),
|
||||
rule_id: Cow::Borrowed(".org.matrix.msc3952.is_user_mention"),
|
||||
priority_class: 5,
|
||||
conditions: Cow::Borrowed(&[Condition::Known(
|
||||
KnownCondition::ExactEventPropertyContainsType(EventPropertyIsTypeCondition {
|
||||
key: Cow::Borrowed("content.m\\.mentions.user_ids"),
|
||||
key: Cow::Borrowed("content.org\\.matrix\\.msc3952\\.mentions.user_ids"),
|
||||
value_type: Cow::Borrowed(&EventMatchPatternType::UserId),
|
||||
}),
|
||||
)]),
|
||||
@@ -163,11 +163,11 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
|
||||
default_enabled: true,
|
||||
},
|
||||
PushRule {
|
||||
rule_id: Cow::Borrowed("global/override/.m.is_room_mention"),
|
||||
rule_id: Cow::Borrowed(".org.matrix.msc3952.is_room_mention"),
|
||||
priority_class: 5,
|
||||
conditions: Cow::Borrowed(&[
|
||||
Condition::Known(KnownCondition::EventPropertyIs(EventPropertyIsCondition {
|
||||
key: Cow::Borrowed("content.m\\.mentions.room"),
|
||||
key: Cow::Borrowed("content.org\\.matrix\\.msc3952\\.mentions.room"),
|
||||
value: Cow::Borrowed(&SimpleJsonValue::Bool(true)),
|
||||
})),
|
||||
Condition::Known(KnownCondition::SenderNotificationPermission {
|
||||
|
||||
@@ -70,9 +70,7 @@ pub struct PushRuleEvaluator {
|
||||
/// The "content.body", if any.
|
||||
body: String,
|
||||
|
||||
/// True if the event has a m.mentions property. (Note that this is a separate
|
||||
/// flag instead of checking flattened_keys since the m.mentions property
|
||||
/// might be an empty map and not appear in flattened_keys.
|
||||
/// True if the event has a mentions property and MSC3952 support is enabled.
|
||||
has_mentions: bool,
|
||||
|
||||
/// The number of users in the room.
|
||||
@@ -157,7 +155,9 @@ impl PushRuleEvaluator {
|
||||
let rule_id = &push_rule.rule_id().to_string();
|
||||
|
||||
// For backwards-compatibility the legacy mention rules are disabled
|
||||
// if the event contains the 'm.mentions' property.
|
||||
// if the event contains the 'm.mentions' property (and if the
|
||||
// experimental feature is enabled, both of these are represented
|
||||
// by the has_mentions flag).
|
||||
if self.has_mentions
|
||||
&& (rule_id == "global/override/.m.rule.contains_display_name"
|
||||
|| rule_id == "global/content/.m.rule.contains_user_name"
|
||||
@@ -562,7 +562,7 @@ fn test_requires_room_version_supports_condition() {
|
||||
};
|
||||
let rules = PushRules::new(vec![custom_rule]);
|
||||
result = evaluator.run(
|
||||
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false),
|
||||
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false, false),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
@@ -527,6 +527,7 @@ pub struct FilteredPushRules {
|
||||
msc1767_enabled: bool,
|
||||
msc3381_polls_enabled: bool,
|
||||
msc3664_enabled: bool,
|
||||
msc3952_intentional_mentions: bool,
|
||||
msc3958_suppress_edits_enabled: bool,
|
||||
}
|
||||
|
||||
@@ -539,6 +540,7 @@ impl FilteredPushRules {
|
||||
msc1767_enabled: bool,
|
||||
msc3381_polls_enabled: bool,
|
||||
msc3664_enabled: bool,
|
||||
msc3952_intentional_mentions: bool,
|
||||
msc3958_suppress_edits_enabled: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -547,6 +549,7 @@ impl FilteredPushRules {
|
||||
msc1767_enabled,
|
||||
msc3381_polls_enabled,
|
||||
msc3664_enabled,
|
||||
msc3952_intentional_mentions,
|
||||
msc3958_suppress_edits_enabled,
|
||||
}
|
||||
}
|
||||
@@ -584,6 +587,10 @@ impl FilteredPushRules {
|
||||
return false;
|
||||
}
|
||||
|
||||
if !self.msc3952_intentional_mentions && rule.rule_id.contains("org.matrix.msc3952")
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if !self.msc3958_suppress_edits_enabled
|
||||
&& rule.rule_id == "global/override/.com.beeper.suppress_edits"
|
||||
{
|
||||
|
||||
@@ -46,6 +46,7 @@ class FilteredPushRules:
|
||||
msc1767_enabled: bool,
|
||||
msc3381_polls_enabled: bool,
|
||||
msc3664_enabled: bool,
|
||||
msc3952_intentional_mentions: bool,
|
||||
msc3958_suppress_edits_enabled: bool,
|
||||
): ...
|
||||
def rules(self) -> Collection[Tuple[PushRule, bool]]: ...
|
||||
|
||||
@@ -236,7 +236,7 @@ class EventContentFields:
|
||||
AUTHORISING_USER: Final = "join_authorised_via_users_server"
|
||||
|
||||
# Use for mentioning users.
|
||||
MENTIONS: Final = "m.mentions"
|
||||
MSC3952_MENTIONS: Final = "org.matrix.msc3952.mentions"
|
||||
|
||||
# an unspecced field added to to-device messages to identify them uniquely-ish
|
||||
TO_DEVICE_MSGID: Final = "org.matrix.msgid"
|
||||
|
||||
@@ -358,6 +358,11 @@ class ExperimentalConfig(Config):
|
||||
# MSC3391: Removing account data.
|
||||
self.msc3391_enabled = experimental.get("msc3391_enabled", False)
|
||||
|
||||
# MSC3952: Intentional mentions, this depends on MSC3966.
|
||||
self.msc3952_intentional_mentions = experimental.get(
|
||||
"msc3952_intentional_mentions", False
|
||||
)
|
||||
|
||||
# MSC3959: Do not generate notifications for edits.
|
||||
self.msc3958_supress_edit_notifs = experimental.get(
|
||||
"msc3958_supress_edit_notifs", False
|
||||
|
||||
@@ -22,8 +22,6 @@ class FederationConfig(Config):
|
||||
section = "federation"
|
||||
|
||||
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
|
||||
federation_config = config.setdefault("federation", {})
|
||||
|
||||
# FIXME: federation_domain_whitelist needs sytests
|
||||
self.federation_domain_whitelist: Optional[dict] = None
|
||||
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
|
||||
@@ -51,19 +49,5 @@ class FederationConfig(Config):
|
||||
"allow_device_name_lookup_over_federation", False
|
||||
)
|
||||
|
||||
# Allow for the configuration of timeout, max request retries
|
||||
# and min/max retry delays in the matrix federation client.
|
||||
self.client_timeout = Config.parse_duration(
|
||||
federation_config.get("client_timeout", "60s")
|
||||
)
|
||||
self.max_long_retry_delay = Config.parse_duration(
|
||||
federation_config.get("max_long_retry_delay", "60s")
|
||||
)
|
||||
self.max_short_retry_delay = Config.parse_duration(
|
||||
federation_config.get("max_short_retry_delay", "2s")
|
||||
)
|
||||
self.max_long_retries = federation_config.get("max_long_retries", 10)
|
||||
self.max_short_retries = federation_config.get("max_short_retries", 3)
|
||||
|
||||
|
||||
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
from immutabledict import immutabledict
|
||||
@@ -107,32 +107,33 @@ class EventContext(UnpersistedEventContextBase):
|
||||
state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
|
||||
then this is the delta of the state between the two groups.
|
||||
|
||||
state_group_deltas: If not empty, this is a dict collecting a mapping of the state
|
||||
difference between state groups.
|
||||
prev_group: If it is known, ``state_group``'s prev_group. Note that this being
|
||||
None does not necessarily mean that ``state_group`` does not have
|
||||
a prev_group!
|
||||
|
||||
The keys are a tuple of two integers: the initial group and final state group.
|
||||
The corresponding value is a state map representing the state delta between
|
||||
these state groups.
|
||||
If the event is a state event, this is normally the same as
|
||||
``state_group_before_event``.
|
||||
|
||||
The dictionary is expected to have at most two entries with state groups of:
|
||||
If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
|
||||
will always also be ``None``.
|
||||
|
||||
1. The state group before the event and after the event.
|
||||
2. The state group preceding the state group before the event and the
|
||||
state group before the event.
|
||||
Note that this *not* (necessarily) the state group associated with
|
||||
``_prev_state_ids``.
|
||||
|
||||
This information is collected and stored as part of an optimization for persisting
|
||||
events.
|
||||
delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
|
||||
and ``state_group``.
|
||||
|
||||
partial_state: if True, we may be storing this event with a temporary,
|
||||
incomplete state.
|
||||
"""
|
||||
|
||||
_storage: "StorageControllers"
|
||||
state_group_deltas: Dict[Tuple[int, int], StateMap[str]]
|
||||
rejected: Optional[str] = None
|
||||
_state_group: Optional[int] = None
|
||||
state_group_before_event: Optional[int] = None
|
||||
_state_delta_due_to_event: Optional[StateMap[str]] = None
|
||||
prev_group: Optional[int] = None
|
||||
delta_ids: Optional[StateMap[str]] = None
|
||||
app_service: Optional[ApplicationService] = None
|
||||
|
||||
partial_state: bool = False
|
||||
@@ -144,14 +145,16 @@ class EventContext(UnpersistedEventContextBase):
|
||||
state_group_before_event: Optional[int],
|
||||
state_delta_due_to_event: Optional[StateMap[str]],
|
||||
partial_state: bool,
|
||||
state_group_deltas: Dict[Tuple[int, int], StateMap[str]],
|
||||
prev_group: Optional[int] = None,
|
||||
delta_ids: Optional[StateMap[str]] = None,
|
||||
) -> "EventContext":
|
||||
return EventContext(
|
||||
storage=storage,
|
||||
state_group=state_group,
|
||||
state_group_before_event=state_group_before_event,
|
||||
state_delta_due_to_event=state_delta_due_to_event,
|
||||
state_group_deltas=state_group_deltas,
|
||||
prev_group=prev_group,
|
||||
delta_ids=delta_ids,
|
||||
partial_state=partial_state,
|
||||
)
|
||||
|
||||
@@ -160,7 +163,7 @@ class EventContext(UnpersistedEventContextBase):
|
||||
storage: "StorageControllers",
|
||||
) -> "EventContext":
|
||||
"""Return an EventContext instance suitable for persisting an outlier event"""
|
||||
return EventContext(storage=storage, state_group_deltas={})
|
||||
return EventContext(storage=storage)
|
||||
|
||||
async def persist(self, event: EventBase) -> "EventContext":
|
||||
return self
|
||||
@@ -180,15 +183,13 @@ class EventContext(UnpersistedEventContextBase):
|
||||
"state_group": self._state_group,
|
||||
"state_group_before_event": self.state_group_before_event,
|
||||
"rejected": self.rejected,
|
||||
"state_group_deltas": _encode_state_group_delta(self.state_group_deltas),
|
||||
"prev_group": self.prev_group,
|
||||
"state_delta_due_to_event": _encode_state_dict(
|
||||
self._state_delta_due_to_event
|
||||
),
|
||||
"delta_ids": _encode_state_dict(self.delta_ids),
|
||||
"app_service_id": self.app_service.id if self.app_service else None,
|
||||
"partial_state": self.partial_state,
|
||||
# add dummy delta_ids and prev_group for backwards compatibility
|
||||
"delta_ids": None,
|
||||
"prev_group": None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@@ -203,24 +204,17 @@ class EventContext(UnpersistedEventContextBase):
|
||||
Returns:
|
||||
The event context.
|
||||
"""
|
||||
# workaround for backwards/forwards compatibility: if the input doesn't have a value
|
||||
# for "state_group_deltas" just assign an empty dict
|
||||
state_group_deltas = input.get("state_group_deltas", None)
|
||||
if state_group_deltas:
|
||||
state_group_deltas = _decode_state_group_delta(state_group_deltas)
|
||||
else:
|
||||
state_group_deltas = {}
|
||||
|
||||
context = EventContext(
|
||||
# We use the state_group and prev_state_id stuff to pull the
|
||||
# current_state_ids out of the DB and construct prev_state_ids.
|
||||
storage=storage,
|
||||
state_group=input["state_group"],
|
||||
state_group_before_event=input["state_group_before_event"],
|
||||
state_group_deltas=state_group_deltas,
|
||||
prev_group=input["prev_group"],
|
||||
state_delta_due_to_event=_decode_state_dict(
|
||||
input["state_delta_due_to_event"]
|
||||
),
|
||||
delta_ids=_decode_state_dict(input["delta_ids"]),
|
||||
rejected=input["rejected"],
|
||||
partial_state=input.get("partial_state", False),
|
||||
)
|
||||
@@ -355,7 +349,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
|
||||
_storage: "StorageControllers"
|
||||
state_group_before_event: Optional[int]
|
||||
state_group_after_event: Optional[int]
|
||||
state_delta_due_to_event: Optional[StateMap[str]]
|
||||
state_delta_due_to_event: Optional[dict]
|
||||
prev_group_for_state_group_before_event: Optional[int]
|
||||
delta_ids_to_state_group_before_event: Optional[StateMap[str]]
|
||||
partial_state: bool
|
||||
@@ -386,16 +380,26 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
|
||||
|
||||
events_and_persisted_context = []
|
||||
for event, unpersisted_context in amended_events_and_context:
|
||||
state_group_deltas = unpersisted_context._build_state_group_deltas()
|
||||
|
||||
context = EventContext(
|
||||
storage=unpersisted_context._storage,
|
||||
state_group=unpersisted_context.state_group_after_event,
|
||||
state_group_before_event=unpersisted_context.state_group_before_event,
|
||||
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
|
||||
partial_state=unpersisted_context.partial_state,
|
||||
state_group_deltas=state_group_deltas,
|
||||
)
|
||||
if event.is_state():
|
||||
context = EventContext(
|
||||
storage=unpersisted_context._storage,
|
||||
state_group=unpersisted_context.state_group_after_event,
|
||||
state_group_before_event=unpersisted_context.state_group_before_event,
|
||||
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
|
||||
partial_state=unpersisted_context.partial_state,
|
||||
prev_group=unpersisted_context.state_group_before_event,
|
||||
delta_ids=unpersisted_context.state_delta_due_to_event,
|
||||
)
|
||||
else:
|
||||
context = EventContext(
|
||||
storage=unpersisted_context._storage,
|
||||
state_group=unpersisted_context.state_group_after_event,
|
||||
state_group_before_event=unpersisted_context.state_group_before_event,
|
||||
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
|
||||
partial_state=unpersisted_context.partial_state,
|
||||
prev_group=unpersisted_context.prev_group_for_state_group_before_event,
|
||||
delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
|
||||
)
|
||||
events_and_persisted_context.append((event, context))
|
||||
return events_and_persisted_context
|
||||
|
||||
@@ -448,11 +452,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
|
||||
|
||||
# if the event isn't a state event the state group doesn't change
|
||||
if not self.state_delta_due_to_event:
|
||||
self.state_group_after_event = self.state_group_before_event
|
||||
state_group_after_event = self.state_group_before_event
|
||||
|
||||
# otherwise if it is a state event we need to get a state group for it
|
||||
else:
|
||||
self.state_group_after_event = await self._storage.state.store_state_group(
|
||||
state_group_after_event = await self._storage.state.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=self.state_group_before_event,
|
||||
@@ -460,81 +464,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
|
||||
current_state_ids=None,
|
||||
)
|
||||
|
||||
state_group_deltas = self._build_state_group_deltas()
|
||||
|
||||
return EventContext.with_state(
|
||||
storage=self._storage,
|
||||
state_group=self.state_group_after_event,
|
||||
state_group=state_group_after_event,
|
||||
state_group_before_event=self.state_group_before_event,
|
||||
state_delta_due_to_event=self.state_delta_due_to_event,
|
||||
state_group_deltas=state_group_deltas,
|
||||
partial_state=self.partial_state,
|
||||
prev_group=self.state_group_before_event,
|
||||
delta_ids=self.state_delta_due_to_event,
|
||||
)
|
||||
|
||||
def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]:
|
||||
"""
|
||||
Collect deltas between the state groups associated with this context
|
||||
"""
|
||||
state_group_deltas = {}
|
||||
|
||||
# if we know the state group before the event and after the event, add them and the
|
||||
# state delta between them to state_group_deltas
|
||||
if self.state_group_before_event and self.state_group_after_event:
|
||||
# if we have the state groups we should have the delta
|
||||
assert self.state_delta_due_to_event is not None
|
||||
state_group_deltas[
|
||||
(
|
||||
self.state_group_before_event,
|
||||
self.state_group_after_event,
|
||||
)
|
||||
] = self.state_delta_due_to_event
|
||||
|
||||
# the state group before the event may also have a state group which precedes it, if
|
||||
# we have that and the state group before the event, add them and the state
|
||||
# delta between them to state_group_deltas
|
||||
if (
|
||||
self.prev_group_for_state_group_before_event
|
||||
and self.state_group_before_event
|
||||
):
|
||||
# if we have both state groups we should have the delta between them
|
||||
assert self.delta_ids_to_state_group_before_event is not None
|
||||
state_group_deltas[
|
||||
(
|
||||
self.prev_group_for_state_group_before_event,
|
||||
self.state_group_before_event,
|
||||
)
|
||||
] = self.delta_ids_to_state_group_before_event
|
||||
|
||||
return state_group_deltas
|
||||
|
||||
|
||||
def _encode_state_group_delta(
|
||||
state_group_delta: Dict[Tuple[int, int], StateMap[str]]
|
||||
) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
|
||||
if not state_group_delta:
|
||||
return []
|
||||
|
||||
state_group_delta_encoded = []
|
||||
for key, value in state_group_delta.items():
|
||||
state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value)))
|
||||
|
||||
return state_group_delta_encoded
|
||||
|
||||
|
||||
def _decode_state_group_delta(
|
||||
input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
|
||||
) -> Dict[Tuple[int, int], StateMap[str]]:
|
||||
if not input:
|
||||
return {}
|
||||
|
||||
state_group_deltas = {}
|
||||
for state_group_1, state_group_2, state_dict in input:
|
||||
state_map = _decode_state_dict(state_dict)
|
||||
assert state_map is not None
|
||||
state_group_deltas[(state_group_1, state_group_2)] = state_map
|
||||
|
||||
return state_group_deltas
|
||||
|
||||
|
||||
def _encode_state_dict(
|
||||
state_dict: Optional[StateMap[str]],
|
||||
|
||||
@@ -134,8 +134,13 @@ class EventValidator:
|
||||
)
|
||||
|
||||
# If the event contains a mentions key, validate it.
|
||||
if EventContentFields.MENTIONS in event.content:
|
||||
validate_json_object(event.content[EventContentFields.MENTIONS], Mentions)
|
||||
if (
|
||||
EventContentFields.MSC3952_MENTIONS in event.content
|
||||
and config.experimental.msc3952_intentional_mentions
|
||||
):
|
||||
validate_json_object(
|
||||
event.content[EventContentFields.MSC3952_MENTIONS], Mentions
|
||||
)
|
||||
|
||||
def _validate_retention(self, event: EventBase) -> None:
|
||||
"""Checks that an event that defines the retention policy for a room respects the
|
||||
|
||||
@@ -260,9 +260,7 @@ class FederationClient(FederationBase):
|
||||
use_unstable = False
|
||||
for user_id, one_time_keys in query.items():
|
||||
for device_id, algorithms in one_time_keys.items():
|
||||
# If more than one algorithm is requested, attempt to use the unstable
|
||||
# endpoint.
|
||||
if sum(algorithms.values()) > 1:
|
||||
if any(count > 1 for count in algorithms.values()):
|
||||
use_unstable = True
|
||||
if algorithms:
|
||||
# For the stable query, choose only the first algorithm.
|
||||
@@ -298,7 +296,6 @@ class FederationClient(FederationBase):
|
||||
else:
|
||||
logger.debug("Skipping unstable claim client keys API")
|
||||
|
||||
# TODO Potentially attempt multiple queries and combine the results?
|
||||
return await self.transport_layer.claim_client_keys(
|
||||
user, destination, content, timeout
|
||||
)
|
||||
|
||||
@@ -944,7 +944,7 @@ class FederationServer(FederationBase):
|
||||
if not self._is_mine_server_name(authorising_server):
|
||||
raise SynapseError(
|
||||
400,
|
||||
f"Cannot authorise membership event for {authorising_server}. We can only authorise requests from our own homeserver",
|
||||
f"Cannot authorise request from resident server: {authorising_server}",
|
||||
)
|
||||
|
||||
event.signatures.update(
|
||||
@@ -1016,9 +1016,7 @@ class FederationServer(FederationBase):
|
||||
for user_id, device_keys in result.items():
|
||||
for device_id, keys in device_keys.items():
|
||||
for key_id, key in keys.items():
|
||||
json_result.setdefault(user_id, {}).setdefault(device_id, {})[
|
||||
key_id
|
||||
] = key
|
||||
json_result.setdefault(user_id, {})[device_id] = {key_id: key}
|
||||
|
||||
logger.info(
|
||||
"Claimed one-time-keys: %s",
|
||||
|
||||
@@ -200,7 +200,6 @@ class FederationHandler:
|
||||
)
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def maybe_backfill(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
) -> bool:
|
||||
@@ -215,9 +214,6 @@ class FederationHandler:
|
||||
limit: The number of events that the pagination request will
|
||||
return. This is used as part of the heuristic to decide if we
|
||||
should back paginate.
|
||||
|
||||
Returns:
|
||||
True if we actually tried to backfill something, otherwise False.
|
||||
"""
|
||||
# Starting the processing time here so we can include the room backfill
|
||||
# linearizer lock queue in the timing
|
||||
@@ -231,8 +227,6 @@ class FederationHandler:
|
||||
processing_start_time=processing_start_time,
|
||||
)
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def _maybe_backfill_inner(
|
||||
self,
|
||||
room_id: str,
|
||||
@@ -253,9 +247,6 @@ class FederationHandler:
|
||||
limit: The max number of events to request from the remote federated server.
|
||||
processing_start_time: The time when `maybe_backfill` started processing.
|
||||
Only used for timing. If `None`, no timing observation will be made.
|
||||
|
||||
Returns:
|
||||
True if we actually tried to backfill something, otherwise False.
|
||||
"""
|
||||
backwards_extremities = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
|
||||
@@ -311,30 +302,15 @@ class FederationHandler:
|
||||
len(sorted_backfill_points),
|
||||
sorted_backfill_points,
|
||||
)
|
||||
set_tag(
|
||||
SynapseTags.RESULT_PREFIX + "sorted_backfill_points",
|
||||
str(sorted_backfill_points),
|
||||
)
|
||||
set_tag(
|
||||
SynapseTags.RESULT_PREFIX + "sorted_backfill_points.length",
|
||||
str(len(sorted_backfill_points)),
|
||||
)
|
||||
|
||||
# If we have no backfill points lower than the `current_depth` then either we
|
||||
# can a) bail or b) still attempt to backfill. We opt to try backfilling anyway
|
||||
# just in case we do get relevant events. This is good for eventual consistency
|
||||
# sake but we don't need to block the client for something that is just as
|
||||
# likely not to return anything relevant so we backfill in the background. The
|
||||
# only way, this could return something relevant is if we discover a new branch
|
||||
# of history that extends all the way back to where we are currently paginating
|
||||
# and it's within the 100 events that are returned from `/backfill`.
|
||||
# If we have no backfill points lower than the `current_depth` then
|
||||
# either we can a) bail or b) still attempt to backfill. We opt to try
|
||||
# backfilling anyway just in case we do get relevant events.
|
||||
if not sorted_backfill_points and current_depth != MAX_DEPTH:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
|
||||
)
|
||||
run_as_background_process(
|
||||
"_maybe_backfill_inner_anyway_with_max_depth",
|
||||
self._maybe_backfill_inner,
|
||||
return await self._maybe_backfill_inner(
|
||||
room_id=room_id,
|
||||
# We use `MAX_DEPTH` so that we find all backfill points next
|
||||
# time (all events are below the `MAX_DEPTH`)
|
||||
@@ -345,9 +321,6 @@ class FederationHandler:
|
||||
# overall otherwise the smaller one will throw off the results.
|
||||
processing_start_time=None,
|
||||
)
|
||||
# We return `False` because we're backfilling in the background and there is
|
||||
# no new events immediately for the caller to know about yet.
|
||||
return False
|
||||
|
||||
# Even after recursing with `MAX_DEPTH`, we didn't find any
|
||||
# backward extremities to backfill from.
|
||||
|
||||
@@ -40,11 +40,6 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# How many single event gaps we tolerate returning in a `/messages` response before we
|
||||
# backfill and try to fill in the history. This is an arbitrarily picked number so feel
|
||||
# free to tune it in the future.
|
||||
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class PurgeStatus:
|
||||
@@ -491,35 +486,35 @@ class PaginationHandler:
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
if (
|
||||
pagin_config.direction == Direction.BACKWARDS
|
||||
and not use_admin_priviledge
|
||||
and membership == Membership.LEAVE
|
||||
):
|
||||
# This is only None if the room is world_readable, in which case
|
||||
# "Membership.JOIN" would have been returned and we should never hit
|
||||
# this branch.
|
||||
assert member_event_id
|
||||
if not use_admin_priviledge and membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
|
||||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
assert leave_token.topological is not None
|
||||
# This is only None if the room is world_readable, in which
|
||||
# case "JOIN" would have been returned.
|
||||
assert member_event_id
|
||||
|
||||
if leave_token.topological < curr_topo:
|
||||
from_token = from_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, leave_token
|
||||
leave_token = await self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
assert leave_token.topological is not None
|
||||
|
||||
if leave_token.topological < curr_topo:
|
||||
from_token = from_token.copy_and_replace(
|
||||
StreamKeyType.ROOM, leave_token
|
||||
)
|
||||
|
||||
await self.hs.get_federation_handler().maybe_backfill(
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
|
||||
to_room_key = None
|
||||
if pagin_config.to_token:
|
||||
to_room_key = pagin_config.to_token.room_key
|
||||
|
||||
# Initially fetch the events from the database. With any luck, we can return
|
||||
# these without blocking on backfill (handled below).
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
@@ -529,94 +524,6 @@ class PaginationHandler:
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
if pagin_config.direction == Direction.BACKWARDS:
|
||||
# We use a `Set` because there can be multiple events at a given depth
|
||||
# and we only care about looking at the unique continum of depths to
|
||||
# find gaps.
|
||||
event_depths: Set[int] = {event.depth for event in events}
|
||||
sorted_event_depths = sorted(event_depths)
|
||||
|
||||
# Inspect the depths of the returned events to see if there are any gaps
|
||||
found_big_gap = False
|
||||
number_of_gaps = 0
|
||||
previous_event_depth = (
|
||||
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
|
||||
)
|
||||
for event_depth in sorted_event_depths:
|
||||
# We don't expect a negative depth but we'll just deal with it in
|
||||
# any case by taking the absolute value to get the true gap between
|
||||
# any two integers.
|
||||
depth_gap = abs(event_depth - previous_event_depth)
|
||||
# A `depth_gap` of 1 is a normal continuous chain to the next event
|
||||
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
|
||||
# also possible there is no event at a given depth but we can't ever
|
||||
# know that for sure)
|
||||
if depth_gap > 1:
|
||||
number_of_gaps += 1
|
||||
|
||||
# We only tolerate a small number single-event long gaps in the
|
||||
# returned events because those are most likely just events we've
|
||||
# failed to pull in the past. Anything longer than that is probably
|
||||
# a sign that we're missing a decent chunk of history and we should
|
||||
# try to backfill it.
|
||||
#
|
||||
# XXX: It's possible we could tolerate longer gaps if we checked
|
||||
# that a given events `prev_events` is one that has failed pull
|
||||
# attempts and we could just treat it like a dead branch of history
|
||||
# for now or at least something that we don't need the block the
|
||||
# client on to try pulling.
|
||||
#
|
||||
# XXX: If we had something like MSC3871 to indicate gaps in the
|
||||
# timeline to the client, we could also get away with any sized gap
|
||||
# and just have the client refetch the holes as they see fit.
|
||||
if depth_gap > 2:
|
||||
found_big_gap = True
|
||||
break
|
||||
previous_event_depth = event_depth
|
||||
|
||||
# Backfill in the foreground if we found a big gap, have too many holes,
|
||||
# or we don't have enough events to fill the limit that the client asked
|
||||
# for.
|
||||
missing_too_many_events = (
|
||||
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
|
||||
)
|
||||
not_enough_events_to_fill_response = len(events) < pagin_config.limit
|
||||
if (
|
||||
found_big_gap
|
||||
or missing_too_many_events
|
||||
or not_enough_events_to_fill_response
|
||||
):
|
||||
did_backfill = (
|
||||
await self.hs.get_federation_handler().maybe_backfill(
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
)
|
||||
|
||||
# If we did backfill something, refetch the events from the database to
|
||||
# catch anything new that might have been added since we last fetched.
|
||||
if did_backfill:
|
||||
events, next_key = await self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=from_token.room_key,
|
||||
to_key=to_room_key,
|
||||
direction=pagin_config.direction,
|
||||
limit=pagin_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
else:
|
||||
# Otherwise, we can backfill in the background for eventual
|
||||
# consistency's sake but we don't need to block the client waiting
|
||||
# for a costly federation call and processing.
|
||||
run_as_background_process(
|
||||
"maybe_backfill_in_the_background",
|
||||
self.hs.get_federation_handler().maybe_backfill,
|
||||
room_id,
|
||||
curr_topo,
|
||||
limit=pagin_config.limit,
|
||||
)
|
||||
|
||||
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
|
||||
|
||||
# if no events are returned from pagination, that implies
|
||||
|
||||
@@ -648,6 +648,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
self.wheel_timer: WheelTimer[str] = WheelTimer()
|
||||
self.notifier = hs.get_notifier()
|
||||
self._presence_enabled = hs.config.server.use_presence
|
||||
|
||||
@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class ReadMarkerHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.config.server.server_name
|
||||
self.store = hs.get_datastores().main
|
||||
self.account_data_handler = hs.get_account_data_handler()
|
||||
self.read_marker_linearizer = Linearizer(name="read_marker")
|
||||
|
||||
@@ -872,8 +872,6 @@ class RoomCreationHandler:
|
||||
visibility = config.get("visibility", "private")
|
||||
is_public = visibility == "public"
|
||||
|
||||
self._validate_room_config(config, visibility)
|
||||
|
||||
room_id = await self._generate_and_create_room_id(
|
||||
creator_id=user_id,
|
||||
is_public=is_public,
|
||||
@@ -1113,7 +1111,20 @@ class RoomCreationHandler:
|
||||
|
||||
return new_event, new_unpersisted_context
|
||||
|
||||
preset_config, config = self._room_preset_config(room_config)
|
||||
visibility = room_config.get("visibility", "private")
|
||||
preset_config = room_config.get(
|
||||
"preset",
|
||||
RoomCreationPreset.PRIVATE_CHAT
|
||||
if visibility == "private"
|
||||
else RoomCreationPreset.PUBLIC_CHAT,
|
||||
)
|
||||
|
||||
try:
|
||||
config = self._presets_dict[preset_config]
|
||||
except KeyError:
|
||||
raise SynapseError(
|
||||
400, f"'{preset_config}' is not a valid preset", errcode=Codes.BAD_JSON
|
||||
)
|
||||
|
||||
# MSC2175 removes the creator field from the create event.
|
||||
if not room_version.msc2175_implicit_room_creator:
|
||||
@@ -1295,65 +1306,6 @@ class RoomCreationHandler:
|
||||
assert last_event.internal_metadata.stream_ordering is not None
|
||||
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
|
||||
|
||||
def _validate_room_config(
|
||||
self,
|
||||
config: JsonDict,
|
||||
visibility: str,
|
||||
) -> None:
|
||||
"""Checks configuration parameters for a /createRoom request.
|
||||
|
||||
If validation detects invalid parameters an exception may be raised to
|
||||
cause room creation to be aborted and an error response to be returned
|
||||
to the client.
|
||||
|
||||
Args:
|
||||
config: A dict of configuration options. Originally from the body of
|
||||
the /createRoom request
|
||||
visibility: One of "public" or "private"
|
||||
"""
|
||||
|
||||
# Validate the requested preset, raise a 400 error if not valid
|
||||
preset_name, preset_config = self._room_preset_config(config)
|
||||
|
||||
# If the user is trying to create an encrypted room and this is forbidden
|
||||
# by the configured default_power_level_content_override, then reject the
|
||||
# request before the room is created.
|
||||
raw_initial_state = config.get("initial_state", [])
|
||||
room_encryption_event = any(
|
||||
s.get("type", "") == EventTypes.RoomEncryption for s in raw_initial_state
|
||||
)
|
||||
|
||||
if preset_config["encrypted"] or room_encryption_event:
|
||||
if self._default_power_level_content_override:
|
||||
override = self._default_power_level_content_override.get(preset_name)
|
||||
if override is not None:
|
||||
event_levels = override.get("events", {})
|
||||
room_admin_level = event_levels.get(EventTypes.PowerLevels, 100)
|
||||
encryption_level = event_levels.get(EventTypes.RoomEncryption, 100)
|
||||
if encryption_level > room_admin_level:
|
||||
raise SynapseError(
|
||||
403,
|
||||
f"You cannot create an encrypted room. user_level ({room_admin_level}) < send_level ({encryption_level})",
|
||||
)
|
||||
|
||||
def _room_preset_config(self, room_config: JsonDict) -> Tuple[str, dict]:
|
||||
# The spec says rooms should default to private visibility if
|
||||
# `visibility` is not specified.
|
||||
visibility = room_config.get("visibility", "private")
|
||||
preset_name = room_config.get(
|
||||
"preset",
|
||||
RoomCreationPreset.PRIVATE_CHAT
|
||||
if visibility == "private"
|
||||
else RoomCreationPreset.PUBLIC_CHAT,
|
||||
)
|
||||
try:
|
||||
preset_config = self._presets_dict[preset_name]
|
||||
except KeyError:
|
||||
raise SynapseError(
|
||||
400, f"'{preset_name}' is not a valid preset", errcode=Codes.BAD_JSON
|
||||
)
|
||||
return preset_name, preset_config
|
||||
|
||||
def _generate_room_id(self) -> str:
|
||||
"""Generates a random room ID.
|
||||
|
||||
@@ -1538,6 +1490,7 @@ class RoomContextHandler:
|
||||
|
||||
class TimestampLookupHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.server_name = hs.hostname
|
||||
self.store = hs.get_datastores().main
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.federation_client = hs.get_federation_client()
|
||||
|
||||
@@ -42,6 +42,7 @@ class StatsHandler:
|
||||
self.store = hs.get_datastores().main
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self.state = hs.get_state_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
@@ -95,6 +95,8 @@ incoming_responses_counter = Counter(
|
||||
)
|
||||
|
||||
|
||||
MAX_LONG_RETRIES = 10
|
||||
MAX_SHORT_RETRIES = 3
|
||||
MAXINT = sys.maxsize
|
||||
|
||||
|
||||
@@ -404,12 +406,7 @@ class MatrixFederationHttpClient:
|
||||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastores().main
|
||||
self.version_string_bytes = hs.version_string.encode("ascii")
|
||||
self.default_timeout = hs.config.federation.client_timeout / 1000
|
||||
|
||||
self.max_long_retry_delay = hs.config.federation.max_long_retry_delay / 1000
|
||||
self.max_short_retry_delay = hs.config.federation.max_short_retry_delay / 1000
|
||||
self.max_long_retries = hs.config.federation.max_long_retries
|
||||
self.max_short_retries = hs.config.federation.max_short_retries
|
||||
self.default_timeout = 60
|
||||
|
||||
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
|
||||
|
||||
@@ -502,15 +499,8 @@ class MatrixFederationHttpClient:
|
||||
Note that the above intervals are *in addition* to the time spent
|
||||
waiting for the request to complete (up to `timeout` ms).
|
||||
|
||||
NB: the long retry algorithm takes over 20 minutes to complete, with a
|
||||
default timeout of 60s! It's best not to use the `long_retries` option
|
||||
for something that is blocking a client so we don't make them wait for
|
||||
aaaaages, whereas some things like sending transactions (server to
|
||||
server) we can be a lot more lenient but its very fuzzy / hand-wavey.
|
||||
|
||||
In the future, we could be more intelligent about doing this sort of
|
||||
thing by looking at things with the bigger picture in mind,
|
||||
https://github.com/matrix-org/synapse/issues/8917
|
||||
NB: the long retry algorithm takes over 20 minutes to complete, with
|
||||
a default timeout of 60s!
|
||||
|
||||
ignore_backoff: true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
@@ -538,10 +528,10 @@ class MatrixFederationHttpClient:
|
||||
logger.exception(f"Invalid destination: {request.destination}.")
|
||||
raise FederationDeniedError(request.destination)
|
||||
|
||||
if timeout is None:
|
||||
timeout = int(self.default_timeout)
|
||||
|
||||
_sec_timeout = timeout / 1000
|
||||
if timeout:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
if (
|
||||
self.hs.config.federation.federation_domain_whitelist is not None
|
||||
@@ -586,9 +576,9 @@ class MatrixFederationHttpClient:
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
# (once we have reliable transactions in place)
|
||||
if long_retries:
|
||||
retries_left = self.max_long_retries
|
||||
retries_left = MAX_LONG_RETRIES
|
||||
else:
|
||||
retries_left = self.max_short_retries
|
||||
retries_left = MAX_SHORT_RETRIES
|
||||
|
||||
url_bytes = request.uri
|
||||
url_str = url_bytes.decode("ascii")
|
||||
@@ -733,12 +723,12 @@ class MatrixFederationHttpClient:
|
||||
|
||||
if retries_left and not timeout:
|
||||
if long_retries:
|
||||
delay = 4 ** (self.max_long_retries + 1 - retries_left)
|
||||
delay = min(delay, self.max_long_retry_delay)
|
||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
||||
delay = min(delay, 60)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
else:
|
||||
delay = 0.5 * 2 ** (self.max_short_retries - retries_left)
|
||||
delay = min(delay, self.max_short_retry_delay)
|
||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
||||
delay = min(delay, 2)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
|
||||
logger.debug(
|
||||
@@ -946,9 +936,10 @@ class MatrixFederationHttpClient:
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
if timeout is None:
|
||||
timeout = int(self.default_timeout)
|
||||
_sec_timeout = timeout / 1000
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
if parser is None:
|
||||
parser = cast(ByteParser[T], JsonParser())
|
||||
@@ -1134,9 +1125,10 @@ class MatrixFederationHttpClient:
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
if timeout is None:
|
||||
timeout = int(self.default_timeout)
|
||||
_sec_timeout = timeout / 1000
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
if parser is None:
|
||||
parser = cast(ByteParser[T], JsonParser())
|
||||
@@ -1209,9 +1201,10 @@ class MatrixFederationHttpClient:
|
||||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
||||
if timeout is None:
|
||||
timeout = int(self.default_timeout)
|
||||
_sec_timeout = timeout / 1000
|
||||
if timeout is not None:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
body = await _handle_response(
|
||||
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
||||
|
||||
@@ -76,7 +76,7 @@ class ReplicationEndpointFactory:
|
||||
endpoint = wrapClientTLS(
|
||||
# The 'port' argument below isn't actually used by the function
|
||||
self.context_factory.creatorForNetloc(
|
||||
self.instance_map[worker_name].host.encode("utf-8"),
|
||||
self.instance_map[worker_name].host,
|
||||
self.instance_map[worker_name].port,
|
||||
),
|
||||
endpoint,
|
||||
|
||||
@@ -171,7 +171,6 @@ from functools import wraps
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Collection,
|
||||
ContextManager,
|
||||
@@ -904,7 +903,6 @@ def _custom_sync_async_decorator(
|
||||
"""
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
# For this branch, we handle async functions like `async def func() -> RInner`.
|
||||
# In this branch, R = Awaitable[RInner], for some other type RInner
|
||||
@wraps(func)
|
||||
async def _wrapper(
|
||||
@@ -916,16 +914,15 @@ def _custom_sync_async_decorator(
|
||||
return await func(*args, **kwargs) # type: ignore[misc]
|
||||
|
||||
else:
|
||||
# The other case here handles sync functions including those decorated with
|
||||
# `@defer.inlineCallbacks` or that return a `Deferred` or other `Awaitable`.
|
||||
# The other case here handles both sync functions and those
|
||||
# decorated with inlineDeferred.
|
||||
@wraps(func)
|
||||
def _wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
|
||||
def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
|
||||
scope = wrapping_logic(func, *args, **kwargs)
|
||||
scope.__enter__()
|
||||
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
if isinstance(result, defer.Deferred):
|
||||
|
||||
def call_back(result: R) -> R:
|
||||
@@ -933,32 +930,20 @@ def _custom_sync_async_decorator(
|
||||
return result
|
||||
|
||||
def err_back(result: R) -> R:
|
||||
# TODO: Pass the error details into `scope.__exit__(...)` for
|
||||
# consistency with the other paths.
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
result.addCallbacks(call_back, err_back)
|
||||
|
||||
elif inspect.isawaitable(result):
|
||||
|
||||
async def wrap_awaitable() -> Any:
|
||||
try:
|
||||
assert isinstance(result, Awaitable)
|
||||
awaited_result = await result
|
||||
scope.__exit__(None, None, None)
|
||||
return awaited_result
|
||||
except Exception as e:
|
||||
scope.__exit__(type(e), None, e.__traceback__)
|
||||
raise
|
||||
|
||||
# The original method returned an awaitable, eg. a coroutine, so we
|
||||
# create another awaitable wrapping it that calls
|
||||
# `scope.__exit__(...)`.
|
||||
return wrap_awaitable()
|
||||
else:
|
||||
# Just a simple sync function so we can just exit the scope and
|
||||
# return the result without any fuss.
|
||||
if inspect.isawaitable(result):
|
||||
logger.error(
|
||||
"@trace may not have wrapped %s correctly! "
|
||||
"The function is not async but returned a %s.",
|
||||
func.__qualname__,
|
||||
type(result).__name__,
|
||||
)
|
||||
|
||||
scope.__exit__(None, None, None)
|
||||
|
||||
return result
|
||||
|
||||
@@ -77,8 +77,6 @@ RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
|
||||
|
||||
@attr.s(slots=True, hash=True, auto_attribs=True)
|
||||
class LaterGauge(Collector):
|
||||
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
|
||||
|
||||
name: str
|
||||
desc: str
|
||||
labels: Optional[Sequence[str]] = attr.ib(hash=False)
|
||||
|
||||
@@ -120,6 +120,9 @@ class BulkPushRuleEvaluator:
|
||||
self.should_calculate_push_rules = self.hs.config.push.enable_push
|
||||
|
||||
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
|
||||
self._intentional_mentions_enabled = (
|
||||
self.hs.config.experimental.msc3952_intentional_mentions
|
||||
)
|
||||
|
||||
self.room_push_rule_cache_metrics = register_cache(
|
||||
"cache",
|
||||
@@ -387,7 +390,10 @@ class BulkPushRuleEvaluator:
|
||||
del notification_levels[key]
|
||||
|
||||
# Pull out any user and room mentions.
|
||||
has_mentions = EventContentFields.MENTIONS in event.content
|
||||
has_mentions = (
|
||||
self._intentional_mentions_enabled
|
||||
and EventContentFields.MSC3952_MENTIONS in event.content
|
||||
)
|
||||
|
||||
evaluator = PushRuleEvaluator(
|
||||
_flatten_dict(event),
|
||||
|
||||
@@ -124,6 +124,8 @@ class VersionsRestServlet(RestServlet):
|
||||
is not None,
|
||||
# Adds support for relation-based redactions as per MSC3912.
|
||||
"org.matrix.msc3912": self.config.experimental.msc3912_enabled,
|
||||
# Adds support for unstable "intentional mentions" behaviour.
|
||||
"org.matrix.msc3952_intentional_mentions": self.config.experimental.msc3952_intentional_mentions,
|
||||
# Whether recursively provide relations is supported.
|
||||
"org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations,
|
||||
# Adds support for deleting account data.
|
||||
|
||||
@@ -39,6 +39,7 @@ class UploadResource(DirectServeJsonResource):
|
||||
self.filepaths = media_repo.filepaths
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.server_name = hs.hostname
|
||||
self.auth = hs.get_auth()
|
||||
self.max_upload_size = hs.config.media.max_upload_size
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
@@ -86,14 +86,9 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
room_id: Room where state changed
|
||||
members_changed: The user_ids of members that have changed
|
||||
"""
|
||||
|
||||
# XXX: If you add something to this function make sure you add it to
|
||||
# `_invalidate_state_caches_all` as well.
|
||||
|
||||
# If there were any membership changes, purge the appropriate caches.
|
||||
for host in {get_domain_from_id(u) for u in members_changed}:
|
||||
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
|
||||
self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
|
||||
if members_changed:
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
|
||||
@@ -122,32 +117,6 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
|
||||
def _invalidate_state_caches_all(self, room_id: str) -> None:
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
not stream invalidations down replication.
|
||||
|
||||
Same as `_invalidate_state_caches`, except that works when we don't know
|
||||
which memberships have changed.
|
||||
|
||||
Args:
|
||||
room_id: Room where state changed
|
||||
"""
|
||||
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("is_host_invited", None)
|
||||
self._attempt_to_invalidate_cache("is_host_joined", None)
|
||||
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
|
||||
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||
|
||||
def _attempt_to_invalidate_cache(
|
||||
self, cache_name: str, key: Optional[Collection[Any]]
|
||||
) -> bool:
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from enum import IntEnum
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -137,15 +136,6 @@ class BackgroundUpdatePerformance:
|
||||
return float(self.total_item_count) / float(self.total_duration_ms)
|
||||
|
||||
|
||||
class UpdaterStatus(IntEnum):
|
||||
# Use negative values for error conditions.
|
||||
ABORTED = -1
|
||||
DISABLED = 0
|
||||
NOT_STARTED = 1
|
||||
RUNNING_UPDATE = 2
|
||||
COMPLETE = 3
|
||||
|
||||
|
||||
class BackgroundUpdater:
|
||||
"""Background updates are updates to the database that run in the
|
||||
background. Each update processes a batch of data at once. We attempt to
|
||||
@@ -168,16 +158,11 @@ class BackgroundUpdater:
|
||||
|
||||
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
|
||||
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
|
||||
# TODO: all these bool flags make me feel icky---can we combine into a status
|
||||
# enum?
|
||||
self._all_done = False
|
||||
|
||||
# Whether we're currently running updates
|
||||
self._running = False
|
||||
|
||||
# Marker to be set if we abort and halt all background updates.
|
||||
self._aborted = False
|
||||
|
||||
# Whether background updates are enabled. This allows us to
|
||||
# enable/disable background updates via the admin API.
|
||||
self.enabled = True
|
||||
@@ -190,20 +175,6 @@ class BackgroundUpdater:
|
||||
self.sleep_duration_ms = hs.config.background_updates.sleep_duration_ms
|
||||
self.sleep_enabled = hs.config.background_updates.sleep_enabled
|
||||
|
||||
def get_status(self) -> UpdaterStatus:
|
||||
"""An integer summarising the updater status. Used as a metric."""
|
||||
if self._aborted:
|
||||
return UpdaterStatus.ABORTED
|
||||
# TODO: a status for "have seen at least one failure, but haven't aborted yet".
|
||||
if not self.enabled:
|
||||
return UpdaterStatus.DISABLED
|
||||
|
||||
if self._all_done:
|
||||
return UpdaterStatus.COMPLETE
|
||||
if self._running:
|
||||
return UpdaterStatus.RUNNING_UPDATE
|
||||
return UpdaterStatus.NOT_STARTED
|
||||
|
||||
def register_update_controller_callbacks(
|
||||
self,
|
||||
on_update: ON_UPDATE_CALLBACK,
|
||||
@@ -325,7 +296,6 @@ class BackgroundUpdater:
|
||||
except Exception:
|
||||
back_to_back_failures += 1
|
||||
if back_to_back_failures >= 5:
|
||||
self._aborted = True
|
||||
raise RuntimeError(
|
||||
"5 back-to-back background update failures; aborting."
|
||||
)
|
||||
|
||||
@@ -839,8 +839,9 @@ class EventsPersistenceStorageController:
|
||||
"group" % (ev.event_id,)
|
||||
)
|
||||
continue
|
||||
if ctx.state_group_deltas:
|
||||
state_group_deltas.update(ctx.state_group_deltas)
|
||||
|
||||
if ctx.prev_group:
|
||||
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
|
||||
|
||||
# We need to map the event_ids to their state groups. First, let's
|
||||
# check if the event is one we're persisting, in which case we can
|
||||
|
||||
@@ -54,7 +54,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.metrics import LaterGauge, register_threadpool
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
@@ -547,12 +547,6 @@ class DatabasePool:
|
||||
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
|
||||
|
||||
self.updates = BackgroundUpdater(hs, self)
|
||||
LaterGauge(
|
||||
"synapse_background_update_status",
|
||||
"Background update status",
|
||||
[],
|
||||
self.updates.get_status,
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
self._current_txn_total_time = 0.0
|
||||
|
||||
@@ -46,12 +46,6 @@ logger = logging.getLogger(__name__)
|
||||
# based on the current state when notifying workers over replication.
|
||||
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
|
||||
|
||||
# As above, but for invalidating event caches on history deletion
|
||||
PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
|
||||
|
||||
# As above, but for invalidating room caches on room deletion
|
||||
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
|
||||
|
||||
|
||||
class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
def __init__(
|
||||
@@ -181,23 +175,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
room_id = row.keys[0]
|
||||
members_changed = set(row.keys[1:])
|
||||
self._invalidate_state_caches(room_id, members_changed)
|
||||
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
|
||||
if row.keys is None:
|
||||
raise Exception(
|
||||
"Can't send an 'invalidate all' for 'purge history' cache"
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
elif row.cache_func == DELETE_ROOM_CACHE_NAME:
|
||||
if row.keys is None:
|
||||
raise Exception(
|
||||
"Can't send an 'invalidate all' for 'delete room' cache"
|
||||
)
|
||||
|
||||
room_id = row.keys[0]
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
self._invalidate_caches_for_room(room_id)
|
||||
else:
|
||||
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
|
||||
|
||||
@@ -249,9 +226,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
relates_to: Optional[str],
|
||||
backfilled: bool,
|
||||
) -> None:
|
||||
# XXX: If you add something to this function make sure you add it to
|
||||
# `_invalidate_caches_for_room_events` as well.
|
||||
|
||||
# This invalidates any local in-memory cached event objects, the original
|
||||
# process triggering the invalidation is responsible for clearing any external
|
||||
# cached objects.
|
||||
@@ -297,106 +271,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
|
||||
def _invalidate_caches_for_room_events_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
"""Invalidate caches associated with events in a room, and stream to
|
||||
replication.
|
||||
|
||||
Used when we delete events a room, but don't know which events we've
|
||||
deleted.
|
||||
"""
|
||||
|
||||
self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
|
||||
txn.call_after(self._invalidate_caches_for_room_events, room_id)
|
||||
|
||||
def _invalidate_caches_for_room_events(self, room_id: str) -> None:
|
||||
"""Invalidate caches associated with events in a room, and stream to
|
||||
replication.
|
||||
|
||||
Used when we delete events in a room, but don't know which events we've
|
||||
deleted.
|
||||
"""
|
||||
|
||||
self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
|
||||
|
||||
self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_relations_for_event", None)
|
||||
self._attempt_to_invalidate_cache("get_applicable_edit", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_rooms_for_user_with_stream_ordering", None
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("get_references_for_event", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_summary", None)
|
||||
self._attempt_to_invalidate_cache("get_thread_participated", None)
|
||||
self._attempt_to_invalidate_cache("get_threads", (room_id,))
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
|
||||
|
||||
self._attempt_to_invalidate_cache("get_event_ordering", None)
|
||||
self._attempt_to_invalidate_cache("is_partial_state_event", None)
|
||||
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
|
||||
|
||||
def _invalidate_caches_for_room_and_stream(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
"""Invalidate caches associated with rooms, and stream to replication.
|
||||
|
||||
Used when we delete rooms.
|
||||
"""
|
||||
|
||||
self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
|
||||
txn.call_after(self._invalidate_caches_for_room, room_id)
|
||||
|
||||
def _invalidate_caches_for_room(self, room_id: str) -> None:
|
||||
"""Invalidate caches associated with rooms.
|
||||
|
||||
Used when we delete rooms.
|
||||
"""
|
||||
|
||||
# If we've deleted the room then we also need to purge all event caches.
|
||||
self._invalidate_caches_for_room_events(room_id)
|
||||
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
|
||||
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
|
||||
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_linearized_receipts_for_room", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache(
|
||||
"_get_partial_state_servers_at_join", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
|
||||
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
|
||||
self._attempt_to_invalidate_cache(
|
||||
"get_current_hosts_in_room_ordered", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("did_forget", None)
|
||||
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
|
||||
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
|
||||
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
|
||||
|
||||
# And delete state caches.
|
||||
|
||||
self._invalidate_state_caches_all(room_id)
|
||||
|
||||
async def invalidate_cache_and_stream(
|
||||
self, cache_name: str, keys: Tuple[Any, ...]
|
||||
) -> None:
|
||||
@@ -503,14 +377,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||
"Can't stream invalidate all with magic current state cache"
|
||||
)
|
||||
|
||||
if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
|
||||
raise Exception(
|
||||
"Can't stream invalidate all with magic purge history cache"
|
||||
)
|
||||
|
||||
if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
|
||||
raise Exception("Can't stream invalidate all with magic delete room cache")
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
assert self._cache_id_gen is not None
|
||||
|
||||
|
||||
@@ -903,15 +903,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
self._event_ref.pop(event_id, None)
|
||||
self._current_event_fetches.pop(event_id, None)
|
||||
|
||||
def _invalidate_local_get_event_cache_all(self) -> None:
|
||||
"""Clears the in-memory get event caches.
|
||||
|
||||
Used when we purge room history.
|
||||
"""
|
||||
self._get_event_cache.clear()
|
||||
self._event_ref.clear()
|
||||
self._current_event_fetches.clear()
|
||||
|
||||
async def _get_events_from_cache(
|
||||
self, events: Iterable[str], update_metrics: bool = True
|
||||
) -> Dict[str, EventCacheEntry]:
|
||||
|
||||
@@ -308,8 +308,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
self._invalidate_caches_for_room_events_and_stream(txn, room_id)
|
||||
|
||||
return referenced_state_groups
|
||||
|
||||
async def purge_room(self, room_id: str) -> List[int]:
|
||||
@@ -487,6 +485,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
# index on them. In any case we should be clearing out 'stream' tables
|
||||
# periodically anyway (#5888)
|
||||
|
||||
self._invalidate_caches_for_room_and_stream(txn, room_id)
|
||||
# TODO: we could probably usefully do a bunch more cache invalidation here
|
||||
|
||||
# XXX: as with purge_history, this is racy, but no worse than other races
|
||||
# that already exist.
|
||||
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
|
||||
|
||||
return state_groups
|
||||
|
||||
@@ -88,6 +88,7 @@ def _load_rules(
|
||||
msc1767_enabled=experimental_config.msc1767_enabled,
|
||||
msc3664_enabled=experimental_config.msc3664_enabled,
|
||||
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
|
||||
msc3952_intentional_mentions=experimental_config.msc3952_intentional_mentions,
|
||||
msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs,
|
||||
)
|
||||
|
||||
|
||||
@@ -927,10 +927,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
raise Exception("Invalid host name")
|
||||
|
||||
sql = """
|
||||
SELECT state_key FROM current_state_events
|
||||
WHERE membership = ?
|
||||
SELECT state_key FROM current_state_events AS c
|
||||
INNER JOIN room_memberships AS m USING (event_id)
|
||||
WHERE m.membership = ?
|
||||
AND type = 'm.room.member'
|
||||
AND room_id = ?
|
||||
AND c.room_id = ?
|
||||
AND state_key LIKE ?
|
||||
LIMIT 1
|
||||
"""
|
||||
@@ -1460,6 +1461,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
|
||||
SELECT stream_ordering, event_id, events.room_id, event_json.json
|
||||
FROM events
|
||||
INNER JOIN event_json USING (event_id)
|
||||
INNER JOIN room_memberships USING (event_id)
|
||||
WHERE ? <= stream_ordering AND stream_ordering < ?
|
||||
AND type = 'm.room.member'
|
||||
ORDER BY stream_ordering DESC
|
||||
|
||||
@@ -1061,15 +1061,12 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
|
||||
# The array of numbers are the weights for the various part of the
|
||||
# search: (domain, _, display name, localpart)
|
||||
sql = """
|
||||
WITH matching_users AS (
|
||||
SELECT user_id, vector FROM user_directory_search WHERE vector @@ to_tsquery('simple', ?)
|
||||
LIMIT 10000
|
||||
)
|
||||
SELECT d.user_id AS user_id, display_name, avatar_url
|
||||
FROM matching_users as t
|
||||
FROM user_directory_search as t
|
||||
INNER JOIN user_directory AS d USING (user_id)
|
||||
WHERE
|
||||
%(where_clause)s
|
||||
AND vector @@ to_tsquery('simple', ?)
|
||||
ORDER BY
|
||||
(CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
|
||||
* (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
|
||||
@@ -1098,9 +1095,8 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
|
||||
"order_case_statements": " ".join(additional_ordering_statements),
|
||||
}
|
||||
args = (
|
||||
(full_query,)
|
||||
+ join_args
|
||||
+ (exact_query, prefix_query)
|
||||
join_args
|
||||
+ (full_query, exact_query, prefix_query)
|
||||
+ ordering_arguments
|
||||
+ (limit + 1,)
|
||||
)
|
||||
|
||||
@@ -22,7 +22,7 @@ from synapse.storage.database import (
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches import intern_string
|
||||
@@ -328,6 +328,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||
columns=["event_stream_ordering"],
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"add_event_stream_ordering",
|
||||
self._add_event_stream_ordering,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
"add_stream_ordering_triggers", self._add_triggers_in_bg
|
||||
)
|
||||
|
||||
async def _background_deduplicate_state(
|
||||
self, progress: dict, batch_size: int
|
||||
) -> int:
|
||||
@@ -504,3 +513,175 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
|
||||
)
|
||||
|
||||
return 1
|
||||
|
||||
async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int:
|
||||
"""
|
||||
Add denormalised copies of `stream_ordering` from the corresponding row in `events`
|
||||
to the tables current_state_events, local_current_membership, and room_memberships.
|
||||
This is done to improve database performance by reduring JOINs.
|
||||
|
||||
"""
|
||||
tables = [
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
]
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
|
||||
def check_pg_column(txn: LoggingTransaction, table: str) -> list:
|
||||
"""
|
||||
check if the column event_stream_ordering already exists
|
||||
"""
|
||||
check_sql = f"""
|
||||
SELECT column_name FROM information_schema.columns
|
||||
WHERE table_name = '{table}' and column_name = 'event_stream_ordering';
|
||||
"""
|
||||
txn.execute(check_sql)
|
||||
column = txn.fetchall()
|
||||
return column
|
||||
|
||||
def add_pg_column(txn: LoggingTransaction, table: str) -> None:
|
||||
"""
|
||||
Add column event_stream_ordering to A given table
|
||||
"""
|
||||
add_column_sql = f"""
|
||||
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT;
|
||||
"""
|
||||
txn.execute(add_column_sql)
|
||||
|
||||
add_fk_sql = f"""
|
||||
ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey
|
||||
FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
|
||||
"""
|
||||
txn.execute(add_fk_sql)
|
||||
|
||||
for table in tables:
|
||||
res = await self.db_pool.runInteraction(
|
||||
"check_column", check_pg_column, table
|
||||
)
|
||||
# if the column exists do nothing
|
||||
if not res:
|
||||
await self.db_pool.runInteraction(
|
||||
"add_event_stream_ordering",
|
||||
add_pg_column,
|
||||
table,
|
||||
)
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"add_event_stream_ordering"
|
||||
)
|
||||
return 1
|
||||
|
||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||
|
||||
def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]:
|
||||
"""
|
||||
Get table info (to see if column event_stream_ordering exists)
|
||||
"""
|
||||
check_sql = f"""
|
||||
PRAGMA table_info({table})
|
||||
"""
|
||||
txn.execute(check_sql)
|
||||
res = txn.fetchall()
|
||||
return res
|
||||
|
||||
def add_sqlite_column(txn: LoggingTransaction, table: str) -> None:
|
||||
"""
|
||||
Add column event_stream_ordering to given table
|
||||
"""
|
||||
add_column_sql = f"""
|
||||
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
"""
|
||||
txn.execute(add_column_sql)
|
||||
|
||||
for table in tables:
|
||||
res = await self.db_pool.runInteraction(
|
||||
"check_column", check_sqlite_column, table
|
||||
)
|
||||
columns = [tup[1] for tup in res]
|
||||
|
||||
# if the column exists do nothing
|
||||
if "event_stream_ordering" not in columns:
|
||||
await self.db_pool.runInteraction(
|
||||
"add_event_stream_ordering", add_sqlite_column, table
|
||||
)
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"add_event_stream_ordering"
|
||||
)
|
||||
return 1
|
||||
|
||||
async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int:
|
||||
"""
|
||||
Adds triggers to the room membership tables to enforce consistency
|
||||
"""
|
||||
# Complain if the `event_stream_ordering` in membership tables doesn't match
|
||||
# the `stream_ordering` row with the same `event_id` in `events`.
|
||||
if isinstance(self.database_engine, Sqlite3Engine):
|
||||
|
||||
def add_sqlite_triggers(txn: LoggingTransaction) -> None:
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
txn.execute(
|
||||
f"""
|
||||
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
|
||||
BEFORE INSERT ON {table}
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
);
|
||||
END;
|
||||
"""
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"add_sqlite_triggers", add_sqlite_triggers
|
||||
)
|
||||
elif isinstance(self.database_engine, PostgresEngine):
|
||||
|
||||
def add_pg_triggers(txn: LoggingTransaction) -> None:
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
) THEN
|
||||
RAISE EXCEPTION 'Incorrect event_stream_ordering';
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
txn.execute(
|
||||
f"""
|
||||
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE check_event_stream_ordering()
|
||||
"""
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("add_postgres_triggers", add_pg_triggers)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
|
||||
await self.db_pool.updates._end_background_update(
|
||||
"add_stream_ordering_triggers"
|
||||
)
|
||||
return 1
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
|
||||
VALUES
|
||||
(7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column');
|
||||
@@ -1,29 +0,0 @@
|
||||
/* Copyright 2022 Beeper
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
|
||||
-- we use to improve database performance by reduring JOINs.
|
||||
|
||||
-- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables,
|
||||
-- which will be validated in a later migration. For all new/updated rows the FKEY will be checked.
|
||||
|
||||
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
|
||||
ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
|
||||
|
||||
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
|
||||
ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
|
||||
|
||||
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
|
||||
ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
|
||||
@@ -1,23 +0,0 @@
|
||||
/* Copyright 2022 Beeper
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
|
||||
-- we use to improve database performance by reduring JOINs.
|
||||
|
||||
-- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is
|
||||
|
||||
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
|
||||
@@ -1,79 +0,0 @@
|
||||
# Copyright 2022 Beeper
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
"""
|
||||
This migration adds triggers to the room membership tables to enforce consistency.
|
||||
Triggers cannot be expressed in .sql files, so we have to use a separate file.
|
||||
"""
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
|
||||
|
||||
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
|
||||
# Complain if the `event_stream_ordering` in membership tables doesn't match
|
||||
# the `stream_ordering` row with the same `event_id` in `events`.
|
||||
if isinstance(database_engine, Sqlite3Engine):
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
|
||||
BEFORE INSERT ON {table}
|
||||
FOR EACH ROW
|
||||
BEGIN
|
||||
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
);
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(database_engine, PostgresEngine):
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM events
|
||||
WHERE events.event_id = NEW.event_id
|
||||
AND events.stream_ordering != NEW.event_stream_ordering
|
||||
) THEN
|
||||
RAISE EXCEPTION 'Incorrect event_stream_ordering';
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"local_current_membership",
|
||||
"room_memberships",
|
||||
):
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE check_event_stream_ordering()
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
@@ -0,0 +1,22 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
-- This migration adds triggers to the room membership tables to enforce consistency.
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
|
||||
VALUES
|
||||
(7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering');
|
||||
@@ -21,27 +21,7 @@ DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_i
|
||||
-- Overwrite any null thread_id values.
|
||||
UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
|
||||
UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
|
||||
|
||||
-- Empirically we can end up with entries in the push summary table with both a
|
||||
-- `NULL` and `main` thread ID, which causes the insert below to fail. We fudge
|
||||
-- this by deleting any `NULL` rows that have a corresponding `main`.
|
||||
DELETE FROM event_push_summary AS a WHERE thread_id IS NULL AND EXISTS (
|
||||
SELECT 1 FROM event_push_summary AS b
|
||||
WHERE b.thread_id = 'main' AND a.user_id = b.user_id AND a.room_id = b.room_id
|
||||
);
|
||||
-- Copy the NULL threads to have a 'main' thread ID.
|
||||
--
|
||||
-- Note: Some people seem to have duplicate rows with a `NULL` thread ID, in
|
||||
-- which case we just fudge it with using MAX of the values. The counts *may* be
|
||||
-- wrong for such rooms, but a) its an edge case, and b) they'll be fixed when
|
||||
-- the user reads the room.
|
||||
INSERT INTO event_push_summary (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
|
||||
SELECT user_id, room_id, MAX(notif_count), MAX(stream_ordering), MAX(unread_count), MAX(last_receipt_stream_ordering), 'main'
|
||||
FROM event_push_summary
|
||||
WHERE thread_id IS NULL
|
||||
GROUP BY user_id, room_id, thread_id;
|
||||
|
||||
DELETE FROM event_push_summary AS a WHERE thread_id IS NULL;
|
||||
UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
|
||||
|
||||
-- Drop the background updates to calculate the indexes used to find null thread_ids.
|
||||
DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
|
||||
|
||||
@@ -13,8 +13,8 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json)
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
|
||||
VALUES
|
||||
(7714, 'current_state_events_stream_ordering_idx', '{}'),
|
||||
(7714, 'local_current_membership_stream_ordering_idx', '{}'),
|
||||
(7714, 'room_memberships_stream_ordering_idx', '{}');
|
||||
(7714, 'current_state_events_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
|
||||
(7714, 'local_current_membership_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
|
||||
(7714, 'room_memberships_stream_ordering_idx', '{}', 'add_event_stream_ordering');
|
||||
|
||||
@@ -862,5 +862,5 @@ class AsyncLruCache(Generic[KT, VT]):
|
||||
async def contains(self, key: KT) -> bool:
|
||||
return self._lru_cache.contains(key)
|
||||
|
||||
def clear(self) -> None:
|
||||
async def clear(self) -> None:
|
||||
self._lru_cache.clear()
|
||||
|
||||
@@ -101,7 +101,8 @@ class TestEventContext(unittest.HomeserverTestCase):
|
||||
self.assertEqual(
|
||||
context.state_group_before_event, d_context.state_group_before_event
|
||||
)
|
||||
self.assertEqual(context.state_group_deltas, d_context.state_group_deltas)
|
||||
self.assertEqual(context.prev_group, d_context.prev_group)
|
||||
self.assertEqual(context.delta_ids, d_context.delta_ids)
|
||||
self.assertEqual(context.app_service, d_context.app_service)
|
||||
|
||||
self.assertEqual(
|
||||
|
||||
@@ -163,7 +163,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
# Blow away caches (supported room versions can only change due to a restart).
|
||||
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
|
||||
self.store.get_rooms_for_user.invalidate_all()
|
||||
self.store._get_event_cache.clear()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
self.store._event_ref.clear()
|
||||
|
||||
# The rooms should be excluded from the sync response.
|
||||
|
||||
@@ -40,7 +40,7 @@ from synapse.server import HomeServer
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.server import FakeTransport
|
||||
from tests.unittest import HomeserverTestCase, override_config
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
def check_logcontext(context: LoggingContextOrSentinel) -> None:
|
||||
@@ -640,21 +640,3 @@ class FederationClientTests(HomeserverTestCase):
|
||||
self.cl.build_auth_headers(
|
||||
b"", b"GET", b"https://example.com", destination_is=b""
|
||||
)
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"federation": {
|
||||
"client_timeout": "180s",
|
||||
"max_long_retry_delay": "100s",
|
||||
"max_short_retry_delay": "7s",
|
||||
"max_long_retries": 20,
|
||||
"max_short_retries": 5,
|
||||
}
|
||||
}
|
||||
)
|
||||
def test_configurable_retry_and_delay_values(self) -> None:
|
||||
self.assertEqual(self.cl.default_timeout, 180)
|
||||
self.assertEqual(self.cl.max_long_retry_delay, 100)
|
||||
self.assertEqual(self.cl.max_short_retry_delay, 7)
|
||||
self.assertEqual(self.cl.max_long_retries, 20)
|
||||
self.assertEqual(self.cl.max_short_retries, 5)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Awaitable, cast
|
||||
from typing import cast
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.test.proto_helpers import MemoryReactorClock
|
||||
@@ -227,6 +227,8 @@ class LogContextScopeManagerTestCase(TestCase):
|
||||
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
|
||||
with functions that return deferreds
|
||||
"""
|
||||
reactor = MemoryReactorClock()
|
||||
|
||||
with LoggingContext("root context"):
|
||||
|
||||
@trace_with_opname("fixture_deferred_func", tracer=self._tracer)
|
||||
@@ -238,6 +240,9 @@ class LogContextScopeManagerTestCase(TestCase):
|
||||
|
||||
result_d1 = fixture_deferred_func()
|
||||
|
||||
# let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
|
||||
self.assertEqual(self.successResultOf(result_d1), "foo")
|
||||
|
||||
# the span should have been reported
|
||||
@@ -251,6 +256,8 @@ class LogContextScopeManagerTestCase(TestCase):
|
||||
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
|
||||
with async functions
|
||||
"""
|
||||
reactor = MemoryReactorClock()
|
||||
|
||||
with LoggingContext("root context"):
|
||||
|
||||
@trace_with_opname("fixture_async_func", tracer=self._tracer)
|
||||
@@ -260,6 +267,9 @@ class LogContextScopeManagerTestCase(TestCase):
|
||||
|
||||
d1 = defer.ensureDeferred(fixture_async_func())
|
||||
|
||||
# let the tasks complete
|
||||
reactor.pump((2,) * 8)
|
||||
|
||||
self.assertEqual(self.successResultOf(d1), "foo")
|
||||
|
||||
# the span should have been reported
|
||||
@@ -267,34 +277,3 @@ class LogContextScopeManagerTestCase(TestCase):
|
||||
[span.operation_name for span in self._reporter.get_spans()],
|
||||
["fixture_async_func"],
|
||||
)
|
||||
|
||||
def test_trace_decorator_awaitable_return(self) -> None:
|
||||
"""
|
||||
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
|
||||
with functions that return an awaitable (e.g. a coroutine)
|
||||
"""
|
||||
with LoggingContext("root context"):
|
||||
# Something we can return without `await` to get a coroutine
|
||||
async def fixture_async_func() -> str:
|
||||
return "foo"
|
||||
|
||||
# The actual kind of function we want to test that returns an awaitable
|
||||
@trace_with_opname("fixture_awaitable_return_func", tracer=self._tracer)
|
||||
@tag_args
|
||||
def fixture_awaitable_return_func() -> Awaitable[str]:
|
||||
return fixture_async_func()
|
||||
|
||||
# Something we can run with `defer.ensureDeferred(runner())` and pump the
|
||||
# whole async tasks through to completion.
|
||||
async def runner() -> str:
|
||||
return await fixture_awaitable_return_func()
|
||||
|
||||
d1 = defer.ensureDeferred(runner())
|
||||
|
||||
self.assertEqual(self.successResultOf(d1), "foo")
|
||||
|
||||
# the span should have been reported
|
||||
self.assertEqual(
|
||||
[span.operation_name for span in self._reporter.get_spans()],
|
||||
["fixture_awaitable_return_func"],
|
||||
)
|
||||
|
||||
@@ -228,6 +228,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
)
|
||||
return len(result) > 0
|
||||
|
||||
@override_config({"experimental_features": {"msc3952_intentional_mentions": True}})
|
||||
def test_user_mentions(self) -> None:
|
||||
"""Test the behavior of an event which includes invalid user mentions."""
|
||||
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
|
||||
@@ -236,7 +237,9 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.assertFalse(self._create_and_process(bulk_evaluator))
|
||||
# An empty mentions field should not notify.
|
||||
self.assertFalse(
|
||||
self._create_and_process(bulk_evaluator, {EventContentFields.MENTIONS: {}})
|
||||
self._create_and_process(
|
||||
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {}}
|
||||
)
|
||||
)
|
||||
|
||||
# Non-dict mentions should be ignored.
|
||||
@@ -250,7 +253,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
for mentions in (None, True, False, 1, "foo", []):
|
||||
self.assertFalse(
|
||||
self._create_and_process(
|
||||
bulk_evaluator, {EventContentFields.MENTIONS: mentions}
|
||||
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: mentions}
|
||||
)
|
||||
)
|
||||
|
||||
@@ -259,7 +262,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.assertFalse(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{EventContentFields.MENTIONS: {"user_ids": mentions}},
|
||||
{EventContentFields.MSC3952_MENTIONS: {"user_ids": mentions}},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -267,14 +270,14 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{EventContentFields.MENTIONS: {"user_ids": [self.alice]}},
|
||||
{EventContentFields.MSC3952_MENTIONS: {"user_ids": [self.alice]}},
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
EventContentFields.MENTIONS: {
|
||||
EventContentFields.MSC3952_MENTIONS: {
|
||||
"user_ids": ["@another:test", self.alice]
|
||||
}
|
||||
},
|
||||
@@ -285,7 +288,11 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{EventContentFields.MENTIONS: {"user_ids": [self.alice, self.alice]}},
|
||||
{
|
||||
EventContentFields.MSC3952_MENTIONS: {
|
||||
"user_ids": [self.alice, self.alice]
|
||||
}
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -300,7 +307,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
EventContentFields.MENTIONS: {
|
||||
EventContentFields.MSC3952_MENTIONS: {
|
||||
"user_ids": [None, True, False, {}, []]
|
||||
}
|
||||
},
|
||||
@@ -310,7 +317,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{
|
||||
EventContentFields.MENTIONS: {
|
||||
EventContentFields.MSC3952_MENTIONS: {
|
||||
"user_ids": [None, True, False, {}, [], self.alice]
|
||||
}
|
||||
},
|
||||
@@ -324,11 +331,12 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
{
|
||||
"body": self.alice,
|
||||
"msgtype": "m.text",
|
||||
EventContentFields.MENTIONS: {},
|
||||
EventContentFields.MSC3952_MENTIONS: {},
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
@override_config({"experimental_features": {"msc3952_intentional_mentions": True}})
|
||||
def test_room_mentions(self) -> None:
|
||||
"""Test the behavior of an event which includes invalid room mentions."""
|
||||
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
|
||||
@@ -336,7 +344,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
# Room mentions from those without power should not notify.
|
||||
self.assertFalse(
|
||||
self._create_and_process(
|
||||
bulk_evaluator, {EventContentFields.MENTIONS: {"room": True}}
|
||||
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {"room": True}}
|
||||
)
|
||||
)
|
||||
|
||||
@@ -350,7 +358,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
)
|
||||
self.assertTrue(
|
||||
self._create_and_process(
|
||||
bulk_evaluator, {EventContentFields.MENTIONS: {"room": True}}
|
||||
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {"room": True}}
|
||||
)
|
||||
)
|
||||
|
||||
@@ -366,7 +374,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
self.assertFalse(
|
||||
self._create_and_process(
|
||||
bulk_evaluator,
|
||||
{EventContentFields.MENTIONS: {"room": mentions}},
|
||||
{EventContentFields.MSC3952_MENTIONS: {"room": mentions}},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -377,7 +385,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
|
||||
{
|
||||
"body": "@room",
|
||||
"msgtype": "m.text",
|
||||
EventContentFields.MENTIONS: {},
|
||||
EventContentFields.MSC3952_MENTIONS: {},
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -131,6 +131,9 @@ class ReadMarkerTestCase(unittest.HomeserverTestCase):
|
||||
event = self.get_success(self.store.get_event(event_id_1, allow_none=True))
|
||||
assert event is None
|
||||
|
||||
# TODO See https://github.com/matrix-org/synapse/issues/13476
|
||||
self.store.get_event_ordering.invalidate_all()
|
||||
|
||||
# Test moving the read marker to a newer event
|
||||
event_id_2 = send_message()
|
||||
channel = self.make_request(
|
||||
|
||||
@@ -1941,43 +1941,6 @@ class RoomPowerLevelOverridesInPracticeTestCase(RoomBase):
|
||||
channel.json_body["error"],
|
||||
)
|
||||
|
||||
@unittest.override_config(
|
||||
{
|
||||
"default_power_level_content_override": {
|
||||
"private_chat": {
|
||||
"events": {
|
||||
"m.room.avatar": 50,
|
||||
"m.room.canonical_alias": 50,
|
||||
"m.room.encryption": 999,
|
||||
"m.room.history_visibility": 100,
|
||||
"m.room.name": 50,
|
||||
"m.room.power_levels": 100,
|
||||
"m.room.server_acl": 100,
|
||||
"m.room.tombstone": 100,
|
||||
},
|
||||
"events_default": 0,
|
||||
},
|
||||
}
|
||||
},
|
||||
)
|
||||
def test_config_override_blocks_encrypted_room(self) -> None:
|
||||
# Given the server has config for private_chats,
|
||||
|
||||
# When I attempt to create an encrypted private_chat room
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/createRoom",
|
||||
'{"creation_content": {"m.federate": false},"name": "Secret Private Room","preset": "private_chat","initial_state": [{"type": "m.room.encryption","state_key": "","content": {"algorithm": "m.megolm.v1.aes-sha2"}}]}',
|
||||
)
|
||||
|
||||
# Then I am not allowed because the required power level is unattainable
|
||||
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.result["body"])
|
||||
self.assertEqual(
|
||||
"You cannot create an encrypted room. "
|
||||
+ "user_level (100) < send_level (999)",
|
||||
channel.json_body["error"],
|
||||
)
|
||||
|
||||
|
||||
class RoomInitialSyncTestCase(RoomBase):
|
||||
"""Tests /rooms/$room_id/initialSync."""
|
||||
|
||||
@@ -188,7 +188,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
||||
self.event_id = res["event_id"]
|
||||
|
||||
# Reset the event cache so the tests start with it empty
|
||||
self.store._get_event_cache.clear()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
|
||||
def test_simple(self) -> None:
|
||||
"""Test that we cache events that we pull from the DB."""
|
||||
@@ -205,7 +205,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
|
||||
# Reset the event cache
|
||||
self.store._get_event_cache.clear()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
|
||||
with LoggingContext("test") as ctx:
|
||||
# We keep hold of the event event though we never use it.
|
||||
@@ -215,7 +215,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
|
||||
|
||||
# Reset the event cache
|
||||
self.store._get_event_cache.clear()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
|
||||
with LoggingContext("test") as ctx:
|
||||
self.get_success(self.store.get_event(self.event_id))
|
||||
@@ -390,7 +390,7 @@ class GetEventCancellationTestCase(unittest.HomeserverTestCase):
|
||||
self.event_id = res["event_id"]
|
||||
|
||||
# Reset the event cache so the tests start with it empty
|
||||
self.store._get_event_cache.clear()
|
||||
self.get_success(self.store._get_event_cache.clear())
|
||||
|
||||
@contextmanager
|
||||
def blocking_get_event_calls(
|
||||
|
||||
@@ -401,10 +401,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
|
||||
assert persist_events_store is not None
|
||||
persist_events_store._store_event_txn(
|
||||
txn,
|
||||
[
|
||||
(e, EventContext(self.hs.get_storage_controllers(), {}))
|
||||
for e in events
|
||||
],
|
||||
[(e, EventContext(self.hs.get_storage_controllers())) for e in events],
|
||||
)
|
||||
|
||||
# Actually call the function that calculates the auth chain stuff.
|
||||
|
||||
@@ -555,15 +555,10 @@ class StateTestCase(unittest.TestCase):
|
||||
(e.event_id for e in old_state + [event]), current_state_ids.values()
|
||||
)
|
||||
|
||||
assert context.state_group_before_event is not None
|
||||
assert context.state_group is not None
|
||||
self.assertEqual(
|
||||
context.state_group_deltas.get(
|
||||
(context.state_group_before_event, context.state_group)
|
||||
),
|
||||
{(event.type, event.state_key): event.event_id},
|
||||
)
|
||||
self.assertIsNotNone(context.state_group_before_event)
|
||||
self.assertNotEqual(context.state_group_before_event, context.state_group)
|
||||
self.assertEqual(context.state_group_before_event, context.prev_group)
|
||||
self.assertEqual({("state", ""): event.event_id}, context.delta_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_trivial_annotate_message(
|
||||
|
||||
Reference in New Issue
Block a user