1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
Erik Johnston
f5817281f8 Fixup2 2022-12-15 14:05:59 +00:00
Erik Johnston
87406aa5d3 Fixup 2022-12-15 13:20:20 +00:00
Erik Johnston
6842974391 Fixup 2022-12-15 13:15:47 +00:00
Erik Johnston
c93ef61fa3 WIP Rust HTTP for federation 2022-12-14 11:02:16 +00:00
77 changed files with 2545 additions and 991 deletions

View File

@@ -1,74 +1,3 @@
Synapse 1.74.0rc1 (2022-12-13)
==============================
Features
--------
- Improve user search for international display names. ([\#14464](https://github.com/matrix-org/synapse/issues/14464))
- Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`. ([\#14490](https://github.com/matrix-org/synapse/issues/14490), [\#14525](https://github.com/matrix-org/synapse/issues/14525))
- Add new `push.enabled` config option to allow opting out of push notification calculation. ([\#14551](https://github.com/matrix-org/synapse/issues/14551), [\#14619](https://github.com/matrix-org/synapse/issues/14619))
- Advertise support for Matrix 1.5 on `/_matrix/client/versions`. ([\#14576](https://github.com/matrix-org/synapse/issues/14576))
- Improve opentracing and logging for to-device message handling. ([\#14598](https://github.com/matrix-org/synapse/issues/14598))
- Allow selecting "prejoin" events by state keys in addition to event types. ([\#14642](https://github.com/matrix-org/synapse/issues/14642))
Bugfixes
--------
- Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances. ([\#14435](https://github.com/matrix-org/synapse/issues/14435), [\#14592](https://github.com/matrix-org/synapse/issues/14592), [\#14604](https://github.com/matrix-org/synapse/issues/14604))
- Suppress a spurious warning when `POST /rooms/<room_id>/<membership>/`, `POST /join/<room_id_or_alias`, or the unspecced `PUT /join/<room_id_or_alias>/<txn_id>` receive an empty HTTP request body. ([\#14600](https://github.com/matrix-org/synapse/issues/14600))
- Return spec-compliant JSON errors when unknown endpoints are requested. ([\#14620](https://github.com/matrix-org/synapse/issues/14620), [\#14621](https://github.com/matrix-org/synapse/issues/14621))
- Update html templates to load images over HTTPS. Contributed by @ashfame. ([\#14625](https://github.com/matrix-org/synapse/issues/14625))
- Fix a long-standing bug where the user directory would return 1 more row than requested. ([\#14631](https://github.com/matrix-org/synapse/issues/14631))
- Reject invalid read receipt requests with empty room or event IDs. Contributed by Nick @ Beeper (@fizzadar). ([\#14632](https://github.com/matrix-org/synapse/issues/14632))
- Fix a bug introduced in Synapse 1.67.0 where not specifying a config file or a server URL would lead to the `register_new_matrix_user` script failing. ([\#14637](https://github.com/matrix-org/synapse/issues/14637))
- Fix a long-standing bug where the user directory and room/user stats might be out of sync. ([\#14639](https://github.com/matrix-org/synapse/issues/14639), [\#14643](https://github.com/matrix-org/synapse/issues/14643))
- Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted. ([\#14650](https://github.com/matrix-org/synapse/issues/14650))
- Improve validation of field size limits in events. ([\#14664](https://github.com/matrix-org/synapse/issues/14664))
- Fix bugs introduced in Synapse 1.55.0 and 1.69.0 where application services would not be notified of events in the correct rooms, due to stale caches. ([\#14670](https://github.com/matrix-org/synapse/issues/14670))
Improved Documentation
----------------------
- Update worker settings for `pusher` and `federation_sender` functionality. ([\#14493](https://github.com/matrix-org/synapse/issues/14493))
- Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages. ([\#14517](https://github.com/matrix-org/synapse/issues/14517))
- Remove old, incorrect minimum postgres version note and replace with a link to the [Dependency Deprecation Policy](https://matrix-org.github.io/synapse/v1.73/deprecation_policy.html). ([\#14590](https://github.com/matrix-org/synapse/issues/14590))
- Add Single-Sign On setup instructions for Mastodon-based instances. ([\#14594](https://github.com/matrix-org/synapse/issues/14594))
- Change `turn_allow_guests` example value to lowercase `true`. ([\#14634](https://github.com/matrix-org/synapse/issues/14634))
Internal Changes
----------------
- Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar). ([\#14255](https://github.com/matrix-org/synapse/issues/14255))
- Faster remote room joins: stream the un-partial-stating of rooms over replication. ([\#14473](https://github.com/matrix-org/synapse/issues/14473), [\#14474](https://github.com/matrix-org/synapse/issues/14474))
- Share the `ClientRestResource` for both workers and the main process. ([\#14528](https://github.com/matrix-org/synapse/issues/14528))
- Add `--editable` flag to `complement.sh` which uses an editable install of Synapse for faster turn-around times whilst developing iteratively. ([\#14548](https://github.com/matrix-org/synapse/issues/14548))
- Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room. ([\#14549](https://github.com/matrix-org/synapse/issues/14549))
- Modernize unit tests configuration related to workers. ([\#14568](https://github.com/matrix-org/synapse/issues/14568))
- Bump jsonschema from 4.17.0 to 4.17.3. ([\#14591](https://github.com/matrix-org/synapse/issues/14591))
- Fix Rust lint CI. ([\#14602](https://github.com/matrix-org/synapse/issues/14602))
- Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1. ([\#14607](https://github.com/matrix-org/synapse/issues/14607))
- Alter some unit test environment parameters to decrease time spent running tests. ([\#14610](https://github.com/matrix-org/synapse/issues/14610))
- Switch to Go recommended installation method for `gotestfmt` template in CI. ([\#14611](https://github.com/matrix-org/synapse/issues/14611))
- Bump phonenumbers from 8.13.0 to 8.13.1. ([\#14612](https://github.com/matrix-org/synapse/issues/14612))
- Bump types-setuptools from 65.5.0.3 to 65.6.0.1. ([\#14613](https://github.com/matrix-org/synapse/issues/14613))
- Bump twine from 4.0.1 to 4.0.2. ([\#14614](https://github.com/matrix-org/synapse/issues/14614))
- Bump types-requests from 2.28.11.2 to 2.28.11.5. ([\#14615](https://github.com/matrix-org/synapse/issues/14615))
- Bump cryptography from 38.0.3 to 38.0.4. ([\#14616](https://github.com/matrix-org/synapse/issues/14616))
- Remove useless cargo install with apt from Dockerfile. ([\#14636](https://github.com/matrix-org/synapse/issues/14636))
- Bump certifi from 2021.10.8 to 2022.12.7. ([\#14645](https://github.com/matrix-org/synapse/issues/14645))
- Bump flake8-bugbear from 22.10.27 to 22.12.6. ([\#14656](https://github.com/matrix-org/synapse/issues/14656))
- Bump packaging from 21.3 to 22.0. ([\#14657](https://github.com/matrix-org/synapse/issues/14657))
- Bump types-pillow from 9.3.0.1 to 9.3.0.4. ([\#14658](https://github.com/matrix-org/synapse/issues/14658))
- Bump serde from 1.0.148 to 1.0.150. ([\#14659](https://github.com/matrix-org/synapse/issues/14659))
- Bump phonenumbers from 8.13.1 to 8.13.2. ([\#14660](https://github.com/matrix-org/synapse/issues/14660))
- Bump authlib from 1.1.0 to 1.2.0. ([\#14661](https://github.com/matrix-org/synapse/issues/14661))
- Move `StateFilter` to `synapse.types`. ([\#14668](https://github.com/matrix-org/synapse/issues/14668))
- Improve type hints. ([\#14597](https://github.com/matrix-org/synapse/issues/14597), [\#14646](https://github.com/matrix-org/synapse/issues/14646), [\#14671](https://github.com/matrix-org/synapse/issues/14671))
Synapse 1.73.0 (2022-12-06)
===========================

1114
Cargo.lock generated

File diff suppressed because it is too large Load Diff

1
changelog.d/14255.misc Normal file
View File

@@ -0,0 +1 @@
Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).

1
changelog.d/14435.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.

View File

@@ -0,0 +1 @@
Improve user search for international display names.

1
changelog.d/14473.misc Normal file
View File

@@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of rooms over replication.

1
changelog.d/14474.misc Normal file
View File

@@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of rooms over replication.

View File

@@ -0,0 +1 @@
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.

1
changelog.d/14493.doc Normal file
View File

@@ -0,0 +1 @@
Update worker settings for `pusher` and `federation_sender` functionality.

1
changelog.d/14517.doc Normal file
View File

@@ -0,0 +1 @@
Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages.

View File

@@ -0,0 +1 @@
Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.

1
changelog.d/14528.misc Normal file
View File

@@ -0,0 +1 @@
Share the `ClientRestResource` for both workers and the main process.

1
changelog.d/14548.misc Normal file
View File

@@ -0,0 +1 @@
Add `--editable` flag to `complement.sh` which uses an editable install of Synapse for faster turn-around times whilst developing iteratively.

1
changelog.d/14549.misc Normal file
View File

@@ -0,0 +1 @@
Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room.

View File

@@ -0,0 +1 @@
Add new `push.enabled` config option to allow opting out of push notification calculation.

1
changelog.d/14568.misc Normal file
View File

@@ -0,0 +1 @@
Modernize unit tests configuration related to workers.

View File

@@ -0,0 +1 @@
Advertise support for Matrix 1.5 on `/_matrix/client/versions`.

1
changelog.d/14590.doc Normal file
View File

@@ -0,0 +1 @@
Remove old, incorrect minimum postgres version note and replace with a link to the [Dependency Deprecation Policy](https://matrix-org.github.io/synapse/v1.73/deprecation_policy.html).

1
changelog.d/14591.misc Normal file
View File

@@ -0,0 +1 @@
Bump jsonschema from 4.17.0 to 4.17.3.

1
changelog.d/14592.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.

1
changelog.d/14594.doc Normal file
View File

@@ -0,0 +1 @@
Add Single-Sign On setup instructions for Mastodon-based instances.

1
changelog.d/14597.misc Normal file
View File

@@ -0,0 +1 @@
Add missing type hints.

View File

@@ -0,0 +1 @@
Improve opentracing and logging for to-device message handling.

1
changelog.d/14600.bugfix Normal file
View File

@@ -0,0 +1 @@
Suppress a spurious warning when `POST /rooms/<room_id>/<membership>/`, `POST /join/<room_id_or_alias`, or the unspecced `PUT /join/<room_id_or_alias>/<txn_id>` receive an empty HTTP request body.

1
changelog.d/14602.misc Normal file
View File

@@ -0,0 +1 @@
Fix Rust lint CI.

1
changelog.d/14604.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.

1
changelog.d/14607.misc Normal file
View File

@@ -0,0 +1 @@
Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1.

1
changelog.d/14610.misc Normal file
View File

@@ -0,0 +1 @@
Alter some unit test environment parameters to decrease time spent running tests.

1
changelog.d/14611.misc Normal file
View File

@@ -0,0 +1 @@
Switch to Go recommended installation method for `gotestfmt` template in CI.

1
changelog.d/14612.misc Normal file
View File

@@ -0,0 +1 @@
Bump phonenumbers from 8.13.0 to 8.13.1.

1
changelog.d/14613.misc Normal file
View File

@@ -0,0 +1 @@
Bump types-setuptools from 65.5.0.3 to 65.6.0.1.

1
changelog.d/14614.misc Normal file
View File

@@ -0,0 +1 @@
Bump twine from 4.0.1 to 4.0.2.

1
changelog.d/14615.misc Normal file
View File

@@ -0,0 +1 @@
Bump types-requests from 2.28.11.2 to 2.28.11.5.

1
changelog.d/14616.misc Normal file
View File

@@ -0,0 +1 @@
Bump cryptography from 38.0.3 to 38.0.4.

1
changelog.d/14619.doc Normal file
View File

@@ -0,0 +1 @@
Add new `push.enabled` config option to allow opting out of push notification calculation.

1
changelog.d/14620.bugfix Normal file
View File

@@ -0,0 +1 @@
Return spec-compliant JSON errors when unknown endpoints are requested.

1
changelog.d/14621.bugfix Normal file
View File

@@ -0,0 +1 @@
Return spec-compliant JSON errors when unknown endpoints are requested.

1
changelog.d/14625.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix html templates to load images only on HTTPS. Contributed by @ashfame.

1
changelog.d/14631.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where the user directory would return 1 more row than requested.

1
changelog.d/14632.bugfix Normal file
View File

@@ -0,0 +1 @@
Reject invalid read receipt requests with empty room or event IDs. Contributed by Nick @ Beeper (@fizzadar).

1
changelog.d/14634.doc Normal file
View File

@@ -0,0 +1 @@
Change `turn_allow_guests` example value to lowercase `true`.

1
changelog.d/14636.misc Normal file
View File

@@ -0,0 +1 @@
Remove useless cargo install with apt from Dockerfile.

1
changelog.d/14637.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a bug introduced in v1.67.0 where not specifying a config file or a server URL would lead to the `register_new_matrix_user` script failing.

1
changelog.d/14639.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where the user directory and room/user stats might be out of sync.

View File

@@ -0,0 +1 @@
Allow selecting "prejoin" events by state keys in addition to event types.

1
changelog.d/14643.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where the user directory and room/user stats might be out of sync.

1
changelog.d/14645.misc Normal file
View File

@@ -0,0 +1 @@
Bump certifi from 2021.10.8 to 2022.12.7.

1
changelog.d/14646.misc Normal file
View File

@@ -0,0 +1 @@
Add missing type hints.

2
changelog.d/14650.bugfix Normal file
View File

@@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted.

1
changelog.d/14656.misc Normal file
View File

@@ -0,0 +1 @@
Bump flake8-bugbear from 22.10.27 to 22.12.6.

1
changelog.d/14657.misc Normal file
View File

@@ -0,0 +1 @@
Bump packaging from 21.3 to 22.0.

1
changelog.d/14658.misc Normal file
View File

@@ -0,0 +1 @@
Bump types-pillow from 9.3.0.1 to 9.3.0.4.

1
changelog.d/14659.misc Normal file
View File

@@ -0,0 +1 @@
Bump serde from 1.0.148 to 1.0.150.

1
changelog.d/14660.misc Normal file
View File

@@ -0,0 +1 @@
Bump phonenumbers from 8.13.1 to 8.13.2.

1
changelog.d/14661.misc Normal file
View File

@@ -0,0 +1 @@
Bump authlib from 1.1.0 to 1.2.0.

View File

@@ -0,0 +1 @@
(remove from changelog: unreleased) Revert the deletion of stale devices due to performance issues.

1
changelog.d/14668.misc Normal file
View File

@@ -0,0 +1 @@
Move `StateFilter` to `synapse.types`.

1
changelog.d/14670.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bugs introduced in 1.55.0 and 1.69.0 where application services would not be notified of events in the correct rooms, due to stale caches.

1
changelog.d/14671.misc Normal file
View File

@@ -0,0 +1 @@
Improve type hints.

5
debian/changelog vendored
View File

@@ -1,10 +1,9 @@
matrix-synapse-py3 (1.74.0~rc1) stable; urgency=medium
matrix-synapse-py3 (1.74.0~rc1) UNRELEASED; urgency=medium
* New dependency on libicu-dev to provide improved results for user
search.
* New Synapse release 1.74.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 13 Dec 2022 13:30:01 +0000
-- Synapse Packaging team <packages@matrix.org> Tue, 06 Dec 2022 15:28:10 +0000
matrix-synapse-py3 (1.73.0) stable; urgency=medium

1387
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -57,7 +57,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
version = "1.74.0rc1"
version = "1.73.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
@@ -209,7 +209,6 @@ Pympler = { version = "*", optional = true }
parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
pyicu = { version = ">=2.10.2", optional = true }
uvloop = { version = ">=0.17.0", optional = true }
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified
@@ -236,7 +235,6 @@ test = ["parameterized", "idna"]
# requires libicu's development headers installed on the system (e.g. libicu-dev on
# Debian-based distributions).
user-search = ["pyicu"]
uvloop = ["uvloop"]
# The duplication here is awful. I hate hate hate hate hate it. However, for now I want
# to ensure you can still `pip install matrix-synapse[all]` like today. Two motivations:
@@ -270,8 +268,6 @@ all = [
"pympler",
# improved user search
"pyicu",
# uvloop
"uvloop",
# omitted:
# - test: it's useful to have this separate from dev deps in the olddeps job
# - systemd: this is a system-based requirement

View File

@@ -21,14 +21,25 @@ name = "synapse.synapse_rust"
[dependencies]
anyhow = "1.0.63"
env_logger = "0.10.0"
futures = "0.3.25"
futures-util = "0.3.25"
http = "0.2.8"
hyper = { version = "0.14.23", features = ["client", "http1", "http2", "runtime", "server", "full"] }
hyper-tls = "0.5.0"
lazy_static = "1.4.0"
log = "0.4.17"
native-tls = "0.2.11"
pyo3 = { version = "0.17.1", features = ["extension-module", "macros", "anyhow", "abi3", "abi3-py37"] }
pyo3-asyncio = { version = "0.17.0", features = ["tokio", "tokio-runtime"] }
pyo3-log = "0.7.0"
pythonize = "0.17.0"
regex = "1.6.0"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "1.0.85"
tokio = "1.23.0"
tokio-native-tls = "0.3.0"
trust-dns-resolver = "0.22.0"
[build-dependencies]
blake2 = "0.10.4"

158
rust/src/http/mod.rs Normal file
View File

@@ -0,0 +1,158 @@
use std::collections::HashMap;
use anyhow::Error;
use http::{Request, Uri};
use hyper::Body;
use log::info;
use pyo3::{
pyclass, pymethods,
types::{PyBytes, PyModule},
IntoPy, PyAny, PyObject, PyResult, Python, ToPyObject,
};
use self::resolver::{MatrixConnector, MatrixResolver};
pub mod resolver;
/// Called when registering modules with python.
pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
let child_module = PyModule::new(py, "http")?;
child_module.add_class::<HttpClient>()?;
child_module.add_class::<MatrixResponse>()?;
m.add_submodule(child_module)?;
// We need to manually add the module to sys.modules to make `from
// synapse.synapse_rust import push` work.
py.import("sys")?
.getattr("modules")?
.set_item("synapse.synapse_rust.http", child_module)?;
Ok(())
}
#[derive(Clone, Debug)]
pub struct Bytes(pub Vec<u8>);
impl ToPyObject for Bytes {
fn to_object(&self, py: Python<'_>) -> pyo3::PyObject {
PyBytes::new(py, &self.0).into_py(py)
}
}
impl IntoPy<PyObject> for Bytes {
fn into_py(self, py: Python<'_>) -> PyObject {
self.to_object(py)
}
}
#[derive(Debug)]
#[pyclass]
pub struct MatrixResponse {
#[pyo3(get)]
pub code: u16,
#[pyo3(get)]
pub phrase: &'static str,
#[pyo3(get)]
pub content: Bytes,
#[pyo3(get)]
pub headers: HashMap<String, Bytes>,
}
#[pyclass]
#[derive(Clone)]
pub struct HttpClient {
client: hyper::Client<MatrixConnector>,
resolver: MatrixResolver,
}
impl HttpClient {
pub fn new() -> Result<Self, Error> {
let resolver = MatrixResolver::new()?;
let client =
hyper::Client::builder().build(MatrixConnector::with_resolver(resolver.clone()));
Ok(HttpClient { client, resolver })
}
pub async fn async_request(
&self,
url: String,
method: String,
headers: HashMap<Vec<u8>, Vec<Vec<u8>>>,
body: Option<Vec<u8>>,
) -> Result<MatrixResponse, Error> {
let uri: Uri = url.try_into()?;
let mut builder = Request::builder().method(&*method).uri(uri.clone());
for (key, values) in headers {
for value in values {
builder = builder.header(key.clone(), value);
}
}
if uri.scheme_str() == Some("matrix") {
let endpoints = self.resolver.resolve_server_name_from_uri(&uri).await?;
if let Some(endpoint) = endpoints.first() {
builder = builder.header("Host", &endpoint.host_header);
}
}
let request = if let Some(body) = body {
builder.body(Body::from(body))?
} else {
builder.body(Body::empty())?
};
let response = self.client.request(request).await?;
let code = response.status().as_u16();
let phrase = response.status().canonical_reason().unwrap_or_default();
let headers = response
.headers()
.iter()
.map(|(k, v)| (k.to_string(), Bytes(v.as_bytes().to_owned())))
.collect();
let body = response.into_body();
let bytes = hyper::body::to_bytes(body).await?;
let content = Bytes(bytes.to_vec());
Ok(MatrixResponse {
code,
phrase,
content,
headers,
})
}
}
#[pymethods]
impl HttpClient {
#[new]
fn py_new() -> Result<Self, Error> {
Self::new()
}
fn request<'a>(
&'a self,
py: Python<'a>,
url: String,
method: String,
headers: HashMap<Vec<u8>, Vec<Vec<u8>>>,
body: Option<Vec<u8>>,
) -> PyResult<&'a PyAny> {
pyo3::prepare_freethreaded_python();
let client = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let resp = client.async_request(url, method, headers, body).await?;
Ok(resp)
})
}
}

432
rust/src/http/resolver.rs Normal file
View File

@@ -0,0 +1,432 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::net::IpAddr;
use std::pin::Pin;
use std::str::FromStr;
use std::{
io::Cursor,
sync::{Arc, Mutex},
task::{self, Poll},
};
use anyhow::{bail, Error};
use futures::{FutureExt, TryFutureExt};
use futures_util::stream::StreamExt;
use http::Uri;
use hyper::client::connect::Connection;
use hyper::client::connect::{Connected, HttpConnector};
use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::Client;
use hyper_tls::HttpsConnector;
use hyper_tls::MaybeHttpsStream;
use log::{debug, info};
use native_tls::TlsConnector;
use serde::Deserialize;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tokio_native_tls::TlsConnector as AsyncTlsConnector;
use trust_dns_resolver::error::ResolveErrorKind;
#[derive(Debug, Clone)]
pub struct Endpoint {
pub host: String,
pub port: u16,
pub host_header: String,
pub tls_name: String,
}
#[derive(Clone)]
pub struct MatrixResolver {
resolver: trust_dns_resolver::TokioAsyncResolver,
http_client: Client<HttpsConnector<HttpConnector>>,
}
impl MatrixResolver {
pub fn new() -> Result<MatrixResolver, Error> {
let http_client = hyper::Client::builder().build(HttpsConnector::new());
MatrixResolver::with_client(http_client)
}
pub fn with_client(
http_client: Client<HttpsConnector<HttpConnector>>,
) -> Result<MatrixResolver, Error> {
let resolver = trust_dns_resolver::TokioAsyncResolver::tokio_from_system_conf()?;
Ok(MatrixResolver {
resolver,
http_client,
})
}
/// Does SRV lookup
pub async fn resolve_server_name_from_uri(&self, uri: &Uri) -> Result<Vec<Endpoint>, Error> {
let host = uri.host().expect("URI has no host").to_string();
let port = uri.port_u16();
self.resolve_server_name_from_host_port(host, port).await
}
pub async fn resolve_server_name_from_host_port(
&self,
mut host: String,
mut port: Option<u16>,
) -> Result<Vec<Endpoint>, Error> {
let mut authority = if let Some(p) = port {
format!("{}:{}", host, p)
} else {
host.to_string()
};
// If a literal IP or includes port then we shortcircuit.
if host.parse::<IpAddr>().is_ok() || port.is_some() {
return Ok(vec![Endpoint {
host: host.to_string(),
port: port.unwrap_or(8448),
host_header: authority.to_string(),
tls_name: host.to_string(),
}]);
}
// Do well-known delegation lookup.
if let Some(server) = get_well_known(&self.http_client, &host).await {
let a = http::uri::Authority::from_str(&server.server)?;
host = a.host().to_string();
port = a.port_u16();
authority = a.to_string();
}
// If a literal IP or includes port then we shortcircuit.
if host.parse::<IpAddr>().is_ok() || port.is_some() {
return Ok(vec![Endpoint {
host: host.clone(),
port: port.unwrap_or(8448),
host_header: authority.to_string(),
tls_name: host.clone(),
}]);
}
let result = self
.resolver
.srv_lookup(format!("_matrix._tcp.{}", host))
.await;
let records = match result {
Ok(records) => records,
Err(err) => match err.kind() {
ResolveErrorKind::NoRecordsFound { .. } => {
return Ok(vec![Endpoint {
host: host.clone(),
port: 8448,
host_header: authority.to_string(),
tls_name: host.clone(),
}])
}
_ => return Err(err.into()),
},
};
let mut priority_map: BTreeMap<u16, Vec<_>> = BTreeMap::new();
let mut count = 0;
for record in records {
count += 1;
let priority = record.priority();
priority_map.entry(priority).or_default().push(record);
}
let mut results = Vec::with_capacity(count);
for (_priority, records) in priority_map {
// TODO: Correctly shuffle records
results.extend(records.into_iter().map(|record| Endpoint {
host: record.target().to_utf8(),
port: record.port(),
host_header: host.to_string(),
tls_name: host.to_string(),
}))
}
Ok(results)
}
}
async fn get_well_known<C>(http_client: &Client<C>, host: &str) -> Option<WellKnownServer>
where
C: Service<Uri> + Clone + Sync + Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
C::Future: Unpin + Send,
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
// TODO: Add timeout.
let uri = hyper::Uri::builder()
.scheme("https")
.authority(host)
.path_and_query("/.well-known/matrix/server")
.build()
.ok()?;
let mut body = http_client.get(uri).await.ok()?.into_body();
let mut vec = Vec::new();
while let Some(next) = body.next().await {
let chunk = next.ok()?;
vec.extend(chunk);
}
serde_json::from_slice(&vec).ok()?
}
#[derive(Deserialize)]
struct WellKnownServer {
#[serde(rename = "m.server")]
server: String,
}
#[derive(Clone)]
pub struct MatrixConnector {
resolver: MatrixResolver,
}
impl MatrixConnector {
pub fn with_resolver(resolver: MatrixResolver) -> MatrixConnector {
MatrixConnector { resolver }
}
}
impl Service<Uri> for MatrixConnector {
type Response = MaybeHttpsStream<TcpStream>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// This connector is always ready, but others might not be.
Poll::Ready(Ok(()))
}
fn call(&mut self, dst: Uri) -> Self::Future {
let resolver = self.resolver.clone();
if dst.scheme_str() != Some("matrix") {
debug!("Got non-matrix scheme");
return HttpsConnector::new()
.call(dst)
.map_err(|e| Error::msg(e))
.boxed();
}
async move {
let endpoints = resolver
.resolve_server_name_from_host_port(
dst.host().expect("hostname").to_string(),
dst.port_u16(),
)
.await?;
debug!("Got endpoints: {:?}", endpoints);
for endpoint in endpoints {
match try_connecting(&dst, &endpoint).await {
Ok(r) => return Ok(r),
// Errors here are not unexpected, and we just move on
// with our lives.
Err(e) => info!(
"Failed to connect to {} via {}:{} because {}",
dst.host().expect("hostname"),
endpoint.host,
endpoint.port,
e,
),
}
}
bail!(
"failed to resolve host: {:?} port {:?}",
dst.host(),
dst.port()
)
}
.boxed()
}
}
/// Attempts to connect to a particular endpoint.
async fn try_connecting(
dst: &Uri,
endpoint: &Endpoint,
) -> Result<MaybeHttpsStream<TcpStream>, Error> {
let tcp = TcpStream::connect((&endpoint.host as &str, endpoint.port)).await?;
let connector: AsyncTlsConnector = if dst.host().expect("hostname").contains("localhost") {
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()?
.into()
} else {
TlsConnector::new().unwrap().into()
};
let tls = connector.connect(&endpoint.tls_name, tcp).await?;
Ok(tls.into())
}
/// A connector that reutrns a connection which returns 200 OK to all connections.
#[derive(Clone)]
pub struct TestConnector;
impl Service<Uri> for TestConnector {
type Response = TestConnection;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// This connector is always ready, but others might not be.
Poll::Ready(Ok(()))
}
fn call(&mut self, _dst: Uri) -> Self::Future {
let (client, server) = TestConnection::double_ended();
{
let service = hyper::service::service_fn(|_| async move {
Ok(hyper::Response::new(hyper::Body::from("Hello World")))
as Result<_, hyper::http::Error>
});
let fut = Http::new().serve_connection(server, service);
tokio::spawn(fut);
}
futures::future::ok(client).boxed()
}
}
#[derive(Default)]
struct TestConnectionInner {
outbound_buffer: Cursor<Vec<u8>>,
inbound_buffer: Cursor<Vec<u8>>,
wakers: Vec<futures::task::Waker>,
}
/// A in memory connection for use with tests.
#[derive(Clone, Default)]
pub struct TestConnection {
inner: Arc<Mutex<TestConnectionInner>>,
direction: bool,
}
impl TestConnection {
pub fn double_ended() -> (TestConnection, TestConnection) {
let inner: Arc<Mutex<TestConnectionInner>> = Arc::default();
let a = TestConnection {
inner: inner.clone(),
direction: false,
};
let b = TestConnection {
inner,
direction: true,
};
(a, b)
}
}
impl AsyncRead for TestConnection {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
let mut conn = self.inner.lock().expect("mutex");
let buffer = if self.direction {
&mut conn.inbound_buffer
} else {
&mut conn.outbound_buffer
};
let bytes_read = std::io::Read::read(buffer, buf.initialize_unfilled())?;
buf.advance(bytes_read);
if bytes_read > 0 {
Poll::Ready(Ok(()))
} else {
conn.wakers.push(cx.waker().clone());
Poll::Pending
}
}
}
impl AsyncWrite for TestConnection {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let mut conn = self.inner.lock().expect("mutex");
if self.direction {
conn.outbound_buffer.get_mut().extend_from_slice(buf);
} else {
conn.inbound_buffer.get_mut().extend_from_slice(buf);
}
for waker in conn.wakers.drain(..) {
waker.wake()
}
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let mut conn = self.inner.lock().expect("mutex");
if self.direction {
Pin::new(&mut conn.outbound_buffer).poll_flush(cx)
} else {
Pin::new(&mut conn.inbound_buffer).poll_flush(cx)
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let mut conn = self.inner.lock().expect("mutex");
if self.direction {
Pin::new(&mut conn.outbound_buffer).poll_shutdown(cx)
} else {
Pin::new(&mut conn.inbound_buffer).poll_shutdown(cx)
}
}
}
impl Connection for TestConnection {
fn connected(&self) -> Connected {
Connected::new()
}
}
#[tokio::test]
async fn test_memory_connection() {
let client: hyper::Client<_, hyper::Body> = hyper::Client::builder().build(TestConnector);
let response = client
.get("http://localhost".parse().unwrap())
.await
.unwrap();
assert!(response.status().is_success());
let bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
assert_eq!(&bytes[..], b"Hello World");
}

View File

@@ -1,5 +1,6 @@
use pyo3::prelude::*;
pub mod http;
pub mod push;
/// Returns the hash of all the rust source files at the time it was compiled.
@@ -26,6 +27,7 @@ fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(get_rust_file_digest, m)?)?;
push::register_module(py, m)?;
http::register_module(py, m)?;
Ok(())
}

View File

@@ -0,0 +1,16 @@
from typing import Dict, List, Optional
class MatrixResponse:
code: int
phrase: str
content: bytes
headers: Dict[str, str]
class HttpClient:
async def request(
self,
url: str,
method: str,
headers: Dict[bytes, List[bytes]],
body: Optional[bytes],
) -> MatrixResponse: ...

View File

@@ -45,7 +45,7 @@ class PushRuleEvaluator:
notification_power_levels: Mapping[str, int],
related_events_flattened: Mapping[str, Mapping[str, str]],
related_event_match_enabled: bool,
room_version_feature_flags: Tuple[str, ...],
room_version_feature_flags: list[str],
msc3931_enabled: bool,
): ...
def run(

View File

@@ -29,7 +29,7 @@ if sys.version_info < (3, 7):
sys.exit(1)
# Allow using the asyncio reactor via env var.
if strtobool(os.environ.get("SYNAPSE_ASYNC_IO_REACTOR", "0")):
if strtobool(os.environ.get("SYNAPSE_ASYNC_IO_REACTOR", "0")) or True:
from incremental import Version
import twisted
@@ -44,15 +44,8 @@ if strtobool(os.environ.get("SYNAPSE_ASYNC_IO_REACTOR", "0")):
from twisted.internet import asyncioreactor
if bool(os.environ.get("SYNAPSE_UVLOOP", False)):
import uvloop
uvloop.install()
print("Using uvloop")
asyncioreactor.install(asyncio.get_event_loop())
# Twisted and canonicaljson will fail to import when this file is executed to
# get the __version__ during a fresh install. That's OK and subsequent calls to
# actually start Synapse will import these libraries fine.

View File

@@ -152,7 +152,6 @@ class EduTypes:
class RejectedReason:
AUTH_ERROR: Final = "auth_error"
OVERSIZED_EVENT: Final = "oversized_event"
class RoomCreationPreset:

View File

@@ -424,17 +424,8 @@ class ResourceLimitError(SynapseError):
class EventSizeError(SynapseError):
"""An error raised when an event is too big."""
def __init__(self, msg: str, unpersistable: bool):
"""
unpersistable:
if True, the PDU must not be persisted, not even as a rejected PDU
when received over federation.
This is notably true when the entire PDU exceeds the size limit for a PDU,
(as opposed to an individual key's size limit being exceeded).
"""
def __init__(self, msg: str):
super().__init__(413, msg, Codes.TOO_LARGE)
self.unpersistable = unpersistable
class LoginError(SynapseError):

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Dict, Optional, Tuple
from typing import Callable, Dict, List, Optional
import attr
@@ -103,7 +103,7 @@ class RoomVersion:
# is not enough to mark it "supported": the push rule evaluator also needs to
# support the flag. Unknown flags are ignored by the evaluator, making conditions
# fail if used.
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
msc3931_push_features: List[str] # values from PushRuleRoomFlag
class RoomVersions:
@@ -124,7 +124,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V2 = RoomVersion(
"2",
@@ -143,7 +143,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V3 = RoomVersion(
"3",
@@ -162,7 +162,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V4 = RoomVersion(
"4",
@@ -181,7 +181,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V5 = RoomVersion(
"5",
@@ -200,7 +200,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V6 = RoomVersion(
"6",
@@ -219,7 +219,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
@@ -238,7 +238,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V7 = RoomVersion(
"7",
@@ -257,7 +257,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V8 = RoomVersion(
"8",
@@ -276,7 +276,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V9 = RoomVersion(
"9",
@@ -295,7 +295,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC3787 = RoomVersion(
"org.matrix.msc3787",
@@ -314,7 +314,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
V10 = RoomVersion(
"10",
@@ -333,7 +333,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC2716v4 = RoomVersion(
"org.matrix.msc2716v4",
@@ -352,7 +352,7 @@ class RoomVersions:
msc2716_redactions=True,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
msc3931_push_features=(),
msc3931_push_features=[],
)
MSC1767v10 = RoomVersion(
# MSC1767 (Extensible Events) based on room version "10"
@@ -372,7 +372,7 @@ class RoomVersions:
msc2716_redactions=False,
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
msc3931_push_features=[PushRuleRoomFlag.EXTENSIBLE_EVENTS],
)

View File

@@ -52,7 +52,6 @@ from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
RoomVersion,
RoomVersions,
)
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import MutableStateMap, StateMap, UserID, get_domain_from_id
@@ -342,80 +341,19 @@ def check_state_dependent_auth_rules(
logger.debug("Allowing! %s", event)
# Set of room versions where Synapse did not apply event key size limits
# in bytes, but rather in codepoints.
# In these room versions, we are more lenient with event size validation.
LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
RoomVersions.V1,
RoomVersions.V2,
RoomVersions.V3,
RoomVersions.V4,
RoomVersions.V5,
RoomVersions.V6,
RoomVersions.MSC2176,
RoomVersions.V7,
RoomVersions.V8,
RoomVersions.V9,
RoomVersions.MSC3787,
RoomVersions.V10,
RoomVersions.MSC2716v4,
RoomVersions.MSC1767v10,
}
def _check_size_limits(event: "EventBase") -> None:
"""
Checks the size limits in a PDU.
The entire size limit of the PDU is checked first.
Then the size of fields is checked, first in codepoints and then in bytes.
The codepoint size limits are only for Synapse compatibility.
Raises:
EventSizeError:
when a size limit has been violated.
unpersistable=True if Synapse never would have accepted the event and
the PDU must NOT be persisted.
unpersistable=False if a prior version of Synapse would have accepted the
event and so the PDU must be persisted as rejected to avoid
breaking the room.
"""
# Whole PDU check
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
raise EventSizeError("event too large", unpersistable=True)
# Codepoint size check: Synapse always enforced these limits, so apply
# them strictly.
if len(event.user_id) > 255:
raise EventSizeError("'user_id' too large", unpersistable=True)
raise EventSizeError("'user_id' too large")
if len(event.room_id) > 255:
raise EventSizeError("'room_id' too large", unpersistable=True)
raise EventSizeError("'room_id' too large")
if event.is_state() and len(event.state_key) > 255:
raise EventSizeError("'state_key' too large", unpersistable=True)
raise EventSizeError("'state_key' too large")
if len(event.type) > 255:
raise EventSizeError("'type' too large", unpersistable=True)
raise EventSizeError("'type' too large")
if len(event.event_id) > 255:
raise EventSizeError("'event_id' too large", unpersistable=True)
strict_byte_limits = (
event.room_version not in LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS
)
# Byte size check: if these fail, then be lenient to avoid breaking rooms.
if len(event.user_id.encode("utf-8")) > 255:
raise EventSizeError("'user_id' too large", unpersistable=strict_byte_limits)
if len(event.room_id.encode("utf-8")) > 255:
raise EventSizeError("'room_id' too large", unpersistable=strict_byte_limits)
if event.is_state() and len(event.state_key.encode("utf-8")) > 255:
raise EventSizeError("'state_key' too large", unpersistable=strict_byte_limits)
if len(event.type.encode("utf-8")) > 255:
raise EventSizeError("'type' too large", unpersistable=strict_byte_limits)
if len(event.event_id.encode("utf-8")) > 255:
raise EventSizeError("'event_id' too large", unpersistable=strict_byte_limits)
raise EventSizeError("'event_id' too large")
if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
raise EventSizeError("event too large")
def _check_create(event: "EventBase") -> None:

View File

@@ -43,7 +43,6 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
EventSizeError,
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
@@ -1737,15 +1736,6 @@ class FederationEventHandler:
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
except EventSizeError as e:
if e.unpersistable:
# This event is completely unpersistable.
raise e
# Otherwise, we are somewhat lenient and just persist the event
# as rejected, for moderate compatibility with older Synapse
# versions.
logger.warning("While validating received event %r: %s", event, e)
context.rejected = RejectedReason.OVERSIZED_EVENT
events_and_contexts_to_persist.append((event, context))
@@ -1791,16 +1781,6 @@ class FederationEventHandler:
# TODO: use a different rejected reason here?
context.rejected = RejectedReason.AUTH_ERROR
return
except EventSizeError as e:
if e.unpersistable:
# This event is completely unpersistable.
raise e
# Otherwise, we are somewhat lenient and just persist the event
# as rejected, for moderate compatibility with older Synapse
# versions.
logger.warning("While validating received event %r: %s", event, e)
context.rejected = RejectedReason.OVERSIZED_EVENT
return
# next, check that we have all of the event's auth events.
#

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import asyncio
import cgi
import codecs
import logging
@@ -42,14 +43,18 @@ from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
from typing_extensions import Literal
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import IReactorTime
from twisted.internet.protocol import Protocol
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.internet.testing import StringTransport
from twisted.python.failure import Failure
from twisted.web.client import Response, ResponseDone, ResponseFailed
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer, IResponse
from twisted.web.iweb import UNKNOWN_LENGTH, IBodyProducer, IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -75,6 +80,7 @@ from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.synapse_rust.http import HttpClient
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
@@ -199,6 +205,33 @@ class JsonParser(ByteParser[Union[JsonDict, list]]):
return json_decoder.decode(self._buffer.getvalue())
@attr.s(auto_attribs=True)
@implementer(IResponse)
class RustResponse:
version: tuple
code: int
phrase: bytes
headers: Headers
length: Union[int, UNKNOWN_LENGTH]
# request: Optional[IClientRequest]
# previousResponse: Optional[IResponse]
_data: bytes
def deliverBody(self, protocol: Protocol):
protocol.dataReceived(self._data)
protocol.connectionLost(Failure(ResponseDone("Response body fully received")))
def setPreviousResponse(self, response: IResponse):
pass
async def _handle_response(
reactor: IReactorTime,
timeout_sec: float,
@@ -372,6 +405,8 @@ class MatrixFederationHttpClient:
self._sleeper = AwakenableSleeper(self.reactor)
self._rust_client = HttpClient()
def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""
@@ -556,11 +591,8 @@ class MatrixFederationHttpClient:
destination_bytes, method_bytes, url_to_sign_bytes, json
)
data = encode_canonical_json(json)
producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
BytesIO(data), cooperator=self._cooperator
)
else:
producer = None
data = None
auth_headers = self.build_auth_headers(
destination_bytes, method_bytes, url_to_sign_bytes
)
@@ -591,23 +623,33 @@ class MatrixFederationHttpClient:
# * The `Deferred` that joins the forks back together is
# wrapped in `make_deferred_yieldable` to restore the
# logging context regardless of the path taken.
request_deferred = run_in_background(
self.agent.request,
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.reactor,
)
# request_deferred = run_in_background(
# self._rust_client.request,
# url_str,
# request.method,
# headers_dict,
# data,
# )
# request_deferred = timeout_deferred(
# request_deferred,
# timeout=_sec_timeout,
# reactor=self.reactor,
# )
response = await make_deferred_yieldable(request_deferred)
# response = await make_deferred_yieldable(request_deferred)
response_d = run_in_background(
self._rust_client.request,
url_str,
request.method,
headers_dict,
data,
)
response = await make_deferred_yieldable(response_d)
except DNSLookupError as e:
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
except Exception as e:
logger.exception("ERROR")
raise RequestSendFailed(e, can_retry=True) from e
incoming_responses_counter.labels(
@@ -615,7 +657,7 @@ class MatrixFederationHttpClient:
).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)
response_phrase = response.phrase.decode("ascii", errors="replace")
response_phrase = response.phrase
if 200 <= response.code < 300:
logger.debug(
@@ -635,25 +677,7 @@ class MatrixFederationHttpClient:
)
# :'(
# Update transactions table?
d = treq.content(response)
d = timeout_deferred(
d, timeout=_sec_timeout, reactor=self.reactor
)
try:
body = await make_deferred_yieldable(d)
except Exception as e:
# Eh, we're already going to raise an exception so lets
# ignore if this fails.
logger.warning(
"{%s} [%s] Failed to get error response: %s %s: %s",
request.txn_id,
request.destination,
request.method,
url_str,
_flatten_response_never_received(e),
)
body = None
body = response.content
exc = HttpResponseException(
response.code, response_phrase, body
@@ -715,7 +739,19 @@ class MatrixFederationHttpClient:
_flatten_response_never_received(e),
)
raise
return response
headers = Headers()
for key, value in response.headers.items():
headers.addRawHeader(key, value)
return RustResponse(
("HTTP", 1, 1),
response.code,
response.phrase.encode("ascii"),
headers,
UNKNOWN_LENGTH,
response.content,
)
def build_auth_headers(
self,

View File

@@ -26,6 +26,7 @@ import logging
import threading
import typing
import warnings
from asyncio import Future
from types import TracebackType
from typing import (
TYPE_CHECKING,
@@ -814,6 +815,8 @@ def run_in_background( # type: ignore[misc]
res = defer.ensureDeferred(res)
elif isinstance(res, defer.Deferred):
pass
elif isinstance(res, Future):
res = defer.Deferred.fromFuture(res)
elif isinstance(res, Awaitable):
# `res` is probably some kind of completed awaitable, such as a `DoneAwaitable`
# or `Future` from `make_awaitable`.

View File

@@ -342,6 +342,10 @@ class BulkPushRuleEvaluator:
for user_id, level in notification_levels.items():
notification_levels[user_id] = int(level)
room_version_features = event.room_version.msc3931_push_features
if not room_version_features:
room_version_features = []
evaluator = PushRuleEvaluator(
_flatten_dict(event, room_version=event.room_version),
room_member_count,
@@ -349,7 +353,7 @@ class BulkPushRuleEvaluator:
notification_levels,
related_events,
self._related_event_match_enabled,
event.room_version.msc3931_push_features,
room_version_features,
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
)