From 160d9788c09e850b03115b78967f531fa695cd9f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 3 Mar 2026 08:24:25 -0600 Subject: [PATCH] Simplify Rust HTTP client response streaming and limiting (#19510) *As suggested by @sandhose in https://github.com/element-hq/synapse/pull/19498#discussion_r2865607737,* Simplify Rust HTTP client response streaming and limiting ### Dev notes Synapse's Rust HTTP client was introduced in https://github.com/element-hq/synapse/pull/18357 ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --- Cargo.lock | 1 + changelog.d/19510.misc | 1 + rust/Cargo.toml | 3 ++ rust/src/errors.rs | 2 +- rust/src/http_client.rs | 75 ++++++++------------------ tests/synapse_rust/test_http_client.py | 18 +++++++ 6 files changed, 45 insertions(+), 55 deletions(-) create mode 100644 changelog.d/19510.misc diff --git a/Cargo.lock b/Cargo.lock index c43ee29c76..340114f801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ "anyhow", + "bytes", "indoc", "libc", "memoffset", diff --git a/changelog.d/19510.misc b/changelog.d/19510.misc new file mode 100644 index 0000000000..cafc26601f --- /dev/null +++ b/changelog.d/19510.misc @@ -0,0 +1 @@ +Simplify Rust HTTP client response streaming and limiting. diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 350701d327..8199a4e02b 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -35,6 +35,9 @@ pyo3 = { version = "0.27.2", features = [ "anyhow", "abi3", "abi3-py310", + # So we can pass `bytes::Bytes` directly back to Python efficiently, + # https://docs.rs/pyo3/latest/pyo3/bytes/index.html + "bytes", ] } pyo3-log = "0.13.1" pythonize = "0.27.0" diff --git a/rust/src/errors.rs b/rust/src/errors.rs index 149019ff4b..012f9a7990 100644 --- a/rust/src/errors.rs +++ b/rust/src/errors.rs @@ -62,7 +62,7 @@ impl NotFoundError { import_exception!(synapse.api.errors, HttpResponseException); impl HttpResponseException { - pub fn new(status: StatusCode, bytes: Vec) -> pyo3::PyErr { + pub fn new(status: StatusCode, bytes: bytes::Bytes) -> pyo3::PyErr { HttpResponseException::new_err(( status.as_u16(), status.canonical_reason().unwrap_or_default(), diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index dd37a10426..398ba9041f 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -15,8 +15,7 @@ use std::{collections::HashMap, future::Future, sync::OnceLock}; use anyhow::Context; -use futures::TryStreamExt; -use headers::HeaderMapExt; +use http_body_util::BodyExt; use once_cell::sync::OnceCell; use pyo3::{create_exception, exceptions::PyException, prelude::*}; use reqwest::RequestBuilder; @@ -236,62 +235,30 @@ impl HttpClient { let status = response.status(); - // Find the expected `Content-Length` so we can pre-allocate the buffer - // necessary to read the response. It's expected that not every request will - // have a `Content-Length` header. - // - // `response.content_length()` does exist but the "value does not directly - // represents the value of the `Content-Length` header, but rather the size - // of the response’s body" - // (https://docs.rs/reqwest/latest/reqwest/struct.Response.html#method.content_length) - // and we want to avoid reading the entire body at this point because we - // purposely stream it below until the `response_limit`. - let content_length = { - let content_length = response - .headers() - .typed_get::() - // We need a `usize` for the `Vec::with_capacity(...)` usage below - .and_then(|content_length| content_length.0.try_into().ok()); - - // Sanity check that the request isn't too large from the information - // they told us (may be inaccurate so we also check below as we actually - // read the bytes) - if let Some(content_length_bytes) = content_length { - if content_length_bytes > response_limit { - Err(anyhow::anyhow!( - "Response size (defined by `Content-Length`) too large" - ))?; - } - } - - content_length - }; - - // Stream the response to avoid allocating a giant object on the server - // above our expected `response_limit`. - let mut stream = response.bytes_stream(); - // Pre-allocate the buffer based on the expected `Content-Length` - let mut buffer = Vec::with_capacity( - content_length - // Default to pre-allocating nothing when the request doesn't have a - // `Content-Length` header - .unwrap_or(0), - ); - while let Some(chunk) = stream.try_next().await.context("reading body")? { - if buffer.len() + chunk.len() > response_limit { - Err(anyhow::anyhow!("Response size too large"))?; - } - - buffer.extend_from_slice(&chunk); - } + // A light-weight way to read the response up until the `response_limit`. We + // want to avoid allocating a giant response object on the server above our + // expected `response_limit` to avoid out-of-memory DOS problems. + let body = reqwest::Body::from(response); + let limited_body = http_body_util::Limited::new(body, response_limit); + let collected = limited_body + .collect() + .await + .map_err(anyhow::Error::from_boxed) + .with_context(|| { + format!( + "Response body exceeded response limit ({} bytes)", + response_limit + ) + })?; + let bytes: bytes::Bytes = collected.to_bytes(); if !status.is_success() { - return Err(HttpResponseException::new(status, buffer)); + return Err(HttpResponseException::new(status, bytes)); } - let r = Python::attach(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; - - Ok(r) + // Because of the `pyo3` `bytes` feature, we can pass this back to Python + // land efficiently + Ok(bytes) }) } } diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 032eab77e8..56fab3a0e1 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -171,6 +171,24 @@ class HttpClientTestCase(HomeserverTestCase): self.get_success(self.till_deferred_has_result(do_request())) self.assertEqual(self.server.calls, 1) + def test_request_response_limit_exceeded(self) -> None: + """ + Test to make sure we handle the response limit being exceeded + """ + + async def do_request() -> None: + await self._rust_http_client.get( + url=self.server.endpoint, + # Small limit so we hit the limit + response_limit=1, + ) + + self.assertFailure( + self.till_deferred_has_result(do_request()), + RuntimeError, + ) + self.assertEqual(self.server.calls, 1) + async def test_logging_context(self) -> None: """ Test to make sure the `LoggingContext` (logcontext) is handled correctly