Speed up MAS token introspection (#18357)

We do this by shoving it into Rust. We believe our python http client is
a bit slow.

Also bumps minimum rust version to 1.81.0, released last September (over
six months ago)

To allow for async Rust, includes some adapters between Tokio in Rust
and the Twisted reactor in Python.
This commit is contained in:
Erik Johnston
2025-06-16 10:41:35 -05:00
committed by GitHub
parent df04931f0b
commit f500c7d982
11 changed files with 1828 additions and 301 deletions

View File

@@ -7,7 +7,7 @@ name = "synapse"
version = "0.1.0"
edition = "2021"
rust-version = "1.66.0"
rust-version = "1.81.0"
[lib]
name = "synapse"
@@ -36,13 +36,21 @@ pyo3 = { version = "0.24.2", features = [
"abi3",
"abi3-py39",
] }
pyo3-log = "0.12.0"
pyo3-log = "0.12.3"
pythonize = "0.24.0"
regex = "1.6.0"
sha2 = "0.10.8"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "1.0.85"
ulid = "1.1.2"
reqwest = { version = "0.12.15", default-features = false, features = [
"http2",
"stream",
"rustls-tls-native-roots",
] }
http-body-util = "0.1.3"
futures = "0.3.31"
tokio = { version = "1.44.2", features = ["rt", "rt-multi-thread"] }
[features]
extension-module = ["pyo3/extension-module"]

View File

@@ -58,3 +58,15 @@ impl NotFoundError {
NotFoundError::new_err(())
}
}
import_exception!(synapse.api.errors, HttpResponseException);
impl HttpResponseException {
pub fn new(status: StatusCode, bytes: Vec<u8>) -> pyo3::PyErr {
HttpResponseException::new_err((
status.as_u16(),
status.canonical_reason().unwrap_or_default(),
bytes,
))
}
}

218
rust/src/http_client.rs Normal file
View File

@@ -0,0 +1,218 @@
/*
* This file is licensed under the Affero General Public License (AGPL) version 3.
*
* Copyright (C) 2025 New Vector, Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* See the GNU Affero General Public License for more details:
* <https://www.gnu.org/licenses/agpl-3.0.html>.
*/
use std::{collections::HashMap, future::Future, panic::AssertUnwindSafe, sync::LazyLock};
use anyhow::Context;
use futures::{FutureExt, TryStreamExt};
use pyo3::{exceptions::PyException, prelude::*, types::PyString};
use reqwest::RequestBuilder;
use tokio::runtime::Runtime;
use crate::errors::HttpResponseException;
/// The tokio runtime that we're using to run async Rust libs.
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
});
/// A reference to the `Deferred` python class.
static DEFERRED_CLASS: LazyLock<PyObject> = LazyLock::new(|| {
Python::with_gil(|py| {
py.import("twisted.internet.defer")
.expect("module 'twisted.internet.defer' should be importable")
.getattr("Deferred")
.expect("module 'twisted.internet.defer' should have a 'Deferred' class")
.unbind()
})
});
/// A reference to the twisted `reactor`.
static TWISTED_REACTOR: LazyLock<Py<PyModule>> = LazyLock::new(|| {
Python::with_gil(|py| {
py.import("twisted.internet.reactor")
.expect("module 'twisted.internet.reactor' should be importable")
.unbind()
})
});
/// Called when registering modules with python.
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let child_module: Bound<'_, PyModule> = PyModule::new(py, "http_client")?;
child_module.add_class::<HttpClient>()?;
// Make sure we fail early if we can't build the lazy statics.
LazyLock::force(&RUNTIME);
LazyLock::force(&DEFERRED_CLASS);
m.add_submodule(&child_module)?;
// We need to manually add the module to sys.modules to make `from
// synapse.synapse_rust import acl` work.
py.import("sys")?
.getattr("modules")?
.set_item("synapse.synapse_rust.http_client", child_module)?;
Ok(())
}
#[pyclass]
#[derive(Clone)]
struct HttpClient {
client: reqwest::Client,
}
#[pymethods]
impl HttpClient {
#[new]
pub fn py_new(user_agent: &str) -> PyResult<HttpClient> {
// The twisted reactor can only be imported after Synapse has been
// imported, to allow Synapse to change the twisted reactor. If we try
// and import the reactor too early twisted installs a default reactor,
// which can't be replaced.
LazyLock::force(&TWISTED_REACTOR);
Ok(HttpClient {
client: reqwest::Client::builder()
.user_agent(user_agent)
.build()
.context("building reqwest client")?,
})
}
pub fn get<'a>(
&self,
py: Python<'a>,
url: String,
response_limit: usize,
) -> PyResult<Bound<'a, PyAny>> {
self.send_request(py, self.client.get(url), response_limit)
}
pub fn post<'a>(
&self,
py: Python<'a>,
url: String,
response_limit: usize,
headers: HashMap<String, String>,
request_body: String,
) -> PyResult<Bound<'a, PyAny>> {
let mut builder = self.client.post(url);
for (name, value) in headers {
builder = builder.header(name, value);
}
builder = builder.body(request_body);
self.send_request(py, builder, response_limit)
}
}
impl HttpClient {
fn send_request<'a>(
&self,
py: Python<'a>,
builder: RequestBuilder,
response_limit: usize,
) -> PyResult<Bound<'a, PyAny>> {
create_deferred(py, async move {
let response = builder.send().await.context("sending request")?;
let status = response.status();
let mut stream = response.bytes_stream();
let mut buffer = Vec::new();
while let Some(chunk) = stream.try_next().await.context("reading body")? {
if buffer.len() + chunk.len() > response_limit {
Err(anyhow::anyhow!("Response size too large"))?;
}
buffer.extend_from_slice(&chunk);
}
if !status.is_success() {
return Err(HttpResponseException::new(status, buffer));
}
let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?;
Ok(r)
})
}
}
/// Creates a twisted deferred from the given future, spawning the task on the
/// tokio runtime.
///
/// Does not handle deferred cancellation or contextvars.
fn create_deferred<F, O>(py: Python, fut: F) -> PyResult<Bound<'_, PyAny>>
where
F: Future<Output = PyResult<O>> + Send + 'static,
for<'a> O: IntoPyObject<'a>,
{
let deferred = DEFERRED_CLASS.bind(py).call0()?;
let deferred_callback = deferred.getattr("callback")?.unbind();
let deferred_errback = deferred.getattr("errback")?.unbind();
RUNTIME.spawn(async move {
// TODO: Is it safe to assert unwind safety here? I think so, as we
// don't use anything that could be tainted by the panic afterwards.
// Note that `.spawn(..)` asserts unwind safety on the future too.
let res = AssertUnwindSafe(fut).catch_unwind().await;
Python::with_gil(move |py| {
// Flatten the panic into standard python error
let res = match res {
Ok(r) => r,
Err(panic_err) => {
let panic_message = get_panic_message(&panic_err);
Err(PyException::new_err(
PyString::new(py, panic_message).unbind(),
))
}
};
// Send the result to the deferred, via `.callback(..)` or `.errback(..)`
match res {
Ok(obj) => {
TWISTED_REACTOR
.call_method(py, "callFromThread", (deferred_callback, obj), None)
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
}
Err(err) => {
TWISTED_REACTOR
.call_method(py, "callFromThread", (deferred_errback, err), None)
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
}
}
});
});
Ok(deferred)
}
/// Try and get the panic message out of the panic
fn get_panic_message<'a>(panic_err: &'a (dyn std::any::Any + Send + 'static)) -> &'a str {
// Apparently this is how you extract the panic message from a panic
if let Some(str_slice) = panic_err.downcast_ref::<&str>() {
str_slice
} else if let Some(string) = panic_err.downcast_ref::<String>() {
string
} else {
"unknown error"
}
}

View File

@@ -8,6 +8,7 @@ pub mod acl;
pub mod errors;
pub mod events;
pub mod http;
pub mod http_client;
pub mod identifier;
pub mod matrix_const;
pub mod push;
@@ -50,6 +51,7 @@ fn synapse_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
acl::register_module(py, m)?;
push::register_module(py, m)?;
events::register_module(py, m)?;
http_client::register_module(py, m)?;
rendezvous::register_module(py, m)?;
Ok(())