|
|
|
@@ -29,10 +29,11 @@ from zope.interface import implementer
|
|
|
|
|
|
|
|
|
|
from twisted.internet import defer, protocol
|
|
|
|
|
from twisted.internet.error import DNSLookupError
|
|
|
|
|
from twisted.internet.interfaces import IReactorPluggableNameResolver
|
|
|
|
|
from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
|
|
|
|
|
from twisted.internet.task import _EPSILON, Cooperator
|
|
|
|
|
from twisted.web._newclient import ResponseDone
|
|
|
|
|
from twisted.web.http_headers import Headers
|
|
|
|
|
from twisted.web.iweb import IResponse
|
|
|
|
|
|
|
|
|
|
import synapse.metrics
|
|
|
|
|
import synapse.util.retryutils
|
|
|
|
@@ -74,7 +75,7 @@ MAXINT = sys.maxsize
|
|
|
|
|
_next_id = 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@attr.s
|
|
|
|
|
@attr.s(frozen=True)
|
|
|
|
|
class MatrixFederationRequest(object):
|
|
|
|
|
method = attr.ib()
|
|
|
|
|
"""HTTP method
|
|
|
|
@@ -110,26 +111,52 @@ class MatrixFederationRequest(object):
|
|
|
|
|
:type: str|None
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
uri = attr.ib(init=False, type=bytes)
|
|
|
|
|
"""The URI of this request
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __attrs_post_init__(self):
|
|
|
|
|
global _next_id
|
|
|
|
|
self.txn_id = "%s-O-%s" % (self.method, _next_id)
|
|
|
|
|
txn_id = "%s-O-%s" % (self.method, _next_id)
|
|
|
|
|
_next_id = (_next_id + 1) % (MAXINT - 1)
|
|
|
|
|
|
|
|
|
|
object.__setattr__(self, "txn_id", txn_id)
|
|
|
|
|
|
|
|
|
|
destination_bytes = self.destination.encode("ascii")
|
|
|
|
|
path_bytes = self.path.encode("ascii")
|
|
|
|
|
if self.query:
|
|
|
|
|
query_bytes = encode_query_args(self.query)
|
|
|
|
|
else:
|
|
|
|
|
query_bytes = b""
|
|
|
|
|
|
|
|
|
|
# The object is frozen so we can pre-compute this.
|
|
|
|
|
uri = urllib.parse.urlunparse(
|
|
|
|
|
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
|
|
|
|
|
)
|
|
|
|
|
object.__setattr__(self, "uri", uri)
|
|
|
|
|
|
|
|
|
|
def get_json(self):
|
|
|
|
|
if self.json_callback:
|
|
|
|
|
return self.json_callback()
|
|
|
|
|
return self.json
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _handle_json_response(reactor, timeout_sec, request, response):
|
|
|
|
|
async def _handle_json_response(
|
|
|
|
|
reactor: IReactorTime,
|
|
|
|
|
timeout_sec: float,
|
|
|
|
|
request: MatrixFederationRequest,
|
|
|
|
|
response: IResponse,
|
|
|
|
|
start_ms: int,
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Reads the JSON body of a response, with a timeout
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
reactor (IReactor): twisted reactor, for the timeout
|
|
|
|
|
timeout_sec (float): number of seconds to wait for response to complete
|
|
|
|
|
request (MatrixFederationRequest): the request that triggered the response
|
|
|
|
|
response (IResponse): response to the request
|
|
|
|
|
reactor: twisted reactor, for the timeout
|
|
|
|
|
timeout_sec: number of seconds to wait for response to complete
|
|
|
|
|
request: the request that triggered the response
|
|
|
|
|
response: response to the request
|
|
|
|
|
start_ms: Timestamp when request was made
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: parsed JSON response
|
|
|
|
@@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response):
|
|
|
|
|
body = await make_deferred_yieldable(d)
|
|
|
|
|
except TimeoutError as e:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"{%s} [%s] Timed out reading response", request.txn_id, request.destination,
|
|
|
|
|
"{%s} [%s] Timed out reading response - %s %s",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
|
request.method,
|
|
|
|
|
request.uri.decode("ascii"),
|
|
|
|
|
)
|
|
|
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"{%s} [%s] Error reading response: %s",
|
|
|
|
|
"{%s} [%s] Error reading response %s %s: %s",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
|
request.method,
|
|
|
|
|
request.uri.decode("ascii"),
|
|
|
|
|
e,
|
|
|
|
|
)
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
time_taken_secs = reactor.seconds() - start_ms / 1000
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"{%s} [%s] Completed: %d %s",
|
|
|
|
|
"{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
|
response.code,
|
|
|
|
|
response.phrase.decode("ascii", errors="replace"),
|
|
|
|
|
time_taken_secs,
|
|
|
|
|
request.method,
|
|
|
|
|
request.uri.decode("ascii"),
|
|
|
|
|
)
|
|
|
|
|
return body
|
|
|
|
|
|
|
|
|
@@ -261,7 +300,9 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
|
|
|
|
|
# trailing slash on Synapse <= v0.99.3.
|
|
|
|
|
logger.info("Retrying request with trailing slash")
|
|
|
|
|
request.path += "/"
|
|
|
|
|
|
|
|
|
|
# Request is frozen so we create a new instance
|
|
|
|
|
request = attr.evolve(request, path=request.path + "/")
|
|
|
|
|
|
|
|
|
|
response = await self._send_request(request, **send_request_args)
|
|
|
|
|
|
|
|
|
@@ -373,9 +414,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
else:
|
|
|
|
|
retries_left = MAX_SHORT_RETRIES
|
|
|
|
|
|
|
|
|
|
url_bytes = urllib.parse.urlunparse(
|
|
|
|
|
(b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
|
|
|
|
|
)
|
|
|
|
|
url_bytes = request.uri
|
|
|
|
|
url_str = url_bytes.decode("ascii")
|
|
|
|
|
|
|
|
|
|
url_to_sign_bytes = urllib.parse.urlunparse(
|
|
|
|
@@ -402,7 +441,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
|
|
|
|
|
headers_dict[b"Authorization"] = auth_headers
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
logger.debug(
|
|
|
|
|
"{%s} [%s] Sending request: %s %s; timeout %fs",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
@@ -436,7 +475,6 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
except DNSLookupError as e:
|
|
|
|
|
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.info("Failed to send request: %s", e)
|
|
|
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
|
|
|
|
|
|
|
|
incoming_responses_counter.labels(
|
|
|
|
@@ -496,7 +534,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
|
except RequestSendFailed as e:
|
|
|
|
|
logger.warning(
|
|
|
|
|
logger.info(
|
|
|
|
|
"{%s} [%s] Request failed: %s %s: %s",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
@@ -654,6 +692,8 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
json=data,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
|
|
|
|
|
|
response = await self._send_request_with_optional_trailing_slash(
|
|
|
|
|
request,
|
|
|
|
|
try_trailing_slash_on_400,
|
|
|
|
@@ -664,7 +704,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
body = await _handle_json_response(
|
|
|
|
|
self.reactor, self.default_timeout, request, response
|
|
|
|
|
self.reactor, self.default_timeout, request, response, start_ms
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return body
|
|
|
|
@@ -720,6 +760,8 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
method="POST", destination=destination, path=path, query=args, json=data
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
|
|
|
|
|
|
response = await self._send_request(
|
|
|
|
|
request,
|
|
|
|
|
long_retries=long_retries,
|
|
|
|
@@ -733,7 +775,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
_sec_timeout = self.default_timeout
|
|
|
|
|
|
|
|
|
|
body = await _handle_json_response(
|
|
|
|
|
self.reactor, _sec_timeout, request, response
|
|
|
|
|
self.reactor, _sec_timeout, request, response, start_ms,
|
|
|
|
|
)
|
|
|
|
|
return body
|
|
|
|
|
|
|
|
|
@@ -786,6 +828,8 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
method="GET", destination=destination, path=path, query=args
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
|
|
|
|
|
|
response = await self._send_request_with_optional_trailing_slash(
|
|
|
|
|
request,
|
|
|
|
|
try_trailing_slash_on_400,
|
|
|
|
@@ -796,7 +840,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
body = await _handle_json_response(
|
|
|
|
|
self.reactor, self.default_timeout, request, response
|
|
|
|
|
self.reactor, self.default_timeout, request, response, start_ms
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return body
|
|
|
|
@@ -846,6 +890,8 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
method="DELETE", destination=destination, path=path, query=args
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
|
|
|
|
|
|
response = await self._send_request(
|
|
|
|
|
request,
|
|
|
|
|
long_retries=long_retries,
|
|
|
|
@@ -854,7 +900,7 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
body = await _handle_json_response(
|
|
|
|
|
self.reactor, self.default_timeout, request, response
|
|
|
|
|
self.reactor, self.default_timeout, request, response, start_ms
|
|
|
|
|
)
|
|
|
|
|
return body
|
|
|
|
|
|
|
|
|
@@ -914,12 +960,14 @@ class MatrixFederationHttpClient(object):
|
|
|
|
|
)
|
|
|
|
|
raise
|
|
|
|
|
logger.info(
|
|
|
|
|
"{%s} [%s] Completed: %d %s [%d bytes]",
|
|
|
|
|
"{%s} [%s] Completed: %d %s [%d bytes] %s %s",
|
|
|
|
|
request.txn_id,
|
|
|
|
|
request.destination,
|
|
|
|
|
response.code,
|
|
|
|
|
response.phrase.decode("ascii", errors="replace"),
|
|
|
|
|
length,
|
|
|
|
|
request.method,
|
|
|
|
|
request.uri.decode("ascii"),
|
|
|
|
|
)
|
|
|
|
|
return (length, headers)
|
|
|
|
|
|
|
|
|
|