1
0

Compare commits

..

6 Commits

91 changed files with 737 additions and 1190 deletions

View File

@@ -22,21 +22,7 @@ concurrency:
cancel-in-progress: true
jobs:
check_repo:
# Prevent this workflow from running on any fork of Synapse other than matrix-org/synapse, as it is
# only useful to the Synapse core team.
# All other workflow steps depend on this one, thus if 'should_run_workflow' is not 'true', the rest
# of the workflow will be skipped as well.
runs-on: ubuntu-latest
outputs:
should_run_workflow: ${{ steps.check_condition.outputs.should_run_workflow }}
steps:
- id: check_condition
run: echo "should_run_workflow=${{ github.repository == 'matrix-org/synapse' }}" >> "$GITHUB_OUTPUT"
mypy:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
@@ -61,8 +47,6 @@ jobs:
run: sed '/warn_unused_ignores = True/d' -i mypy.ini
- run: poetry run mypy
trial:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
strategy:
matrix:
@@ -121,8 +105,6 @@ jobs:
sytest:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:testing
@@ -174,8 +156,7 @@ jobs:
complement:
needs: check_repo
if: "!failure() && !cancelled() && needs.check_repo.outputs.should_run_workflow == 'true'"
if: "${{ !failure() && !cancelled() }}"
runs-on: ubuntu-latest
strategy:
@@ -211,7 +192,7 @@ jobs:
# Open an issue if the build fails, so we know about it.
# Only do this if we're not experimenting with this action in a PR.
open-issue:
if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request' && needs.check_repo.outputs.should_run_workflow == 'true'"
if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request'"
needs:
# TODO: should mypy be included here? It feels more brittle than the others.
- mypy

View File

@@ -35,7 +35,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
with:
@@ -45,16 +45,6 @@ jobs:
- run: poetry run scripts-dev/generate_sample_config.sh --check
- run: poetry run scripts-dev/config-lint.sh
check-schema-delta:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
- run: scripts-dev/check_schema_delta.py --force-colors
check-lockfile:
runs-on: ubuntu-latest
steps:
@@ -92,10 +82,6 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
- uses: Swatinem/rust-cache@v2
- name: Setup Poetry
uses: matrix-org/setup-python-poetry@v1
with:
@@ -107,6 +93,10 @@ jobs:
# To make CI green, err towards caution and install the project.
install-project: "true"
- name: Install Rust
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
# Cribbed from
# https://github.com/AustinScola/mypy-cache-github-action/blob/85ea4f2972abed39b33bd02c36e341b28ca59213/src/restore.ts#L10-L17
- name: Restore/persist mypy's cache
@@ -150,7 +140,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
with:
@@ -167,7 +157,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
with:
components: clippy
- uses: Swatinem/rust-cache@v2
@@ -221,7 +211,6 @@ jobs:
- lint-newsfile
- lint-pydantic
- check-sampleconfig
- check-schema-delta
- check-lockfile
- lint-clippy
- lint-rustfmt
@@ -268,7 +257,7 @@ jobs:
postgres:${{ matrix.job.postgres-version }}
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- uses: matrix-org/setup-python-poetry@v1
@@ -308,7 +297,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
# There aren't wheels for some of the older deps, so we need to install
@@ -416,7 +405,7 @@ jobs:
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- name: Run SyTest
@@ -556,7 +545,7 @@ jobs:
path: synapse
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- uses: actions/setup-go@v4
@@ -584,7 +573,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
uses: dtolnay/rust-toolchain@1.60.0
uses: dtolnay/rust-toolchain@1.58.1
- uses: Swatinem/rust-cache@v2
- run: cargo test
@@ -609,6 +598,16 @@ jobs:
- run: cargo bench --no-run
check-schema-delta:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.x"
- run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
- run: scripts-dev/check_schema_delta.py --force-colors
# a job which marks all the other jobs as complete, thus allowing PRs to be merged.
tests-done:
if: ${{ always() }}

View File

@@ -18,22 +18,7 @@ concurrency:
cancel-in-progress: true
jobs:
check_repo:
# Prevent this workflow from running on any fork of Synapse other than matrix-org/synapse, as it is
# only useful to the Synapse core team.
# All other workflow steps depend on this one, thus if 'should_run_workflow' is not 'true', the rest
# of the workflow will be skipped as well.
if: github.repository == 'matrix-org/synapse'
runs-on: ubuntu-latest
outputs:
should_run_workflow: ${{ steps.check_condition.outputs.should_run_workflow }}
steps:
- id: check_condition
run: echo "should_run_workflow=${{ github.repository == 'matrix-org/synapse' }}" >> "$GITHUB_OUTPUT"
mypy:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
steps:
@@ -56,8 +41,6 @@ jobs:
- run: poetry run mypy
trial:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
steps:
@@ -92,8 +75,6 @@ jobs:
|| true
sytest:
needs: check_repo
if: needs.check_repo.outputs.should_run_workflow == 'true'
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:buster
@@ -138,8 +119,7 @@ jobs:
/logs/**/*.log*
complement:
needs: check_repo
if: "!failure() && !cancelled() && needs.check_repo.outputs.should_run_workflow == 'true'"
if: "${{ !failure() && !cancelled() }}"
runs-on: ubuntu-latest
strategy:
@@ -186,7 +166,7 @@ jobs:
# open an issue if the build fails, so we know about it.
open-issue:
if: failure() && needs.check_repo.outputs.should_run_workflow == 'true'
if: failure()
needs:
- mypy
- trial

View File

@@ -1,44 +1,3 @@
Synapse 1.85.2 (2023-06-08)
===========================
Bugfixes
--------
- Fix regression where using TLS for HTTP replication between workers did not work. Introduced in v1.85.0. ([\#15746](https://github.com/matrix-org/synapse/issues/15746))
Synapse 1.85.1 (2023-06-07)
===========================
Note: this release only fixes a bug that stopped some deployments from upgrading to v1.85.0. There is no need to upgrade to v1.85.1 if successfully running v1.85.0.
Bugfixes
--------
- Fix bug in schema delta that broke upgrades for some deployments. Introduced in v1.85.0. ([\#15738](https://github.com/matrix-org/synapse/issues/15738), [\#15739](https://github.com/matrix-org/synapse/issues/15739))
Synapse 1.85.0 (2023-06-06)
===========================
No significant changes since 1.85.0rc2.
## Security advisory
The following issues are fixed in 1.85.0 (and RCs).
- [GHSA-26c5-ppr8-f33p](https://github.com/matrix-org/synapse/security/advisories/GHSA-26c5-ppr8-f33p) / [CVE-2023-32682](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-32682) — Low Severity
It may be possible for a deactivated user to login when using uncommon configurations.
- [GHSA-98px-6486-j7qc](https://github.com/matrix-org/synapse/security/advisories/GHSA-98px-6486-j7qc) / [CVE-2023-32683](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-32683) — Low Severity
A discovered oEmbed or image URL can bypass the `url_preview_url_blacklist` setting potentially allowing server side request forgery or bypassing network policies. Impact is limited to IP addresses allowed by the `url_preview_ip_range_blacklist` setting (by default this only allows public IPs).
See the advisories for more details. If you have any questions, email security@matrix.org.
Synapse 1.85.0rc2 (2023-06-01)
==============================

28
Cargo.lock generated
View File

@@ -4,9 +4,9 @@ version = 3
[[package]]
name = "aho-corasick"
version = "1.0.2"
version = "0.7.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e"
dependencies = [
"memchr",
]
@@ -132,9 +132,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.19"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
[[package]]
name = "memchr"
@@ -229,9 +229,9 @@ dependencies = [
[[package]]
name = "pyo3-log"
version = "0.8.2"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c94ff6535a6bae58d7d0b85e60d4c53f7f84d0d0aa35d6a28c3f3e70bfe51444"
checksum = "f9c8b57fe71fb5dcf38970ebedc2b1531cf1c14b1b9b4c560a182a57e115575c"
dependencies = [
"arc-swap",
"log",
@@ -291,9 +291,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.4"
version = "1.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
dependencies = [
"aho-corasick",
"memchr",
@@ -302,9 +302,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.7.2"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "ryu"
@@ -320,18 +320,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.164"
version = "1.0.163"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d"
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.164"
version = "1.0.163"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68"
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -1 +0,0 @@
Allow for the configuration of max request retries and min/max retry delays in the matrix federation client.

View File

@@ -1 +0,0 @@
Replace `EventContext` fields `prev_group` and `delta_ids` with field `state_group_deltas`.

View File

@@ -1 +0,0 @@
Enable support for [MSC3952](https://github.com/matrix-org/matrix-spec-proposals/pull/3952): intentional mentions.

View File

@@ -1 +0,0 @@
Correctly clear caches when we delete a room.

View File

@@ -1 +0,0 @@
Add support for tracing functions which return `Awaitable`s.

View File

@@ -1 +0,0 @@
Check permissions for enabling encryption earlier during room creation to avoid creating broken rooms.

View File

@@ -1 +0,0 @@
Update docstring and traces on `maybe_backfill()` functions.

View File

@@ -1 +0,0 @@
Speed up `/messages` by backfilling in the background when there are no backward extremities where we are directly paginating.

View File

@@ -1 +0,0 @@
Add context for when/why to use the `long_retries` option when sending Federation requests.

View File

@@ -1 +0,0 @@
Removed some unused fields.

1
changelog.d/15724.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix missing dependencies in background jobs.

View File

@@ -1 +0,0 @@
Update federation error to more plainly explain we can only authorize our own membership events.

View File

@@ -1 +0,0 @@
Prevent the `latest_deps` and `twisted_trunk` daily GitHub Actions workflows from running on forks of the codebase.

View File

@@ -1 +0,0 @@
Improve performance of user directory search.

View File

@@ -1 +0,0 @@
Remove redundant table join with `room_memberships` when doing a `is_host_joined()`/`is_host_invited()` call (`membership` is already part of the `current_state_events`).

View File

@@ -1 +0,0 @@
Simplify query to find participating servers in a room.

View File

@@ -1 +0,0 @@
Remove superfluous `room_memberships` join from background update.

View File

@@ -1 +0,0 @@
Improve `/messages` response time by avoiding backfill when we already have messages to return.

View File

@@ -1 +0,0 @@
Expose a metric reporting the database background update status.

View File

@@ -1 +0,0 @@
Speed up typechecking CI.

View File

@@ -1 +0,0 @@
Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).

View File

@@ -1 +0,0 @@
Bump minimum supported Rust version to 1.60.0.

View File

@@ -1 +0,0 @@
Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).

View File

@@ -1 +0,0 @@
Use parse_duration for federation client timeout and retry options.

18
debian/changelog vendored
View File

@@ -1,21 +1,3 @@
matrix-synapse-py3 (1.85.2) stable; urgency=medium
* New Synapse release 1.85.2.
-- Synapse Packaging team <packages@matrix.org> Thu, 08 Jun 2023 13:04:18 +0100
matrix-synapse-py3 (1.85.1) stable; urgency=medium
* New Synapse release 1.85.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 07 Jun 2023 10:51:12 +0100
matrix-synapse-py3 (1.85.0) stable; urgency=medium
* New Synapse release 1.85.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Jun 2023 09:39:29 +0100
matrix-synapse-py3 (1.85.0~rc2) stable; urgency=medium
* New Synapse release 1.85.0rc2.

View File

@@ -21,7 +21,7 @@ FROM docker.io/library/debian:bullseye-slim AS deps_base
# which makes it much easier to copy (but we need to make sure we use an image
# based on the same debian version as the synapse image, to make sure we get
# the expected version of libc.
FROM docker.io/library/redis:7-bullseye AS redis_base
FROM docker.io/library/redis:6-bullseye AS redis_base
# now build the final image, based on the the regular Synapse docker image
FROM $FROM

View File

@@ -87,14 +87,6 @@ process, for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.86.0
## Minimum supported Rust version
The minimum supported Rust version has been increased from v1.58.1 to v1.60.0.
Users building from source will need to ensure their `rustc` version is up to
date.
# Upgrading to v1.85.0

View File

@@ -27,8 +27,9 @@ What servers are currently participating in this room?
Run this sql query on your db:
```sql
SELECT DISTINCT split_part(state_key, ':', 2)
FROM current_state_events
WHERE room_id = '!cURbafjkfsMDVwdRDQ:matrix.org' AND membership = 'join';
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
WHERE room_id = '!cURbafjkfsMDVwdRDQ:matrix.org' AND membership = 'join';
```
What users are registered on my server?

View File

@@ -1196,32 +1196,6 @@ Example configuration:
allow_device_name_lookup_over_federation: true
```
---
### `federation`
The federation section defines some sub-options related to federation.
The following options are related to configuring timeout and retry logic for one request,
independently of the others.
Short retry algorithm is used when something or someone will wait for the request to have an
answer, while long retry is used for requests that happen in the background,
like sending a federation transaction.
* `client_timeout`: timeout for the federation requests in seconds. Default to 60s.
* `max_short_retry_delay`: maximum delay to be used for the short retry algo in seconds. Default to 2s.
* `max_long_retry_delay`: maximum delay to be used for the short retry algo in seconds. Default to 60s.
* `max_short_retries`: maximum number of retries for the short retry algo. Default to 3 attempts.
* `max_long_retries`: maximum number of retries for the long retry algo. Default to 10 attempts.
Example configuration:
```yaml
federation:
client_timeout: 180
max_short_retry_delay: 7
max_long_retry_delay: 100
max_short_retries: 5
max_long_retries: 20
```
---
## Caching
Options related to caching.

287
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
version = "1.85.2"
version = "1.85.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"

View File

@@ -7,7 +7,7 @@ name = "synapse"
version = "0.1.0"
edition = "2021"
rust-version = "1.60.0"
rust-version = "1.58.1"
[lib]
name = "synapse"

View File

@@ -13,6 +13,8 @@
// limitations under the License.
#![feature(test)]
use std::collections::BTreeSet;
use synapse::push::{
evaluator::PushRuleEvaluator, Condition, EventMatchCondition, FilteredPushRules, JsonValue,
PushRules, SimpleJsonValue,
@@ -195,6 +197,7 @@ fn bench_eval_message(b: &mut Bencher) {
false,
false,
false,
false,
);
b.iter(|| eval.run(&rules, Some("bob"), Some("person")));

View File

@@ -142,11 +142,11 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/override/.m.is_user_mention"),
rule_id: Cow::Borrowed(".org.matrix.msc3952.is_user_mention"),
priority_class: 5,
conditions: Cow::Borrowed(&[Condition::Known(
KnownCondition::ExactEventPropertyContainsType(EventPropertyIsTypeCondition {
key: Cow::Borrowed("content.m\\.mentions.user_ids"),
key: Cow::Borrowed("content.org\\.matrix\\.msc3952\\.mentions.user_ids"),
value_type: Cow::Borrowed(&EventMatchPatternType::UserId),
}),
)]),
@@ -163,11 +163,11 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
default_enabled: true,
},
PushRule {
rule_id: Cow::Borrowed("global/override/.m.is_room_mention"),
rule_id: Cow::Borrowed(".org.matrix.msc3952.is_room_mention"),
priority_class: 5,
conditions: Cow::Borrowed(&[
Condition::Known(KnownCondition::EventPropertyIs(EventPropertyIsCondition {
key: Cow::Borrowed("content.m\\.mentions.room"),
key: Cow::Borrowed("content.org\\.matrix\\.msc3952\\.mentions.room"),
value: Cow::Borrowed(&SimpleJsonValue::Bool(true)),
})),
Condition::Known(KnownCondition::SenderNotificationPermission {

View File

@@ -70,9 +70,7 @@ pub struct PushRuleEvaluator {
/// The "content.body", if any.
body: String,
/// True if the event has a m.mentions property. (Note that this is a separate
/// flag instead of checking flattened_keys since the m.mentions property
/// might be an empty map and not appear in flattened_keys.
/// True if the event has a mentions property and MSC3952 support is enabled.
has_mentions: bool,
/// The number of users in the room.
@@ -157,7 +155,9 @@ impl PushRuleEvaluator {
let rule_id = &push_rule.rule_id().to_string();
// For backwards-compatibility the legacy mention rules are disabled
// if the event contains the 'm.mentions' property.
// if the event contains the 'm.mentions' property (and if the
// experimental feature is enabled, both of these are represented
// by the has_mentions flag).
if self.has_mentions
&& (rule_id == "global/override/.m.rule.contains_display_name"
|| rule_id == "global/content/.m.rule.contains_user_name"
@@ -562,7 +562,7 @@ fn test_requires_room_version_supports_condition() {
};
let rules = PushRules::new(vec![custom_rule]);
result = evaluator.run(
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false),
&FilteredPushRules::py_new(rules, BTreeMap::new(), true, false, true, false, false),
None,
None,
);

View File

@@ -527,6 +527,7 @@ pub struct FilteredPushRules {
msc1767_enabled: bool,
msc3381_polls_enabled: bool,
msc3664_enabled: bool,
msc3952_intentional_mentions: bool,
msc3958_suppress_edits_enabled: bool,
}
@@ -539,6 +540,7 @@ impl FilteredPushRules {
msc1767_enabled: bool,
msc3381_polls_enabled: bool,
msc3664_enabled: bool,
msc3952_intentional_mentions: bool,
msc3958_suppress_edits_enabled: bool,
) -> Self {
Self {
@@ -547,6 +549,7 @@ impl FilteredPushRules {
msc1767_enabled,
msc3381_polls_enabled,
msc3664_enabled,
msc3952_intentional_mentions,
msc3958_suppress_edits_enabled,
}
}
@@ -584,6 +587,10 @@ impl FilteredPushRules {
return false;
}
if !self.msc3952_intentional_mentions && rule.rule_id.contains("org.matrix.msc3952")
{
return false;
}
if !self.msc3958_suppress_edits_enabled
&& rule.rule_id == "global/override/.com.beeper.suppress_edits"
{

View File

@@ -46,6 +46,7 @@ class FilteredPushRules:
msc1767_enabled: bool,
msc3381_polls_enabled: bool,
msc3664_enabled: bool,
msc3952_intentional_mentions: bool,
msc3958_suppress_edits_enabled: bool,
): ...
def rules(self) -> Collection[Tuple[PushRule, bool]]: ...

View File

@@ -236,7 +236,7 @@ class EventContentFields:
AUTHORISING_USER: Final = "join_authorised_via_users_server"
# Use for mentioning users.
MENTIONS: Final = "m.mentions"
MSC3952_MENTIONS: Final = "org.matrix.msc3952.mentions"
# an unspecced field added to to-device messages to identify them uniquely-ish
TO_DEVICE_MSGID: Final = "org.matrix.msgid"

View File

@@ -358,6 +358,11 @@ class ExperimentalConfig(Config):
# MSC3391: Removing account data.
self.msc3391_enabled = experimental.get("msc3391_enabled", False)
# MSC3952: Intentional mentions, this depends on MSC3966.
self.msc3952_intentional_mentions = experimental.get(
"msc3952_intentional_mentions", False
)
# MSC3959: Do not generate notifications for edits.
self.msc3958_supress_edit_notifs = experimental.get(
"msc3958_supress_edit_notifs", False

View File

@@ -22,8 +22,6 @@ class FederationConfig(Config):
section = "federation"
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
federation_config = config.setdefault("federation", {})
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist: Optional[dict] = None
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
@@ -51,19 +49,5 @@ class FederationConfig(Config):
"allow_device_name_lookup_over_federation", False
)
# Allow for the configuration of timeout, max request retries
# and min/max retry delays in the matrix federation client.
self.client_timeout = Config.parse_duration(
federation_config.get("client_timeout", "60s")
)
self.max_long_retry_delay = Config.parse_duration(
federation_config.get("max_long_retry_delay", "60s")
)
self.max_short_retry_delay = Config.parse_duration(
federation_config.get("max_short_retry_delay", "2s")
)
self.max_long_retries = federation_config.get("max_long_retries", 10)
self.max_short_retries = federation_config.get("max_short_retries", 3)
_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple
import attr
from immutabledict import immutabledict
@@ -107,32 +107,33 @@ class EventContext(UnpersistedEventContextBase):
state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
then this is the delta of the state between the two groups.
state_group_deltas: If not empty, this is a dict collecting a mapping of the state
difference between state groups.
prev_group: If it is known, ``state_group``'s prev_group. Note that this being
None does not necessarily mean that ``state_group`` does not have
a prev_group!
The keys are a tuple of two integers: the initial group and final state group.
The corresponding value is a state map representing the state delta between
these state groups.
If the event is a state event, this is normally the same as
``state_group_before_event``.
The dictionary is expected to have at most two entries with state groups of:
If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
will always also be ``None``.
1. The state group before the event and after the event.
2. The state group preceding the state group before the event and the
state group before the event.
Note that this *not* (necessarily) the state group associated with
``_prev_state_ids``.
This information is collected and stored as part of an optimization for persisting
events.
delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
and ``state_group``.
partial_state: if True, we may be storing this event with a temporary,
incomplete state.
"""
_storage: "StorageControllers"
state_group_deltas: Dict[Tuple[int, int], StateMap[str]]
rejected: Optional[str] = None
_state_group: Optional[int] = None
state_group_before_event: Optional[int] = None
_state_delta_due_to_event: Optional[StateMap[str]] = None
prev_group: Optional[int] = None
delta_ids: Optional[StateMap[str]] = None
app_service: Optional[ApplicationService] = None
partial_state: bool = False
@@ -144,14 +145,16 @@ class EventContext(UnpersistedEventContextBase):
state_group_before_event: Optional[int],
state_delta_due_to_event: Optional[StateMap[str]],
partial_state: bool,
state_group_deltas: Dict[Tuple[int, int], StateMap[str]],
prev_group: Optional[int] = None,
delta_ids: Optional[StateMap[str]] = None,
) -> "EventContext":
return EventContext(
storage=storage,
state_group=state_group,
state_group_before_event=state_group_before_event,
state_delta_due_to_event=state_delta_due_to_event,
state_group_deltas=state_group_deltas,
prev_group=prev_group,
delta_ids=delta_ids,
partial_state=partial_state,
)
@@ -160,7 +163,7 @@ class EventContext(UnpersistedEventContextBase):
storage: "StorageControllers",
) -> "EventContext":
"""Return an EventContext instance suitable for persisting an outlier event"""
return EventContext(storage=storage, state_group_deltas={})
return EventContext(storage=storage)
async def persist(self, event: EventBase) -> "EventContext":
return self
@@ -180,15 +183,13 @@ class EventContext(UnpersistedEventContextBase):
"state_group": self._state_group,
"state_group_before_event": self.state_group_before_event,
"rejected": self.rejected,
"state_group_deltas": _encode_state_group_delta(self.state_group_deltas),
"prev_group": self.prev_group,
"state_delta_due_to_event": _encode_state_dict(
self._state_delta_due_to_event
),
"delta_ids": _encode_state_dict(self.delta_ids),
"app_service_id": self.app_service.id if self.app_service else None,
"partial_state": self.partial_state,
# add dummy delta_ids and prev_group for backwards compatibility
"delta_ids": None,
"prev_group": None,
}
@staticmethod
@@ -203,24 +204,17 @@ class EventContext(UnpersistedEventContextBase):
Returns:
The event context.
"""
# workaround for backwards/forwards compatibility: if the input doesn't have a value
# for "state_group_deltas" just assign an empty dict
state_group_deltas = input.get("state_group_deltas", None)
if state_group_deltas:
state_group_deltas = _decode_state_group_delta(state_group_deltas)
else:
state_group_deltas = {}
context = EventContext(
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
storage=storage,
state_group=input["state_group"],
state_group_before_event=input["state_group_before_event"],
state_group_deltas=state_group_deltas,
prev_group=input["prev_group"],
state_delta_due_to_event=_decode_state_dict(
input["state_delta_due_to_event"]
),
delta_ids=_decode_state_dict(input["delta_ids"]),
rejected=input["rejected"],
partial_state=input.get("partial_state", False),
)
@@ -355,7 +349,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
_storage: "StorageControllers"
state_group_before_event: Optional[int]
state_group_after_event: Optional[int]
state_delta_due_to_event: Optional[StateMap[str]]
state_delta_due_to_event: Optional[dict]
prev_group_for_state_group_before_event: Optional[int]
delta_ids_to_state_group_before_event: Optional[StateMap[str]]
partial_state: bool
@@ -386,16 +380,26 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
events_and_persisted_context = []
for event, unpersisted_context in amended_events_and_context:
state_group_deltas = unpersisted_context._build_state_group_deltas()
context = EventContext(
storage=unpersisted_context._storage,
state_group=unpersisted_context.state_group_after_event,
state_group_before_event=unpersisted_context.state_group_before_event,
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
partial_state=unpersisted_context.partial_state,
state_group_deltas=state_group_deltas,
)
if event.is_state():
context = EventContext(
storage=unpersisted_context._storage,
state_group=unpersisted_context.state_group_after_event,
state_group_before_event=unpersisted_context.state_group_before_event,
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
partial_state=unpersisted_context.partial_state,
prev_group=unpersisted_context.state_group_before_event,
delta_ids=unpersisted_context.state_delta_due_to_event,
)
else:
context = EventContext(
storage=unpersisted_context._storage,
state_group=unpersisted_context.state_group_after_event,
state_group_before_event=unpersisted_context.state_group_before_event,
state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
partial_state=unpersisted_context.partial_state,
prev_group=unpersisted_context.prev_group_for_state_group_before_event,
delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
)
events_and_persisted_context.append((event, context))
return events_and_persisted_context
@@ -448,11 +452,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
# if the event isn't a state event the state group doesn't change
if not self.state_delta_due_to_event:
self.state_group_after_event = self.state_group_before_event
state_group_after_event = self.state_group_before_event
# otherwise if it is a state event we need to get a state group for it
else:
self.state_group_after_event = await self._storage.state.store_state_group(
state_group_after_event = await self._storage.state.store_state_group(
event.event_id,
event.room_id,
prev_group=self.state_group_before_event,
@@ -460,81 +464,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
current_state_ids=None,
)
state_group_deltas = self._build_state_group_deltas()
return EventContext.with_state(
storage=self._storage,
state_group=self.state_group_after_event,
state_group=state_group_after_event,
state_group_before_event=self.state_group_before_event,
state_delta_due_to_event=self.state_delta_due_to_event,
state_group_deltas=state_group_deltas,
partial_state=self.partial_state,
prev_group=self.state_group_before_event,
delta_ids=self.state_delta_due_to_event,
)
def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]:
"""
Collect deltas between the state groups associated with this context
"""
state_group_deltas = {}
# if we know the state group before the event and after the event, add them and the
# state delta between them to state_group_deltas
if self.state_group_before_event and self.state_group_after_event:
# if we have the state groups we should have the delta
assert self.state_delta_due_to_event is not None
state_group_deltas[
(
self.state_group_before_event,
self.state_group_after_event,
)
] = self.state_delta_due_to_event
# the state group before the event may also have a state group which precedes it, if
# we have that and the state group before the event, add them and the state
# delta between them to state_group_deltas
if (
self.prev_group_for_state_group_before_event
and self.state_group_before_event
):
# if we have both state groups we should have the delta between them
assert self.delta_ids_to_state_group_before_event is not None
state_group_deltas[
(
self.prev_group_for_state_group_before_event,
self.state_group_before_event,
)
] = self.delta_ids_to_state_group_before_event
return state_group_deltas
def _encode_state_group_delta(
state_group_delta: Dict[Tuple[int, int], StateMap[str]]
) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
if not state_group_delta:
return []
state_group_delta_encoded = []
for key, value in state_group_delta.items():
state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value)))
return state_group_delta_encoded
def _decode_state_group_delta(
input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
) -> Dict[Tuple[int, int], StateMap[str]]:
if not input:
return {}
state_group_deltas = {}
for state_group_1, state_group_2, state_dict in input:
state_map = _decode_state_dict(state_dict)
assert state_map is not None
state_group_deltas[(state_group_1, state_group_2)] = state_map
return state_group_deltas
def _encode_state_dict(
state_dict: Optional[StateMap[str]],

View File

@@ -134,8 +134,13 @@ class EventValidator:
)
# If the event contains a mentions key, validate it.
if EventContentFields.MENTIONS in event.content:
validate_json_object(event.content[EventContentFields.MENTIONS], Mentions)
if (
EventContentFields.MSC3952_MENTIONS in event.content
and config.experimental.msc3952_intentional_mentions
):
validate_json_object(
event.content[EventContentFields.MSC3952_MENTIONS], Mentions
)
def _validate_retention(self, event: EventBase) -> None:
"""Checks that an event that defines the retention policy for a room respects the

View File

@@ -260,9 +260,7 @@ class FederationClient(FederationBase):
use_unstable = False
for user_id, one_time_keys in query.items():
for device_id, algorithms in one_time_keys.items():
# If more than one algorithm is requested, attempt to use the unstable
# endpoint.
if sum(algorithms.values()) > 1:
if any(count > 1 for count in algorithms.values()):
use_unstable = True
if algorithms:
# For the stable query, choose only the first algorithm.
@@ -298,7 +296,6 @@ class FederationClient(FederationBase):
else:
logger.debug("Skipping unstable claim client keys API")
# TODO Potentially attempt multiple queries and combine the results?
return await self.transport_layer.claim_client_keys(
user, destination, content, timeout
)

View File

@@ -944,7 +944,7 @@ class FederationServer(FederationBase):
if not self._is_mine_server_name(authorising_server):
raise SynapseError(
400,
f"Cannot authorise membership event for {authorising_server}. We can only authorise requests from our own homeserver",
f"Cannot authorise request from resident server: {authorising_server}",
)
event.signatures.update(
@@ -1016,9 +1016,7 @@ class FederationServer(FederationBase):
for user_id, device_keys in result.items():
for device_id, keys in device_keys.items():
for key_id, key in keys.items():
json_result.setdefault(user_id, {}).setdefault(device_id, {})[
key_id
] = key
json_result.setdefault(user_id, {})[device_id] = {key_id: key}
logger.info(
"Claimed one-time-keys: %s",

View File

@@ -200,7 +200,6 @@ class FederationHandler:
)
@trace
@tag_args
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
@@ -215,9 +214,6 @@ class FederationHandler:
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
@@ -231,8 +227,6 @@ class FederationHandler:
processing_start_time=processing_start_time,
)
@trace
@tag_args
async def _maybe_backfill_inner(
self,
room_id: str,
@@ -253,9 +247,6 @@ class FederationHandler:
limit: The max number of events to request from the remote federated server.
processing_start_time: The time when `maybe_backfill` started processing.
Only used for timing. If `None`, no timing observation will be made.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
@@ -311,30 +302,15 @@ class FederationHandler:
len(sorted_backfill_points),
sorted_backfill_points,
)
set_tag(
SynapseTags.RESULT_PREFIX + "sorted_backfill_points",
str(sorted_backfill_points),
)
set_tag(
SynapseTags.RESULT_PREFIX + "sorted_backfill_points.length",
str(len(sorted_backfill_points)),
)
# If we have no backfill points lower than the `current_depth` then either we
# can a) bail or b) still attempt to backfill. We opt to try backfilling anyway
# just in case we do get relevant events. This is good for eventual consistency
# sake but we don't need to block the client for something that is just as
# likely not to return anything relevant so we backfill in the background. The
# only way, this could return something relevant is if we discover a new branch
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
# If we have no backfill points lower than the `current_depth` then
# either we can a) bail or b) still attempt to backfill. We opt to try
# backfilling anyway just in case we do get relevant events.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
self._maybe_backfill_inner,
return await self._maybe_backfill_inner(
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
@@ -345,9 +321,6 @@ class FederationHandler:
# overall otherwise the smaller one will throw off the results.
processing_start_time=None,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.
return False
# Even after recursing with `MAX_DEPTH`, we didn't find any
# backward extremities to backfill from.

View File

@@ -40,11 +40,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How many single event gaps we tolerate returning in a `/messages` response before we
# backfill and try to fill in the history. This is an arbitrarily picked number so feel
# free to tune it in the future.
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
@@ -491,35 +486,35 @@ class PaginationHandler:
room_id, room_token.stream
)
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
if (
pagin_config.direction == Direction.BACKWARDS
and not use_admin_priviledge
and membership == Membership.LEAVE
):
# This is only None if the room is world_readable, in which case
# "Membership.JOIN" would have been returned and we should never hit
# this branch.
assert member_event_id
if not use_admin_priviledge and membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
assert leave_token.topological is not None
# This is only None if the room is world_readable, in which
# case "JOIN" would have been returned.
assert member_event_id
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
assert leave_token.topological is not None
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
)
await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
@@ -529,94 +524,6 @@ class PaginationHandler:
event_filter=event_filter,
)
if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
event_depths: Set[int] = {event.depth for event in events}
sorted_event_depths = sorted(event_depths)
# Inspect the depths of the returned events to see if there are any gaps
found_big_gap = False
number_of_gaps = 0
previous_event_depth = (
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)
for event_depth in sorted_event_depths:
# We don't expect a negative depth but we'll just deal with it in
# any case by taking the absolute value to get the true gap between
# any two integers.
depth_gap = abs(event_depth - previous_event_depth)
# A `depth_gap` of 1 is a normal continuous chain to the next event
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
# also possible there is no event at a given depth but we can't ever
# know that for sure)
if depth_gap > 1:
number_of_gaps += 1
# We only tolerate a small number single-event long gaps in the
# returned events because those are most likely just events we've
# failed to pull in the past. Anything longer than that is probably
# a sign that we're missing a decent chunk of history and we should
# try to backfill it.
#
# XXX: It's possible we could tolerate longer gaps if we checked
# that a given events `prev_events` is one that has failed pull
# attempts and we could just treat it like a dead branch of history
# for now or at least something that we don't need the block the
# client on to try pulling.
#
# XXX: If we had something like MSC3871 to indicate gaps in the
# timeline to the client, we could also get away with any sized gap
# and just have the client refetch the holes as they see fit.
if depth_gap > 2:
found_big_gap = True
break
previous_event_depth = event_depth
# Backfill in the foreground if we found a big gap, have too many holes,
# or we don't have enough events to fill the limit that the client asked
# for.
missing_too_many_events = (
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
)
not_enough_events_to_fill_response = len(events) < pagin_config.limit
if (
found_big_gap
or missing_too_many_events
or not_enough_events_to_fill_response
):
did_backfill = (
await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
)
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting
# for a costly federation call and processing.
run_as_background_process(
"maybe_backfill_in_the_background",
self.hs.get_federation_handler().maybe_backfill,
room_id,
curr_topo,
limit=pagin_config.limit,
)
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies

View File

@@ -648,6 +648,7 @@ class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.server_name = hs.hostname
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
self._presence_enabled = hs.config.server.use_presence

View File

@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
class ReadMarkerHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.config.server.server_name
self.store = hs.get_datastores().main
self.account_data_handler = hs.get_account_data_handler()
self.read_marker_linearizer = Linearizer(name="read_marker")

View File

@@ -872,8 +872,6 @@ class RoomCreationHandler:
visibility = config.get("visibility", "private")
is_public = visibility == "public"
self._validate_room_config(config, visibility)
room_id = await self._generate_and_create_room_id(
creator_id=user_id,
is_public=is_public,
@@ -1113,7 +1111,20 @@ class RoomCreationHandler:
return new_event, new_unpersisted_context
preset_config, config = self._room_preset_config(room_config)
visibility = room_config.get("visibility", "private")
preset_config = room_config.get(
"preset",
RoomCreationPreset.PRIVATE_CHAT
if visibility == "private"
else RoomCreationPreset.PUBLIC_CHAT,
)
try:
config = self._presets_dict[preset_config]
except KeyError:
raise SynapseError(
400, f"'{preset_config}' is not a valid preset", errcode=Codes.BAD_JSON
)
# MSC2175 removes the creator field from the create event.
if not room_version.msc2175_implicit_room_creator:
@@ -1295,65 +1306,6 @@ class RoomCreationHandler:
assert last_event.internal_metadata.stream_ordering is not None
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
def _validate_room_config(
self,
config: JsonDict,
visibility: str,
) -> None:
"""Checks configuration parameters for a /createRoom request.
If validation detects invalid parameters an exception may be raised to
cause room creation to be aborted and an error response to be returned
to the client.
Args:
config: A dict of configuration options. Originally from the body of
the /createRoom request
visibility: One of "public" or "private"
"""
# Validate the requested preset, raise a 400 error if not valid
preset_name, preset_config = self._room_preset_config(config)
# If the user is trying to create an encrypted room and this is forbidden
# by the configured default_power_level_content_override, then reject the
# request before the room is created.
raw_initial_state = config.get("initial_state", [])
room_encryption_event = any(
s.get("type", "") == EventTypes.RoomEncryption for s in raw_initial_state
)
if preset_config["encrypted"] or room_encryption_event:
if self._default_power_level_content_override:
override = self._default_power_level_content_override.get(preset_name)
if override is not None:
event_levels = override.get("events", {})
room_admin_level = event_levels.get(EventTypes.PowerLevels, 100)
encryption_level = event_levels.get(EventTypes.RoomEncryption, 100)
if encryption_level > room_admin_level:
raise SynapseError(
403,
f"You cannot create an encrypted room. user_level ({room_admin_level}) < send_level ({encryption_level})",
)
def _room_preset_config(self, room_config: JsonDict) -> Tuple[str, dict]:
# The spec says rooms should default to private visibility if
# `visibility` is not specified.
visibility = room_config.get("visibility", "private")
preset_name = room_config.get(
"preset",
RoomCreationPreset.PRIVATE_CHAT
if visibility == "private"
else RoomCreationPreset.PUBLIC_CHAT,
)
try:
preset_config = self._presets_dict[preset_name]
except KeyError:
raise SynapseError(
400, f"'{preset_name}' is not a valid preset", errcode=Codes.BAD_JSON
)
return preset_name, preset_config
def _generate_room_id(self) -> str:
"""Generates a random room ID.
@@ -1538,6 +1490,7 @@ class RoomContextHandler:
class TimestampLookupHandler:
def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.state_handler = hs.get_state_handler()
self.federation_client = hs.get_federation_client()

View File

@@ -42,6 +42,7 @@ class StatsHandler:
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id

View File

@@ -95,6 +95,8 @@ incoming_responses_counter = Counter(
)
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
MAXINT = sys.maxsize
@@ -404,12 +406,7 @@ class MatrixFederationHttpClient:
self.clock = hs.get_clock()
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout = hs.config.federation.client_timeout / 1000
self.max_long_retry_delay = hs.config.federation.max_long_retry_delay / 1000
self.max_short_retry_delay = hs.config.federation.max_short_retry_delay / 1000
self.max_long_retries = hs.config.federation.max_long_retries
self.max_short_retries = hs.config.federation.max_short_retries
self.default_timeout = 60
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
@@ -502,15 +499,8 @@ class MatrixFederationHttpClient:
Note that the above intervals are *in addition* to the time spent
waiting for the request to complete (up to `timeout` ms).
NB: the long retry algorithm takes over 20 minutes to complete, with a
default timeout of 60s! It's best not to use the `long_retries` option
for something that is blocking a client so we don't make them wait for
aaaaages, whereas some things like sending transactions (server to
server) we can be a lot more lenient but its very fuzzy / hand-wavey.
In the future, we could be more intelligent about doing this sort of
thing by looking at things with the bigger picture in mind,
https://github.com/matrix-org/synapse/issues/8917
NB: the long retry algorithm takes over 20 minutes to complete, with
a default timeout of 60s!
ignore_backoff: true to ignore the historical backoff data
and try the request anyway.
@@ -538,10 +528,10 @@ class MatrixFederationHttpClient:
logger.exception(f"Invalid destination: {request.destination}.")
raise FederationDeniedError(request.destination)
if timeout is None:
timeout = int(self.default_timeout)
_sec_timeout = timeout / 1000
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
if (
self.hs.config.federation.federation_domain_whitelist is not None
@@ -586,9 +576,9 @@ class MatrixFederationHttpClient:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
retries_left = self.max_long_retries
retries_left = MAX_LONG_RETRIES
else:
retries_left = self.max_short_retries
retries_left = MAX_SHORT_RETRIES
url_bytes = request.uri
url_str = url_bytes.decode("ascii")
@@ -733,12 +723,12 @@ class MatrixFederationHttpClient:
if retries_left and not timeout:
if long_retries:
delay = 4 ** (self.max_long_retries + 1 - retries_left)
delay = min(delay, self.max_long_retry_delay)
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay *= random.uniform(0.8, 1.4)
else:
delay = 0.5 * 2 ** (self.max_short_retries - retries_left)
delay = min(delay, self.max_short_retry_delay)
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
logger.debug(
@@ -946,9 +936,10 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
if timeout is None:
timeout = int(self.default_timeout)
_sec_timeout = timeout / 1000
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
if parser is None:
parser = cast(ByteParser[T], JsonParser())
@@ -1134,9 +1125,10 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
if timeout is None:
timeout = int(self.default_timeout)
_sec_timeout = timeout / 1000
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
if parser is None:
parser = cast(ByteParser[T], JsonParser())
@@ -1209,9 +1201,10 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
)
if timeout is None:
timeout = int(self.default_timeout)
_sec_timeout = timeout / 1000
if timeout is not None:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = await _handle_response(
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()

View File

@@ -76,7 +76,7 @@ class ReplicationEndpointFactory:
endpoint = wrapClientTLS(
# The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(
self.instance_map[worker_name].host.encode("utf-8"),
self.instance_map[worker_name].host,
self.instance_map[worker_name].port,
),
endpoint,

View File

@@ -171,7 +171,6 @@ from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Collection,
ContextManager,
@@ -904,7 +903,6 @@ def _custom_sync_async_decorator(
"""
if inspect.iscoroutinefunction(func):
# For this branch, we handle async functions like `async def func() -> RInner`.
# In this branch, R = Awaitable[RInner], for some other type RInner
@wraps(func)
async def _wrapper(
@@ -916,16 +914,15 @@ def _custom_sync_async_decorator(
return await func(*args, **kwargs) # type: ignore[misc]
else:
# The other case here handles sync functions including those decorated with
# `@defer.inlineCallbacks` or that return a `Deferred` or other `Awaitable`.
# The other case here handles both sync functions and those
# decorated with inlineDeferred.
@wraps(func)
def _wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
scope = wrapping_logic(func, *args, **kwargs)
scope.__enter__()
try:
result = func(*args, **kwargs)
if isinstance(result, defer.Deferred):
def call_back(result: R) -> R:
@@ -933,32 +930,20 @@ def _custom_sync_async_decorator(
return result
def err_back(result: R) -> R:
# TODO: Pass the error details into `scope.__exit__(...)` for
# consistency with the other paths.
scope.__exit__(None, None, None)
return result
result.addCallbacks(call_back, err_back)
elif inspect.isawaitable(result):
async def wrap_awaitable() -> Any:
try:
assert isinstance(result, Awaitable)
awaited_result = await result
scope.__exit__(None, None, None)
return awaited_result
except Exception as e:
scope.__exit__(type(e), None, e.__traceback__)
raise
# The original method returned an awaitable, eg. a coroutine, so we
# create another awaitable wrapping it that calls
# `scope.__exit__(...)`.
return wrap_awaitable()
else:
# Just a simple sync function so we can just exit the scope and
# return the result without any fuss.
if inspect.isawaitable(result):
logger.error(
"@trace may not have wrapped %s correctly! "
"The function is not async but returned a %s.",
func.__qualname__,
type(result).__name__,
)
scope.__exit__(None, None, None)
return result

View File

@@ -77,8 +77,6 @@ RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
@attr.s(slots=True, hash=True, auto_attribs=True)
class LaterGauge(Collector):
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
name: str
desc: str
labels: Optional[Sequence[str]] = attr.ib(hash=False)

View File

@@ -120,6 +120,9 @@ class BulkPushRuleEvaluator:
self.should_calculate_push_rules = self.hs.config.push.enable_push
self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
self._intentional_mentions_enabled = (
self.hs.config.experimental.msc3952_intentional_mentions
)
self.room_push_rule_cache_metrics = register_cache(
"cache",
@@ -387,7 +390,10 @@ class BulkPushRuleEvaluator:
del notification_levels[key]
# Pull out any user and room mentions.
has_mentions = EventContentFields.MENTIONS in event.content
has_mentions = (
self._intentional_mentions_enabled
and EventContentFields.MSC3952_MENTIONS in event.content
)
evaluator = PushRuleEvaluator(
_flatten_dict(event),

View File

@@ -124,6 +124,8 @@ class VersionsRestServlet(RestServlet):
is not None,
# Adds support for relation-based redactions as per MSC3912.
"org.matrix.msc3912": self.config.experimental.msc3912_enabled,
# Adds support for unstable "intentional mentions" behaviour.
"org.matrix.msc3952_intentional_mentions": self.config.experimental.msc3952_intentional_mentions,
# Whether recursively provide relations is supported.
"org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations,
# Adds support for deleting account data.

View File

@@ -39,6 +39,7 @@ class UploadResource(DirectServeJsonResource):
self.filepaths = media_repo.filepaths
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.server_name = hs.hostname
self.auth = hs.get_auth()
self.max_upload_size = hs.config.media.max_upload_size
self.clock = hs.get_clock()

View File

@@ -86,14 +86,9 @@ class SQLBaseStore(metaclass=ABCMeta):
room_id: Room where state changed
members_changed: The user_ids of members that have changed
"""
# XXX: If you add something to this function make sure you add it to
# `_invalidate_state_caches_all` as well.
# If there were any membership changes, purge the appropriate caches.
for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
if members_changed:
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
@@ -122,32 +117,6 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
def _invalidate_state_caches_all(self, room_id: str) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
Same as `_invalidate_state_caches`, except that works when we don't know
which memberships have changed.
Args:
room_id: Room where state changed
"""
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("is_host_invited", None)
self._attempt_to_invalidate_cache("is_host_joined", None)
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
) -> bool:

View File

@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from enum import IntEnum
from types import TracebackType
from typing import (
TYPE_CHECKING,
@@ -137,15 +136,6 @@ class BackgroundUpdatePerformance:
return float(self.total_item_count) / float(self.total_duration_ms)
class UpdaterStatus(IntEnum):
# Use negative values for error conditions.
ABORTED = -1
DISABLED = 0
NOT_STARTED = 1
RUNNING_UPDATE = 2
COMPLETE = 3
class BackgroundUpdater:
"""Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
@@ -168,16 +158,11 @@ class BackgroundUpdater:
self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {}
# TODO: all these bool flags make me feel icky---can we combine into a status
# enum?
self._all_done = False
# Whether we're currently running updates
self._running = False
# Marker to be set if we abort and halt all background updates.
self._aborted = False
# Whether background updates are enabled. This allows us to
# enable/disable background updates via the admin API.
self.enabled = True
@@ -190,20 +175,6 @@ class BackgroundUpdater:
self.sleep_duration_ms = hs.config.background_updates.sleep_duration_ms
self.sleep_enabled = hs.config.background_updates.sleep_enabled
def get_status(self) -> UpdaterStatus:
"""An integer summarising the updater status. Used as a metric."""
if self._aborted:
return UpdaterStatus.ABORTED
# TODO: a status for "have seen at least one failure, but haven't aborted yet".
if not self.enabled:
return UpdaterStatus.DISABLED
if self._all_done:
return UpdaterStatus.COMPLETE
if self._running:
return UpdaterStatus.RUNNING_UPDATE
return UpdaterStatus.NOT_STARTED
def register_update_controller_callbacks(
self,
on_update: ON_UPDATE_CALLBACK,
@@ -325,7 +296,6 @@ class BackgroundUpdater:
except Exception:
back_to_back_failures += 1
if back_to_back_failures >= 5:
self._aborted = True
raise RuntimeError(
"5 back-to-back background update failures; aborting."
)

View File

@@ -839,8 +839,9 @@ class EventsPersistenceStorageController:
"group" % (ev.event_id,)
)
continue
if ctx.state_group_deltas:
state_group_deltas.update(ctx.state_group_deltas)
if ctx.prev_group:
state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can

View File

@@ -54,7 +54,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import LaterGauge, register_threadpool
from synapse.metrics import register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -547,12 +547,6 @@ class DatabasePool:
self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
"synapse_background_update_status",
"Background update status",
[],
self.updates.get_status,
)
self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0

View File

@@ -46,12 +46,6 @@ logger = logging.getLogger(__name__)
# based on the current state when notifying workers over replication.
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
# As above, but for invalidating event caches on history deletion
PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
@@ -181,23 +175,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for 'purge history' cache"
)
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
elif row.cache_func == DELETE_ROOM_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for 'delete room' cache"
)
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
@@ -249,9 +226,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
relates_to: Optional[str],
backfilled: bool,
) -> None:
# XXX: If you add something to this function make sure you add it to
# `_invalidate_caches_for_room_events` as well.
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
@@ -297,106 +271,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache("get_threads", (room_id,))
def _invalidate_caches_for_room_events_and_stream(
self, txn: LoggingTransaction, room_id: str
) -> None:
"""Invalidate caches associated with events in a room, and stream to
replication.
Used when we delete events a room, but don't know which events we've
deleted.
"""
self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room_events, room_id)
def _invalidate_caches_for_room_events(self, room_id: str) -> None:
"""Invalidate caches associated with events in a room, and stream to
replication.
Used when we delete events in a room, but don't know which events we've
deleted.
"""
self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_relations_for_event", None)
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
self._attempt_to_invalidate_cache("get_thread_summary", None)
self._attempt_to_invalidate_cache("get_thread_participated", None)
self._attempt_to_invalidate_cache("get_threads", (room_id,))
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
def _invalidate_caches_for_room_and_stream(
self, txn: LoggingTransaction, room_id: str
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.
Used when we delete rooms.
"""
self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room, room_id)
def _invalidate_caches_for_room(self, room_id: str) -> None:
"""Invalidate caches associated with rooms.
Used when we delete rooms.
"""
# If we've deleted the room then we also need to purge all event caches.
self._invalidate_caches_for_room_events(room_id)
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache(
"_get_linearized_receipts_for_room", (room_id,)
)
self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
self._attempt_to_invalidate_cache(
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
# And delete state caches.
self._invalidate_state_caches_all(room_id)
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
@@ -503,14 +377,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"Can't stream invalidate all with magic current state cache"
)
if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
raise Exception(
"Can't stream invalidate all with magic purge history cache"
)
if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
raise Exception("Can't stream invalidate all with magic delete room cache")
if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None

View File

@@ -903,15 +903,6 @@ class EventsWorkerStore(SQLBaseStore):
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)
def _invalidate_local_get_event_cache_all(self) -> None:
"""Clears the in-memory get event caches.
Used when we purge room history.
"""
self._get_event_cache.clear()
self._event_ref.clear()
self._current_event_fetches.clear()
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:

View File

@@ -308,8 +308,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
logger.info("[purge] done")
self._invalidate_caches_for_room_events_and_stream(txn, room_id)
return referenced_state_groups
async def purge_room(self, room_id: str) -> List[int]:
@@ -487,6 +485,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)
self._invalidate_caches_for_room_and_stream(txn, room_id)
# TODO: we could probably usefully do a bunch more cache invalidation here
# XXX: as with purge_history, this is racy, but no worse than other races
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
return state_groups

View File

@@ -88,6 +88,7 @@ def _load_rules(
msc1767_enabled=experimental_config.msc1767_enabled,
msc3664_enabled=experimental_config.msc3664_enabled,
msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
msc3952_intentional_mentions=experimental_config.msc3952_intentional_mentions,
msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs,
)

View File

@@ -927,10 +927,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
raise Exception("Invalid host name")
sql = """
SELECT state_key FROM current_state_events
WHERE membership = ?
SELECT state_key FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (event_id)
WHERE m.membership = ?
AND type = 'm.room.member'
AND room_id = ?
AND c.room_id = ?
AND state_key LIKE ?
LIMIT 1
"""
@@ -1460,6 +1461,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
SELECT stream_ordering, event_id, events.room_id, event_json.json
FROM events
INNER JOIN event_json USING (event_id)
INNER JOIN room_memberships USING (event_id)
WHERE ? <= stream_ordering AND stream_ordering < ?
AND type = 'm.room.member'
ORDER BY stream_ordering DESC

View File

@@ -1061,15 +1061,12 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
# The array of numbers are the weights for the various part of the
# search: (domain, _, display name, localpart)
sql = """
WITH matching_users AS (
SELECT user_id, vector FROM user_directory_search WHERE vector @@ to_tsquery('simple', ?)
LIMIT 10000
)
SELECT d.user_id AS user_id, display_name, avatar_url
FROM matching_users as t
FROM user_directory_search as t
INNER JOIN user_directory AS d USING (user_id)
WHERE
%(where_clause)s
AND vector @@ to_tsquery('simple', ?)
ORDER BY
(CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
* (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
@@ -1098,9 +1095,8 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
"order_case_statements": " ".join(additional_ordering_statements),
}
args = (
(full_query,)
+ join_args
+ (exact_query, prefix_query)
join_args
+ (full_query, exact_query, prefix_query)
+ ordering_arguments
+ (limit + 1,)
)

View File

@@ -22,7 +22,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import MutableStateMap, StateMap
from synapse.types.state import StateFilter
from synapse.util.caches import intern_string
@@ -328,6 +328,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
columns=["event_stream_ordering"],
)
self.db_pool.updates.register_background_update_handler(
"add_event_stream_ordering",
self._add_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
"add_stream_ordering_triggers", self._add_triggers_in_bg
)
async def _background_deduplicate_state(
self, progress: dict, batch_size: int
) -> int:
@@ -504,3 +513,175 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
)
return 1
async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int:
"""
Add denormalised copies of `stream_ordering` from the corresponding row in `events`
to the tables current_state_events, local_current_membership, and room_memberships.
This is done to improve database performance by reduring JOINs.
"""
tables = [
"current_state_events",
"local_current_membership",
"room_memberships",
]
if isinstance(self.database_engine, PostgresEngine):
def check_pg_column(txn: LoggingTransaction, table: str) -> list:
"""
check if the column event_stream_ordering already exists
"""
check_sql = f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{table}' and column_name = 'event_stream_ordering';
"""
txn.execute(check_sql)
column = txn.fetchall()
return column
def add_pg_column(txn: LoggingTransaction, table: str) -> None:
"""
Add column event_stream_ordering to A given table
"""
add_column_sql = f"""
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT;
"""
txn.execute(add_column_sql)
add_fk_sql = f"""
ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey
FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
"""
txn.execute(add_fk_sql)
for table in tables:
res = await self.db_pool.runInteraction(
"check_column", check_pg_column, table
)
# if the column exists do nothing
if not res:
await self.db_pool.runInteraction(
"add_event_stream_ordering",
add_pg_column,
table,
)
await self.db_pool.updates._end_background_update(
"add_event_stream_ordering"
)
return 1
elif isinstance(self.database_engine, Sqlite3Engine):
def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]:
"""
Get table info (to see if column event_stream_ordering exists)
"""
check_sql = f"""
PRAGMA table_info({table})
"""
txn.execute(check_sql)
res = txn.fetchall()
return res
def add_sqlite_column(txn: LoggingTransaction, table: str) -> None:
"""
Add column event_stream_ordering to given table
"""
add_column_sql = f"""
ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
"""
txn.execute(add_column_sql)
for table in tables:
res = await self.db_pool.runInteraction(
"check_column", check_sqlite_column, table
)
columns = [tup[1] for tup in res]
# if the column exists do nothing
if "event_stream_ordering" not in columns:
await self.db_pool.runInteraction(
"add_event_stream_ordering", add_sqlite_column, table
)
await self.db_pool.updates._end_background_update(
"add_event_stream_ordering"
)
return 1
async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int:
"""
Adds triggers to the room membership tables to enforce consistency
"""
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(self.database_engine, Sqlite3Engine):
def add_sqlite_triggers(txn: LoggingTransaction) -> None:
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
txn.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
BEFORE INSERT ON {table}
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
);
END;
"""
)
await self.db_pool.runInteraction(
"add_sqlite_triggers", add_sqlite_triggers
)
elif isinstance(self.database_engine, PostgresEngine):
def add_pg_triggers(txn: LoggingTransaction) -> None:
txn.execute(
"""
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
BEGIN
IF EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
) THEN
RAISE EXCEPTION 'Incorrect event_stream_ordering';
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql;
"""
)
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
txn.execute(
f"""
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE PROCEDURE check_event_stream_ordering()
"""
)
await self.db_pool.runInteraction("add_postgres_triggers", add_pg_triggers)
else:
raise NotImplementedError("Unknown database engine")
await self.db_pool.updates._end_background_update(
"add_stream_ordering_triggers"
)
return 1

View File

@@ -0,0 +1,18 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES
(7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column');

View File

@@ -1,29 +0,0 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
-- we use to improve database performance by reduring JOINs.
-- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables,
-- which will be validated in a later migration. For all new/updated rows the FKEY will be checked.
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;

View File

@@ -1,23 +0,0 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
-- we use to improve database performance by reduring JOINs.
-- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);

View File

@@ -1,79 +0,0 @@
# Copyright 2022 Beeper
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This migration adds triggers to the room membership tables to enforce consistency.
Triggers cannot be expressed in .sql files, so we have to use a separate file.
"""
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
# Complain if the `event_stream_ordering` in membership tables doesn't match
# the `stream_ordering` row with the same `event_id` in `events`.
if isinstance(database_engine, Sqlite3Engine):
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
BEFORE INSERT ON {table}
FOR EACH ROW
BEGIN
SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
WHERE EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
);
END;
"""
)
elif isinstance(database_engine, PostgresEngine):
cur.execute(
"""
CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
BEGIN
IF EXISTS (
SELECT 1 FROM events
WHERE events.event_id = NEW.event_id
AND events.stream_ordering != NEW.event_stream_ordering
) THEN
RAISE EXCEPTION 'Incorrect event_stream_ordering';
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE plpgsql;
"""
)
for table in (
"current_state_events",
"local_current_membership",
"room_memberships",
):
cur.execute(
f"""
CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW
EXECUTE PROCEDURE check_event_stream_ordering()
"""
)
else:
raise NotImplementedError("Unknown database engine")

View File

@@ -0,0 +1,22 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This migration adds triggers to the room membership tables to enforce consistency.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES
(7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering');

View File

@@ -21,27 +21,7 @@ DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_i
-- Overwrite any null thread_id values.
UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
-- Empirically we can end up with entries in the push summary table with both a
-- `NULL` and `main` thread ID, which causes the insert below to fail. We fudge
-- this by deleting any `NULL` rows that have a corresponding `main`.
DELETE FROM event_push_summary AS a WHERE thread_id IS NULL AND EXISTS (
SELECT 1 FROM event_push_summary AS b
WHERE b.thread_id = 'main' AND a.user_id = b.user_id AND a.room_id = b.room_id
);
-- Copy the NULL threads to have a 'main' thread ID.
--
-- Note: Some people seem to have duplicate rows with a `NULL` thread ID, in
-- which case we just fudge it with using MAX of the values. The counts *may* be
-- wrong for such rooms, but a) its an edge case, and b) they'll be fixed when
-- the user reads the room.
INSERT INTO event_push_summary (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
SELECT user_id, room_id, MAX(notif_count), MAX(stream_ordering), MAX(unread_count), MAX(last_receipt_stream_ordering), 'main'
FROM event_push_summary
WHERE thread_id IS NULL
GROUP BY user_id, room_id, thread_id;
DELETE FROM event_push_summary AS a WHERE thread_id IS NULL;
UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
-- Drop the background updates to calculate the indexes used to find null thread_ids.
DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';

View File

@@ -13,8 +13,8 @@
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json)
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
VALUES
(7714, 'current_state_events_stream_ordering_idx', '{}'),
(7714, 'local_current_membership_stream_ordering_idx', '{}'),
(7714, 'room_memberships_stream_ordering_idx', '{}');
(7714, 'current_state_events_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
(7714, 'local_current_membership_stream_ordering_idx', '{}', 'add_event_stream_ordering'),
(7714, 'room_memberships_stream_ordering_idx', '{}', 'add_event_stream_ordering');

View File

@@ -862,5 +862,5 @@ class AsyncLruCache(Generic[KT, VT]):
async def contains(self, key: KT) -> bool:
return self._lru_cache.contains(key)
def clear(self) -> None:
async def clear(self) -> None:
self._lru_cache.clear()

View File

@@ -101,7 +101,8 @@ class TestEventContext(unittest.HomeserverTestCase):
self.assertEqual(
context.state_group_before_event, d_context.state_group_before_event
)
self.assertEqual(context.state_group_deltas, d_context.state_group_deltas)
self.assertEqual(context.prev_group, d_context.prev_group)
self.assertEqual(context.delta_ids, d_context.delta_ids)
self.assertEqual(context.app_service, d_context.app_service)
self.assertEqual(

View File

@@ -163,7 +163,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
self.store._event_ref.clear()
# The rooms should be excluded from the sync response.

View File

@@ -40,7 +40,7 @@ from synapse.server import HomeServer
from synapse.util import Clock
from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase, override_config
from tests.unittest import HomeserverTestCase
def check_logcontext(context: LoggingContextOrSentinel) -> None:
@@ -640,21 +640,3 @@ class FederationClientTests(HomeserverTestCase):
self.cl.build_auth_headers(
b"", b"GET", b"https://example.com", destination_is=b""
)
@override_config(
{
"federation": {
"client_timeout": "180s",
"max_long_retry_delay": "100s",
"max_short_retry_delay": "7s",
"max_long_retries": 20,
"max_short_retries": 5,
}
}
)
def test_configurable_retry_and_delay_values(self) -> None:
self.assertEqual(self.cl.default_timeout, 180)
self.assertEqual(self.cl.max_long_retry_delay, 100)
self.assertEqual(self.cl.max_short_retry_delay, 7)
self.assertEqual(self.cl.max_long_retries, 20)
self.assertEqual(self.cl.max_short_retries, 5)

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Awaitable, cast
from typing import cast
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock
@@ -227,6 +227,8 @@ class LogContextScopeManagerTestCase(TestCase):
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with functions that return deferreds
"""
reactor = MemoryReactorClock()
with LoggingContext("root context"):
@trace_with_opname("fixture_deferred_func", tracer=self._tracer)
@@ -238,6 +240,9 @@ class LogContextScopeManagerTestCase(TestCase):
result_d1 = fixture_deferred_func()
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(result_d1), "foo")
# the span should have been reported
@@ -251,6 +256,8 @@ class LogContextScopeManagerTestCase(TestCase):
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with async functions
"""
reactor = MemoryReactorClock()
with LoggingContext("root context"):
@trace_with_opname("fixture_async_func", tracer=self._tracer)
@@ -260,6 +267,9 @@ class LogContextScopeManagerTestCase(TestCase):
d1 = defer.ensureDeferred(fixture_async_func())
# let the tasks complete
reactor.pump((2,) * 8)
self.assertEqual(self.successResultOf(d1), "foo")
# the span should have been reported
@@ -267,34 +277,3 @@ class LogContextScopeManagerTestCase(TestCase):
[span.operation_name for span in self._reporter.get_spans()],
["fixture_async_func"],
)
def test_trace_decorator_awaitable_return(self) -> None:
"""
Test whether we can use `@trace_with_opname` (`@trace`) and `@tag_args`
with functions that return an awaitable (e.g. a coroutine)
"""
with LoggingContext("root context"):
# Something we can return without `await` to get a coroutine
async def fixture_async_func() -> str:
return "foo"
# The actual kind of function we want to test that returns an awaitable
@trace_with_opname("fixture_awaitable_return_func", tracer=self._tracer)
@tag_args
def fixture_awaitable_return_func() -> Awaitable[str]:
return fixture_async_func()
# Something we can run with `defer.ensureDeferred(runner())` and pump the
# whole async tasks through to completion.
async def runner() -> str:
return await fixture_awaitable_return_func()
d1 = defer.ensureDeferred(runner())
self.assertEqual(self.successResultOf(d1), "foo")
# the span should have been reported
self.assertEqual(
[span.operation_name for span in self._reporter.get_spans()],
["fixture_awaitable_return_func"],
)

View File

@@ -228,6 +228,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
)
return len(result) > 0
@override_config({"experimental_features": {"msc3952_intentional_mentions": True}})
def test_user_mentions(self) -> None:
"""Test the behavior of an event which includes invalid user mentions."""
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
@@ -236,7 +237,9 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.assertFalse(self._create_and_process(bulk_evaluator))
# An empty mentions field should not notify.
self.assertFalse(
self._create_and_process(bulk_evaluator, {EventContentFields.MENTIONS: {}})
self._create_and_process(
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {}}
)
)
# Non-dict mentions should be ignored.
@@ -250,7 +253,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
for mentions in (None, True, False, 1, "foo", []):
self.assertFalse(
self._create_and_process(
bulk_evaluator, {EventContentFields.MENTIONS: mentions}
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: mentions}
)
)
@@ -259,7 +262,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{EventContentFields.MENTIONS: {"user_ids": mentions}},
{EventContentFields.MSC3952_MENTIONS: {"user_ids": mentions}},
)
)
@@ -267,14 +270,14 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{EventContentFields.MENTIONS: {"user_ids": [self.alice]}},
{EventContentFields.MSC3952_MENTIONS: {"user_ids": [self.alice]}},
)
)
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{
EventContentFields.MENTIONS: {
EventContentFields.MSC3952_MENTIONS: {
"user_ids": ["@another:test", self.alice]
}
},
@@ -285,7 +288,11 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.assertTrue(
self._create_and_process(
bulk_evaluator,
{EventContentFields.MENTIONS: {"user_ids": [self.alice, self.alice]}},
{
EventContentFields.MSC3952_MENTIONS: {
"user_ids": [self.alice, self.alice]
}
},
)
)
@@ -300,7 +307,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self._create_and_process(
bulk_evaluator,
{
EventContentFields.MENTIONS: {
EventContentFields.MSC3952_MENTIONS: {
"user_ids": [None, True, False, {}, []]
}
},
@@ -310,7 +317,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self._create_and_process(
bulk_evaluator,
{
EventContentFields.MENTIONS: {
EventContentFields.MSC3952_MENTIONS: {
"user_ids": [None, True, False, {}, [], self.alice]
}
},
@@ -324,11 +331,12 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
{
"body": self.alice,
"msgtype": "m.text",
EventContentFields.MENTIONS: {},
EventContentFields.MSC3952_MENTIONS: {},
},
)
)
@override_config({"experimental_features": {"msc3952_intentional_mentions": True}})
def test_room_mentions(self) -> None:
"""Test the behavior of an event which includes invalid room mentions."""
bulk_evaluator = BulkPushRuleEvaluator(self.hs)
@@ -336,7 +344,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
# Room mentions from those without power should not notify.
self.assertFalse(
self._create_and_process(
bulk_evaluator, {EventContentFields.MENTIONS: {"room": True}}
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {"room": True}}
)
)
@@ -350,7 +358,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
)
self.assertTrue(
self._create_and_process(
bulk_evaluator, {EventContentFields.MENTIONS: {"room": True}}
bulk_evaluator, {EventContentFields.MSC3952_MENTIONS: {"room": True}}
)
)
@@ -366,7 +374,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
self.assertFalse(
self._create_and_process(
bulk_evaluator,
{EventContentFields.MENTIONS: {"room": mentions}},
{EventContentFields.MSC3952_MENTIONS: {"room": mentions}},
)
)
@@ -377,7 +385,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
{
"body": "@room",
"msgtype": "m.text",
EventContentFields.MENTIONS: {},
EventContentFields.MSC3952_MENTIONS: {},
},
)
)

View File

@@ -131,6 +131,9 @@ class ReadMarkerTestCase(unittest.HomeserverTestCase):
event = self.get_success(self.store.get_event(event_id_1, allow_none=True))
assert event is None
# TODO See https://github.com/matrix-org/synapse/issues/13476
self.store.get_event_ordering.invalidate_all()
# Test moving the read marker to a newer event
event_id_2 = send_message()
channel = self.make_request(

View File

@@ -1941,43 +1941,6 @@ class RoomPowerLevelOverridesInPracticeTestCase(RoomBase):
channel.json_body["error"],
)
@unittest.override_config(
{
"default_power_level_content_override": {
"private_chat": {
"events": {
"m.room.avatar": 50,
"m.room.canonical_alias": 50,
"m.room.encryption": 999,
"m.room.history_visibility": 100,
"m.room.name": 50,
"m.room.power_levels": 100,
"m.room.server_acl": 100,
"m.room.tombstone": 100,
},
"events_default": 0,
},
}
},
)
def test_config_override_blocks_encrypted_room(self) -> None:
# Given the server has config for private_chats,
# When I attempt to create an encrypted private_chat room
channel = self.make_request(
"POST",
"/createRoom",
'{"creation_content": {"m.federate": false},"name": "Secret Private Room","preset": "private_chat","initial_state": [{"type": "m.room.encryption","state_key": "","content": {"algorithm": "m.megolm.v1.aes-sha2"}}]}',
)
# Then I am not allowed because the required power level is unattainable
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.result["body"])
self.assertEqual(
"You cannot create an encrypted room. "
+ "user_level (100) < send_level (999)",
channel.json_body["error"],
)
class RoomInitialSyncTestCase(RoomBase):
"""Tests /rooms/$room_id/initialSync."""

View File

@@ -188,7 +188,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
self.event_id = res["event_id"]
# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
def test_simple(self) -> None:
"""Test that we cache events that we pull from the DB."""
@@ -205,7 +205,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
"""
# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
@@ -215,7 +215,7 @@ class EventCacheTestCase(unittest.HomeserverTestCase):
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
# Reset the event cache
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
@@ -390,7 +390,7 @@ class GetEventCancellationTestCase(unittest.HomeserverTestCase):
self.event_id = res["event_id"]
# Reset the event cache so the tests start with it empty
self.store._get_event_cache.clear()
self.get_success(self.store._get_event_cache.clear())
@contextmanager
def blocking_get_event_calls(

View File

@@ -401,10 +401,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
assert persist_events_store is not None
persist_events_store._store_event_txn(
txn,
[
(e, EventContext(self.hs.get_storage_controllers(), {}))
for e in events
],
[(e, EventContext(self.hs.get_storage_controllers())) for e in events],
)
# Actually call the function that calculates the auth chain stuff.

View File

@@ -555,15 +555,10 @@ class StateTestCase(unittest.TestCase):
(e.event_id for e in old_state + [event]), current_state_ids.values()
)
assert context.state_group_before_event is not None
assert context.state_group is not None
self.assertEqual(
context.state_group_deltas.get(
(context.state_group_before_event, context.state_group)
),
{(event.type, event.state_key): event.event_id},
)
self.assertIsNotNone(context.state_group_before_event)
self.assertNotEqual(context.state_group_before_event, context.state_group)
self.assertEqual(context.state_group_before_event, context.prev_group)
self.assertEqual({("state", ""): event.event_id}, context.delta_ids)
@defer.inlineCallbacks
def test_trivial_annotate_message(