1
0
Files
synapse/synapse/http/proxy.py
Erik Johnston 0779587f9f Lift pausing on ratelimited requests to http layer (#18595)
When a request gets ratelimited we (optionally) wait ~500ms before
returning to mitigate clients that like to tightloop on request
failures. However, this is currently implemented by pausing request
processing when we check for ratelimits, which might be deep within
request processing, and e.g. while locks are held. Instead, let's hoist
the pause to the very top of the HTTP handler.

Hopefully, this mitigates the issue where a user sending lots of events
to a single room can see their requests time out due to the combination
of the linearizer and the pausing of the request. Instead, they should
see the requests 429 after ~500ms.

The first commit is a refactor to pass the `Clock` to `AsyncResource`,
the second commit is the behavioural change.
2025-06-25 14:32:55 +00:00

291 lines
10 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2023 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import json
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
from twisted.internet import protocol
from twisted.internet.interfaces import ITCPTransport
from twisted.internet.protocol import connectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from twisted.web.resource import IResource
from twisted.web.server import Request, Site
from synapse.api.errors import Codes, InvalidProxyCredentialsError
from synapse.http import QuieterFileBodyProducer
from synapse.http.server import _AsyncResource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util.async_helpers import timeout_deferred
if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
# consumed by the immediate recipient and not be forwarded on.
HOP_BY_HOP_HEADERS_LOWERCASE = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
}
assert all(header.lower() == header for header in HOP_BY_HOP_HEADERS_LOWERCASE)
def parse_connection_header_value(
connection_header_value: Optional[bytes],
) -> Set[str]:
"""
Parse the `Connection` header to determine which headers we should not be copied
over from the remote response.
As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
Even though "close" is a special directive, let's just treat it as just another
header for simplicity. If people want to check for this directive, they can simply
check for `"Close" in headers`.
Args:
connection_header_value: The value of the `Connection` header.
Returns:
The set of header names that should not be copied over from the remote response.
The keys are lowercased.
"""
extra_headers_to_remove: Set[str] = set()
if connection_header_value:
extra_headers_to_remove = {
connection_option.decode("ascii").strip().lower()
for connection_option in connection_header_value.split(b",")
}
return extra_headers_to_remove
class ProxyResource(_AsyncResource):
"""
A stub resource that proxies any requests with a `matrix-federation://` scheme
through the given `federation_agent` to the remote homeserver and ferries back the
info.
"""
isLeaf = True
def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
super().__init__(hs.get_clock(), True)
self.reactor = reactor
self.agent = hs.get_federation_http_client().agent
self._proxy_authorization_secret = hs.config.worker.worker_replication_secret
def _check_auth(self, request: Request) -> None:
# The `matrix-federation://` proxy functionality can only be used with auth.
# Protect homserver admins forgetting to configure a secret.
assert self._proxy_authorization_secret is not None
# Get the authorization header.
auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")
if not auth_headers:
raise InvalidProxyCredentialsError(
"Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
)
if len(auth_headers) > 1:
raise InvalidProxyCredentialsError(
"Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
)
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
received_secret = parts[1].decode("ascii")
if self._proxy_authorization_secret == received_secret:
# Success!
return
raise InvalidProxyCredentialsError(
"Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
)
async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
uri = urllib.parse.urlparse(request.uri)
assert uri.scheme == b"matrix-federation"
# Check the authorization headers before handling the request.
self._check_auth(request)
headers = Headers()
for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
header_value = request.getHeader(header_name)
if header_value:
headers.addRawHeader(header_name, header_value)
request_deferred = run_in_background(
self.agent.request,
request.method,
request.uri,
headers=headers,
bodyProducer=QuieterFileBodyProducer(request.content),
)
request_deferred = timeout_deferred(
request_deferred,
# This should be set longer than the timeout in `MatrixFederationHttpClient`
# so that it has enough time to complete and pass us the data before we give
# up.
timeout=90,
reactor=self.reactor,
)
response = await make_deferred_yieldable(request_deferred)
return response.code, response
def _send_response(
self,
request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
response = cast(IResponse, response_object)
response_headers = cast(Headers, response.headers)
request.setResponseCode(code)
# The `Connection` header also defines which headers should not be copied over.
connection_header = response_headers.getRawHeaders(b"connection")
extra_headers_to_remove_lowercase = parse_connection_header_value(
connection_header[0] if connection_header else None
)
# Copy headers.
for k, v in response_headers.getAllRawHeaders():
# Do not copy over any hop-by-hop headers. These are meant to only be
# consumed by the immediate recipient and not be forwarded on.
header_key_lowercase = k.decode("ascii").lower()
if (
header_key_lowercase in HOP_BY_HOP_HEADERS_LOWERCASE
or header_key_lowercase in extra_headers_to_remove_lowercase
):
continue
request.responseHeaders.setRawHeaders(k, v)
response.deliverBody(_ProxyResponseBody(request))
def _send_error_response(
self,
f: failure.Failure,
request: "SynapseRequest",
) -> None:
if isinstance(f.value, InvalidProxyCredentialsError):
error_response_code = f.value.code
error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
else:
error_response_code = 502
error_response_json = {
"errcode": Codes.UNKNOWN,
"err": "ProxyResource: Error when proxying request: %s %s -> %s"
% (
request.method.decode("ascii"),
request.uri.decode("ascii"),
f,
),
}
request.setResponseCode(error_response_code)
request.setHeader(b"Content-Type", b"application/json")
request.write((json.dumps(error_response_json)).encode())
request.finish()
class _ProxyResponseBody(protocol.Protocol):
"""
A protocol that proxies the given remote response data back out to the given local
request.
"""
transport: Optional[ITCPTransport] = None
def __init__(self, request: "SynapseRequest") -> None:
self._request = request
def dataReceived(self, data: bytes) -> None:
# Avoid sending response data to the local request that already disconnected
if self._request._disconnected and self.transport is not None:
# Close the connection (forcefully) since all the data will get
# discarded anyway.
self.transport.abortConnection()
return
self._request.write(data)
def connectionLost(self, reason: Failure = connectionDone) -> None:
# If the local request is already finished (successfully or failed), don't
# worry about sending anything back.
if self._request.finished:
return
if reason.check(ResponseDone):
self._request.finish()
else:
# Abort the underlying request since our remote request also failed.
if self._request.channel:
self._request.channel.forceAbortClient()
class ProxySite(Site):
"""
Proxies any requests with a `matrix-federation://` scheme through the given
`federation_agent`. Otherwise, behaves like a normal `Site`.
"""
def __init__(
self,
resource: IResource,
reactor: ISynapseReactor,
hs: "HomeServer",
):
super().__init__(resource, reactor=reactor)
self._proxy_resource = ProxyResource(reactor, hs=hs)
def getResourceFor(self, request: "SynapseRequest") -> IResource:
uri = urllib.parse.urlparse(request.uri)
if uri.scheme == b"matrix-federation":
return self._proxy_resource
return super().getResourceFor(request)