Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7ba64b6caf | |||
| 68699d5338 | |||
| 3cabaa84ca | |||
| 74ca7ae720 | |||
| 5102565369 | |||
| 33e0c25279 | |||
| 73a38384f5 | |||
| 4a803e8257 | |||
| 51dbbbb40f | |||
| 6363d63822 | |||
| 3e571561c9 | |||
| a3b80071cd | |||
| f500c7d982 | |||
| df04931f0b | |||
| f56670515b | |||
| db8a8d33fe | |||
| 3b94e40cc8 | |||
| 6b1e3c9c66 | |||
| 1709957395 | |||
| 0de7aa9953 | |||
| e4ca593eb6 | |||
| 978032141b | |||
| 142ba5df89 | |||
| eb5dfc19e5 | |||
| d5da07703d | |||
| 96c556081a |
@@ -14,7 +14,7 @@ jobs:
|
||||
# There's a 'download artifact' action, but it hasn't been updated for the workflow_run action
|
||||
# (https://github.com/actions/download-artifact/issues/60) so instead we get this mess:
|
||||
- name: 📥 Download artifact
|
||||
uses: dawidd6/action-download-artifact@07ab29fd4a977ae4d2b275087cf67563dfdf0295 # v9
|
||||
uses: dawidd6/action-download-artifact@ac66b43f0e6a346234dd65d4d0c8fbb31cb316e5 # v11
|
||||
with:
|
||||
workflow: docs-pr.yaml
|
||||
run_id: ${{ github.event.workflow_run.id }}
|
||||
|
||||
@@ -5,6 +5,9 @@ on:
|
||||
paths:
|
||||
- schema/**
|
||||
- docs/usage/configuration/config_documentation.md
|
||||
push:
|
||||
branches: ["develop", "release-*"]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
validate-schema:
|
||||
@@ -12,7 +15,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
- uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
|
||||
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- name: Install check-jsonschema
|
||||
@@ -38,7 +41,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
- uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
|
||||
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- name: Install PyYAML
|
||||
@@ -51,4 +54,4 @@ jobs:
|
||||
> docs/usage/configuration/config_documentation.md
|
||||
- name: Error in case of any differences
|
||||
# Errors if there are now any modified files (untracked files are ignored).
|
||||
run: 'git diff || ! git status --porcelain=1 | grep "^ M"'
|
||||
run: 'git diff --exit-code'
|
||||
|
||||
+11
-11
@@ -85,7 +85,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
|
||||
with:
|
||||
@@ -149,7 +149,7 @@ jobs:
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
- name: Setup Poetry
|
||||
@@ -210,7 +210,7 @@ jobs:
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
|
||||
with:
|
||||
@@ -227,7 +227,7 @@ jobs:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@0d72692bcfbf448b1e2afa01a67f71b455a9dcec # 1.86.0
|
||||
with:
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
@@ -247,7 +247,7 @@ jobs:
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@56f84321dbccf38fb67ce29ab63e4754056677e0 # master (rust 1.85.1)
|
||||
with:
|
||||
toolchain: nightly-2022-12-01
|
||||
toolchain: nightly-2025-04-23
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
@@ -265,7 +265,7 @@ jobs:
|
||||
uses: dtolnay/rust-toolchain@56f84321dbccf38fb67ce29ab63e4754056677e0 # master (rust 1.85.1)
|
||||
with:
|
||||
# We use nightly so that it correctly groups together imports
|
||||
toolchain: nightly-2022-12-01
|
||||
toolchain: nightly-2025-04-23
|
||||
components: rustfmt
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
@@ -362,7 +362,7 @@ jobs:
|
||||
postgres:${{ matrix.job.postgres-version }}
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
|
||||
@@ -404,7 +404,7 @@ jobs:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
# There aren't wheels for some of the older deps, so we need to install
|
||||
@@ -519,7 +519,7 @@ jobs:
|
||||
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
- name: Run SyTest
|
||||
@@ -663,7 +663,7 @@ jobs:
|
||||
path: synapse
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
- name: Prepare Complement's Prerequisites
|
||||
@@ -695,7 +695,7 @@ jobs:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Install Rust
|
||||
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
|
||||
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
|
||||
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
|
||||
|
||||
- run: cargo test
|
||||
|
||||
Generated
+1315
-49
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1 @@
|
||||
Add support for the [MSC4260 user report API](https://github.com/matrix-org/matrix-spec-proposals/pull/4260).
|
||||
@@ -0,0 +1 @@
|
||||
Increase performance of introspecting access tokens when using delegated auth.
|
||||
@@ -0,0 +1 @@
|
||||
Generate config documentation from JSON Schema file.
|
||||
@@ -0,0 +1 @@
|
||||
Fix an issue where during state resolution for v11 rooms Synapse would incorrectly calculate the power level of the creator when there was no power levels event in the room.
|
||||
@@ -0,0 +1 @@
|
||||
Fix long-standing bug where sliding sync did not honour the `room_id_to_include` config option.
|
||||
@@ -0,0 +1 @@
|
||||
Log user deactivations.
|
||||
@@ -0,0 +1 @@
|
||||
Enable [`flake8-logging`](https://docs.astral.sh/ruff/rules/#flake8-logging-log) and [`flake8-logging-format`](https://docs.astral.sh/ruff/rules/#flake8-logging-format-g) rules in Ruff and fix related issues throughout the codebase.
|
||||
@@ -0,0 +1 @@
|
||||
Fix an issue where "Lock timeout is getting excessive" warnings would be logged even when the lock timeout was <10 minutes.
|
||||
@@ -0,0 +1 @@
|
||||
Fix an issue where Synapse could calculate the wrong power level for the creator of the room if there was no power levels event.
|
||||
@@ -0,0 +1 @@
|
||||
Clean up old, unused rows from the `device_federation_inbox` table.
|
||||
@@ -0,0 +1 @@
|
||||
Fix an issue where during state resolution for v11 rooms Synapse would incorrectly calculate the power level of the creator when there was no power levels event in the room.
|
||||
@@ -0,0 +1 @@
|
||||
Run config schema CI on develop and release branches.
|
||||
@@ -0,0 +1 @@
|
||||
Increase performance of introspecting access tokens when using delegated auth.
|
||||
@@ -0,0 +1 @@
|
||||
Fix typo in user type documentation.
|
||||
@@ -0,0 +1 @@
|
||||
Allow worker processes to send server notices.
|
||||
@@ -0,0 +1 @@
|
||||
Update PyO3 to version 0.25.
|
||||
@@ -770,7 +770,7 @@ This setting has the following sub-options:
|
||||
|
||||
* `default_user_type` (string|null): The default user type to use for registering new users when no value has been specified. Defaults to none. Defaults to `null`.
|
||||
|
||||
* `extra_user_types` (list): Array of additional user types to allow. These are treated as real users. Defaults to `[]`.
|
||||
* `extra_user_types` (array): Array of additional user types to allow. These are treated as real users. Defaults to `[]`.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
@@ -1937,6 +1937,33 @@ rc_delayed_event_mgmt:
|
||||
burst_count: 20.0
|
||||
```
|
||||
---
|
||||
### `rc_reports`
|
||||
|
||||
*(object)* Ratelimiting settings for reporting content.
|
||||
This is a ratelimiting option that ratelimits reports made by users about content they see.
|
||||
Setting this to a high value allows users to report content quickly, possibly in duplicate. This can result in higher database usage.
|
||||
|
||||
This setting has the following sub-options:
|
||||
|
||||
* `per_second` (number): Maximum number of requests a client can send per second.
|
||||
|
||||
* `burst_count` (number): Maximum number of requests a client can send before being throttled.
|
||||
|
||||
Default configuration:
|
||||
```yaml
|
||||
rc_reports:
|
||||
per_user:
|
||||
per_second: 1.0
|
||||
burst_count: 5.0
|
||||
```
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
rc_reports:
|
||||
per_second: 2.0
|
||||
burst_count: 20.0
|
||||
```
|
||||
---
|
||||
### `federation_rr_transactions_per_room_per_second`
|
||||
|
||||
*(integer)* Sets outgoing federation transaction frequency for sending read-receipts, per-room.
|
||||
|
||||
Generated
+7
-7
@@ -2256,19 +2256,19 @@ rpds-py = ">=0.7.0"
|
||||
|
||||
[[package]]
|
||||
name = "requests"
|
||||
version = "2.32.2"
|
||||
version = "2.32.4"
|
||||
description = "Python HTTP for Humans."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main", "dev"]
|
||||
files = [
|
||||
{file = "requests-2.32.2-py3-none-any.whl", hash = "sha256:fc06670dd0ed212426dfeb94fc1b983d917c4f9847c863f313c9dfaaffb7c23c"},
|
||||
{file = "requests-2.32.2.tar.gz", hash = "sha256:dd951ff5ecf3e3b3aa26b40703ba77495dab41da839ae72ef3c8e5d8e2433289"},
|
||||
{file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"},
|
||||
{file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = ">=2017.4.17"
|
||||
charset-normalizer = ">=2,<4"
|
||||
charset_normalizer = ">=2,<4"
|
||||
idna = ">=2.5,<4"
|
||||
urllib3 = ">=1.21.1,<3"
|
||||
|
||||
@@ -3058,14 +3058,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-requests"
|
||||
version = "2.32.0.20250328"
|
||||
version = "2.32.4.20250611"
|
||||
description = "Typing stubs for requests"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "types_requests-2.32.0.20250328-py3-none-any.whl", hash = "sha256:72ff80f84b15eb3aa7a8e2625fffb6a93f2ad5a0c20215fc1dcfa61117bcb2a2"},
|
||||
{file = "types_requests-2.32.0.20250328.tar.gz", hash = "sha256:c9e67228ea103bd811c96984fac36ed2ae8da87a36a633964a21f199d60baf32"},
|
||||
{file = "types_requests-2.32.4.20250611-py3-none-any.whl", hash = "sha256:ad2fe5d3b0cb3c2c902c8815a70e7fb2302c4b8c1f77bdcd738192cdb3878072"},
|
||||
{file = "types_requests-2.32.4.20250611.tar.gz", hash = "sha256:741c8777ed6425830bf51e54d6abe245f79b4dcb9019f1622b773463946bf826"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
||||
@@ -74,6 +74,10 @@ select = [
|
||||
"PIE",
|
||||
# flake8-executable
|
||||
"EXE",
|
||||
# flake8-logging
|
||||
"LOG",
|
||||
# flake8-logging-format
|
||||
"G",
|
||||
]
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
|
||||
+12
-4
@@ -7,7 +7,7 @@ name = "synapse"
|
||||
version = "0.1.0"
|
||||
|
||||
edition = "2021"
|
||||
rust-version = "1.66.0"
|
||||
rust-version = "1.81.0"
|
||||
|
||||
[lib]
|
||||
name = "synapse"
|
||||
@@ -30,19 +30,27 @@ http = "1.1.0"
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.17"
|
||||
mime = "0.3.17"
|
||||
pyo3 = { version = "0.24.2", features = [
|
||||
pyo3 = { version = "0.25.1", features = [
|
||||
"macros",
|
||||
"anyhow",
|
||||
"abi3",
|
||||
"abi3-py39",
|
||||
] }
|
||||
pyo3-log = "0.12.0"
|
||||
pythonize = "0.24.0"
|
||||
pyo3-log = "0.12.4"
|
||||
pythonize = "0.25.0"
|
||||
regex = "1.6.0"
|
||||
sha2 = "0.10.8"
|
||||
serde = { version = "1.0.144", features = ["derive"] }
|
||||
serde_json = "1.0.85"
|
||||
ulid = "1.1.2"
|
||||
reqwest = { version = "0.12.15", default-features = false, features = [
|
||||
"http2",
|
||||
"stream",
|
||||
"rustls-tls-native-roots",
|
||||
] }
|
||||
http-body-util = "0.1.3"
|
||||
futures = "0.3.31"
|
||||
tokio = { version = "1.44.2", features = ["rt", "rt-multi-thread"] }
|
||||
|
||||
[features]
|
||||
extension-module = ["pyo3/extension-module"]
|
||||
|
||||
@@ -58,3 +58,15 @@ impl NotFoundError {
|
||||
NotFoundError::new_err(())
|
||||
}
|
||||
}
|
||||
|
||||
import_exception!(synapse.api.errors, HttpResponseException);
|
||||
|
||||
impl HttpResponseException {
|
||||
pub fn new(status: StatusCode, bytes: Vec<u8>) -> pyo3::PyErr {
|
||||
HttpResponseException::new_err((
|
||||
status.as_u16(),
|
||||
status.canonical_reason().unwrap_or_default(),
|
||||
bytes,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,218 @@
|
||||
/*
|
||||
* This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
*
|
||||
* Copyright (C) 2025 New Vector, Ltd
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* See the GNU Affero General Public License for more details:
|
||||
* <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
*/
|
||||
|
||||
use std::{collections::HashMap, future::Future, panic::AssertUnwindSafe, sync::LazyLock};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
use pyo3::{exceptions::PyException, prelude::*, types::PyString};
|
||||
use reqwest::RequestBuilder;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use crate::errors::HttpResponseException;
|
||||
|
||||
/// The tokio runtime that we're using to run async Rust libs.
|
||||
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(4)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
/// A reference to the `Deferred` python class.
|
||||
static DEFERRED_CLASS: LazyLock<PyObject> = LazyLock::new(|| {
|
||||
Python::with_gil(|py| {
|
||||
py.import("twisted.internet.defer")
|
||||
.expect("module 'twisted.internet.defer' should be importable")
|
||||
.getattr("Deferred")
|
||||
.expect("module 'twisted.internet.defer' should have a 'Deferred' class")
|
||||
.unbind()
|
||||
})
|
||||
});
|
||||
|
||||
/// A reference to the twisted `reactor`.
|
||||
static TWISTED_REACTOR: LazyLock<Py<PyModule>> = LazyLock::new(|| {
|
||||
Python::with_gil(|py| {
|
||||
py.import("twisted.internet.reactor")
|
||||
.expect("module 'twisted.internet.reactor' should be importable")
|
||||
.unbind()
|
||||
})
|
||||
});
|
||||
|
||||
/// Called when registering modules with python.
|
||||
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
let child_module: Bound<'_, PyModule> = PyModule::new(py, "http_client")?;
|
||||
child_module.add_class::<HttpClient>()?;
|
||||
|
||||
// Make sure we fail early if we can't build the lazy statics.
|
||||
LazyLock::force(&RUNTIME);
|
||||
LazyLock::force(&DEFERRED_CLASS);
|
||||
|
||||
m.add_submodule(&child_module)?;
|
||||
|
||||
// We need to manually add the module to sys.modules to make `from
|
||||
// synapse.synapse_rust import acl` work.
|
||||
py.import("sys")?
|
||||
.getattr("modules")?
|
||||
.set_item("synapse.synapse_rust.http_client", child_module)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
#[derive(Clone)]
|
||||
struct HttpClient {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl HttpClient {
|
||||
#[new]
|
||||
pub fn py_new(user_agent: &str) -> PyResult<HttpClient> {
|
||||
// The twisted reactor can only be imported after Synapse has been
|
||||
// imported, to allow Synapse to change the twisted reactor. If we try
|
||||
// and import the reactor too early twisted installs a default reactor,
|
||||
// which can't be replaced.
|
||||
LazyLock::force(&TWISTED_REACTOR);
|
||||
|
||||
Ok(HttpClient {
|
||||
client: reqwest::Client::builder()
|
||||
.user_agent(user_agent)
|
||||
.build()
|
||||
.context("building reqwest client")?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get<'a>(
|
||||
&self,
|
||||
py: Python<'a>,
|
||||
url: String,
|
||||
response_limit: usize,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
self.send_request(py, self.client.get(url), response_limit)
|
||||
}
|
||||
|
||||
pub fn post<'a>(
|
||||
&self,
|
||||
py: Python<'a>,
|
||||
url: String,
|
||||
response_limit: usize,
|
||||
headers: HashMap<String, String>,
|
||||
request_body: String,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let mut builder = self.client.post(url);
|
||||
for (name, value) in headers {
|
||||
builder = builder.header(name, value);
|
||||
}
|
||||
builder = builder.body(request_body);
|
||||
|
||||
self.send_request(py, builder, response_limit)
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClient {
|
||||
fn send_request<'a>(
|
||||
&self,
|
||||
py: Python<'a>,
|
||||
builder: RequestBuilder,
|
||||
response_limit: usize,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
create_deferred(py, async move {
|
||||
let response = builder.send().await.context("sending request")?;
|
||||
|
||||
let status = response.status();
|
||||
|
||||
let mut stream = response.bytes_stream();
|
||||
let mut buffer = Vec::new();
|
||||
while let Some(chunk) = stream.try_next().await.context("reading body")? {
|
||||
if buffer.len() + chunk.len() > response_limit {
|
||||
Err(anyhow::anyhow!("Response size too large"))?;
|
||||
}
|
||||
|
||||
buffer.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
if !status.is_success() {
|
||||
return Err(HttpResponseException::new(status, buffer));
|
||||
}
|
||||
|
||||
let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?;
|
||||
|
||||
Ok(r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a twisted deferred from the given future, spawning the task on the
|
||||
/// tokio runtime.
|
||||
///
|
||||
/// Does not handle deferred cancellation or contextvars.
|
||||
fn create_deferred<F, O>(py: Python, fut: F) -> PyResult<Bound<'_, PyAny>>
|
||||
where
|
||||
F: Future<Output = PyResult<O>> + Send + 'static,
|
||||
for<'a> O: IntoPyObject<'a>,
|
||||
{
|
||||
let deferred = DEFERRED_CLASS.bind(py).call0()?;
|
||||
let deferred_callback = deferred.getattr("callback")?.unbind();
|
||||
let deferred_errback = deferred.getattr("errback")?.unbind();
|
||||
|
||||
RUNTIME.spawn(async move {
|
||||
// TODO: Is it safe to assert unwind safety here? I think so, as we
|
||||
// don't use anything that could be tainted by the panic afterwards.
|
||||
// Note that `.spawn(..)` asserts unwind safety on the future too.
|
||||
let res = AssertUnwindSafe(fut).catch_unwind().await;
|
||||
|
||||
Python::with_gil(move |py| {
|
||||
// Flatten the panic into standard python error
|
||||
let res = match res {
|
||||
Ok(r) => r,
|
||||
Err(panic_err) => {
|
||||
let panic_message = get_panic_message(&panic_err);
|
||||
Err(PyException::new_err(
|
||||
PyString::new(py, panic_message).unbind(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Send the result to the deferred, via `.callback(..)` or `.errback(..)`
|
||||
match res {
|
||||
Ok(obj) => {
|
||||
TWISTED_REACTOR
|
||||
.call_method(py, "callFromThread", (deferred_callback, obj), None)
|
||||
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
|
||||
}
|
||||
Err(err) => {
|
||||
TWISTED_REACTOR
|
||||
.call_method(py, "callFromThread", (deferred_errback, err), None)
|
||||
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Ok(deferred)
|
||||
}
|
||||
|
||||
/// Try and get the panic message out of the panic
|
||||
fn get_panic_message<'a>(panic_err: &'a (dyn std::any::Any + Send + 'static)) -> &'a str {
|
||||
// Apparently this is how you extract the panic message from a panic
|
||||
if let Some(str_slice) = panic_err.downcast_ref::<&str>() {
|
||||
str_slice
|
||||
} else if let Some(string) = panic_err.downcast_ref::<String>() {
|
||||
string
|
||||
} else {
|
||||
"unknown error"
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ pub mod acl;
|
||||
pub mod errors;
|
||||
pub mod events;
|
||||
pub mod http;
|
||||
pub mod http_client;
|
||||
pub mod identifier;
|
||||
pub mod matrix_const;
|
||||
pub mod push;
|
||||
@@ -50,6 +51,7 @@ fn synapse_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
acl::register_module(py, m)?;
|
||||
push::register_module(py, m)?;
|
||||
events::register_module(py, m)?;
|
||||
http_client::register_module(py, m)?;
|
||||
rendezvous::register_module(py, m)?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -2185,6 +2185,23 @@ properties:
|
||||
examples:
|
||||
- per_second: 2.0
|
||||
burst_count: 20.0
|
||||
rc_reports:
|
||||
$ref: "#/$defs/rc"
|
||||
description: >-
|
||||
Ratelimiting settings for reporting content.
|
||||
|
||||
This is a ratelimiting option that ratelimits reports made by users
|
||||
about content they see.
|
||||
|
||||
Setting this to a high value allows users to report content quickly, possibly in
|
||||
duplicate. This can result in higher database usage.
|
||||
default:
|
||||
per_user:
|
||||
per_second: 1.0
|
||||
burst_count: 5.0
|
||||
examples:
|
||||
- per_second: 2.0
|
||||
burst_count: 20.0
|
||||
federation_rr_transactions_per_room_per_second:
|
||||
type: integer
|
||||
description: >-
|
||||
|
||||
@@ -243,7 +243,7 @@ def do_lint() -> Set[str]:
|
||||
importlib.import_module(module_info.name)
|
||||
except ModelCheckerException as e:
|
||||
logger.warning(
|
||||
f"Bad annotation found when importing {module_info.name}"
|
||||
"Bad annotation found when importing %s", module_info.name
|
||||
)
|
||||
failures.add(format_model_checker_exception(e))
|
||||
|
||||
|
||||
@@ -37,7 +37,9 @@ from synapse.appservice import ApplicationService
|
||||
from synapse.http import get_request_user_agent
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.state import CREATE_KEY, POWER_KEY
|
||||
from synapse.types import Requester, create_requester
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.cancellation import cancellable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -216,18 +218,20 @@ class BaseAuth:
|
||||
# by checking if they would (theoretically) be able to change the
|
||||
# m.room.canonical_alias events
|
||||
|
||||
power_level_event = (
|
||||
await self._storage_controllers.state.get_current_state_event(
|
||||
room_id, EventTypes.PowerLevels, ""
|
||||
)
|
||||
auth_events = await self._storage_controllers.state.get_current_state(
|
||||
room_id,
|
||||
StateFilter.from_types(
|
||||
[
|
||||
POWER_KEY,
|
||||
CREATE_KEY,
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
auth_events = {}
|
||||
if power_level_event:
|
||||
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
|
||||
|
||||
send_level = event_auth.get_send_level(
|
||||
EventTypes.CanonicalAlias, "", power_level_event
|
||||
EventTypes.CanonicalAlias,
|
||||
"",
|
||||
auth_events.get(POWER_KEY),
|
||||
)
|
||||
user_level = event_auth.get_user_power_level(
|
||||
requester.user.to_string(), auth_events
|
||||
|
||||
@@ -30,9 +30,6 @@ from authlib.oauth2.rfc7662 import IntrospectionToken
|
||||
from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
|
||||
from prometheus_client import Histogram
|
||||
|
||||
from twisted.web.client import readBody
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
from synapse.api.auth.base import BaseAuth
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
@@ -43,8 +40,14 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
active_span,
|
||||
force_tracing,
|
||||
inject_request_headers,
|
||||
start_active_span,
|
||||
)
|
||||
from synapse.synapse_rust.http_client import HttpClient
|
||||
from synapse.types import Requester, UserID, create_requester
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
|
||||
@@ -179,6 +182,10 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
self._admin_token: Callable[[], Optional[str]] = self._config.admin_token
|
||||
self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
|
||||
|
||||
self._rust_http_client = HttpClient(
|
||||
user_agent=self._http_client.user_agent.decode("utf8")
|
||||
)
|
||||
|
||||
# # Token Introspection Cache
|
||||
# This remembers what users/devices are represented by which access tokens,
|
||||
# in order to reduce overall system load:
|
||||
@@ -301,7 +308,6 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
introspection_endpoint = await self._introspection_endpoint()
|
||||
raw_headers: Dict[str, str] = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"User-Agent": str(self._http_client.user_agent, "utf-8"),
|
||||
"Accept": "application/json",
|
||||
# Tell MAS that we support reading the device ID as an explicit
|
||||
# value, not encoded in the scope. This is supported by MAS 0.15+
|
||||
@@ -315,38 +321,34 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
uri, raw_headers, body = self._client_auth.prepare(
|
||||
method="POST", uri=introspection_endpoint, headers=raw_headers, body=body
|
||||
)
|
||||
headers = Headers({k: [v] for (k, v) in raw_headers.items()})
|
||||
|
||||
# Do the actual request
|
||||
# We're not using the SimpleHttpClient util methods as we don't want to
|
||||
# check the HTTP status code, and we do the body encoding ourselves.
|
||||
|
||||
logger.debug("Fetching token from MAS")
|
||||
start_time = self._clock.time()
|
||||
try:
|
||||
response = await self._http_client.request(
|
||||
method="POST",
|
||||
uri=uri,
|
||||
data=body.encode("utf-8"),
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
resp_body = await make_deferred_yieldable(readBody(response))
|
||||
with start_active_span("mas-introspect-token"):
|
||||
inject_request_headers(raw_headers)
|
||||
with PreserveLoggingContext():
|
||||
resp_body = await self._rust_http_client.post(
|
||||
url=uri,
|
||||
response_limit=1 * 1024 * 1024,
|
||||
headers=raw_headers,
|
||||
request_body=body,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels(e.code).observe(end_time - start_time)
|
||||
raise
|
||||
except Exception:
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels("ERR").observe(end_time - start_time)
|
||||
raise
|
||||
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels(response.code).observe(
|
||||
end_time - start_time
|
||||
)
|
||||
logger.debug("Fetched token from MAS")
|
||||
|
||||
if response.code < 200 or response.code >= 300:
|
||||
raise HttpResponseException(
|
||||
response.code,
|
||||
response.phrase.decode("ascii", errors="replace"),
|
||||
resp_body,
|
||||
)
|
||||
end_time = self._clock.time()
|
||||
introspection_response_timer.labels(200).observe(end_time - start_time)
|
||||
|
||||
resp = json_decoder.decode(resp_body.decode("utf-8"))
|
||||
|
||||
@@ -475,7 +477,7 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
# XXX: This is a temporary solution so that the admin API can be called by
|
||||
# the OIDC provider. This will be removed once we have OIDC client
|
||||
# credentials grant support in matrix-authentication-service.
|
||||
logging.info("Admin toked used")
|
||||
logger.info("Admin toked used")
|
||||
# XXX: that user doesn't exist and won't be provisioned.
|
||||
# This is mostly fine for admin calls, but we should also think about doing
|
||||
# requesters without a user_id.
|
||||
|
||||
@@ -445,8 +445,8 @@ def listen_http(
|
||||
# getHost() returns a UNIXAddress which contains an instance variable of 'name'
|
||||
# encoded as a byte string. Decode as utf-8 so pretty.
|
||||
logger.info(
|
||||
"Synapse now listening on Unix Socket at: "
|
||||
f"{ports[0].getHost().name.decode('utf-8')}"
|
||||
"Synapse now listening on Unix Socket at: %s",
|
||||
ports[0].getHost().name.decode("utf-8"),
|
||||
)
|
||||
|
||||
return ports
|
||||
|
||||
@@ -28,15 +28,13 @@ from prometheus_client import Gauge
|
||||
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.constants import ONE_HOUR_SECONDS, ONE_MINUTE_SECONDS
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
|
||||
MILLISECONDS_PER_SECOND = 1000
|
||||
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
|
||||
@@ -173,7 +171,7 @@ async def phone_stats_home(
|
||||
stats["log_level"] = logging.getLevelName(log_level)
|
||||
|
||||
logger.info(
|
||||
"Reporting stats to %s: %s" % (hs.config.metrics.report_stats_endpoint, stats)
|
||||
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
|
||||
)
|
||||
try:
|
||||
await hs.get_proxied_http_client().put_json(
|
||||
|
||||
@@ -461,7 +461,7 @@ class _TransactionController:
|
||||
recoverer = self.recoverers.get(service.id)
|
||||
if not recoverer:
|
||||
# No need to force a retry on a happy AS.
|
||||
logger.info(f"{service.id} is not in recovery, not forcing retry")
|
||||
logger.info("%s is not in recovery, not forcing retry", service.id)
|
||||
return
|
||||
|
||||
recoverer.force_retry()
|
||||
|
||||
+12
-10
@@ -51,6 +51,8 @@ if TYPE_CHECKING:
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_LOG_CONFIG = Template(
|
||||
"""\
|
||||
# Log configuration for Synapse.
|
||||
@@ -291,7 +293,7 @@ def _load_logging_config(log_config_path: str) -> None:
|
||||
log_config = yaml.safe_load(f.read())
|
||||
|
||||
if not log_config:
|
||||
logging.warning("Loaded a blank logging config?")
|
||||
logger.warning("Loaded a blank logging config?")
|
||||
|
||||
# If the old structured logging configuration is being used, raise an error.
|
||||
if "structured" in log_config and log_config.get("structured"):
|
||||
@@ -312,7 +314,7 @@ def _reload_logging_config(log_config_path: Optional[str]) -> None:
|
||||
return
|
||||
|
||||
_load_logging_config(log_config_path)
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
|
||||
logger.info("Reloaded log config from %s due to SIGHUP", log_config_path)
|
||||
|
||||
|
||||
def setup_logging(
|
||||
@@ -349,17 +351,17 @@ def setup_logging(
|
||||
appbase.register_sighup(_reload_logging_config, log_config_path)
|
||||
|
||||
# Log immediately so we can grep backwards.
|
||||
logging.warning("***** STARTING SERVER *****")
|
||||
logging.warning(
|
||||
logger.warning("***** STARTING SERVER *****")
|
||||
logger.warning(
|
||||
"Server %s version %s",
|
||||
sys.argv[0],
|
||||
SYNAPSE_VERSION,
|
||||
)
|
||||
logging.warning("Copyright (c) 2023 New Vector, Inc")
|
||||
logging.warning(
|
||||
logger.warning("Copyright (c) 2023 New Vector, Inc")
|
||||
logger.warning(
|
||||
"Licensed under the AGPL 3.0 license. Website: https://github.com/element-hq/synapse"
|
||||
)
|
||||
logging.info("Server hostname: %s", config.server.server_name)
|
||||
logging.info("Public Base URL: %s", config.server.public_baseurl)
|
||||
logging.info("Instance name: %s", hs.get_instance_name())
|
||||
logging.info("Twisted reactor: %s", type(reactor).__name__)
|
||||
logger.info("Server hostname: %s", config.server.server_name)
|
||||
logger.info("Public Base URL: %s", config.server.public_baseurl)
|
||||
logger.info("Instance name: %s", hs.get_instance_name())
|
||||
logger.info("Twisted reactor: %s", type(reactor).__name__)
|
||||
|
||||
@@ -240,3 +240,9 @@ class RatelimitConfig(Config):
|
||||
"rc_delayed_event_mgmt",
|
||||
defaults={"per_second": 1, "burst_count": 5},
|
||||
)
|
||||
|
||||
self.rc_reports = RatelimitSettings.parse(
|
||||
config,
|
||||
"rc_reports",
|
||||
defaults={"per_second": 1, "burst_count": 5},
|
||||
)
|
||||
|
||||
@@ -27,7 +27,7 @@ from synapse.types import JsonDict
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
logger = logging.Logger(__name__)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoomDefaultEncryptionTypes:
|
||||
|
||||
@@ -41,7 +41,7 @@ from synapse.util.stringutils import parse_and_validate_server_name
|
||||
from ._base import Config, ConfigError
|
||||
from ._util import validate_config
|
||||
|
||||
logger = logging.Logger(__name__)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DIRECT_TCP_ERROR = """
|
||||
Using direct TCP replication for workers is no longer supported.
|
||||
|
||||
+20
-13
@@ -64,6 +64,7 @@ from synapse.api.room_versions import (
|
||||
RoomVersion,
|
||||
RoomVersions,
|
||||
)
|
||||
from synapse.state import CREATE_KEY
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
MutableStateMap,
|
||||
@@ -308,6 +309,13 @@ def check_state_dependent_auth_rules(
|
||||
|
||||
auth_dict = {(e.type, e.state_key): e for e in auth_events}
|
||||
|
||||
# Later code relies on there being a create event e.g _can_federate, _is_membership_change_allowed
|
||||
# so produce a more intelligible error if we don't have one.
|
||||
if auth_dict.get(CREATE_KEY) is None:
|
||||
raise AuthError(
|
||||
403, f"Event {event.event_id} is missing a create event in auth_events."
|
||||
)
|
||||
|
||||
# additional check for m.federate
|
||||
creating_domain = get_domain_from_id(event.room_id)
|
||||
originating_domain = get_domain_from_id(event.sender)
|
||||
@@ -1010,11 +1018,16 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in
|
||||
user_id: user's id to look up in power_levels
|
||||
auth_events:
|
||||
state in force at this point in the room (or rather, a subset of
|
||||
it including at least the create event and power levels event.
|
||||
it including at least the create event, and possibly a power levels event).
|
||||
|
||||
Returns:
|
||||
the user's power level in this room.
|
||||
"""
|
||||
create_event = auth_events.get(CREATE_KEY)
|
||||
assert create_event is not None, (
|
||||
"A create event in the auth events chain is required to calculate user power level correctly,"
|
||||
" but was not found. This indicates a bug"
|
||||
)
|
||||
power_level_event = get_power_level_event(auth_events)
|
||||
if power_level_event:
|
||||
level = power_level_event.content.get("users", {}).get(user_id)
|
||||
@@ -1028,18 +1041,12 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in
|
||||
else:
|
||||
# if there is no power levels event, the creator gets 100 and everyone
|
||||
# else gets 0.
|
||||
|
||||
# some things which call this don't pass the create event: hack around
|
||||
# that.
|
||||
key = (EventTypes.Create, "")
|
||||
create_event = auth_events.get(key)
|
||||
if create_event is not None:
|
||||
if create_event.room_version.implicit_room_creator:
|
||||
creator = create_event.sender
|
||||
else:
|
||||
creator = create_event.content[EventContentFields.ROOM_CREATOR]
|
||||
if creator == user_id:
|
||||
return 100
|
||||
if create_event.room_version.implicit_room_creator:
|
||||
creator = create_event.sender
|
||||
else:
|
||||
creator = create_event.content[EventContentFields.ROOM_CREATOR]
|
||||
if creator == user_id:
|
||||
return 100
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -195,15 +195,18 @@ class InviteAutoAccepter:
|
||||
except SynapseError as e:
|
||||
if e.code == HTTPStatus.FORBIDDEN:
|
||||
logger.debug(
|
||||
f"Update_room_membership was forbidden. This can sometimes be expected for remote invites. Exception: {e}"
|
||||
"Update_room_membership was forbidden. This can sometimes be expected for remote invites. Exception: %s",
|
||||
e,
|
||||
)
|
||||
else:
|
||||
logger.warn(
|
||||
f"Update_room_membership raised the following unexpected (SynapseError) exception: {e}"
|
||||
logger.warning(
|
||||
"Update_room_membership raised the following unexpected (SynapseError) exception: %s",
|
||||
e,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
f"Update_room_membership raised the following unexpected exception: {e}"
|
||||
logger.warning(
|
||||
"Update_room_membership raised the following unexpected exception: %s",
|
||||
e,
|
||||
)
|
||||
|
||||
sleep = 2**retries
|
||||
|
||||
@@ -1818,7 +1818,7 @@ class FederationClient(FederationBase):
|
||||
)
|
||||
return timestamp_to_event_response
|
||||
except SynapseError as e:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s",
|
||||
room_id,
|
||||
timestamp,
|
||||
|
||||
@@ -928,7 +928,8 @@ class FederationServer(FederationBase):
|
||||
# joins) or the full state (for full joins).
|
||||
# Return a 404 as we would if we weren't in the room at all.
|
||||
logger.info(
|
||||
f"Rejecting /send_{membership_type} to %s because it's a partial state room",
|
||||
"Rejecting /send_%s to %s because it's a partial state room",
|
||||
membership_type,
|
||||
room_id,
|
||||
)
|
||||
raise SynapseError(
|
||||
|
||||
@@ -495,7 +495,7 @@ class AdminHandler:
|
||||
)
|
||||
except Exception as ex:
|
||||
logger.info(
|
||||
f"Redaction of event {event.event_id} failed due to: {ex}"
|
||||
"Redaction of event %s failed due to: %s", event.event_id, ex
|
||||
)
|
||||
result["failed_redactions"][event.event_id] = str(ex)
|
||||
await self._task_scheduler.update_task(task.id, result=result)
|
||||
|
||||
@@ -465,9 +465,7 @@ class ApplicationServicesHandler:
|
||||
service, "read_receipt"
|
||||
)
|
||||
if new_token is not None and new_token.stream <= from_key:
|
||||
logger.debug(
|
||||
"Rejecting token lower than or equal to stored: %s" % (new_token,)
|
||||
)
|
||||
logger.debug("Rejecting token lower than or equal to stored: %s", new_token)
|
||||
return []
|
||||
|
||||
from_token = MultiWriterStreamToken(stream=from_key)
|
||||
@@ -509,9 +507,7 @@ class ApplicationServicesHandler:
|
||||
service, "presence"
|
||||
)
|
||||
if new_token is not None and new_token <= from_key:
|
||||
logger.debug(
|
||||
"Rejecting token lower than or equal to stored: %s" % (new_token,)
|
||||
)
|
||||
logger.debug("Rejecting token lower than or equal to stored: %s", new_token)
|
||||
return []
|
||||
|
||||
for user in users:
|
||||
|
||||
@@ -1895,7 +1895,7 @@ def load_single_legacy_password_auth_provider(
|
||||
try:
|
||||
provider = module(config=config, account_handler=api)
|
||||
except Exception as e:
|
||||
logger.error("Error while initializing %r: %s", module, e)
|
||||
logger.exception("Error while initializing %r: %s", module, e)
|
||||
raise
|
||||
|
||||
# All methods that the module provides should be async, but this wasn't enforced
|
||||
@@ -2428,7 +2428,7 @@ class PasswordAuthProvider:
|
||||
except CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error("Module raised an exception in is_3pid_allowed: %s", e)
|
||||
logger.exception("Module raised an exception in is_3pid_allowed: %s", e)
|
||||
raise SynapseError(code=500, msg="Internal Server Error")
|
||||
|
||||
return True
|
||||
|
||||
@@ -96,6 +96,14 @@ class DeactivateAccountHandler:
|
||||
403, "Deactivation of this user is forbidden", Codes.FORBIDDEN
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"%s requested deactivation of %s erase_data=%s id_server=%s",
|
||||
requester.user,
|
||||
user_id,
|
||||
erase_data,
|
||||
id_server,
|
||||
)
|
||||
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
|
||||
|
||||
@@ -1600,7 +1600,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
|
||||
if prev_stream_id is not None and cached_devices == {
|
||||
d["device_id"]: d for d in devices
|
||||
}:
|
||||
logging.info(
|
||||
logger.info(
|
||||
"Skipping device list resync for %s, as our cache matches already",
|
||||
user_id,
|
||||
)
|
||||
|
||||
@@ -282,7 +282,7 @@ class DirectoryHandler:
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to fetch alias")
|
||||
except CodeMessageException as e:
|
||||
logging.warning(
|
||||
logger.warning(
|
||||
"Error retrieving alias %s -> %s %s", room_alias, e.code, e.msg
|
||||
)
|
||||
if e.code == 404:
|
||||
|
||||
@@ -1095,7 +1095,9 @@ class FederationHandler:
|
||||
rule = invite_config.get_invite_rule(event.sender)
|
||||
if rule == InviteRule.BLOCK:
|
||||
logger.info(
|
||||
f"Automatically rejecting invite from {event.sender} due to the invite filtering rules of {event.state_key}"
|
||||
"Automatically rejecting invite from %s due to the invite filtering rules of %s",
|
||||
event.sender,
|
||||
event.state_key,
|
||||
)
|
||||
raise SynapseError(
|
||||
403,
|
||||
|
||||
@@ -218,7 +218,7 @@ class IdentityHandler:
|
||||
|
||||
return data
|
||||
except HttpResponseException as e:
|
||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||
logger.exception("3PID bind failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
@@ -323,7 +323,7 @@ class IdentityHandler:
|
||||
# The remote server probably doesn't support unbinding (yet)
|
||||
logger.warning("Received %d response while unbinding threepid", e.code)
|
||||
else:
|
||||
logger.error("Failed to unbind threepid on identity server: %s", e)
|
||||
logger.exception("Failed to unbind threepid on identity server: %s", e)
|
||||
raise SynapseError(500, "Failed to contact identity server")
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
@@ -460,7 +460,7 @@ class MessageHandler:
|
||||
# date from the database in the same database transaction.
|
||||
await self.store.expire_event(event_id)
|
||||
except Exception as e:
|
||||
logger.error("Could not expire event %s: %r", event_id, e)
|
||||
logger.exception("Could not expire event %s: %r", event_id, e)
|
||||
|
||||
# Schedule the expiry of the next event to expire.
|
||||
await self._schedule_next_expiry()
|
||||
@@ -2061,7 +2061,8 @@ class EventCreationHandler:
|
||||
# dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
|
||||
logger.info(
|
||||
"Failed to send dummy event into room %s. Will exclude it from "
|
||||
"future attempts until cache expires" % (room_id,)
|
||||
"future attempts until cache expires",
|
||||
room_id,
|
||||
)
|
||||
now = self.clock.time_msec()
|
||||
self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
|
||||
@@ -2120,7 +2121,9 @@ class EventCreationHandler:
|
||||
except AuthError:
|
||||
logger.info(
|
||||
"Failed to send dummy event into room %s for user %s due to "
|
||||
"lack of power. Will try another user" % (room_id, user_id)
|
||||
"lack of power. Will try another user",
|
||||
room_id,
|
||||
user_id,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
@@ -563,12 +563,13 @@ class OidcProvider:
|
||||
raise ValueError("Unexpected subject")
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"OIDC Back-Channel Logout is enabled for issuer {self.issuer!r} "
|
||||
"OIDC Back-Channel Logout is enabled for issuer %r "
|
||||
"but it looks like the configured `user_mapping_provider` "
|
||||
"does not use the `sub` claim as subject. If it is the case, "
|
||||
"and you want Synapse to ignore the `sub` claim in OIDC "
|
||||
"Back-Channel Logouts, set `backchannel_logout_ignore_sub` "
|
||||
"to `true` in the issuer config."
|
||||
"to `true` in the issuer config.",
|
||||
self.issuer,
|
||||
)
|
||||
|
||||
@property
|
||||
@@ -826,10 +827,10 @@ class OidcProvider:
|
||||
if response.code < 400:
|
||||
logger.debug(
|
||||
"Invalid response from the authorization server: "
|
||||
'responded with a "{status}" '
|
||||
"but body has an error field: {error!r}".format(
|
||||
status=status, error=resp["error"]
|
||||
)
|
||||
'responded with a "%s" '
|
||||
"but body has an error field: %r",
|
||||
status,
|
||||
resp["error"],
|
||||
)
|
||||
|
||||
description = resp.get("error_description", error)
|
||||
@@ -1385,7 +1386,8 @@ class OidcProvider:
|
||||
# support dynamic registration in Synapse at some point.
|
||||
if not self._config.backchannel_logout_enabled:
|
||||
logger.warning(
|
||||
f"Received an OIDC Back-Channel Logout request from issuer {self.issuer!r} but it is disabled in config"
|
||||
"Received an OIDC Back-Channel Logout request from issuer %r but it is disabled in config",
|
||||
self.issuer,
|
||||
)
|
||||
|
||||
# TODO: this responds with a 400 status code, which is what the OIDC
|
||||
@@ -1797,5 +1799,5 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
|
||||
extras[key] = template.render(user=userinfo).strip()
|
||||
except Exception as e:
|
||||
# Log an error and skip this value (don't break login for this).
|
||||
logger.error("Failed to render OIDC extra attribute %s: %s" % (key, e))
|
||||
logger.exception("Failed to render OIDC extra attribute %s: %s", key, e)
|
||||
return extras
|
||||
|
||||
@@ -506,7 +506,7 @@ class RegistrationHandler:
|
||||
ratelimit=False,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to join new user to %r: %r", r, e)
|
||||
logger.exception("Failed to join new user to %r: %r", r, e)
|
||||
|
||||
async def _join_rooms(self, user_id: str) -> None:
|
||||
"""
|
||||
@@ -596,7 +596,7 @@ class RegistrationHandler:
|
||||
# moving away from bare excepts is a good thing to do.
|
||||
logger.error("Failed to join new user to %r: %r", r, e)
|
||||
except Exception as e:
|
||||
logger.error("Failed to join new user to %r: %r", r, e, exc_info=True)
|
||||
logger.exception("Failed to join new user to %r: %r", r, e)
|
||||
|
||||
async def _auto_join_rooms(self, user_id: str) -> None:
|
||||
"""Automatically joins users to auto join rooms - creating the room in the first place
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright (C) 2023 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.types import (
|
||||
Requester,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReportsHandler:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._hs = hs
|
||||
self._store = hs.get_datastores().main
|
||||
self._clock = hs.get_clock()
|
||||
|
||||
# Ratelimiter for management of existing delayed events,
|
||||
# keyed by the requesting user ID.
|
||||
self._reports_ratelimiter = Ratelimiter(
|
||||
store=self._store,
|
||||
clock=self._clock,
|
||||
cfg=hs.config.ratelimiting.rc_reports,
|
||||
)
|
||||
|
||||
async def report_user(
|
||||
self, requester: Requester, target_user_id: str, reason: str
|
||||
) -> None:
|
||||
"""Files a report against a user from a user.
|
||||
|
||||
Rate and size limits are applied to the report. If the user being reported
|
||||
does not belong to this server, the report is ignored. This check is done
|
||||
after the limits to reduce DoS potential.
|
||||
|
||||
If the user being reported belongs to this server, but doesn't exist, we
|
||||
similarly ignore the report. The spec allows us to return an error if we
|
||||
want to, but we choose to hide that user's existence instead.
|
||||
|
||||
If the report is otherwise valid (for a user which exists on our server),
|
||||
we append it to the database for later processing.
|
||||
|
||||
Args:
|
||||
requester - The user filing the report.
|
||||
target_user_id - The user being reported.
|
||||
reason - The user-supplied reason the user is being reported.
|
||||
|
||||
Raises:
|
||||
SynapseError for BAD_REQUEST/BAD_JSON if the reason is too long.
|
||||
"""
|
||||
|
||||
await self._check_limits(requester)
|
||||
|
||||
if len(reason) > 1000:
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Reason must be less than 1000 characters",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
if not self._hs.is_mine_id(target_user_id):
|
||||
return # hide that they're not ours/that we can't do anything about them
|
||||
|
||||
user = await self._store.get_user_by_id(target_user_id)
|
||||
if user is None:
|
||||
return # hide that they don't exist
|
||||
|
||||
await self._store.add_user_report(
|
||||
target_user_id=target_user_id,
|
||||
user_id=requester.user.to_string(),
|
||||
reason=reason,
|
||||
received_ts=self._clock.time_msec(),
|
||||
)
|
||||
|
||||
async def _check_limits(self, requester: Requester) -> None:
|
||||
await self._reports_ratelimiter.ratelimit(
|
||||
requester,
|
||||
requester.user.to_string(),
|
||||
)
|
||||
@@ -698,7 +698,7 @@ class RoomCreationHandler:
|
||||
except SynapseError as e:
|
||||
# again I'm not really expecting this to fail, but if it does, I'd rather
|
||||
# we returned the new room to the client at this point.
|
||||
logger.error("Unable to send updated alias events in old room: %s", e)
|
||||
logger.exception("Unable to send updated alias events in old room: %s", e)
|
||||
|
||||
try:
|
||||
await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
@@ -715,7 +715,7 @@ class RoomCreationHandler:
|
||||
except SynapseError as e:
|
||||
# again I'm not really expecting this to fail, but if it does, I'd rather
|
||||
# we returned the new room to the client at this point.
|
||||
logger.error("Unable to send updated alias events in new room: %s", e)
|
||||
logger.exception("Unable to send updated alias events in new room: %s", e)
|
||||
|
||||
async def create_room(
|
||||
self,
|
||||
|
||||
@@ -922,7 +922,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
rule = invite_config.get_invite_rule(requester.user.to_string())
|
||||
if rule == InviteRule.BLOCK:
|
||||
logger.info(
|
||||
f"Automatically rejecting invite from {target_id} due to the the invite filtering rules of {requester.user}"
|
||||
"Automatically rejecting invite from %s due to the the invite filtering rules of %s",
|
||||
target_id,
|
||||
requester.user,
|
||||
)
|
||||
raise SynapseError(
|
||||
403,
|
||||
@@ -1570,7 +1572,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
require_consent=False,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error kicking guest user: %s" % (e,))
|
||||
logger.exception("Error kicking guest user: %s", e)
|
||||
|
||||
async def lookup_room_alias(
|
||||
self, room_alias: RoomAlias
|
||||
|
||||
@@ -124,7 +124,7 @@ class SamlHandler:
|
||||
)
|
||||
|
||||
# Since SAML sessions timeout it is useful to log when they were created.
|
||||
logger.info("Initiating a new SAML session: %s" % (reqid,))
|
||||
logger.info("Initiating a new SAML session: %s", reqid)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
self._outstanding_requests_dict[reqid] = Saml2SessionData(
|
||||
|
||||
@@ -238,7 +238,7 @@ class SendEmailHandler:
|
||||
multipart_msg.attach(text_part)
|
||||
multipart_msg.attach(html_part)
|
||||
|
||||
logger.info("Sending email to %s" % email_address)
|
||||
logger.info("Sending email to %s", email_address)
|
||||
|
||||
await self._sendmail(
|
||||
self._reactor,
|
||||
|
||||
@@ -23,6 +23,7 @@ from typing import (
|
||||
List,
|
||||
Literal,
|
||||
Mapping,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
@@ -73,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
|
||||
SlidingSyncResult,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import MutableOverlayMapping
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -245,9 +247,11 @@ class SlidingSyncRoomLists:
|
||||
# Note: this won't include rooms the user has left themselves. We add back
|
||||
# `newly_left` rooms below. This is more efficient than fetching all rooms and
|
||||
# then filtering out the old left rooms.
|
||||
room_membership_for_user_map = (
|
||||
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
|
||||
user_id
|
||||
room_membership_for_user_map: MutableMapping[str, RoomsForUserSlidingSync] = (
|
||||
MutableOverlayMapping(
|
||||
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
|
||||
user_id
|
||||
)
|
||||
)
|
||||
)
|
||||
# To play nice with the rewind logic below, we need to go fetch the rooms the
|
||||
@@ -268,26 +272,12 @@ class SlidingSyncRoomLists:
|
||||
)
|
||||
)
|
||||
if self_leave_room_membership_for_user_map:
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
room_membership_for_user_map.update(self_leave_room_membership_for_user_map)
|
||||
|
||||
# Remove invites from ignored users
|
||||
ignored_users = await self.store.ignored_users(user_id)
|
||||
invite_config = await self.store.get_invite_config_for_user(user_id)
|
||||
if ignored_users:
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
# Make a copy so we don't run into an error: `dictionary changed size during
|
||||
# iteration`, when we remove items
|
||||
for room_id in list(room_membership_for_user_map.keys()):
|
||||
@@ -316,13 +306,6 @@ class SlidingSyncRoomLists:
|
||||
sync_config.user, room_membership_for_user_map, to_token=to_token
|
||||
)
|
||||
if changes:
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id, change in changes.items():
|
||||
if change is None:
|
||||
# Remove rooms that the user joined after the `to_token`
|
||||
@@ -364,13 +347,6 @@ class SlidingSyncRoomLists:
|
||||
newly_left_room_map.keys() - room_membership_for_user_map.keys()
|
||||
)
|
||||
if missing_newly_left_rooms:
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
for room_id in missing_newly_left_rooms:
|
||||
newly_left_room_for_user = newly_left_room_map[room_id]
|
||||
# This should be a given
|
||||
@@ -461,6 +437,10 @@ class SlidingSyncRoomLists:
|
||||
else:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
# Remove any rooms that we globally exclude from sync.
|
||||
for room_id in self.rooms_to_exclude_globally:
|
||||
room_membership_for_user_map.pop(room_id, None)
|
||||
|
||||
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
|
||||
|
||||
if sync_config.lists:
|
||||
@@ -577,14 +557,6 @@ class SlidingSyncRoomLists:
|
||||
|
||||
if sync_config.room_subscriptions:
|
||||
with start_active_span("assemble_room_subscriptions"):
|
||||
# FIXME: It would be nice to avoid this copy but since
|
||||
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
|
||||
# can't return a mutable value like a `dict`. We make the copy to get a
|
||||
# mutable dict that we can change. We try to only make a copy when necessary
|
||||
# (if we actually need to change something) as in most cases, the logic
|
||||
# doesn't need to run.
|
||||
room_membership_for_user_map = dict(room_membership_for_user_map)
|
||||
|
||||
# Find which rooms are partially stated and may need to be filtered out
|
||||
# depending on the `required_state` requested (see below).
|
||||
partial_state_rooms = await self.store.get_partial_rooms()
|
||||
|
||||
@@ -1230,12 +1230,16 @@ class SsoHandler:
|
||||
if expected_user_id is not None and user_id != expected_user_id:
|
||||
logger.error(
|
||||
"Received a logout notification from SSO provider "
|
||||
f"{auth_provider_id!r} for the user {expected_user_id!r}, but with "
|
||||
f"a session ID ({auth_provider_session_id!r}) which belongs to "
|
||||
f"{user_id!r}. This may happen when the SSO provider user mapper "
|
||||
"%r for the user %r, but with "
|
||||
"a session ID (%r) which belongs to "
|
||||
"%r. This may happen when the SSO provider user mapper "
|
||||
"uses something else than the standard attribute as mapping ID. "
|
||||
"For OIDC providers, set `backchannel_logout_ignore_sub` to `true` "
|
||||
"in the provider config if that is the case."
|
||||
"in the provider config if that is the case.",
|
||||
auth_provider_id,
|
||||
expected_user_id,
|
||||
auth_provider_session_id,
|
||||
user_id,
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
@@ -3074,8 +3074,10 @@ class SyncHandler:
|
||||
if batch.limited and since_token:
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.debug(
|
||||
"Incremental gappy sync of %s for user %s with %d state events"
|
||||
% (room_id, user_id, len(state))
|
||||
"Incremental gappy sync of %s for user %s with %d state events",
|
||||
room_id,
|
||||
user_id,
|
||||
len(state),
|
||||
)
|
||||
elif room_builder.rtype == "archived":
|
||||
archived_room_sync = ArchivedSyncResult(
|
||||
|
||||
@@ -749,10 +749,9 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
)
|
||||
continue
|
||||
except Exception:
|
||||
logger.error(
|
||||
logger.exception(
|
||||
"Failed to refresh profile for %r due to unhandled exception",
|
||||
user_id,
|
||||
exc_info=True,
|
||||
)
|
||||
await self.store.set_remote_user_profile_in_user_dir_stale(
|
||||
user_id,
|
||||
|
||||
@@ -44,12 +44,15 @@ from synapse.logging.opentracing import start_active_span
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage.databases.main.lock import Lock, LockStore
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.constants import ONE_MINUTE_SECONDS
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.logging.opentracing import opentracing
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# This lock is used to avoid creating an event while we are purging the room.
|
||||
# We take a read lock when creating an event, and a write one when purging a room.
|
||||
# This is because it is fine to create several events concurrently, since referenced events
|
||||
@@ -270,9 +273,10 @@ class WaitingLock:
|
||||
def _get_next_retry_interval(self) -> float:
|
||||
next = self._retry_interval
|
||||
self._retry_interval = max(5, next * 2)
|
||||
if self._retry_interval > 5 * 2 ^ 7: # ~10 minutes
|
||||
logging.warning(
|
||||
f"Lock timeout is getting excessive: {self._retry_interval}s. There may be a deadlock."
|
||||
if self._retry_interval > 10 * ONE_MINUTE_SECONDS: # >7 iterations
|
||||
logger.warning(
|
||||
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
|
||||
self._retry_interval,
|
||||
)
|
||||
return next * random.uniform(0.9, 1.1)
|
||||
|
||||
@@ -349,8 +353,9 @@ class WaitingMultiLock:
|
||||
def _get_next_retry_interval(self) -> float:
|
||||
next = self._retry_interval
|
||||
self._retry_interval = max(5, next * 2)
|
||||
if self._retry_interval > 5 * 2 ^ 7: # ~10 minutes
|
||||
logging.warning(
|
||||
f"Lock timeout is getting excessive: {self._retry_interval}s. There may be a deadlock."
|
||||
if self._retry_interval > 10 * ONE_MINUTE_SECONDS: # >7 iterations
|
||||
logger.warning(
|
||||
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
|
||||
self._retry_interval,
|
||||
)
|
||||
return next * random.uniform(0.9, 1.1)
|
||||
|
||||
@@ -213,7 +213,7 @@ class _IPBlockingResolver:
|
||||
|
||||
if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist):
|
||||
logger.info(
|
||||
"Blocked %s from DNS resolution to %s" % (ip_address, hostname)
|
||||
"Blocked %s from DNS resolution to %s", ip_address, hostname
|
||||
)
|
||||
has_bad_ip = True
|
||||
|
||||
@@ -318,7 +318,7 @@ class BlocklistingAgentWrapper(Agent):
|
||||
pass
|
||||
else:
|
||||
if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist):
|
||||
logger.info("Blocking access to %s" % (ip_address,))
|
||||
logger.info("Blocking access to %s", ip_address)
|
||||
e = SynapseError(HTTPStatus.FORBIDDEN, "IP address blocked")
|
||||
return defer.fail(Failure(e))
|
||||
|
||||
@@ -723,7 +723,7 @@ class BaseHttpClient:
|
||||
resp_headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
if response.code > 299:
|
||||
logger.warning("Got %d when downloading %s" % (response.code, url))
|
||||
logger.warning("Got %d when downloading %s", response.code, url)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY, "Got error %d" % (response.code,), Codes.UNKNOWN
|
||||
)
|
||||
@@ -1106,7 +1106,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
self.stream.write(data[start:end])
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Exception encountered writing file data to stream: {e}"
|
||||
"Exception encountered writing file data to stream: %s", e
|
||||
)
|
||||
self.deferred.errback()
|
||||
self.file_length += end - start
|
||||
@@ -1129,7 +1129,7 @@ class _MultipartParserProtocol(protocol.Protocol):
|
||||
try:
|
||||
self.parser.write(incoming_data)
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception writing to multipart parser: {e}")
|
||||
logger.warning("Exception writing to multipart parser: %s", e)
|
||||
self.deferred.errback()
|
||||
return
|
||||
|
||||
|
||||
@@ -602,7 +602,7 @@ class MatrixFederationHttpClient:
|
||||
try:
|
||||
parse_and_validate_server_name(request.destination)
|
||||
except ValueError:
|
||||
logger.exception(f"Invalid destination: {request.destination}.")
|
||||
logger.exception("Invalid destination: %s.", request.destination)
|
||||
raise FederationDeniedError(request.destination)
|
||||
|
||||
if timeout is not None:
|
||||
|
||||
@@ -796,6 +796,13 @@ def inject_response_headers(response_headers: Headers) -> None:
|
||||
response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
|
||||
|
||||
|
||||
@ensure_active_span("inject the span into a header dict")
|
||||
def inject_request_headers(headers: Dict[str, str]) -> None:
|
||||
span = opentracing.tracer.active_span
|
||||
assert span is not None
|
||||
opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers)
|
||||
|
||||
|
||||
@ensure_active_span(
|
||||
"get the active span context as a dict", ret=cast(Dict[str, str], {})
|
||||
)
|
||||
|
||||
@@ -313,7 +313,7 @@ class MediaRepository:
|
||||
logger.info("Stored local media in file %r", fname)
|
||||
|
||||
if should_quarantine:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Media has been automatically quarantined as it matched existing quarantined media"
|
||||
)
|
||||
|
||||
@@ -366,7 +366,7 @@ class MediaRepository:
|
||||
logger.info("Stored local media in file %r", fname)
|
||||
|
||||
if should_quarantine:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Media has been automatically quarantined as it matched existing quarantined media"
|
||||
)
|
||||
|
||||
@@ -1393,8 +1393,8 @@ class MediaRepository:
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Purging remote media last accessed before"
|
||||
f" {remote_media_threshold_timestamp_ms}"
|
||||
"Purging remote media last accessed before %s",
|
||||
remote_media_threshold_timestamp_ms,
|
||||
)
|
||||
|
||||
await self.delete_old_remote_media(
|
||||
@@ -1409,8 +1409,8 @@ class MediaRepository:
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Purging local media last accessed before"
|
||||
f" {local_media_threshold_timestamp_ms}"
|
||||
"Purging local media last accessed before %s",
|
||||
local_media_threshold_timestamp_ms,
|
||||
)
|
||||
|
||||
await self.delete_old_local_media(
|
||||
|
||||
@@ -287,7 +287,7 @@ class UrlPreviewer:
|
||||
og["og:image:width"] = dims["width"]
|
||||
og["og:image:height"] = dims["height"]
|
||||
else:
|
||||
logger.warning("Couldn't get dims for %s" % url)
|
||||
logger.warning("Couldn't get dims for %s", url)
|
||||
|
||||
# define our OG response for this media
|
||||
elif _is_html(media_info.media_type):
|
||||
@@ -609,7 +609,7 @@ class UrlPreviewer:
|
||||
should_quarantine = await self.store.get_is_hash_quarantined(sha256)
|
||||
|
||||
if should_quarantine:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Media has been automatically quarantined as it matched existing quarantined media"
|
||||
)
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ class LaterGauge(Collector):
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
@@ -244,7 +244,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
|
||||
@@ -50,7 +50,7 @@ from synapse.event_auth import auth_types_for_event, get_user_power_level
|
||||
from synapse.events import EventBase, relation_from_event
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.state import POWER_KEY
|
||||
from synapse.state import CREATE_KEY, POWER_KEY
|
||||
from synapse.storage.databases.main.roommember import EventIdMembership
|
||||
from synapse.storage.invite_rule import InviteRule
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
@@ -246,6 +246,7 @@ class BulkPushRuleEvaluator:
|
||||
StateFilter.from_types(event_types)
|
||||
)
|
||||
pl_event_id = prev_state_ids.get(POWER_KEY)
|
||||
create_event_id = prev_state_ids.get(CREATE_KEY)
|
||||
|
||||
# fastpath: if there's a power level event, that's all we need, and
|
||||
# not having a power level event is an extreme edge case
|
||||
@@ -268,6 +269,26 @@ class BulkPushRuleEvaluator:
|
||||
if auth_event:
|
||||
auth_events_dict[auth_event_id] = auth_event
|
||||
auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}
|
||||
if auth_events.get(CREATE_KEY) is None:
|
||||
# if the event being checked is the create event, use its own permissions
|
||||
if event.type == EventTypes.Create and event.get_state_key() == "":
|
||||
auth_events[CREATE_KEY] = event
|
||||
else:
|
||||
auth_events[
|
||||
CREATE_KEY
|
||||
] = await self.store.get_create_event_for_room(event.room_id)
|
||||
|
||||
# if we are evaluating the create event, then use itself to determine power levels.
|
||||
if event.type == EventTypes.Create and event.get_state_key() == "":
|
||||
auth_events[CREATE_KEY] = event
|
||||
else:
|
||||
# if we aren't processing the create event, create_event_id should always be set
|
||||
assert create_event_id is not None
|
||||
create_event = event_id_to_event.get(create_event_id)
|
||||
if create_event:
|
||||
auth_events[CREATE_KEY] = create_event
|
||||
else:
|
||||
auth_events[CREATE_KEY] = await self.store.get_event(create_event_id)
|
||||
|
||||
sender_level = get_user_power_level(event.sender, auth_events)
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ class Mailer:
|
||||
self.app_name = app_name
|
||||
self.email_subjects: EmailSubjectConfig = hs.config.email.email_subjects
|
||||
|
||||
logger.info("Created Mailer for app_name %s" % app_name)
|
||||
logger.info("Created Mailer for app_name %s", app_name)
|
||||
|
||||
emails_sent_counter.labels("password_reset")
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2023-2024 New Vector, Ltd
|
||||
# Copyright (C) 2023-2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
@@ -33,6 +33,7 @@ from synapse.replication.http import (
|
||||
register,
|
||||
send_event,
|
||||
send_events,
|
||||
server_notices,
|
||||
state,
|
||||
streams,
|
||||
)
|
||||
@@ -66,3 +67,4 @@ class ReplicationRestResource(JsonResource):
|
||||
register.register_servlets(hs, self)
|
||||
devices.register_servlets(hs, self)
|
||||
delayed_events.register_servlets(hs, self)
|
||||
server_notices.register_servlets(hs, self)
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional, Tuple
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplicationSendServerNoticeServlet(ReplicationEndpoint):
|
||||
"""Send a server notice to a user"""
|
||||
|
||||
NAME = "send_server_notice"
|
||||
PATH_ARGS = ()
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.server_notices_manager = hs.get_server_notices_manager()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload( # type: ignore[override]
|
||||
user_id: str,
|
||||
event_content: dict,
|
||||
type: str,
|
||||
state_key: Optional[str] = None,
|
||||
txn_id: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Args:
|
||||
user_id: mxid of user to send event to.
|
||||
event_content: content of event to send
|
||||
type: type of event
|
||||
state_key: the state key for the event, if it is a state event
|
||||
txn_id: the transaction ID
|
||||
"""
|
||||
return {
|
||||
"user_id": user_id,
|
||||
"event_content": event_content,
|
||||
"type": type,
|
||||
"state_key": state_key,
|
||||
"txn_id": txn_id,
|
||||
}
|
||||
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self, request: Request, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
event = await self.server_notices_manager.send_notice(
|
||||
user_id=content["user_id"],
|
||||
event_content=content["event_content"],
|
||||
type=content["type"],
|
||||
state_key=content["state_key"],
|
||||
txn_id=content["txn_id"],
|
||||
)
|
||||
|
||||
return 200, event
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
ReplicationSendServerNoticeServlet(hs).register(http_server)
|
||||
@@ -165,7 +165,7 @@ class ClientRestResource(JsonResource):
|
||||
# Fail on unknown servlet groups.
|
||||
if servlet_group not in SERVLET_GROUPS:
|
||||
if servlet_group == "media":
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"media.can_load_media_repo needs to be configured for the media servlet to be available"
|
||||
)
|
||||
raise RuntimeError(
|
||||
|
||||
+12
-10
@@ -71,7 +71,7 @@ class QuarantineMediaInRoom(RestServlet):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining room: %s", room_id)
|
||||
logger.info("Quarantining room: %s", room_id)
|
||||
|
||||
# Quarantine all media in this room
|
||||
num_quarantined = await self.store.quarantine_media_ids_in_room(
|
||||
@@ -98,7 +98,7 @@ class QuarantineMediaByUser(RestServlet):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining media by user: %s", user_id)
|
||||
logger.info("Quarantining media by user: %s", user_id)
|
||||
|
||||
# Quarantine all media this user has uploaded
|
||||
num_quarantined = await self.store.quarantine_media_ids_by_user(
|
||||
@@ -127,7 +127,7 @@ class QuarantineMediaByID(RestServlet):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining media by ID: %s/%s", server_name, media_id)
|
||||
logger.info("Quarantining media by ID: %s/%s", server_name, media_id)
|
||||
|
||||
# Quarantine this media id
|
||||
await self.store.quarantine_media_by_id(
|
||||
@@ -155,7 +155,7 @@ class UnquarantineMediaByID(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info("Remove from quarantine media by ID: %s/%s", server_name, media_id)
|
||||
logger.info("Remove from quarantine media by ID: %s/%s", server_name, media_id)
|
||||
|
||||
# Remove from quarantine this media id
|
||||
await self.store.quarantine_media_by_id(server_name, media_id, None)
|
||||
@@ -177,7 +177,7 @@ class ProtectMediaByID(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info("Protecting local media by ID: %s", media_id)
|
||||
logger.info("Protecting local media by ID: %s", media_id)
|
||||
|
||||
# Protect this media id
|
||||
await self.store.mark_local_media_as_safe(media_id, safe=True)
|
||||
@@ -199,7 +199,7 @@ class UnprotectMediaByID(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info("Unprotecting local media by ID: %s", media_id)
|
||||
logger.info("Unprotecting local media by ID: %s", media_id)
|
||||
|
||||
# Unprotect this media id
|
||||
await self.store.mark_local_media_as_safe(media_id, safe=False)
|
||||
@@ -280,7 +280,7 @@ class DeleteMediaByID(RestServlet):
|
||||
if await self.store.get_local_media(media_id) is None:
|
||||
raise NotFoundError("Unknown media")
|
||||
|
||||
logging.info("Deleting local media by ID: %s", media_id)
|
||||
logger.info("Deleting local media by ID: %s", media_id)
|
||||
|
||||
deleted_media, total = await self.media_repository.delete_local_media_ids(
|
||||
[media_id]
|
||||
@@ -327,9 +327,11 @@ class DeleteMediaByDateSize(RestServlet):
|
||||
if server_name is not None and self.server_name != server_name:
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only delete local media")
|
||||
|
||||
logging.info(
|
||||
"Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s"
|
||||
% (before_ts, size_gt, keep_profiles)
|
||||
logger.info(
|
||||
"Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s",
|
||||
before_ts,
|
||||
size_gt,
|
||||
keep_profiles,
|
||||
)
|
||||
|
||||
deleted_media, total = await self.media_repository.delete_old_local_media(
|
||||
|
||||
@@ -88,9 +88,6 @@ class SendServerNoticeServlet(RestServlet):
|
||||
event_type = body.get("type", EventTypes.Message)
|
||||
state_key = body.get("state_key")
|
||||
|
||||
# We grab the server notices manager here as its initialisation has a check for worker processes,
|
||||
# but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
|
||||
# admin api).
|
||||
if not self.server_notices_manager.is_enabled():
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, "Server notices are not enabled on this server"
|
||||
@@ -113,7 +110,7 @@ class SendServerNoticeServlet(RestServlet):
|
||||
txn_id=txn_id,
|
||||
)
|
||||
|
||||
return HTTPStatus.OK, {"event_id": event.event_id}
|
||||
return HTTPStatus.OK, {"event_id": event["event_id"]}
|
||||
|
||||
async def on_POST(
|
||||
self,
|
||||
|
||||
@@ -150,6 +150,44 @@ class ReportRoomRestServlet(RestServlet):
|
||||
return 200, {}
|
||||
|
||||
|
||||
class ReportUserRestServlet(RestServlet):
|
||||
"""This endpoint lets clients report a user for abuse.
|
||||
|
||||
Introduced by MSC4260: https://github.com/matrix-org/matrix-spec-proposals/pull/4260
|
||||
"""
|
||||
|
||||
PATTERNS = list(
|
||||
client_patterns(
|
||||
"/users/(?P<target_user_id>[^/]*)/report$",
|
||||
releases=("v3",),
|
||||
unstable=False,
|
||||
v1=False,
|
||||
)
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
self.handler = hs.get_reports_handler()
|
||||
|
||||
class PostBody(RequestBodyModel):
|
||||
reason: StrictStr
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, target_user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
body = parse_and_validate_json_object_from_request(request, self.PostBody)
|
||||
|
||||
await self.handler.report_user(requester, target_user_id, body.reason)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
ReportEventRestServlet(hs).register(http_server)
|
||||
ReportRoomRestServlet(hs).register(http_server)
|
||||
ReportUserRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -64,6 +64,7 @@ from synapse.logging.opentracing import set_tag
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
from synapse.state import CREATE_KEY, POWER_KEY
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID
|
||||
from synapse.types.state import StateFilter
|
||||
@@ -924,16 +925,16 @@ class RoomEventServlet(RestServlet):
|
||||
if include_unredacted_content and not await self.auth.is_server_admin(
|
||||
requester
|
||||
):
|
||||
power_level_event = (
|
||||
await self._storage_controllers.state.get_current_state_event(
|
||||
room_id, EventTypes.PowerLevels, ""
|
||||
)
|
||||
auth_events = await self._storage_controllers.state.get_current_state(
|
||||
room_id,
|
||||
StateFilter.from_types(
|
||||
[
|
||||
POWER_KEY,
|
||||
CREATE_KEY,
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
auth_events = {}
|
||||
if power_level_event:
|
||||
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
|
||||
|
||||
redact_level = event_auth.get_named_level(auth_events, "redact", 50)
|
||||
user_level = event_auth.get_user_power_level(
|
||||
requester.user.to_string(), auth_events
|
||||
|
||||
+10
-2
@@ -94,6 +94,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
|
||||
from synapse.handlers.receipts import ReceiptsHandler
|
||||
from synapse.handlers.register import RegistrationHandler
|
||||
from synapse.handlers.relations import RelationsHandler
|
||||
from synapse.handlers.reports import ReportsHandler
|
||||
from synapse.handlers.room import (
|
||||
RoomContextHandler,
|
||||
RoomCreationHandler,
|
||||
@@ -141,6 +142,9 @@ from synapse.replication.tcp.streams import STREAMS_MAP, Stream
|
||||
from synapse.rest.media.media_repository_resource import MediaRepositoryResource
|
||||
from synapse.server_notices.server_notices_manager import ServerNoticesManager
|
||||
from synapse.server_notices.server_notices_sender import ServerNoticesSender
|
||||
from synapse.server_notices.worker_server_notices_manager import (
|
||||
WorkerServerNoticesManager,
|
||||
)
|
||||
from synapse.server_notices.worker_server_notices_sender import (
|
||||
WorkerServerNoticesSender,
|
||||
)
|
||||
@@ -718,6 +722,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
def get_receipts_handler(self) -> ReceiptsHandler:
|
||||
return ReceiptsHandler(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_reports_handler(self) -> ReportsHandler:
|
||||
return ReportsHandler(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_read_marker_handler(self) -> ReadMarkerHandler:
|
||||
return ReadMarkerHandler(self)
|
||||
@@ -753,9 +761,9 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
return FederationHandlerRegistry(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_server_notices_manager(self) -> ServerNoticesManager:
|
||||
def get_server_notices_manager(self) -> WorkerServerNoticesManager:
|
||||
if self.config.worker.worker_app:
|
||||
raise Exception("Workers cannot send server notices")
|
||||
return WorkerServerNoticesManager(self)
|
||||
return ServerNoticesManager(self)
|
||||
|
||||
@cache_in_self
|
||||
|
||||
@@ -165,7 +165,7 @@ class ResourceLimitsServerNotices:
|
||||
user_id, content, EventTypes.Message
|
||||
)
|
||||
|
||||
content = {"pinned": [event.event_id]}
|
||||
content = {"pinned": [event["event_id"]]}
|
||||
await self._server_notices_manager.send_notice(
|
||||
user_id, content, EventTypes.Pinned, ""
|
||||
)
|
||||
|
||||
@@ -21,7 +21,9 @@ import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership, RoomCreationPreset
|
||||
from synapse.events import EventBase
|
||||
from synapse.server_notices.worker_server_notices_manager import (
|
||||
WorkerServerNoticesManager,
|
||||
)
|
||||
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -33,9 +35,9 @@ logger = logging.getLogger(__name__)
|
||||
SERVER_NOTICE_ROOM_TAG = "m.server_notice"
|
||||
|
||||
|
||||
class ServerNoticesManager:
|
||||
class ServerNoticesManager(WorkerServerNoticesManager):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._store = hs.get_datastores().main
|
||||
super().__init__(hs)
|
||||
self._config = hs.config
|
||||
self._account_data_handler = hs.get_account_data_handler()
|
||||
self._room_creation_handler = hs.get_room_creation_handler()
|
||||
@@ -47,11 +49,6 @@ class ServerNoticesManager:
|
||||
self._server_name = hs.hostname
|
||||
|
||||
self._notifier = hs.get_notifier()
|
||||
self.server_notices_mxid = self._config.servernotices.server_notices_mxid
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
"""Checks if server notices are enabled on this server."""
|
||||
return self.server_notices_mxid is not None
|
||||
|
||||
async def send_notice(
|
||||
self,
|
||||
@@ -60,7 +57,7 @@ class ServerNoticesManager:
|
||||
type: str = EventTypes.Message,
|
||||
state_key: Optional[str] = None,
|
||||
txn_id: Optional[str] = None,
|
||||
) -> EventBase:
|
||||
) -> JsonDict:
|
||||
"""Send a notice to the given user
|
||||
|
||||
Creates the server notices room, if none exists.
|
||||
@@ -69,15 +66,16 @@ class ServerNoticesManager:
|
||||
user_id: mxid of user to send event to.
|
||||
event_content: content of event to send
|
||||
type: type of event
|
||||
is_state_event: Is the event a state event
|
||||
state_key: the state key for the event, if it is a state event
|
||||
txn_id: The transaction ID.
|
||||
"""
|
||||
room_id = await self.get_or_create_notice_room_for_user(user_id)
|
||||
await self.maybe_invite_user_to_room(user_id, room_id)
|
||||
room_id = await self._get_or_create_notice_room_for_user(user_id)
|
||||
await self._maybe_invite_user_to_room(user_id, room_id)
|
||||
|
||||
assert self.server_notices_mxid is not None
|
||||
assert self._server_notices_mxid is not None
|
||||
requester = create_requester(
|
||||
self.server_notices_mxid, authenticated_entity=self._server_name
|
||||
self._server_notices_mxid,
|
||||
authenticated_entity=self._server_name,
|
||||
)
|
||||
|
||||
logger.info("Sending server notice to %s", user_id)
|
||||
@@ -85,7 +83,7 @@ class ServerNoticesManager:
|
||||
event_dict = {
|
||||
"type": type,
|
||||
"room_id": room_id,
|
||||
"sender": self.server_notices_mxid,
|
||||
"sender": self._server_notices_mxid,
|
||||
"content": event_content,
|
||||
}
|
||||
|
||||
@@ -95,45 +93,10 @@ class ServerNoticesManager:
|
||||
event, _ = await self._event_creation_handler.create_and_send_nonmember_event(
|
||||
requester, event_dict, ratelimit=False, txn_id=txn_id
|
||||
)
|
||||
return event
|
||||
return event.get_dict()
|
||||
|
||||
@cached()
|
||||
async def maybe_get_notice_room_for_user(self, user_id: str) -> Optional[str]:
|
||||
"""Try to look up the server notice room for this user if it exists.
|
||||
|
||||
Does not create one if none can be found.
|
||||
|
||||
Args:
|
||||
user_id: the user we want a server notice room for.
|
||||
|
||||
Returns:
|
||||
The room's ID, or None if no room could be found.
|
||||
"""
|
||||
# If there is no server notices MXID, then there is no server notices room
|
||||
if self.server_notices_mxid is None:
|
||||
return None
|
||||
|
||||
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
|
||||
user_id, [Membership.INVITE, Membership.JOIN]
|
||||
)
|
||||
for room in rooms:
|
||||
# it's worth noting that there is an asymmetry here in that we
|
||||
# expect the user to be invited or joined, but the system user must
|
||||
# be joined. This is kinda deliberate, in that if somebody somehow
|
||||
# manages to invite the system user to a room, that doesn't make it
|
||||
# the server notices room.
|
||||
is_server_notices_room = await self._store.check_local_user_in_room(
|
||||
user_id=self.server_notices_mxid, room_id=room.room_id
|
||||
)
|
||||
if is_server_notices_room:
|
||||
# we found a room which our user shares with the system notice
|
||||
# user
|
||||
return room.room_id
|
||||
|
||||
return None
|
||||
|
||||
@cached()
|
||||
async def get_or_create_notice_room_for_user(self, user_id: str) -> str:
|
||||
async def _get_or_create_notice_room_for_user(self, user_id: str) -> str:
|
||||
"""Get the room for notices for a given user
|
||||
|
||||
If we have not yet created a notice room for this user, create it, but don't
|
||||
@@ -145,13 +108,13 @@ class ServerNoticesManager:
|
||||
Returns:
|
||||
room id of notice room.
|
||||
"""
|
||||
if self.server_notices_mxid is None:
|
||||
if self._server_notices_mxid is None:
|
||||
raise Exception("Server notices not enabled")
|
||||
|
||||
assert self._is_mine_id(user_id), "Cannot send server notices to remote users"
|
||||
|
||||
requester = create_requester(
|
||||
self.server_notices_mxid, authenticated_entity=self._server_name
|
||||
self._server_notices_mxid, authenticated_entity=self._server_name
|
||||
)
|
||||
|
||||
room_id = await self.maybe_get_notice_room_for_user(user_id)
|
||||
@@ -246,7 +209,7 @@ class ServerNoticesManager:
|
||||
logger.info("Created server notices room %s for %s", room_id, user_id)
|
||||
return room_id
|
||||
|
||||
async def maybe_invite_user_to_room(self, user_id: str, room_id: str) -> None:
|
||||
async def _maybe_invite_user_to_room(self, user_id: str, room_id: str) -> None:
|
||||
"""Invite the given user to the given server room, unless the user has already
|
||||
joined or been invited to it.
|
||||
|
||||
@@ -254,9 +217,9 @@ class ServerNoticesManager:
|
||||
user_id: The ID of the user to invite.
|
||||
room_id: The ID of the room to invite the user to.
|
||||
"""
|
||||
assert self.server_notices_mxid is not None
|
||||
assert self._server_notices_mxid is not None
|
||||
requester = create_requester(
|
||||
self.server_notices_mxid, authenticated_entity=self._server_name
|
||||
self._server_notices_mxid, authenticated_entity=self._server_name
|
||||
)
|
||||
|
||||
# Check whether the user has already joined or been invited to this room. If
|
||||
@@ -307,13 +270,13 @@ class ServerNoticesManager:
|
||||
"""
|
||||
logger.debug("Checking whether notice user profile has changed for %s", room_id)
|
||||
|
||||
assert self.server_notices_mxid is not None
|
||||
assert self._server_notices_mxid is not None
|
||||
|
||||
notice_user_data_in_room = (
|
||||
await self._storage_controllers.state.get_current_state_event(
|
||||
room_id,
|
||||
EventTypes.Member,
|
||||
self.server_notices_mxid,
|
||||
self._server_notices_mxid,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -327,7 +290,7 @@ class ServerNoticesManager:
|
||||
logger.info("Updating notice user profile in room %s", room_id)
|
||||
await self._room_member_handler.update_membership(
|
||||
requester=requester,
|
||||
target=UserID.from_string(self.server_notices_mxid),
|
||||
target=UserID.from_string(self._server_notices_mxid),
|
||||
room_id=room_id,
|
||||
action="join",
|
||||
ratelimit=False,
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events import JsonDict
|
||||
from synapse.replication.http.server_notices import (
|
||||
ReplicationSendServerNoticeServlet,
|
||||
)
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WorkerServerNoticesManager:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._store = hs.get_datastores().main
|
||||
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
|
||||
self._send_server_notice = ReplicationSendServerNoticeServlet.make_client(hs)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
"""Checks if server notices are enabled on this server."""
|
||||
return self._server_notices_mxid is not None
|
||||
|
||||
async def send_notice(
|
||||
self,
|
||||
user_id: str,
|
||||
event_content: dict,
|
||||
type: str = EventTypes.Message,
|
||||
state_key: Optional[str] = None,
|
||||
txn_id: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
"""Send a notice to the given user
|
||||
|
||||
Creates the server notices room, if none exists.
|
||||
|
||||
Args:
|
||||
user_id: mxid of user to send event to.
|
||||
event_content: content of event to send
|
||||
type: type of event
|
||||
is_state_event: Is the event a state event
|
||||
txn_id: The transaction ID.
|
||||
"""
|
||||
return await self._send_server_notice(
|
||||
user_id=user_id,
|
||||
event_content=event_content,
|
||||
type=type,
|
||||
state_key=state_key,
|
||||
txn_id=txn_id,
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def maybe_get_notice_room_for_user(self, user_id: str) -> Optional[str]:
|
||||
"""Try to look up the server notice room for this user if it exists.
|
||||
|
||||
Does not create one if none can be found.
|
||||
|
||||
Args:
|
||||
user_id: the user we want a server notice room for.
|
||||
|
||||
Returns:
|
||||
The room's ID, or None if no room could be found.
|
||||
"""
|
||||
# If there is no server notices MXID, then there is no server notices room
|
||||
if self._server_notices_mxid is None:
|
||||
return None
|
||||
|
||||
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
|
||||
user_id, [Membership.INVITE, Membership.JOIN]
|
||||
)
|
||||
for room in rooms:
|
||||
# it's worth noting that there is an asymmetry here in that we
|
||||
# expect the user to be invited or joined, but the system user must
|
||||
# be joined. This is kinda deliberate, in that if somebody somehow
|
||||
# manages to invite the system user to a room, that doesn't make it
|
||||
# the server notices room.
|
||||
is_server_notices_room = await self._store.check_local_user_in_room(
|
||||
user_id=self._server_notices_mxid, room_id=room.room_id
|
||||
)
|
||||
if is_server_notices_room:
|
||||
# we found a room which our user shares with the system notice
|
||||
# user
|
||||
return room.room_id
|
||||
|
||||
return None
|
||||
@@ -83,6 +83,7 @@ EVICTION_TIMEOUT_SECONDS = 60 * 60
|
||||
|
||||
_NEXT_STATE_ID = 1
|
||||
|
||||
CREATE_KEY = (EventTypes.Create, "")
|
||||
POWER_KEY = (EventTypes.PowerLevels, "")
|
||||
|
||||
|
||||
|
||||
+13
-1
@@ -254,7 +254,19 @@ async def _get_power_level_for_sender(
|
||||
room_id, aid, event_map, state_res_store, allow_none=True
|
||||
)
|
||||
if aev and (aev.type, aev.state_key) == (EventTypes.Create, ""):
|
||||
if aev.content.get("creator") == event.sender:
|
||||
creator = (
|
||||
aev.sender
|
||||
if event.room_version.implicit_room_creator
|
||||
else aev.content.get("creator")
|
||||
)
|
||||
if not creator:
|
||||
logger.warning(
|
||||
"_get_power_level_for_sender: event %s has no PL in auth_events and "
|
||||
"creator is missing from create event %s",
|
||||
event_id,
|
||||
aev.event_id,
|
||||
)
|
||||
if creator == event.sender:
|
||||
return 100
|
||||
break
|
||||
return 0
|
||||
|
||||
@@ -240,5 +240,5 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
|
||||
try:
|
||||
return json_decoder.decode(db_content)
|
||||
except Exception:
|
||||
logging.warning("Tried to decode '%r' as JSON and failed", db_content)
|
||||
logger.warning("Tried to decode '%r' as JSON and failed", db_content)
|
||||
raise
|
||||
|
||||
@@ -42,6 +42,7 @@ from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.streams import ToDeviceStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import (
|
||||
@@ -52,7 +53,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util import Duration, json_encoder
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.stringutils import parse_and_validate_server_name
|
||||
@@ -63,6 +64,18 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# How long to keep messages in the device federation inbox before deleting them.
|
||||
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS = 7 * Duration.DAY_MS
|
||||
|
||||
# How often to run the task to clean up old device_federation_inbox rows.
|
||||
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS = 5 * Duration.MINUTE_MS
|
||||
|
||||
# Update name for the device federation inbox received timestamp index.
|
||||
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE = (
|
||||
"device_federation_inbox_received_ts_index"
|
||||
)
|
||||
|
||||
|
||||
class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -134,6 +147,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
prefilled_cache=device_outbox_prefill,
|
||||
)
|
||||
|
||||
if hs.config.worker.run_background_tasks:
|
||||
self._clock.looping_call(
|
||||
run_as_background_process,
|
||||
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS,
|
||||
"_delete_old_federation_inbox_rows",
|
||||
self._delete_old_federation_inbox_rows,
|
||||
)
|
||||
|
||||
def process_replication_rows(
|
||||
self,
|
||||
stream_name: str,
|
||||
@@ -960,6 +981,52 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
async def _delete_old_federation_inbox_rows(self, batch_size: int = 1000) -> None:
|
||||
"""Delete old rows from the device_federation_inbox table."""
|
||||
|
||||
# We wait until we have the index on `received_ts`, otherwise the query
|
||||
# will take a very long time.
|
||||
if not await self.db_pool.updates.has_completed_background_update(
|
||||
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE
|
||||
):
|
||||
return
|
||||
|
||||
def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
|
||||
# We delete at most 100 rows that are older than
|
||||
# DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
|
||||
delete_before_ts = (
|
||||
self._clock.time_msec() - DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
|
||||
)
|
||||
sql = """
|
||||
WITH to_delete AS (
|
||||
SELECT origin, message_id
|
||||
FROM device_federation_inbox
|
||||
WHERE received_ts < ?
|
||||
ORDER BY received_ts ASC
|
||||
LIMIT ?
|
||||
)
|
||||
DELETE FROM device_federation_inbox
|
||||
WHERE
|
||||
(origin, message_id) IN (
|
||||
SELECT origin, message_id FROM to_delete
|
||||
)
|
||||
"""
|
||||
txn.execute(sql, (delete_before_ts, batch_size))
|
||||
return txn.rowcount < batch_size
|
||||
|
||||
while True:
|
||||
finished = await self.db_pool.runInteraction(
|
||||
"_delete_old_federation_inbox_rows",
|
||||
_delete_old_federation_inbox_rows_txn,
|
||||
db_autocommit=True, # We don't need to run in a transaction
|
||||
)
|
||||
if finished:
|
||||
return
|
||||
|
||||
# We sleep a bit so that we don't hammer the database in a tight
|
||||
# loop first time we run this.
|
||||
self._clock.sleep(1)
|
||||
|
||||
|
||||
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
@@ -995,6 +1062,13 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
|
||||
self._cleanup_device_federation_outbox,
|
||||
)
|
||||
|
||||
self.db_pool.updates.register_background_index_update(
|
||||
update_name=DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE,
|
||||
index_name="device_federation_inbox_received_ts_index",
|
||||
table="device_federation_inbox",
|
||||
columns=["received_ts"],
|
||||
)
|
||||
|
||||
async def _background_drop_index_device_inbox(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
|
||||
@@ -331,7 +331,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
|
||||
values={"timestamp": int(self._clock.time_msec())},
|
||||
)
|
||||
else:
|
||||
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
||||
logger.warning("mau limit reserved threepid %s not found in db", tp)
|
||||
|
||||
async def upsert_monthly_active_user(self, user_id: str) -> None:
|
||||
"""Updates or inserts the user into the monthly active user table, which
|
||||
|
||||
@@ -2421,6 +2421,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
self._room_reports_id_gen = IdGenerator(db_conn, "room_reports", "id")
|
||||
self._user_reports_id_gen = IdGenerator(db_conn, "user_reports", "id")
|
||||
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
@@ -2662,6 +2663,37 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
||||
)
|
||||
return next_id
|
||||
|
||||
async def add_user_report(
|
||||
self,
|
||||
target_user_id: str,
|
||||
user_id: str,
|
||||
reason: str,
|
||||
received_ts: int,
|
||||
) -> int:
|
||||
"""Add a user report
|
||||
|
||||
Args:
|
||||
target_user_id: The user ID being reported.
|
||||
user_id: User who reported the user.
|
||||
reason: Description that the user specifies.
|
||||
received_ts: Time when the user submitted the report (milliseconds).
|
||||
Returns:
|
||||
ID of the room report.
|
||||
"""
|
||||
next_id = self._user_reports_id_gen.get_next()
|
||||
await self.db_pool.simple_insert(
|
||||
table="user_reports",
|
||||
values={
|
||||
"id": next_id,
|
||||
"received_ts": received_ts,
|
||||
"target_user_id": target_user_id,
|
||||
"user_id": user_id,
|
||||
"reason": reason,
|
||||
},
|
||||
desc="add_user_report",
|
||||
)
|
||||
return next_id
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
|
||||
@@ -253,8 +253,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
return 1
|
||||
|
||||
logger.debug(
|
||||
"Processing the next %d rooms of %d remaining"
|
||||
% (len(rooms_to_work_on), progress["remaining"])
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
len(rooms_to_work_on),
|
||||
progress["remaining"],
|
||||
)
|
||||
|
||||
processed_event_count = 0
|
||||
|
||||
@@ -50,7 +50,9 @@ class InviteRulesConfig:
|
||||
except Exception as e:
|
||||
# If for whatever reason we can't process this, just ignore it.
|
||||
logger.debug(
|
||||
f"Could not process '{value}' field of invite rule config, ignoring: {e}"
|
||||
"Could not process '%s' field of invite rule config, ignoring: %s",
|
||||
value,
|
||||
e,
|
||||
)
|
||||
|
||||
if account_data:
|
||||
|
||||
@@ -63,8 +63,11 @@ def run_upgrade(
|
||||
if user_id in owned.keys():
|
||||
logger.error(
|
||||
"user_id %s was owned by more than one application"
|
||||
" service (IDs %s and %s); assigning arbitrarily to %s"
|
||||
% (user_id, owned[user_id], appservice.id, owned[user_id])
|
||||
" service (IDs %s and %s); assigning arbitrarily to %s",
|
||||
user_id,
|
||||
owned[user_id],
|
||||
appservice.id,
|
||||
owned[user_id],
|
||||
)
|
||||
owned.setdefault(appservice.id, []).append(user_id)
|
||||
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
-- Background update that adds an index to `device_federation_inbox.received_ts`
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(9206, 'device_federation_inbox_received_ts_index', '{}');
|
||||
@@ -0,0 +1,22 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
CREATE TABLE user_reports (
|
||||
id BIGINT NOT NULL PRIMARY KEY,
|
||||
received_ts BIGINT NOT NULL,
|
||||
target_user_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
reason TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX user_reports_target_user_id ON user_reports(target_user_id); -- for lookups
|
||||
CREATE INDEX user_reports_user_id ON user_reports(user_id); -- for lookups
|
||||
@@ -0,0 +1,24 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
from typing import Awaitable, Mapping
|
||||
|
||||
class HttpClient:
|
||||
def __init__(self, user_agent: str) -> None: ...
|
||||
def get(self, url: str, response_limit: int) -> Awaitable[bytes]: ...
|
||||
def post(
|
||||
self,
|
||||
url: str,
|
||||
response_limit: int,
|
||||
headers: Mapping[str, str],
|
||||
request_body: str,
|
||||
) -> Awaitable[bytes]: ...
|
||||
@@ -19,10 +19,22 @@
|
||||
#
|
||||
#
|
||||
|
||||
import collections.abc
|
||||
import json
|
||||
import logging
|
||||
import typing
|
||||
from typing import Any, Callable, Dict, Generator, Optional, Sequence
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterator,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
import attr
|
||||
from immutabledict import immutabledict
|
||||
@@ -43,6 +55,15 @@ if typing.TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Duration:
|
||||
"""Helper class that holds constants for common time durations in
|
||||
milliseconds."""
|
||||
|
||||
MINUTE_MS = 60 * 1000
|
||||
HOUR_MS = 60 * MINUTE_MS
|
||||
DAY_MS = 24 * HOUR_MS
|
||||
|
||||
|
||||
def _reject_invalid_json(val: Any) -> None:
|
||||
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
|
||||
raise ValueError("Invalid JSON value: '%s'" % val)
|
||||
@@ -251,3 +272,72 @@ class ExceptionBundle(Exception):
|
||||
parts.append(str(e))
|
||||
super().__init__("\n - ".join(parts))
|
||||
self.exceptions = exceptions
|
||||
|
||||
|
||||
K = TypeVar("K")
|
||||
V = TypeVar("V")
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class MutableOverlayMapping(collections.abc.MutableMapping[K, V]):
|
||||
"""A mutable mapping that allows changes to a read-only underlying
|
||||
mapping. Supports deletions.
|
||||
|
||||
This is useful for cases where you want to allow modifications to a mapping
|
||||
without changing or copying the original mapping.
|
||||
|
||||
Note: the underlying mapping must not change while this proxy is in use.
|
||||
"""
|
||||
|
||||
_underlying_map: Mapping[K, V]
|
||||
_mutable_map: Dict[K, V] = attr.ib(factory=dict)
|
||||
_deletions: Set[K] = attr.ib(factory=set)
|
||||
|
||||
def __getitem__(self, key: K) -> V:
|
||||
if key in self._deletions:
|
||||
raise KeyError(key)
|
||||
if key in self._mutable_map:
|
||||
return self._mutable_map[key]
|
||||
return self._underlying_map[key]
|
||||
|
||||
def __setitem__(self, key: K, value: V) -> None:
|
||||
self._deletions.discard(key)
|
||||
self._mutable_map[key] = value
|
||||
|
||||
def __delitem__(self, key: K) -> None:
|
||||
if key not in self:
|
||||
raise KeyError(key)
|
||||
|
||||
self._deletions.add(key)
|
||||
self._mutable_map.pop(key, None)
|
||||
|
||||
def __iter__(self) -> Iterator[K]:
|
||||
for key in self._mutable_map:
|
||||
if key not in self._deletions:
|
||||
yield key
|
||||
|
||||
for key in self._underlying_map:
|
||||
if key not in self._deletions and key not in self._mutable_map:
|
||||
# `key` should not be in both _mutable_map and _deletions
|
||||
assert key not in self._mutable_map
|
||||
yield key
|
||||
|
||||
def __len__(self) -> int:
|
||||
count = len(self._underlying_map)
|
||||
for key in self._deletions:
|
||||
if key in self._underlying_map:
|
||||
count -= 1
|
||||
|
||||
for key in self._mutable_map:
|
||||
# `key` should not be in both _mutable_map and _deletions
|
||||
assert key not in self._deletions
|
||||
|
||||
if key not in self._underlying_map:
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
def clear(self) -> None:
|
||||
self._underlying_map = {}
|
||||
self._mutable_map.clear()
|
||||
self._deletions.clear()
|
||||
|
||||
@@ -37,6 +37,8 @@ DISTRIBUTION_NAME = "matrix-synapse"
|
||||
|
||||
__all__ = ["check_requirements"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DependencyException(Exception):
|
||||
@property
|
||||
@@ -211,6 +213,6 @@ def check_requirements(extra: Optional[str] = None) -> None:
|
||||
|
||||
if deps_unfulfilled:
|
||||
for err in errors:
|
||||
logging.error(err)
|
||||
logger.error(err)
|
||||
|
||||
raise DependencyException(deps_unfulfilled)
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
# Time-based constants.
|
||||
#
|
||||
# Laying these out incrementally, even if only some are required, helps with
|
||||
# readability and catching bugs.
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
@@ -133,7 +133,7 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -
|
||||
|
||||
# write a log line on SIGTERM.
|
||||
def sigterm(signum: int, frame: Optional[FrameType]) -> NoReturn:
|
||||
logger.warning("Caught signal %s. Stopping daemon." % signum)
|
||||
logger.warning("Caught signal %s. Stopping daemon.", signum)
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
@@ -440,7 +440,8 @@ class TaskScheduler:
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
f"scheduled task {task.id} failed",
|
||||
"scheduled task %s failed",
|
||||
task.id,
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
status = TaskStatus.FAILED
|
||||
@@ -473,8 +474,10 @@ class TaskScheduler:
|
||||
self._clock.time_msec()
|
||||
> task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
|
||||
):
|
||||
logger.warn(
|
||||
f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
|
||||
logger.warning(
|
||||
"Task %s (action %s) has seen no update for more than 24h and may be stuck",
|
||||
task.id,
|
||||
task.action,
|
||||
)
|
||||
|
||||
if task.id in self._running_tasks:
|
||||
|
||||
@@ -45,6 +45,8 @@ from synapse.util import Clock
|
||||
from tests import unittest
|
||||
from tests.unittest import override_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FederationServerTests(unittest.FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
@@ -252,7 +254,7 @@ class MessageAcceptTests(unittest.FederatingHomeserverTestCase):
|
||||
class ServerACLsTestCase(unittest.TestCase):
|
||||
def test_blocked_server(self) -> None:
|
||||
e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
|
||||
logging.info("ACL event: %s", e.content)
|
||||
logger.info("ACL event: %s", e.content)
|
||||
|
||||
server_acl_evalutor = server_acl_evaluator_from_event(e)
|
||||
|
||||
@@ -266,7 +268,7 @@ class ServerACLsTestCase(unittest.TestCase):
|
||||
|
||||
def test_block_ip_literals(self) -> None:
|
||||
e = _create_acl_event({"allow_ip_literals": False, "allow": ["*"]})
|
||||
logging.info("ACL event: %s", e.content)
|
||||
logger.info("ACL event: %s", e.content)
|
||||
|
||||
server_acl_evalutor = server_acl_evaluator_from_event(e)
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user