1
0

Merge branch 'erikj/stream_deserealize' into erikj/test_send

This commit is contained in:
Erik Johnston
2021-04-30 14:21:58 +01:00
3 changed files with 29 additions and 11 deletions
+8 -3
View File
@@ -33,6 +33,7 @@ from typing import (
)
import attr
import ijson
from prometheus_client import Counter
from twisted.internet import defer
@@ -669,18 +670,20 @@ class FederationClient(FederationBase):
logger.debug("Got content: %s", content)
logger.info("send_join content: %d", len(content))
# logger.info("send_join content: %d", len(content))
content.seek(0)
state = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
for p in ijson.items(content, "state.item")
]
logger.info("Parsed auth chain: %d", len(state))
content.seek(0)
auth_chain = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
for p in ijson.items(content, "auth_chain.item")
]
logger.info("Parsed auth chain: %d", len(auth_chain))
@@ -779,6 +782,8 @@ class FederationClient(FederationBase):
if not self._is_unknown_endpoint(e):
raise
raise NotImplementedError()
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = await self.transport_layer.send_join_v1(
+8 -2
View File
@@ -244,7 +244,10 @@ class TransportLayerClient:
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
return_string_io=True,
)
return response
@@ -254,7 +257,10 @@ class TransportLayerClient:
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
return_string_io=True,
)
return response
+13 -6
View File
@@ -154,6 +154,7 @@ async def _handle_json_response(
request: MatrixFederationRequest,
response: IResponse,
start_ms: int,
return_string_io=False,
) -> JsonDict:
"""
Reads the JSON body of a response, with a timeout
@@ -175,12 +176,12 @@ async def _handle_json_response(
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
def parse(_len: int):
return json_decoder.decode(buf.getvalue())
await make_deferred_yieldable(d)
d.addCallback(parse)
body = await make_deferred_yieldable(d)
if return_string_io:
body = buf
else:
body = json_decoder.decode(buf.getvalue())
except BodyExceededMaxSize as e:
# The response was too big.
logger.warning(
@@ -684,6 +685,7 @@ class MatrixFederationHttpClient:
ignore_backoff: bool = False,
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
return_string_io=False,
) -> Union[JsonDict, list]:
"""Sends the specified json data using PUT
@@ -758,7 +760,12 @@ class MatrixFederationHttpClient:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
self.reactor, _sec_timeout, request, response, start_ms
self.reactor,
_sec_timeout,
request,
response,
start_ms,
return_string_io=return_string_io,
)
return body