Compare commits
35 Commits
rav/device
...
clokep/lm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c70e00393 | ||
|
|
6522bbb4b3 | ||
|
|
02867aeb25 | ||
|
|
ed5b522085 | ||
|
|
8d5ab38584 | ||
|
|
73589e86ef | ||
|
|
ebb76e6917 | ||
|
|
98ff8d6f30 | ||
|
|
cbf930edf8 | ||
|
|
565b9f983d | ||
|
|
1e77a62c1d | ||
|
|
c50cb6e391 | ||
|
|
e0164d9a21 | ||
|
|
36cd55f6a0 | ||
|
|
98c864bd46 | ||
|
|
b04530ebd3 | ||
|
|
ab75a7d2ac | ||
|
|
3fc09bb13a | ||
|
|
dcd3d5cdc6 | ||
|
|
547a7076b4 | ||
|
|
c74a073389 | ||
|
|
b90edc168e | ||
|
|
48193d339f | ||
|
|
ae26625694 | ||
|
|
a2d697b745 | ||
|
|
23cd415b9e | ||
|
|
b38ba4a8b1 | ||
|
|
2c7001679e | ||
|
|
c02f115306 | ||
|
|
ddd3d43049 | ||
|
|
cf26b9f897 | ||
|
|
24647487a0 | ||
|
|
b33bea983c | ||
|
|
a93540b60d | ||
|
|
123b63a443 |
@@ -30,12 +30,14 @@ class EventFormatVersions:
|
||||
ROOM_V1_V2 = 1 # $id:server event id format: used for room v1 and v2
|
||||
ROOM_V3 = 2 # MSC1659-style $hash event id format: used for room v3
|
||||
ROOM_V4_PLUS = 3 # MSC1884-style $hash format: introduced for room v4
|
||||
LINEARIZED = 4 # Delegated Linearized event
|
||||
|
||||
|
||||
KNOWN_EVENT_FORMAT_VERSIONS = {
|
||||
EventFormatVersions.ROOM_V1_V2,
|
||||
EventFormatVersions.ROOM_V3,
|
||||
EventFormatVersions.ROOM_V4_PLUS,
|
||||
EventFormatVersions.LINEARIZED,
|
||||
}
|
||||
|
||||
|
||||
@@ -101,6 +103,7 @@ class RoomVersion:
|
||||
# support the flag. Unknown flags are ignored by the evaluator, making conditions
|
||||
# fail if used.
|
||||
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
|
||||
linearized_matrix: bool
|
||||
|
||||
|
||||
class RoomVersions:
|
||||
@@ -122,6 +125,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V2 = RoomVersion(
|
||||
"2",
|
||||
@@ -141,6 +145,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V3 = RoomVersion(
|
||||
"3",
|
||||
@@ -160,6 +165,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V4 = RoomVersion(
|
||||
"4",
|
||||
@@ -179,6 +185,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V5 = RoomVersion(
|
||||
"5",
|
||||
@@ -198,6 +205,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V6 = RoomVersion(
|
||||
"6",
|
||||
@@ -217,6 +225,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V7 = RoomVersion(
|
||||
"7",
|
||||
@@ -236,6 +245,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V8 = RoomVersion(
|
||||
"8",
|
||||
@@ -255,6 +265,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V9 = RoomVersion(
|
||||
"9",
|
||||
@@ -274,6 +285,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=False,
|
||||
enforce_int_power_levels=False,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V10 = RoomVersion(
|
||||
"10",
|
||||
@@ -293,6 +305,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
MSC1767v10 = RoomVersion(
|
||||
# MSC1767 (Extensible Events) based on room version "10"
|
||||
@@ -313,6 +326,7 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
V11 = RoomVersion(
|
||||
"11",
|
||||
@@ -332,6 +346,32 @@ class RoomVersions:
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=False,
|
||||
)
|
||||
# Based on room version 11:
|
||||
#
|
||||
# - Enable MSC2176, MSC2175, MSC3989, MSC2174, MSC1767, MSC3821.
|
||||
# - Disable 'restricted' and 'knock_restricted' join rules.
|
||||
# - Mark as linearized.
|
||||
LINEARIZED = RoomVersion(
|
||||
"org.matrix.i-d.ralston-mimi-linearized-matrix.02",
|
||||
RoomDisposition.UNSTABLE,
|
||||
EventFormatVersions.LINEARIZED,
|
||||
StateResolutionVersions.V2,
|
||||
enforce_key_validity=True,
|
||||
special_case_aliases_auth=False,
|
||||
strict_canonicaljson=True,
|
||||
limit_notifications_power_levels=True,
|
||||
implicit_room_creator=True,
|
||||
updated_redaction_rules=True,
|
||||
restricted_join_rule=True,
|
||||
restricted_join_rule_fix=True,
|
||||
knock_join_rule=True,
|
||||
msc3389_relation_redactions=False,
|
||||
knock_restricted_join_rule=True,
|
||||
enforce_int_power_levels=True,
|
||||
msc3931_push_features=(),
|
||||
linearized_matrix=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -349,6 +389,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
|
||||
RoomVersions.V9,
|
||||
RoomVersions.V10,
|
||||
RoomVersions.V11,
|
||||
RoomVersions.LINEARIZED,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -36,31 +36,38 @@ logger = logging.getLogger(__name__)
|
||||
Hasher = Callable[[bytes], "hashlib._Hash"]
|
||||
|
||||
|
||||
@trace
|
||||
def check_event_content_hash(
|
||||
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
|
||||
def _check_dict_hash(
|
||||
event_id: str,
|
||||
hash_log: str,
|
||||
hashes: Any,
|
||||
d: JsonDict,
|
||||
hash_algorithm: Hasher,
|
||||
lpdu_hash: bool,
|
||||
) -> bool:
|
||||
"""Check whether the hash for this PDU matches the contents"""
|
||||
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
|
||||
name, expected_hash = compute_content_hash(d, hash_algorithm, lpdu_hash)
|
||||
logger.debug(
|
||||
"Verifying content hash on %s (expecting: %s)",
|
||||
event.event_id,
|
||||
"Verifying %s hash on %s (expecting: %s)",
|
||||
hash_log,
|
||||
event_id,
|
||||
encode_base64(expected_hash),
|
||||
)
|
||||
|
||||
# some malformed events lack a 'hashes'. Protect against it being missing
|
||||
# or a weird type by basically treating it the same as an unhashed event.
|
||||
hashes = event.get("hashes")
|
||||
# nb it might be a immutabledict or a dict
|
||||
if not isinstance(hashes, collections.abc.Mapping):
|
||||
raise SynapseError(
|
||||
400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
|
||||
400,
|
||||
"Malformed %s hashes: %s"
|
||||
% (
|
||||
hash_log,
|
||||
type(hashes),
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
if name not in hashes:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Algorithm %s not in hashes %s" % (name, list(hashes)),
|
||||
"Algorithm %s not in %s hashes %s" % (name, hash_log, list(hashes)),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
message_hash_base64 = hashes[name]
|
||||
@@ -73,8 +80,45 @@ def check_event_content_hash(
|
||||
return message_hash_bytes == expected_hash
|
||||
|
||||
|
||||
@trace
|
||||
def check_event_content_hash(
|
||||
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
|
||||
) -> bool:
|
||||
"""Check whether the hash for this PDU matches the contents"""
|
||||
|
||||
# some malformed events lack a 'hashes'. Protect against it being missing
|
||||
# or a weird type by basically treating it the same as an unhashed event.
|
||||
hashes = event.get("hashes")
|
||||
|
||||
if not _check_dict_hash(
|
||||
event.event_id,
|
||||
"content",
|
||||
hashes,
|
||||
event.get_pdu_json(),
|
||||
hash_algorithm,
|
||||
lpdu_hash=False,
|
||||
):
|
||||
return False
|
||||
|
||||
# Check the content hash of the LPDU, if this was sent via a hub.
|
||||
if event.room_version.linearized_matrix and event.hub_server:
|
||||
# hashes must be a dictionary to have passed _check_dict_hash above.
|
||||
lpdu_hashes = hashes.get("lpdu")
|
||||
return _check_dict_hash(
|
||||
event.event_id,
|
||||
"linearized content",
|
||||
lpdu_hashes,
|
||||
event.get_linearized_pdu_json(),
|
||||
hash_algorithm,
|
||||
lpdu_hash=True,
|
||||
)
|
||||
|
||||
# Non-linearized matrix doesn't care about other checks.
|
||||
return True
|
||||
|
||||
|
||||
def compute_content_hash(
|
||||
event_dict: Dict[str, Any], hash_algorithm: Hasher
|
||||
event_dict: Dict[str, Any], hash_algorithm: Hasher, lpdu_hash: False
|
||||
) -> Tuple[str, bytes]:
|
||||
"""Compute the content hash of an event, which is the hash of the
|
||||
unredacted event.
|
||||
@@ -83,6 +127,7 @@ def compute_content_hash(
|
||||
event_dict: The unredacted event as a dict
|
||||
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
|
||||
to hash the event
|
||||
lpdu_hash: True if the LPDU hash is being calculated.
|
||||
|
||||
Returns:
|
||||
A tuple of the name of hash and the hash as raw bytes.
|
||||
@@ -91,7 +136,17 @@ def compute_content_hash(
|
||||
event_dict.pop("age_ts", None)
|
||||
event_dict.pop("unsigned", None)
|
||||
event_dict.pop("signatures", None)
|
||||
event_dict.pop("hashes", None)
|
||||
|
||||
hashes = event_dict.pop("hashes", {})
|
||||
|
||||
# If we are calculating the content hash of a PDU sent on behalf of another
|
||||
# server, hashes only`contains the lpdu hash.
|
||||
#
|
||||
# TODO(LM) This is wrong in that an incorrectly formatted event could have
|
||||
# bad content hashes from this.
|
||||
if not lpdu_hash and "lpdu" in hashes:
|
||||
event_dict["hashes"] = {"lpdu": hashes["lpdu"]}
|
||||
|
||||
event_dict.pop("outlier", None)
|
||||
event_dict.pop("destinations", None)
|
||||
|
||||
@@ -176,7 +231,10 @@ def add_hashes_and_signatures(
|
||||
signing_key: The key to sign with
|
||||
"""
|
||||
|
||||
name, digest = compute_content_hash(event_dict, hash_algorithm=hashlib.sha256)
|
||||
# Synapse only ever generates PDUs, not LPDUs.
|
||||
name, digest = compute_content_hash(
|
||||
event_dict, hash_algorithm=hashlib.sha256, lpdu_hash=False
|
||||
)
|
||||
|
||||
event_dict.setdefault("hashes", {})[name] = encode_base64(digest)
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ from synapse.types import JsonDict
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
from synapse.util.batching_queue import BatchingQueue
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -189,6 +190,39 @@ class Keyring:
|
||||
valid_until_ts=2**63, # fake future timestamp
|
||||
)
|
||||
|
||||
self._client = hs.get_federation_http_client()
|
||||
|
||||
@cached()
|
||||
async def is_server_linearized(self, server_name: str) -> bool:
|
||||
# TODO(LM) Cache this in the database.
|
||||
# TODO(LM) Support perspectives server.
|
||||
try:
|
||||
response = await self._client.get_json(
|
||||
destination=server_name,
|
||||
path="/_matrix/key/v2/server",
|
||||
ignore_backoff=True,
|
||||
# we only give the remote server 10s to respond. It should be an
|
||||
# easy request to handle, so if it doesn't reply within 10s, it's
|
||||
# probably not going to.
|
||||
#
|
||||
# Furthermore, when we are acting as a notary server, we cannot
|
||||
# wait all day for all of the origin servers, as the requesting
|
||||
# server will otherwise time out before we can respond.
|
||||
#
|
||||
# (Note that get_json may make 4 attempts, so this can still take
|
||||
# almost 45 seconds to fetch the headers, plus up to another 60s to
|
||||
# read the response).
|
||||
timeout=10000,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
# these both have str() representations which we can't really improve
|
||||
# upon
|
||||
raise KeyLookupError(str(e))
|
||||
except HttpResponseException as e:
|
||||
raise KeyLookupError("Remote server returned an error: %s" % (e,))
|
||||
|
||||
return response.get("m.linearized", False)
|
||||
|
||||
async def verify_json_for_server(
|
||||
self,
|
||||
server_name: str,
|
||||
|
||||
@@ -117,7 +117,7 @@ def validate_event_for_room_version(event: "EventBase") -> None:
|
||||
if not is_invite_via_3pid:
|
||||
raise AuthError(403, "Event not signed by sender's server")
|
||||
|
||||
if event.format_version in (EventFormatVersions.ROOM_V1_V2,):
|
||||
if event.format_version == EventFormatVersions.ROOM_V1_V2:
|
||||
# Only older room versions have event IDs to check.
|
||||
event_id_domain = get_domain_from_id(event.event_id)
|
||||
|
||||
@@ -125,6 +125,15 @@ def validate_event_for_room_version(event: "EventBase") -> None:
|
||||
if not event.signatures.get(event_id_domain):
|
||||
raise AuthError(403, "Event not signed by sending server")
|
||||
|
||||
if event.format_version == EventFormatVersions.LINEARIZED:
|
||||
# TODO Are these handling DAG-native events properly? Is the null-checks
|
||||
# a bypass?
|
||||
|
||||
# Check that the hub server signed it.
|
||||
hub_server = event.hub_server
|
||||
if hub_server and not event.signatures.get(hub_server):
|
||||
raise AuthError(403, "Event not signed by hub server")
|
||||
|
||||
is_invite_via_allow_rule = (
|
||||
event.room_version.restricted_join_rule
|
||||
and event.type == EventTypes.Member
|
||||
@@ -277,6 +286,13 @@ def check_state_dependent_auth_rules(
|
||||
|
||||
auth_dict = {(e.type, e.state_key): e for e in auth_events}
|
||||
|
||||
# If the event was sent from a hub server it must be the current hub server.
|
||||
if event.room_version.linearized_matrix:
|
||||
if event.hub_server:
|
||||
current_hub_server = get_hub_server(auth_dict)
|
||||
if current_hub_server != event.hub_server:
|
||||
raise AuthError(403, "Event sent from incorrect hub server.")
|
||||
|
||||
# additional check for m.federate
|
||||
creating_domain = get_domain_from_id(event.room_id)
|
||||
originating_domain = get_domain_from_id(event.sender)
|
||||
@@ -1063,6 +1079,12 @@ def _verify_third_party_invite(
|
||||
return False
|
||||
|
||||
|
||||
def get_hub_server(auth_events: StateMap["EventBase"]) -> str:
|
||||
# The current hub is the sender of the create event.
|
||||
create_event = auth_events[(EventTypes.Create, "")]
|
||||
return get_domain_from_id(create_event.sender)
|
||||
|
||||
|
||||
def get_public_keys(invite_event: "EventBase") -> List[Dict[str, Any]]:
|
||||
public_keys = []
|
||||
if "public_key" in invite_event.content:
|
||||
|
||||
@@ -39,7 +39,7 @@ from unpaddedbase64 import encode_base64
|
||||
|
||||
from synapse.api.constants import RelationTypes
|
||||
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
|
||||
from synapse.types import JsonDict, RoomStreamToken, StrCollection
|
||||
from synapse.types import JsonDict, RoomStreamToken, StrCollection, get_domain_from_id
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.frozenutils import freeze
|
||||
from synapse.util.stringutils import strtobool
|
||||
@@ -334,11 +334,18 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
state_key: DictProperty[str] = DictProperty("state_key")
|
||||
type: DictProperty[str] = DictProperty("type")
|
||||
user_id: DictProperty[str] = DictProperty("sender")
|
||||
# Should only matter for Linearized Matrix.
|
||||
hub_server: DictProperty[Optional[str]] = DefaultDictProperty("hub_server", None)
|
||||
|
||||
@property
|
||||
def event_id(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def pdu_domain(self) -> str:
|
||||
"""The domain which added this event to the DAG."""
|
||||
return get_domain_from_id(self.sender)
|
||||
|
||||
@property
|
||||
def membership(self) -> str:
|
||||
return self.content["membership"]
|
||||
@@ -382,6 +389,9 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
|
||||
return pdu_json
|
||||
|
||||
def get_linearized_pdu_json(self) -> JsonDict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_templated_pdu_json(self) -> JsonDict:
|
||||
"""
|
||||
Return a JSON object suitable for a templated event, as used in the
|
||||
@@ -396,6 +406,9 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
|
||||
return template_json
|
||||
|
||||
def get_templated_linearized_pdu_json(self) -> JsonDict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def __getitem__(self, field: str) -> Optional[Any]:
|
||||
return self._dict[field]
|
||||
|
||||
@@ -591,6 +604,68 @@ class FrozenEventV3(FrozenEventV2):
|
||||
return self._event_id
|
||||
|
||||
|
||||
class FrozenLinearizedEvent(FrozenEventV3):
|
||||
"""
|
||||
Represents a Delegated Linearized PDU.
|
||||
"""
|
||||
|
||||
format_version = EventFormatVersions.LINEARIZED
|
||||
|
||||
# TODO(LM): Do we re-calculate depth at some point?
|
||||
depth = 0 # type: ignore[assignment]
|
||||
|
||||
@property
|
||||
def origin(self) -> str:
|
||||
return get_domain_from_id(self.sender)
|
||||
|
||||
@property
|
||||
def pdu_domain(self) -> str:
|
||||
"""The domain which added this event to the DAG.
|
||||
|
||||
It could be the authorized server or the sender."""
|
||||
if self.hub_server is not None:
|
||||
return self.hub_server
|
||||
return super().pdu_domain
|
||||
|
||||
def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict:
|
||||
pdu = super().get_pdu_json()
|
||||
|
||||
# TODO(LM) Why does this sometimes exist?
|
||||
pdu.pop("depth", None)
|
||||
|
||||
# Internally Synapse uses depth & unsigned, but this isn't part of LM.
|
||||
pdu.pop("unsigned")
|
||||
|
||||
return pdu
|
||||
|
||||
def get_linearized_pdu_json(self) -> JsonDict:
|
||||
# Get the full PDU and then remove fields from it.
|
||||
pdu = self.get_pdu_json()
|
||||
# Strip everything except for the lpdu property from the hashes.
|
||||
pdu["hashes"] = {"lpdu": pdu["hashes"]["lpdu"]}
|
||||
pdu.pop("auth_events")
|
||||
pdu.pop("prev_events")
|
||||
return pdu
|
||||
|
||||
def get_templated_linearized_pdu_json(self) -> JsonDict:
|
||||
"""
|
||||
Return a JSON object suitable for a templated event, as used in the
|
||||
make_{join,leave,knock} workflow.
|
||||
"""
|
||||
# By using _dict directly we don't pull in signatures/unsigned.
|
||||
template_json = dict(self._dict)
|
||||
# The hashes (similar to the signature) need to be recalculated by the
|
||||
# joining/leaving/knocking server after (potentially) modifying the
|
||||
# event.
|
||||
template_json.pop("hashes")
|
||||
|
||||
# Linearized Matrix servers don't know about auth/prev events.
|
||||
template_json.pop("auth_events")
|
||||
template_json.pop("prev_events")
|
||||
|
||||
return template_json
|
||||
|
||||
|
||||
def _event_type_from_format_version(
|
||||
format_version: int,
|
||||
) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]:
|
||||
@@ -610,6 +685,8 @@ def _event_type_from_format_version(
|
||||
return FrozenEventV2
|
||||
elif format_version == EventFormatVersions.ROOM_V4_PLUS:
|
||||
return FrozenEventV3
|
||||
elif format_version == EventFormatVersions.LINEARIZED:
|
||||
return FrozenLinearizedEvent
|
||||
else:
|
||||
raise Exception("No event format %r" % (format_version,))
|
||||
|
||||
|
||||
@@ -87,6 +87,11 @@ class EventBuilder:
|
||||
_redacts: Optional[str] = None
|
||||
_origin_server_ts: Optional[int] = None
|
||||
|
||||
# TODO(LM) If Synapse is acting as a hub-server this should be itself.
|
||||
hub_server: Optional[str] = None
|
||||
_lpdu_hashes: Optional[Dict[str, str]] = None
|
||||
_lpdu_signatures: Optional[Dict[str, Dict[str, str]]] = None
|
||||
|
||||
internal_metadata: _EventInternalMetadata = attr.Factory(
|
||||
lambda: _EventInternalMetadata({})
|
||||
)
|
||||
@@ -145,20 +150,6 @@ class EventBuilder:
|
||||
auth_events = auth_event_ids
|
||||
prev_events = prev_event_ids
|
||||
|
||||
# Otherwise, progress the depth as normal
|
||||
if depth is None:
|
||||
(
|
||||
_,
|
||||
most_recent_prev_event_depth,
|
||||
) = await self._store.get_max_depth_of(prev_event_ids)
|
||||
|
||||
depth = most_recent_prev_event_depth + 1
|
||||
|
||||
# we cap depth of generated events, to ensure that they are not
|
||||
# rejected by other servers (and so that they can be persisted in
|
||||
# the db)
|
||||
depth = min(depth, MAX_DEPTH)
|
||||
|
||||
event_dict: Dict[str, Any] = {
|
||||
"auth_events": auth_events,
|
||||
"prev_events": prev_events,
|
||||
@@ -167,8 +158,25 @@ class EventBuilder:
|
||||
"sender": self.sender,
|
||||
"content": self.content,
|
||||
"unsigned": self.unsigned,
|
||||
"depth": depth,
|
||||
}
|
||||
if not self.room_version.linearized_matrix:
|
||||
# Otherwise, progress the depth as normal
|
||||
if depth is None:
|
||||
(
|
||||
_,
|
||||
most_recent_prev_event_depth,
|
||||
) = await self._store.get_max_depth_of(prev_event_ids)
|
||||
|
||||
depth = most_recent_prev_event_depth + 1
|
||||
|
||||
# we cap depth of generated events, to ensure that they are not
|
||||
# rejected by other servers (and so that they can be persisted in
|
||||
# the db)
|
||||
depth = min(depth, MAX_DEPTH)
|
||||
|
||||
event_dict["depth"] = depth
|
||||
elif self.hub_server is not None:
|
||||
event_dict["hub_server"] = self.hub_server
|
||||
|
||||
if self.is_state():
|
||||
event_dict["state_key"] = self._state_key
|
||||
@@ -181,7 +189,12 @@ class EventBuilder:
|
||||
if self._origin_server_ts is not None:
|
||||
event_dict["origin_server_ts"] = self._origin_server_ts
|
||||
|
||||
return create_local_event_from_event_dict(
|
||||
if self.room_version.linearized_matrix and self._lpdu_hashes:
|
||||
event_dict.setdefault("hashes", {})["lpdu"] = self._lpdu_hashes
|
||||
if self.room_version.linearized_matrix and self._lpdu_signatures:
|
||||
event_dict.setdefault("signatures", {}).update(self._lpdu_signatures)
|
||||
|
||||
event = create_local_event_from_event_dict(
|
||||
clock=self._clock,
|
||||
hostname=self._hostname,
|
||||
signing_key=self._signing_key,
|
||||
@@ -190,6 +203,8 @@ class EventBuilder:
|
||||
internal_metadata_dict=self.internal_metadata.get_dict(),
|
||||
)
|
||||
|
||||
return event
|
||||
|
||||
|
||||
class EventBuilderFactory:
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -230,6 +245,9 @@ class EventBuilderFactory:
|
||||
unsigned=key_values.get("unsigned", {}),
|
||||
redacts=key_values.get("redacts", None),
|
||||
origin_server_ts=key_values.get("origin_server_ts", None),
|
||||
hub_server=key_values.get("hub_server", None),
|
||||
lpdu_hashes=key_values.get("lpdu_hashes", None),
|
||||
lpdu_signatures=key_values.get("lpdu_signatures", None),
|
||||
)
|
||||
|
||||
|
||||
@@ -258,7 +276,11 @@ def create_local_event_from_event_dict(
|
||||
if format_version == EventFormatVersions.ROOM_V1_V2:
|
||||
event_dict["event_id"] = _create_event_id(clock, hostname)
|
||||
|
||||
event_dict["origin"] = hostname
|
||||
if not room_version.linearized_matrix:
|
||||
# Do not add "origin" field to events (LPDUs and PDUs) sent in
|
||||
# rooms that are meant to be compatible with linearized matrix.
|
||||
event_dict["origin"] = hostname
|
||||
|
||||
event_dict.setdefault("origin_server_ts", time_now)
|
||||
|
||||
event_dict.setdefault("unsigned", {})
|
||||
|
||||
@@ -64,6 +64,7 @@ def prune_event(event: EventBase) -> EventBase:
|
||||
the user has specified, but we do want to keep necessary information like
|
||||
type, state_key etc.
|
||||
"""
|
||||
|
||||
pruned_event_dict = prune_event_dict(event.room_version, event.get_dict())
|
||||
|
||||
from . import make_event_from_dict
|
||||
@@ -102,7 +103,6 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
|
||||
"content",
|
||||
"type",
|
||||
"state_key",
|
||||
"depth",
|
||||
"prev_events",
|
||||
"auth_events",
|
||||
"origin_server_ts",
|
||||
@@ -112,6 +112,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
|
||||
if not room_version.updated_redaction_rules:
|
||||
allowed_keys.extend(["prev_state", "membership", "origin"])
|
||||
|
||||
# The hub server should not be redacted for linear matrix.
|
||||
if room_version.linearized_matrix:
|
||||
allowed_keys.append("hub_server")
|
||||
else:
|
||||
allowed_keys.append("depth")
|
||||
|
||||
event_type = event_dict["type"]
|
||||
|
||||
new_content = {}
|
||||
|
||||
@@ -66,6 +66,11 @@ class EventValidator:
|
||||
"type",
|
||||
]
|
||||
|
||||
if event.room_version.linearized_matrix:
|
||||
# Do not add "origin" field to events (LPDUs and PDUs) sent in
|
||||
# rooms that are meant to be compatible with linearized matrix.
|
||||
required.remove("origin")
|
||||
|
||||
for k in required:
|
||||
if k not in event:
|
||||
raise SynapseError(400, "Event does not have key %s" % (k,))
|
||||
|
||||
@@ -21,7 +21,7 @@ from synapse.api.room_versions import EventFormatVersions, RoomVersion
|
||||
from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.utils import prune_event, validate_canonicaljson
|
||||
from synapse.events.utils import prune_event, prune_event_dict, validate_canonicaljson
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
@@ -174,7 +174,7 @@ async def _check_sigs_on_pdu(
|
||||
|
||||
# we want to check that the event is signed by:
|
||||
#
|
||||
# (a) the sender's server
|
||||
# (a) the sender's server or the hub
|
||||
#
|
||||
# - except in the case of invites created from a 3pid invite, which are exempt
|
||||
# from this check, because the sender has to match that of the original 3pid
|
||||
@@ -193,19 +193,22 @@ async def _check_sigs_on_pdu(
|
||||
# let's start by getting the domain for each pdu, and flattening the event back
|
||||
# to JSON.
|
||||
|
||||
# First we check that the sender event is signed by the sender's domain
|
||||
# First we check that the sender event is signed by the domain of the server
|
||||
# which added it to the DAG.
|
||||
# (except if its a 3pid invite, in which case it may be sent by any server)
|
||||
pdu_domain = pdu.pdu_domain
|
||||
sender_domain = get_domain_from_id(pdu.sender)
|
||||
origin_server_ts_for_signing = (
|
||||
pdu.origin_server_ts if room_version.enforce_key_validity else 0
|
||||
)
|
||||
if not _is_invite_via_3pid(pdu):
|
||||
try:
|
||||
await keyring.verify_event_for_server(
|
||||
sender_domain,
|
||||
pdu,
|
||||
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||
pdu_domain, pdu, origin_server_ts_for_signing
|
||||
)
|
||||
except Exception as e:
|
||||
raise InvalidEventSignatureError(
|
||||
f"unable to verify signature for sender domain {sender_domain}: {e}",
|
||||
f"unable to verify signature for PDU domain {pdu_domain}: {e}",
|
||||
pdu.event_id,
|
||||
) from None
|
||||
|
||||
@@ -218,9 +221,7 @@ async def _check_sigs_on_pdu(
|
||||
if event_domain != sender_domain:
|
||||
try:
|
||||
await keyring.verify_event_for_server(
|
||||
event_domain,
|
||||
pdu,
|
||||
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||
event_domain, pdu, origin_server_ts_for_signing
|
||||
)
|
||||
except Exception as e:
|
||||
raise InvalidEventSignatureError(
|
||||
@@ -241,16 +242,35 @@ async def _check_sigs_on_pdu(
|
||||
)
|
||||
try:
|
||||
await keyring.verify_event_for_server(
|
||||
authorising_server,
|
||||
pdu,
|
||||
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
|
||||
authorising_server, pdu, origin_server_ts_for_signing
|
||||
)
|
||||
except Exception as e:
|
||||
raise InvalidEventSignatureError(
|
||||
f"unable to verify signature for authorising serve {authorising_server}: {e}",
|
||||
f"unable to verify signature for authorising server {authorising_server}: {e}",
|
||||
pdu.event_id,
|
||||
) from None
|
||||
|
||||
# If this is a linearized PDU we may need to check signatures of the hub
|
||||
# and sender.
|
||||
if room_version.event_format == EventFormatVersions.LINEARIZED:
|
||||
# If the event was sent via a hub server, check the signature of the
|
||||
# sender against the Linear PDU. (But only if the sender isn't the hub.)
|
||||
#
|
||||
# Note that the signature of the hub server, if one exists, is checked
|
||||
# against the full PDU above.
|
||||
if pdu.hub_server and pdu.hub_server != sender_domain:
|
||||
try:
|
||||
await keyring.verify_json_for_server(
|
||||
sender_domain,
|
||||
prune_event_dict(pdu.room_version, pdu.get_linearized_pdu_json()),
|
||||
origin_server_ts_for_signing,
|
||||
)
|
||||
except Exception as e:
|
||||
raise InvalidEventSignatureError(
|
||||
f"unable to verify signature for sender {sender_domain}: {e}",
|
||||
pdu.event_id,
|
||||
) from None
|
||||
|
||||
|
||||
def _is_invite_via_3pid(event: EventBase) -> bool:
|
||||
return (
|
||||
@@ -273,21 +293,26 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB
|
||||
"""
|
||||
# we could probably enforce a bunch of other fields here (room_id, sender,
|
||||
# origin, etc etc)
|
||||
assert_params_in_dict(pdu_json, ("type", "depth"))
|
||||
if room_version.event_format == EventFormatVersions.LINEARIZED:
|
||||
assert_params_in_dict(pdu_json, ("type",))
|
||||
else:
|
||||
assert_params_in_dict(pdu_json, ("type", "depth"))
|
||||
|
||||
depth = pdu_json["depth"]
|
||||
if type(depth) is not int:
|
||||
raise SynapseError(
|
||||
400, "Depth %r not an integer" % (depth,), Codes.BAD_JSON
|
||||
)
|
||||
|
||||
if depth < 0:
|
||||
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
|
||||
elif depth > MAX_DEPTH:
|
||||
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||
|
||||
# Strip any unauthorized values from "unsigned" if they exist
|
||||
if "unsigned" in pdu_json:
|
||||
_strip_unsigned_values(pdu_json)
|
||||
|
||||
depth = pdu_json["depth"]
|
||||
if type(depth) is not int:
|
||||
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
|
||||
|
||||
if depth < 0:
|
||||
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
|
||||
elif depth > MAX_DEPTH:
|
||||
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||
|
||||
# Validate that the JSON conforms to the specification.
|
||||
if room_version.strict_canonicaljson:
|
||||
validate_canonicaljson(pdu_json)
|
||||
|
||||
@@ -325,9 +325,21 @@ class FederationClient(FederationBase):
|
||||
if not extremities:
|
||||
return None
|
||||
|
||||
transaction_data = await self.transport_layer.backfill(
|
||||
dest, room_id, extremities, limit
|
||||
)
|
||||
try:
|
||||
# Note that this only returns pdus now, but this is close enough to a transaction.
|
||||
transaction_data = await self.transport_layer.backfill_unstable(
|
||||
dest, room_id, extremities, limit
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
|
||||
# and raise.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
transaction_data = await self.transport_layer.backfill(
|
||||
dest, room_id, extremities, limit
|
||||
)
|
||||
|
||||
logger.debug("backfill transaction_data=%r", transaction_data)
|
||||
|
||||
@@ -373,45 +385,58 @@ class FederationClient(FederationBase):
|
||||
Raises:
|
||||
SynapseError, NotRetryingDestination, FederationDeniedError
|
||||
"""
|
||||
transaction_data = await self.transport_layer.get_event(
|
||||
destination, event_id, timeout=timeout
|
||||
)
|
||||
try:
|
||||
# Note that this only returns pdus now, but this is close enough to a transaction.
|
||||
pdu_json = await self.transport_layer.get_event_unstable(
|
||||
destination, event_id, timeout=timeout
|
||||
)
|
||||
|
||||
pdu = event_from_pdu_json(pdu_json, room_version)
|
||||
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
|
||||
# and raise.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
transaction_data = await self.transport_layer.get_event(
|
||||
destination, event_id, timeout=timeout
|
||||
)
|
||||
|
||||
pdu_list: List[EventBase] = [
|
||||
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
|
||||
]
|
||||
|
||||
if pdu_list and pdu_list[0]:
|
||||
pdu = pdu_list[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
logger.debug(
|
||||
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
|
||||
event_id,
|
||||
destination,
|
||||
transaction_data,
|
||||
pdu,
|
||||
)
|
||||
|
||||
pdu_list: List[EventBase] = [
|
||||
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
|
||||
]
|
||||
# Check signatures are correct.
|
||||
try:
|
||||
|
||||
if pdu_list and pdu_list[0]:
|
||||
pdu = pdu_list[0]
|
||||
|
||||
# Check signatures are correct.
|
||||
try:
|
||||
|
||||
async def _record_failure_callback(
|
||||
event: EventBase, cause: str
|
||||
) -> None:
|
||||
await self.store.record_event_failed_pull_attempt(
|
||||
event.room_id, event.event_id, cause
|
||||
)
|
||||
|
||||
signed_pdu = await self._check_sigs_and_hash(
|
||||
room_version, pdu, _record_failure_callback
|
||||
async def _record_failure_callback(event: EventBase, cause: str) -> None:
|
||||
await self.store.record_event_failed_pull_attempt(
|
||||
event.room_id, event.event_id, cause
|
||||
)
|
||||
except InvalidEventSignatureError as e:
|
||||
errmsg = f"event id {pdu.event_id}: {e}"
|
||||
logger.warning("%s", errmsg)
|
||||
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
|
||||
|
||||
return signed_pdu
|
||||
signed_pdu = await self._check_sigs_and_hash(
|
||||
room_version, pdu, _record_failure_callback
|
||||
)
|
||||
except InvalidEventSignatureError as e:
|
||||
errmsg = f"event id {pdu.event_id}: {e}"
|
||||
logger.warning("%s", errmsg)
|
||||
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
|
||||
|
||||
return None
|
||||
return signed_pdu
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
@@ -1222,6 +1247,19 @@ class FederationClient(FederationBase):
|
||||
) -> SendJoinResponse:
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_join_unstable(
|
||||
room_version=room_version,
|
||||
destination=destination,
|
||||
txn_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v2 endpoint.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_join_v2(
|
||||
room_version=room_version,
|
||||
@@ -1293,6 +1331,22 @@ class FederationClient(FederationBase):
|
||||
"""
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_invite_unstable(
|
||||
destination=destination,
|
||||
txn_id=pdu.event_id,
|
||||
content={
|
||||
"event": pdu.get_pdu_json(time_now),
|
||||
"room_version": room_version.identifier,
|
||||
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
|
||||
},
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v2.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_invite_v2(
|
||||
destination=destination,
|
||||
@@ -1359,6 +1413,18 @@ class FederationClient(FederationBase):
|
||||
async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_leave_unstable(
|
||||
destination=destination,
|
||||
txn_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v2.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_leave_v2(
|
||||
destination=destination,
|
||||
@@ -1435,6 +1501,18 @@ class FederationClient(FederationBase):
|
||||
"""
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
try:
|
||||
return await self.transport_layer.send_knock_unstable(
|
||||
destination=destination,
|
||||
txn_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v2.
|
||||
if not is_unknown_endpoint(e):
|
||||
raise
|
||||
|
||||
return await self.transport_layer.send_knock_v1(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import collections.abc
|
||||
import logging
|
||||
import random
|
||||
from typing import (
|
||||
@@ -26,7 +27,6 @@ from typing import (
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
from matrix_common.regex import glob_to_regex
|
||||
@@ -56,13 +56,14 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
|
||||
from synapse.crypto.event_signing import compute_event_signature
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.events.utils import validate_canonicaljson
|
||||
from synapse.federation.federation_base import (
|
||||
FederationBase,
|
||||
InvalidEventSignatureError,
|
||||
event_from_pdu_json,
|
||||
)
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.logging.context import (
|
||||
make_deferred_yieldable,
|
||||
@@ -213,19 +214,15 @@ class FederationServer(FederationBase):
|
||||
|
||||
async def on_backfill_request(
|
||||
self, origin: str, room_id: str, versions: List[str], limit: int
|
||||
) -> Tuple[int, Dict[str, Any]]:
|
||||
) -> List[EventBase]:
|
||||
async with self._server_linearizer.queue((origin, room_id)):
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
await self.check_server_matches_acl(origin_host, room_id)
|
||||
|
||||
pdus = await self.handler.on_backfill_request(
|
||||
return await self.handler.on_backfill_request(
|
||||
origin, room_id, versions, limit
|
||||
)
|
||||
|
||||
res = self._transaction_dict_from_pdus(pdus)
|
||||
|
||||
return 200, res
|
||||
|
||||
async def on_timestamp_to_event_request(
|
||||
self, origin: str, room_id: str, timestamp: int, direction: Direction
|
||||
) -> Tuple[int, Dict[str, Any]]:
|
||||
@@ -463,7 +460,21 @@ class FederationServer(FederationBase):
|
||||
logger.info("Ignoring PDU: %s", e)
|
||||
continue
|
||||
|
||||
event = event_from_pdu_json(p, room_version)
|
||||
# If the transaction is from a linearized matrix server, create PDUs
|
||||
# from the LPDUs.
|
||||
if (
|
||||
room_version.linearized_matrix
|
||||
and await self.keyring.is_server_linearized(origin)
|
||||
):
|
||||
event = await self._on_lpdu_event(p, room_version)
|
||||
|
||||
# Send this event on behalf of the other server.
|
||||
#
|
||||
# We are acting as the hub for this transaction and need to send
|
||||
# this event out to other participants.
|
||||
event.internal_metadata.send_on_behalf_of = origin
|
||||
else:
|
||||
event = event_from_pdu_json(p, room_version)
|
||||
pdus_by_room.setdefault(room_id, []).append(event)
|
||||
|
||||
if event.origin_server_ts > newest_pdu_ts:
|
||||
@@ -534,13 +545,45 @@ class FederationServer(FederationBase):
|
||||
async def _process_edu(edu_dict: JsonDict) -> None:
|
||||
received_edus_counter.inc()
|
||||
|
||||
edu = Edu(
|
||||
origin=origin,
|
||||
destination=self.server_name,
|
||||
edu_type=edu_dict["edu_type"],
|
||||
content=edu_dict["content"],
|
||||
)
|
||||
await self.registry.on_edu(edu.edu_type, origin, edu.content)
|
||||
# TODO(LM) Handle this more natively instead of munging to the current form.
|
||||
if "type" in edu_dict:
|
||||
edu_type = edu_dict["type"]
|
||||
content = edu_dict["content"]
|
||||
sender = edu_dict["sender"]
|
||||
|
||||
if edu_type == EduTypes.DEVICE_LIST_UPDATE:
|
||||
for device_info in content.get("changed", []):
|
||||
device_info["stream_id"] = 0 # XXX Will this work?
|
||||
await self.registry.on_edu(edu_type, origin, device_info)
|
||||
|
||||
for device_id in content.get("removed", []):
|
||||
new_content = {
|
||||
"device_id": device_id,
|
||||
"deleted": True,
|
||||
"stream_id": 0, # XXX Will this work?
|
||||
"user_id": sender,
|
||||
}
|
||||
await self.registry.on_edu(edu_type, origin, new_content)
|
||||
elif edu_type == EduTypes.DIRECT_TO_DEVICE:
|
||||
new_content = {
|
||||
"message_id": 0, # XXX Will this work?
|
||||
"messages": {
|
||||
content["target"]: {
|
||||
content["target_device"]: content["message"]
|
||||
}
|
||||
},
|
||||
"sender": sender,
|
||||
"type": content["message_type"],
|
||||
}
|
||||
await self.registry.on_edu(edu_type, origin, new_content)
|
||||
else:
|
||||
raise ValueError()
|
||||
|
||||
else:
|
||||
edu_type = edu_dict["edu_type"]
|
||||
content = edu_dict["content"]
|
||||
|
||||
await self.registry.on_edu(edu_type, origin, content)
|
||||
|
||||
await concurrently_execute(
|
||||
_process_edu,
|
||||
@@ -616,15 +659,8 @@ class FederationServer(FederationBase):
|
||||
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
|
||||
}
|
||||
|
||||
async def on_pdu_request(
|
||||
self, origin: str, event_id: str
|
||||
) -> Tuple[int, Union[JsonDict, str]]:
|
||||
pdu = await self.handler.get_persisted_pdu(origin, event_id)
|
||||
|
||||
if pdu:
|
||||
return 200, self._transaction_dict_from_pdus([pdu])
|
||||
else:
|
||||
return 404, ""
|
||||
async def on_pdu_request(self, origin: str, event_id: str) -> Optional[EventBase]:
|
||||
return await self.handler.get_persisted_pdu(origin, event_id)
|
||||
|
||||
async def on_query_request(
|
||||
self, query_type: str, args: Dict[str, str]
|
||||
@@ -639,12 +675,14 @@ class FederationServer(FederationBase):
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
await self.check_server_matches_acl(origin_host, room_id)
|
||||
|
||||
room_version = await self.store.get_room_version_id(room_id)
|
||||
if room_version not in supported_versions:
|
||||
# checking the room version will check that we've actually heard of the room
|
||||
# (and return a 404 otherwise)
|
||||
room_version_id = await self.store.get_room_version_id(room_id)
|
||||
if room_version_id not in supported_versions:
|
||||
logger.warning(
|
||||
"Room version %s not in %s", room_version, supported_versions
|
||||
"Room version %s not in %s", room_version_id, supported_versions
|
||||
)
|
||||
raise IncompatibleRoomVersionError(room_version=room_version)
|
||||
raise IncompatibleRoomVersionError(room_version=room_version_id)
|
||||
|
||||
# Refuse the request if that room has seen too many joins recently.
|
||||
# This is in addition to the HS-level rate limiting applied by
|
||||
@@ -655,8 +693,21 @@ class FederationServer(FederationBase):
|
||||
key=room_id,
|
||||
update=False,
|
||||
)
|
||||
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
|
||||
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
||||
pdu = await self.handler.on_make_join_request(
|
||||
origin, room_id, KNOWN_ROOM_VERSIONS[room_version_id], user_id
|
||||
)
|
||||
|
||||
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
|
||||
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
|
||||
if (
|
||||
pdu.room_version.linearized_matrix
|
||||
and await self.keyring.is_server_linearized(origin)
|
||||
):
|
||||
event_json = pdu.get_templated_linearized_pdu_json()
|
||||
else:
|
||||
event_json = pdu.get_templated_pdu_json()
|
||||
|
||||
return {"event": event_json, "room_version": room_version_id}
|
||||
|
||||
async def on_invite_request(
|
||||
self, origin: str, content: JsonDict, room_version_id: str
|
||||
@@ -686,13 +737,14 @@ class FederationServer(FederationBase):
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
room_id: str,
|
||||
expected_room_id: Optional[str] = None,
|
||||
caller_supports_partial_state: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
set_tag(
|
||||
SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
|
||||
caller_supports_partial_state,
|
||||
)
|
||||
room_id = content["room_id"]
|
||||
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
|
||||
requester=None,
|
||||
key=room_id,
|
||||
@@ -700,7 +752,7 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
|
||||
event, context = await self._on_send_membership_event(
|
||||
origin, content, Membership.JOIN, room_id
|
||||
origin, content, Membership.JOIN, expected_room_id
|
||||
)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids()
|
||||
@@ -731,6 +783,14 @@ class FederationServer(FederationBase):
|
||||
auth_chain_events = await self.store.get_events_as_list(auth_chain_event_ids)
|
||||
state_events = await self.store.get_events_as_list(state_event_ids)
|
||||
|
||||
# TODO(LM) eigen-server wants events in order.
|
||||
auth_chain_events = sorted(
|
||||
auth_chain_events, key=lambda e: e.internal_metadata.stream_ordering
|
||||
)
|
||||
state_events = sorted(
|
||||
state_events, key=lambda e: e.internal_metadata.stream_ordering
|
||||
)
|
||||
|
||||
# we try to do all the async stuff before this point, so that time_now is as
|
||||
# accurate as possible.
|
||||
time_now = self._clock.time_msec()
|
||||
@@ -756,13 +816,25 @@ class FederationServer(FederationBase):
|
||||
|
||||
room_version = await self.store.get_room_version_id(room_id)
|
||||
|
||||
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
|
||||
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
|
||||
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
|
||||
if (
|
||||
pdu.room_version.linearized_matrix
|
||||
and await self.keyring.is_server_linearized(origin)
|
||||
):
|
||||
event_json = pdu.get_templated_linearized_pdu_json()
|
||||
else:
|
||||
event_json = pdu.get_templated_pdu_json()
|
||||
|
||||
return {"event": event_json, "room_version": room_version}
|
||||
|
||||
async def on_send_leave_request(
|
||||
self, origin: str, content: JsonDict, room_id: str
|
||||
self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
|
||||
) -> dict:
|
||||
logger.debug("on_send_leave_request: content: %s", content)
|
||||
await self._on_send_membership_event(origin, content, Membership.LEAVE, room_id)
|
||||
await self._on_send_membership_event(
|
||||
origin, content, Membership.LEAVE, expected_room_id
|
||||
)
|
||||
return {}
|
||||
|
||||
async def on_make_knock_request(
|
||||
@@ -814,16 +886,24 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
|
||||
pdu = await self.handler.on_make_knock_request(origin, room_id, user_id)
|
||||
|
||||
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
|
||||
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
|
||||
if (
|
||||
pdu.room_version.linearized_matrix
|
||||
and await self.keyring.is_server_linearized(origin)
|
||||
):
|
||||
event_json = pdu.get_templated_linearized_pdu_json()
|
||||
else:
|
||||
event_json = pdu.get_templated_pdu_json()
|
||||
|
||||
return {
|
||||
"event": pdu.get_templated_pdu_json(),
|
||||
"event": event_json,
|
||||
"room_version": room_version.identifier,
|
||||
}
|
||||
|
||||
async def on_send_knock_request(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
room_id: str,
|
||||
self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
|
||||
) -> Dict[str, List[JsonDict]]:
|
||||
"""
|
||||
We have received a knock event for a room. Verify and send the event into the room
|
||||
@@ -833,13 +913,13 @@ class FederationServer(FederationBase):
|
||||
Args:
|
||||
origin: The remote homeserver of the knocking user.
|
||||
content: The content of the request.
|
||||
room_id: The ID of the room to knock on.
|
||||
expected_room_id: The room ID included in the request.
|
||||
|
||||
Returns:
|
||||
The stripped room state.
|
||||
"""
|
||||
_, context = await self._on_send_membership_event(
|
||||
origin, content, Membership.KNOCK, room_id
|
||||
origin, content, Membership.KNOCK, expected_room_id
|
||||
)
|
||||
|
||||
# Retrieve stripped state events from the room and send them back to the remote
|
||||
@@ -860,7 +940,11 @@ class FederationServer(FederationBase):
|
||||
}
|
||||
|
||||
async def _on_send_membership_event(
|
||||
self, origin: str, content: JsonDict, membership_type: str, room_id: str
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
membership_type: str,
|
||||
expected_room_id: Optional[str],
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""Handle an on_send_{join,leave,knock} request
|
||||
|
||||
@@ -872,8 +956,8 @@ class FederationServer(FederationBase):
|
||||
content: The body of the send_* request - a complete membership event
|
||||
membership_type: The expected membership type (join or leave, depending
|
||||
on the endpoint)
|
||||
room_id: The room_id from the request, to be validated against the room_id
|
||||
in the event
|
||||
expected_room_id: The room_id from the request, to be validated against
|
||||
the room_id in the event. None if the request did not include a room ID.
|
||||
|
||||
Returns:
|
||||
The event and context of the event after inserting it into the room graph.
|
||||
@@ -883,12 +967,16 @@ class FederationServer(FederationBase):
|
||||
the room_id not matching or the event not being authorized.
|
||||
"""
|
||||
assert_params_in_dict(content, ["room_id"])
|
||||
if content["room_id"] != room_id:
|
||||
if expected_room_id is None:
|
||||
room_id = content["room_id"]
|
||||
elif content["room_id"] != expected_room_id:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Room ID in body does not match that in request path",
|
||||
Codes.BAD_JSON,
|
||||
)
|
||||
else:
|
||||
room_id = expected_room_id
|
||||
|
||||
# Note that get_room_version throws if the room does not exist here.
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
@@ -916,7 +1004,14 @@ class FederationServer(FederationBase):
|
||||
errcode=Codes.FORBIDDEN,
|
||||
)
|
||||
|
||||
event = event_from_pdu_json(content, room_version)
|
||||
# Linearized Matrix requires building the event (i.e. adding auth/prev
|
||||
# events). The input content is an LPDU.
|
||||
if room_version.linearized_matrix and await self.keyring.is_server_linearized(
|
||||
origin
|
||||
):
|
||||
event = await self._on_lpdu_event(content, room_version)
|
||||
else:
|
||||
event = event_from_pdu_json(content, room_version)
|
||||
|
||||
if event.type != EventTypes.Member or not event.is_state():
|
||||
raise SynapseError(400, "Not an m.room.member event", Codes.BAD_JSON)
|
||||
@@ -978,6 +1073,120 @@ class FederationServer(FederationBase):
|
||||
origin, event
|
||||
)
|
||||
|
||||
async def _on_lpdu_event(
|
||||
self, lpdu_json: JsonDict, room_version: RoomVersion
|
||||
) -> EventBase:
|
||||
"""Construct an EventBase from a linearized event json received over federation
|
||||
|
||||
See event_from_pdu_json.
|
||||
|
||||
Args:
|
||||
lpdu_json: lpdu as received over federation
|
||||
room_version: The version of the room this event belongs to
|
||||
|
||||
Raises:
|
||||
SynapseError: if the lpdu is missing required fields or is otherwise
|
||||
not a valid matrix event
|
||||
"""
|
||||
if not room_version.linearized_matrix:
|
||||
raise ValueError("Cannot be used on non-linearized matrix")
|
||||
|
||||
# The minimum fields to create an LPDU.
|
||||
assert_params_in_dict(
|
||||
lpdu_json,
|
||||
(
|
||||
"type",
|
||||
"room_id",
|
||||
"sender",
|
||||
"hub_server",
|
||||
"origin_server_ts",
|
||||
"content",
|
||||
"hashes",
|
||||
"signatures",
|
||||
),
|
||||
)
|
||||
|
||||
# The participant server should *not* provide auth/prev events.
|
||||
for field in ("auth_events", "prev_events"):
|
||||
if field in lpdu_json:
|
||||
raise SynapseError(400, f"LPDU contained {field}", Codes.BAD_JSON)
|
||||
|
||||
# Hashes must contain (only) "lpdu".
|
||||
hashes = lpdu_json.pop("hashes")
|
||||
if not isinstance(hashes, collections.abc.Mapping):
|
||||
raise SynapseError(400, "Invalid hashes", Codes.BAD_JSON)
|
||||
if hashes.keys() != {"lpdu"}:
|
||||
raise SynapseError(
|
||||
400, "hashes must contain exactly one key: 'lpdu'", Codes.BAD_JSON
|
||||
)
|
||||
lpdu_json["lpdu_hashes"] = hashes["lpdu"]
|
||||
lpdu_json["lpdu_signatures"] = lpdu_json.pop("signatures")
|
||||
|
||||
# Validate that the JSON conforms to the specification.
|
||||
if room_version.strict_canonicaljson:
|
||||
validate_canonicaljson(lpdu_json)
|
||||
|
||||
# An LPDU doesn't have enough information to just create an event, it needs
|
||||
# to be built and signed, etc.
|
||||
builder = self.hs.get_event_builder_factory().for_room_version(
|
||||
room_version, lpdu_json
|
||||
)
|
||||
|
||||
try:
|
||||
(
|
||||
event,
|
||||
unpersisted_context,
|
||||
) = await self.hs.get_event_creation_handler().create_new_client_event(
|
||||
builder=builder
|
||||
)
|
||||
except SynapseError as e:
|
||||
logger.warning(
|
||||
"Failed to create PDU from template for room %s because %s",
|
||||
lpdu_json["room_id"],
|
||||
e,
|
||||
)
|
||||
raise
|
||||
|
||||
if not event.hub_server:
|
||||
raise SynapseError(400, "Cannot send PDU via hub server", Codes.BAD_JSON)
|
||||
|
||||
# If an LPDU is trying to be sent through us, but we're not the hub
|
||||
# then deny.
|
||||
if not self._is_mine_server_name(event.hub_server):
|
||||
raise SynapseError(
|
||||
400,
|
||||
f"Cannot authorise event for hub server: {event.hub_server}",
|
||||
Codes.FORBIDDEN,
|
||||
)
|
||||
|
||||
# Double check that we *are* the hub.
|
||||
state_ids = await self._state_storage_controller.get_current_state_ids(
|
||||
event.room_id
|
||||
)
|
||||
# Get the current hub server from the sender of the create event.
|
||||
create_event = await self.store.get_event(state_ids[(EventTypes.Create, "")])
|
||||
hub_server = get_domain_from_id(create_event.sender)
|
||||
|
||||
if not self._is_mine_server_name(hub_server):
|
||||
raise SynapseError(400, "Not the hub server", Codes.FORBIDDEN)
|
||||
|
||||
# Sign the event as the hub.
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
room_version,
|
||||
event.get_pdu_json(),
|
||||
self.hs.hostname,
|
||||
self.hs.signing_key,
|
||||
)
|
||||
)
|
||||
|
||||
# Note that the signatures, etc. get checked later on in _handle_received_pdu.
|
||||
# Server ACLs are checked in the caller: _handle_pdus_in_txn.
|
||||
|
||||
# TODO(LM) Do we need to check that the event will be accepted here?
|
||||
|
||||
return event
|
||||
|
||||
async def on_event_auth(
|
||||
self, origin: str, room_id: str, event_id: str
|
||||
) -> Tuple[int, Dict[str, Any]]:
|
||||
@@ -1073,21 +1282,6 @@ class FederationServer(FederationBase):
|
||||
ts_now_ms = self._clock.time_msec()
|
||||
return await self.store.get_user_id_for_open_id_token(token, ts_now_ms)
|
||||
|
||||
def _transaction_dict_from_pdus(self, pdu_list: List[EventBase]) -> JsonDict:
|
||||
"""Returns a new Transaction containing the given PDUs suitable for
|
||||
transmission.
|
||||
"""
|
||||
time_now = self._clock.time_msec()
|
||||
pdus = [p.get_pdu_json(time_now) for p in pdu_list]
|
||||
return Transaction(
|
||||
# Just need a dummy transaction ID and destination since it won't be used.
|
||||
transaction_id="",
|
||||
origin=self.server_name,
|
||||
pdus=pdus,
|
||||
origin_server_ts=int(time_now),
|
||||
destination="",
|
||||
).get_dict()
|
||||
|
||||
async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
|
||||
"""Process a PDU received in a federation /send/ transaction.
|
||||
|
||||
|
||||
@@ -489,10 +489,19 @@ class FederationSender(AbstractFederationSender):
|
||||
break
|
||||
|
||||
async def handle_event(event: EventBase) -> None:
|
||||
# Only send events for this server.
|
||||
# Send events which this server is sending on behalf of another,
|
||||
# e.g. an event due to send_{join,leave,knock}.
|
||||
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
|
||||
# Send events which originate from this server.
|
||||
is_mine = self.is_mine_id(event.sender)
|
||||
if not is_mine and send_on_behalf_of is None:
|
||||
# Finally, if the server is acting as a linearized matrix hub,
|
||||
# then send to any participating servers off the hub.
|
||||
is_hub_server = (
|
||||
event.room_version.linearized_matrix
|
||||
and event.hub_server
|
||||
and self.is_mine_server_name(event.hub_server)
|
||||
)
|
||||
if not is_mine and send_on_behalf_of is None and not is_hub_server:
|
||||
logger.debug("Not sending remote-origin event %s", event)
|
||||
return
|
||||
|
||||
@@ -542,6 +551,7 @@ class FederationSender(AbstractFederationSender):
|
||||
)
|
||||
return
|
||||
|
||||
# TODO(LM): Is the calculation of all destinations correct?
|
||||
destinations: Optional[Collection[str]] = None
|
||||
if not event.prev_event_ids():
|
||||
# If there are no prev event IDs then the state is empty
|
||||
@@ -614,10 +624,16 @@ class FederationSender(AbstractFederationSender):
|
||||
)
|
||||
}
|
||||
|
||||
if send_on_behalf_of is not None:
|
||||
if (
|
||||
send_on_behalf_of is not None
|
||||
and not event.room_version.linearized_matrix
|
||||
):
|
||||
# If we are sending the event on behalf of another server
|
||||
# then it already has the event and there is no reason to
|
||||
# send the event to it.
|
||||
#
|
||||
# For linearized matrix, send it back to the origin.
|
||||
# TODO(LM) Do not send back to DAG servers?
|
||||
sharded_destinations.discard(send_on_behalf_of)
|
||||
|
||||
logger.debug("Sending %s to %r", event, sharded_destinations)
|
||||
|
||||
@@ -21,6 +21,7 @@ from synapse.api.errors import HttpResponseException
|
||||
from synapse.events import EventBase
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.http.client import is_unknown_endpoint
|
||||
from synapse.logging.opentracing import (
|
||||
extract_text_map,
|
||||
set_tag,
|
||||
@@ -159,17 +160,46 @@ class TransactionManager:
|
||||
del p["age_ts"]
|
||||
return data
|
||||
|
||||
def json_data_cb_unstable() -> JsonDict:
|
||||
data = {
|
||||
"pdus": transaction.pdus,
|
||||
}
|
||||
if transaction.edus:
|
||||
data["edus"] = transaction.edus
|
||||
return data
|
||||
|
||||
try:
|
||||
response = await self._transport_layer.send_transaction(
|
||||
transaction, json_data_cb
|
||||
response = await self._transport_layer.send_unstable_transaction(
|
||||
transaction, json_data_cb_unstable
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
# If an error is received that is due to an unrecognised endpoint,
|
||||
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
|
||||
# and raise.
|
||||
if not is_unknown_endpoint(e):
|
||||
code = e.code
|
||||
|
||||
set_tag(tags.ERROR, True)
|
||||
set_tag(tags.ERROR, True)
|
||||
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
raise
|
||||
logger.info(
|
||||
"TX [%s] {%s} got %d response", destination, txn_id, code
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
try:
|
||||
response = await self._transport_layer.send_transaction(
|
||||
transaction, json_data_cb
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
|
||||
set_tag(tags.ERROR, True)
|
||||
|
||||
logger.info(
|
||||
"TX [%s] {%s} got %d response", destination, txn_id, code
|
||||
)
|
||||
raise
|
||||
|
||||
logger.info("TX [%s] {%s} got 200 response", destination, txn_id)
|
||||
|
||||
|
||||
@@ -134,6 +134,31 @@ class TransportLayerClient:
|
||||
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
|
||||
)
|
||||
|
||||
async def get_event_unstable(
|
||||
self, destination: str, event_id: str, timeout: Optional[int] = None
|
||||
) -> JsonDict:
|
||||
"""Requests the pdu with give id and origin from the given server.
|
||||
|
||||
Args:
|
||||
destination: The host name of the remote homeserver we want
|
||||
to get the state from.
|
||||
event_id: The id of the event being requested.
|
||||
timeout: How long to try (in ms) the destination for before
|
||||
giving up. None indicates no timeout.
|
||||
|
||||
Returns:
|
||||
Results in a dict received from the remote homeserver.
|
||||
"""
|
||||
logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
|
||||
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/event/{event_id}"
|
||||
result = await self.client.get_json(
|
||||
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
|
||||
)
|
||||
# Note that this has many callers, convert the result into the v1 response
|
||||
# (i.e. a transaction).
|
||||
return {"pdus": [result]}
|
||||
|
||||
async def backfill(
|
||||
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
|
||||
) -> Optional[Union[JsonDict, list]]:
|
||||
@@ -171,6 +196,43 @@ class TransportLayerClient:
|
||||
destination, path=path, args=args, try_trailing_slash_on_400=True
|
||||
)
|
||||
|
||||
async def backfill_unstable(
|
||||
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
|
||||
) -> Optional[Union[JsonDict, list]]:
|
||||
"""Requests `limit` previous PDUs in a given context before list of
|
||||
PDUs.
|
||||
|
||||
Args:
|
||||
destination
|
||||
room_id
|
||||
event_tuples:
|
||||
Must be a Collection that is falsy when empty.
|
||||
(Iterable is not enough here!)
|
||||
limit
|
||||
|
||||
Returns:
|
||||
Results in a dict received from the remote homeserver.
|
||||
"""
|
||||
logger.debug(
|
||||
"backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
|
||||
destination,
|
||||
room_id,
|
||||
event_tuples,
|
||||
str(limit),
|
||||
)
|
||||
|
||||
if not event_tuples:
|
||||
# TODO: raise?
|
||||
return None
|
||||
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/backfill/{room_id}"
|
||||
|
||||
args = {"v": event_tuples, "limit": [str(limit)]}
|
||||
|
||||
return await self.client.get_json(
|
||||
destination, path=path, args=args, try_trailing_slash_on_400=True
|
||||
)
|
||||
|
||||
async def timestamp_to_event(
|
||||
self, destination: str, room_id: str, timestamp: int, direction: Direction
|
||||
) -> Union[JsonDict, List]:
|
||||
@@ -253,6 +315,51 @@ class TransportLayerClient:
|
||||
try_trailing_slash_on_400=True,
|
||||
)
|
||||
|
||||
async def send_unstable_transaction(
|
||||
self,
|
||||
transaction: Transaction,
|
||||
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
||||
) -> JsonDict:
|
||||
"""Sends the given Transaction to its destination
|
||||
|
||||
Args:
|
||||
transaction
|
||||
|
||||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response. The result
|
||||
will be the decoded JSON body.
|
||||
|
||||
Fails with ``HTTPRequestException`` if we get an HTTP response
|
||||
code >= 300.
|
||||
|
||||
Fails with ``NotRetryingDestination`` if we are not yet ready
|
||||
to retry this server.
|
||||
|
||||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
logger.debug(
|
||||
"send_data dest=%s, txid=%s",
|
||||
transaction.destination,
|
||||
transaction.transaction_id,
|
||||
)
|
||||
|
||||
if self._is_mine_server_name(transaction.destination):
|
||||
raise RuntimeError("Transport layer cannot send to itself!")
|
||||
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send/{transaction.transaction_id}"
|
||||
|
||||
result = await self.client.put_json(
|
||||
transaction.destination,
|
||||
path=path,
|
||||
json_data_callback=json_data_callback,
|
||||
long_retries=True,
|
||||
backoff_on_404=True, # If we get a 404 the other side has gone
|
||||
try_trailing_slash_on_400=True,
|
||||
)
|
||||
# Convert the result to match the v1 result.
|
||||
return {"pdus": result.get("failed_pdus", {})}
|
||||
|
||||
async def make_query(
|
||||
self,
|
||||
destination: str,
|
||||
@@ -373,6 +480,22 @@ class TransportLayerClient:
|
||||
parser=SendJoinParser(room_version, v1_api=False),
|
||||
)
|
||||
|
||||
async def send_join_unstable(
|
||||
self,
|
||||
room_version: RoomVersion,
|
||||
destination: str,
|
||||
txn_id: str,
|
||||
content: JsonDict,
|
||||
) -> "SendJoinResponse":
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_join/{txn_id}"
|
||||
|
||||
return await self.client.post_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
parser=SendJoinParser(room_version, v1_api=False),
|
||||
)
|
||||
|
||||
async def send_leave_v1(
|
||||
self, destination: str, room_id: str, event_id: str, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
@@ -406,6 +529,22 @@ class TransportLayerClient:
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
async def send_leave_unstable(
|
||||
self, destination: str, txn_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_leave/{txn_id}"
|
||||
|
||||
return await self.client.post_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
# we want to do our best to send this through. The problem is
|
||||
# that if it fails, we won't retry it later, so if the remote
|
||||
# server was just having a momentary blip, the room will be out of
|
||||
# sync.
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
async def send_knock_v1(
|
||||
self,
|
||||
destination: str,
|
||||
@@ -439,6 +578,15 @@ class TransportLayerClient:
|
||||
destination=destination, path=path, data=content
|
||||
)
|
||||
|
||||
async def send_knock_unstable(
|
||||
self, destination: str, txn_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_knock/{txn_id}"
|
||||
|
||||
return await self.client.post_json(
|
||||
destination=destination, path=path, data=content
|
||||
)
|
||||
|
||||
async def send_invite_v1(
|
||||
self, destination: str, room_id: str, event_id: str, content: JsonDict
|
||||
) -> Tuple[int, JsonDict]:
|
||||
@@ -461,6 +609,15 @@ class TransportLayerClient:
|
||||
destination=destination, path=path, data=content, ignore_backoff=True
|
||||
)
|
||||
|
||||
async def send_invite_unstable(
|
||||
self, destination: str, txn_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/invite/{txn_id}"
|
||||
|
||||
return await self.client.post_json(
|
||||
destination=destination, path=path, data=content, ignore_backoff=True
|
||||
)
|
||||
|
||||
async def get_public_rooms(
|
||||
self,
|
||||
remote_server: str,
|
||||
|
||||
@@ -35,6 +35,7 @@ from synapse.federation.transport.server._base import (
|
||||
Authenticator,
|
||||
BaseFederationServlet,
|
||||
)
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.http.servlet import (
|
||||
parse_boolean_from_args,
|
||||
parse_integer_from_args,
|
||||
@@ -67,6 +68,7 @@ class BaseFederationServerServlet(BaseFederationServlet):
|
||||
):
|
||||
super().__init__(hs, authenticator, ratelimiter, server_name)
|
||||
self.handler = hs.get_federation_server()
|
||||
self._clock = hs.get_clock()
|
||||
|
||||
|
||||
class FederationSendServlet(BaseFederationServerServlet):
|
||||
@@ -138,6 +140,48 @@ class FederationSendServlet(BaseFederationServerServlet):
|
||||
return code, response
|
||||
|
||||
|
||||
class FederationUnstableSendServlet(FederationSendServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
|
||||
# This is when someone is trying to send us a bunch of data.
|
||||
async def on_PUT(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Dict[bytes, List[bytes]],
|
||||
transaction_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
"""Called on PUT /send/<transaction_id>/
|
||||
|
||||
Args:
|
||||
transaction_id: The transaction_id associated with this request. This
|
||||
is *not* None.
|
||||
|
||||
Returns:
|
||||
Tuple of `(code, response)`, where
|
||||
`response` is a python dict to be converted into JSON that is
|
||||
used as the response body.
|
||||
"""
|
||||
|
||||
# The removed fields (origin and origin_server_ts) are unused, but the
|
||||
# response is slightly different.
|
||||
|
||||
code, response = await super().on_PUT(origin, content, query, transaction_id)
|
||||
|
||||
# The response only includes failed PDUs.
|
||||
response = {
|
||||
"failed_pdus": {
|
||||
event_id: result
|
||||
for event_id, result in response["pdus"].items()
|
||||
if "error" in result
|
||||
}
|
||||
}
|
||||
|
||||
return code, response
|
||||
|
||||
|
||||
class FederationEventServlet(BaseFederationServerServlet):
|
||||
PATH = "/event/(?P<event_id>[^/]*)/?"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -150,7 +194,46 @@ class FederationEventServlet(BaseFederationServerServlet):
|
||||
query: Dict[bytes, List[bytes]],
|
||||
event_id: str,
|
||||
) -> Tuple[int, Union[JsonDict, str]]:
|
||||
return await self.handler.on_pdu_request(origin, event_id)
|
||||
event = await self.handler.on_pdu_request(origin, event_id)
|
||||
|
||||
if event:
|
||||
# Returns a new Transaction containing the given PDUs suitable for transmission.
|
||||
time_now = self._clock.time_msec()
|
||||
pdus = [event.get_pdu_json(time_now)]
|
||||
return (
|
||||
200,
|
||||
Transaction(
|
||||
# Just need a dummy transaction ID and destination since it won't be used.
|
||||
transaction_id="",
|
||||
origin=self.server_name,
|
||||
pdus=pdus,
|
||||
origin_server_ts=int(time_now),
|
||||
destination="",
|
||||
).get_dict(),
|
||||
)
|
||||
|
||||
return 404, ""
|
||||
|
||||
|
||||
class FederationUnstableEventServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/event/(?P<event_id>[^/]*)/?"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
# This is when someone asks for a data item for a given server data_id pair.
|
||||
async def on_GET(
|
||||
self,
|
||||
origin: str,
|
||||
content: Literal[None],
|
||||
query: Dict[bytes, List[bytes]],
|
||||
event_id: str,
|
||||
) -> Tuple[int, Union[JsonDict, str]]:
|
||||
event = await self.handler.on_pdu_request(origin, event_id)
|
||||
if event:
|
||||
return 200, event.get_dict()
|
||||
return 404, ""
|
||||
|
||||
|
||||
class FederationStateV1Servlet(BaseFederationServerServlet):
|
||||
@@ -207,7 +290,52 @@ class FederationBackfillServlet(BaseFederationServerServlet):
|
||||
if not limit:
|
||||
return 400, {"error": "Did not include limit param"}
|
||||
|
||||
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
|
||||
pdu_list = await self.handler.on_backfill_request(
|
||||
origin, room_id, versions, limit
|
||||
)
|
||||
|
||||
# Returns a new Transaction containing the given PDUs suitable for transmission.
|
||||
time_now = self._clock.time_msec()
|
||||
pdus = [p.get_pdu_json(time_now) for p in pdu_list]
|
||||
return (
|
||||
200,
|
||||
Transaction(
|
||||
# Just need a dummy transaction ID and destination since it won't be used.
|
||||
transaction_id="",
|
||||
origin=self.server_name,
|
||||
pdus=pdus,
|
||||
origin_server_ts=int(time_now),
|
||||
destination="",
|
||||
).get_dict(),
|
||||
)
|
||||
|
||||
|
||||
class FederationUnstableBackfillServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/backfill/(?P<room_id>[^/]*)/?"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_GET(
|
||||
self,
|
||||
origin: str,
|
||||
content: Literal[None],
|
||||
query: Dict[bytes, List[bytes]],
|
||||
room_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
versions = [x.decode("ascii") for x in query[b"v"]]
|
||||
# TODO(LM) Only a single version is allowed for Linearized Matrix.
|
||||
limit = parse_integer_from_args(query, "limit", None)
|
||||
|
||||
if not limit:
|
||||
return 400, {"error": "Did not include limit param"}
|
||||
|
||||
pdu_list = await self.handler.on_backfill_request(
|
||||
origin, room_id, versions, limit
|
||||
)
|
||||
|
||||
return 200, {"pdus": [p.get_pdu_json() for p in pdu_list]}
|
||||
|
||||
|
||||
class FederationTimestampLookupServlet(BaseFederationServerServlet):
|
||||
@@ -354,6 +482,25 @@ class FederationV2SendLeaveServlet(BaseFederationServerServlet):
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationUnstableSendLeaveServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/send_leave/(?P<txn_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Dict[bytes, List[bytes]],
|
||||
txn_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# TODO Use the txn_id for idempotency.
|
||||
result = await self.handler.on_send_leave_request(origin, content)
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationMakeKnockServlet(BaseFederationServerServlet):
|
||||
PATH = "/make_knock/(?P<room_id>[^/]*)/(?P<user_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -393,6 +540,25 @@ class FederationV1SendKnockServlet(BaseFederationServerServlet):
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationUnstableSendKnockServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/send_knock/(?P<txn_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Dict[bytes, List[bytes]],
|
||||
txn_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# TODO Use the txn_id for idempotency.
|
||||
result = await self.handler.on_send_knock_request(origin, content)
|
||||
return 200, {"stripped_state": result["knock_room_state"]}
|
||||
|
||||
|
||||
class FederationEventAuthServlet(BaseFederationServerServlet):
|
||||
PATH = "/event_auth/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -451,6 +617,30 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationUnstableSendJoinServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/send_join/(?P<txn_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Dict[bytes, List[bytes]],
|
||||
txn_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# TODO Use the txn_id for idempotency.
|
||||
|
||||
result = await self.handler.on_send_join_request(origin, content)
|
||||
return 200, {
|
||||
"event": result["event"],
|
||||
"state": result["state"],
|
||||
"auth_chain": result["auth_chain"],
|
||||
}
|
||||
|
||||
|
||||
class FederationV1InviteServlet(BaseFederationServerServlet):
|
||||
PATH = "/invite/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -513,6 +703,42 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationUnstableInviteServlet(BaseFederationServerServlet):
|
||||
PREFIX = (
|
||||
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
)
|
||||
PATH = "/invite/(?P<txn_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_POST(
|
||||
self,
|
||||
origin: str,
|
||||
content: JsonDict,
|
||||
query: Dict[bytes, List[bytes]],
|
||||
txn_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# TODO Use the txn_id for idempotency.
|
||||
|
||||
room_version = content["room_version"]
|
||||
event = content["event"]
|
||||
invite_room_state = content.get("invite_room_state", [])
|
||||
|
||||
# Synapse expects invite_room_state to be in unsigned, as it is in v1
|
||||
# API
|
||||
|
||||
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
|
||||
|
||||
result = await self.handler.on_invite_request(
|
||||
origin, event, room_version_id=room_version
|
||||
)
|
||||
|
||||
# We only store invite_room_state for internal use, so remove it before
|
||||
# returning the event to the remote homeserver.
|
||||
result["event"].get("unsigned", {}).pop("invite_room_state", None)
|
||||
|
||||
return 200, result
|
||||
|
||||
|
||||
class FederationThirdPartyInviteExchangeServlet(BaseFederationServerServlet):
|
||||
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -552,6 +778,33 @@ class FederationUserDevicesQueryServlet(BaseFederationServerServlet):
|
||||
return await self.handler.on_query_user_devices(origin, user_id)
|
||||
|
||||
|
||||
class FederationUnstableUserDeviceQueryServlet(BaseFederationServerServlet):
|
||||
PREFIX = "unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
|
||||
PATH = "/user/(?P<user_id>[^/]*)/device/(?P<device_id>[^/]*)"
|
||||
CATEGORY = "Federation requests"
|
||||
|
||||
async def on_GET(
|
||||
self,
|
||||
origin: str,
|
||||
content: Literal[None],
|
||||
query: Dict[bytes, List[bytes]],
|
||||
user_id: str,
|
||||
device_id: str,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# TODO This is not efficient, we should only query the individual device.
|
||||
_, result = await self.handler.on_query_user_devices(origin, user_id)
|
||||
|
||||
# TODO 404 if not a local user or unknown user.
|
||||
devices = result["devices"]
|
||||
if device_id not in devices:
|
||||
return 404, {}
|
||||
|
||||
return 200, {
|
||||
"user_id": user_id,
|
||||
**devices[device_id],
|
||||
}
|
||||
|
||||
|
||||
class FederationClientKeysClaimServlet(BaseFederationServerServlet):
|
||||
PATH = "/user/keys/claim"
|
||||
CATEGORY = "Federation requests"
|
||||
@@ -811,4 +1064,13 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
|
||||
FederationV1SendKnockServlet,
|
||||
FederationMakeKnockServlet,
|
||||
FederationAccountStatusServlet,
|
||||
# TODO(LM) Linearized Matrix additions.
|
||||
FederationUnstableSendServlet,
|
||||
FederationUnstableEventServlet,
|
||||
FederationUnstableBackfillServlet,
|
||||
FederationUnstableInviteServlet,
|
||||
FederationUnstableSendJoinServlet,
|
||||
FederationUnstableSendLeaveServlet,
|
||||
FederationUnstableSendKnockServlet,
|
||||
FederationUnstableUserDeviceQueryServlet,
|
||||
)
|
||||
|
||||
@@ -551,6 +551,9 @@ class FederationHandler:
|
||||
)
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, f"Can't connect to server {target_host}")
|
||||
except HttpResponseException as e:
|
||||
logger.error(f"Error sending invite to '{target_host}'", e.response)
|
||||
raise e
|
||||
|
||||
return pdu
|
||||
|
||||
@@ -662,7 +665,9 @@ class FederationHandler:
|
||||
origin = ret.origin
|
||||
state = ret.state
|
||||
auth_chain = ret.auth_chain
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
# TODO(LM) Assume the auth chain is reasonable ordered.
|
||||
if not room_version_obj.linearized_matrix:
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
@@ -897,7 +902,7 @@ class FederationHandler:
|
||||
)
|
||||
|
||||
async def on_make_join_request(
|
||||
self, origin: str, room_id: str, user_id: str
|
||||
self, origin: str, room_id: str, room_version: RoomVersion, user_id: str
|
||||
) -> EventBase:
|
||||
"""We've received a /make_join/ request, so we create a partial
|
||||
join event for the room and return that. We do *not* persist or
|
||||
@@ -906,6 +911,7 @@ class FederationHandler:
|
||||
Args:
|
||||
origin: The (verified) server name of the requesting server.
|
||||
room_id: Room to create join event in
|
||||
room_version: The room's room version.
|
||||
user_id: The user to create the join for
|
||||
"""
|
||||
if get_domain_from_id(user_id) != origin:
|
||||
@@ -916,10 +922,6 @@ class FederationHandler:
|
||||
)
|
||||
raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
|
||||
|
||||
# checking the room version will check that we've actually heard of the room
|
||||
# (and return a 404 otherwise)
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
if await self.store.is_partial_state_room(room_id):
|
||||
# If our server is still only partially joined, we can't give a complete
|
||||
# response to /make_join, so return a 404 as we would if we weren't in the
|
||||
@@ -995,16 +997,15 @@ class FederationHandler:
|
||||
state_ids,
|
||||
)
|
||||
|
||||
builder = self.event_builder_factory.for_room_version(
|
||||
room_version,
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"content": event_content,
|
||||
"room_id": room_id,
|
||||
"sender": user_id,
|
||||
"state_key": user_id,
|
||||
},
|
||||
)
|
||||
event_dict = {
|
||||
"type": EventTypes.Member,
|
||||
"content": event_content,
|
||||
"room_id": room_id,
|
||||
"sender": user_id,
|
||||
"state_key": user_id,
|
||||
}
|
||||
|
||||
builder = self.event_builder_factory.for_room_version(room_version, event_dict)
|
||||
|
||||
try:
|
||||
(
|
||||
|
||||
@@ -495,6 +495,9 @@ class FederationEventHandler:
|
||||
PartialStateConflictError if the homeserver is already in the room and it
|
||||
has been un-partial stated.
|
||||
"""
|
||||
# TODO(LM) LM server currently includes non-state events.
|
||||
state = [e for e in state if e.is_state()]
|
||||
|
||||
create_event = None
|
||||
for e in state:
|
||||
if (e.type, e.state_key) == (EventTypes.Create, ""):
|
||||
|
||||
@@ -1011,6 +1011,7 @@ class MatrixFederationHttpClient:
|
||||
|
||||
return body
|
||||
|
||||
@overload
|
||||
async def post_json(
|
||||
self,
|
||||
destination: str,
|
||||
@@ -1020,7 +1021,35 @@ class MatrixFederationHttpClient:
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
args: Optional[QueryParams] = None,
|
||||
parser: Literal[None] = None,
|
||||
) -> JsonDict:
|
||||
...
|
||||
|
||||
@overload
|
||||
async def post_json(
|
||||
self,
|
||||
destination: str,
|
||||
path: str,
|
||||
data: Optional[JsonDict] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
args: Optional[QueryParams] = None,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
) -> T:
|
||||
...
|
||||
|
||||
async def post_json(
|
||||
self,
|
||||
destination: str,
|
||||
path: str,
|
||||
data: Optional[JsonDict] = None,
|
||||
long_retries: bool = False,
|
||||
timeout: Optional[int] = None,
|
||||
ignore_backoff: bool = False,
|
||||
args: Optional[QueryParams] = None,
|
||||
parser: Optional[ByteParser[T]] = None,
|
||||
) -> Union[JsonDict, T]:
|
||||
"""Sends the specified json data using POST
|
||||
|
||||
Args:
|
||||
@@ -1046,6 +1075,9 @@ class MatrixFederationHttpClient:
|
||||
try the request anyway.
|
||||
|
||||
args: query params
|
||||
|
||||
parser: The parser to use to decode the response. Defaults to
|
||||
parsing as JSON.
|
||||
Returns:
|
||||
Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
|
||||
|
||||
@@ -1078,8 +1110,11 @@ class MatrixFederationHttpClient:
|
||||
else:
|
||||
_sec_timeout = self.default_timeout_seconds
|
||||
|
||||
if parser is None:
|
||||
parser = cast(ByteParser[T], JsonParser())
|
||||
|
||||
body = await _handle_response(
|
||||
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
||||
self.reactor, _sec_timeout, request, response, start_ms, parser=parser
|
||||
)
|
||||
return body
|
||||
|
||||
|
||||
@@ -38,7 +38,6 @@ from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
from synapse.appservice.scheduler import ApplicationServiceScheduler
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.crypto.context_factory import RegularPolicyForHTTPS
|
||||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
@@ -475,9 +474,11 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
An HTTP client for federation.
|
||||
"""
|
||||
tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
|
||||
self.config
|
||||
)
|
||||
# XXX Disable TLS for federation.
|
||||
# tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
|
||||
# self.config
|
||||
# )
|
||||
tls_client_options_factory = None
|
||||
return MatrixFederationHttpClient(self, tls_client_options_factory)
|
||||
|
||||
@cache_in_self
|
||||
|
||||
@@ -21,6 +21,7 @@ from twisted.internet import defer
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.handlers.device import DeviceHandler
|
||||
from synapse.rest import admin
|
||||
@@ -274,7 +275,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||
|
||||
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
|
||||
self.federation_transport_client = Mock(
|
||||
spec=["send_transaction", "query_user_devices"]
|
||||
spec=["send_transaction", "send_unstable_transaction", "query_user_devices"]
|
||||
)
|
||||
return self.setup_test_homeserver(
|
||||
federation_transport_client=self.federation_transport_client,
|
||||
@@ -313,6 +314,11 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
|
||||
|
||||
# whenever send_transaction is called, record the edu data
|
||||
self.edus: List[JsonDict] = []
|
||||
self.federation_transport_client.send_unstable_transaction.side_effect = (
|
||||
HttpResponseException(
|
||||
404, "Unknown", response=b'{"errcode":"M_UNRECOGNIZED"}'
|
||||
)
|
||||
)
|
||||
self.federation_transport_client.send_transaction.side_effect = (
|
||||
self.record_transaction
|
||||
)
|
||||
|
||||
@@ -27,7 +27,7 @@ from synapse.api.errors import (
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.api.room_versions import RoomVersion, RoomVersions
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.federation.federation_client import SendJoinResult
|
||||
@@ -124,7 +124,9 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||
|
||||
# pretend that another server has joined
|
||||
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
|
||||
join_event = self._build_and_send_join_event(
|
||||
OTHER_SERVER, OTHER_USER, room_id, room_version
|
||||
)
|
||||
|
||||
# check the state group
|
||||
sg = self.get_success(
|
||||
@@ -177,7 +179,9 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||
|
||||
# pretend that another server has joined
|
||||
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
|
||||
join_event = self._build_and_send_join_event(
|
||||
OTHER_SERVER, OTHER_USER, room_id, room_version
|
||||
)
|
||||
|
||||
# check the state group
|
||||
sg = self.get_success(
|
||||
@@ -479,10 +483,16 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
||||
)
|
||||
|
||||
def _build_and_send_join_event(
|
||||
self, other_server: str, other_user: str, room_id: str
|
||||
self,
|
||||
other_server: str,
|
||||
other_user: str,
|
||||
room_id: str,
|
||||
room_version: RoomVersion,
|
||||
) -> EventBase:
|
||||
join_event = self.get_success(
|
||||
self.handler.on_make_join_request(other_server, room_id, other_user)
|
||||
self.handler.on_make_join_request(
|
||||
other_server, room_id, room_version, other_user
|
||||
)
|
||||
)
|
||||
# the auth code requires that a signature exists, but doesn't check that
|
||||
# signature... go figure.
|
||||
|
||||
@@ -85,6 +85,7 @@ class FakeEvent:
|
||||
self.state_key = state_key
|
||||
self.content = content
|
||||
self.room_id = ROOM_ID
|
||||
self.hub_server = None
|
||||
|
||||
def to_event(self, auth_events: List[str], prev_events: List[str]) -> EventBase:
|
||||
"""Given the auth_events and prev_events, convert to a Frozen Event
|
||||
|
||||
Reference in New Issue
Block a user