1
0

Compare commits

...

35 Commits

Author SHA1 Message Date
Patrick Cloke
7c70e00393 Only return LPDUs to linearized matrix servers. 2023-07-26 15:48:42 -04:00
Patrick Cloke
6522bbb4b3 Actually fetch if a server is linearized. 2023-07-26 15:13:21 -04:00
Patrick Cloke
02867aeb25 Merge remote-tracking branch 'origin/develop' into clokep/lm 2023-07-26 12:45:17 -04:00
Patrick Cloke
ed5b522085 FIx-up content hash checking for PDUs vs. delegated PDUs. 2023-07-25 14:29:10 -04:00
Andrew Morgan
8d5ab38584 Print out error from remote homeserver if failed to invite 2023-07-22 22:29:20 +01:00
Andrew Morgan
73589e86ef Use room version, not "hub_server" field when adding/validating "origin"
As other events sent in a LM room that are not on behalf of
participating servers will not have this field.
2023-07-22 22:29:20 +01:00
Andrew Morgan
ebb76e6917 If we are acting on a PDU, keep the "lpdu" hash intact.
To match the logic defined by section 9.1.1 of draft-ralston-mimi-linearized-matrix-03.
2023-07-20 17:26:57 +01:00
Andrew Morgan
98ff8d6f30 Fix a couple type hints. 2023-07-20 17:13:15 +01:00
Andrew Morgan
cbf930edf8 Do not add origin field to LPDUs.
Synapse has a bad habit of adding `origin` to the top-level of event content.

The Linearized Matrix spec does not include `origin`, and in fact cannot as that will cause the content hashes and signatures computed by the participant server to differ from Synapse's own results.

So, we perform a hack here to drop `origin` from any event derived from an LPDU event. Other DAG homeserver implementations might be sad however.
2023-07-20 17:12:49 +01:00
Travis Ralston
565b9f983d Fix https://github.com/matrix-org/synapse/issues/15962
Not breaking anything in LM, but might as well while we're here.
2023-07-19 15:49:11 -06:00
Travis Ralston
1e77a62c1d Fix(?) MSC2176 implementation
Pending resolution of https://matrix.to/#/!NasysSDfxKxZBzJJoE:matrix.org/$yCZra9xATiCfKehho6N52m_22ag9TD_tMoqX3GXEwL0?via=matrix.org&via=libera.chat&via=element.io
2023-07-19 15:48:32 -06:00
Patrick Cloke
c50cb6e391 Add note 2023-07-18 12:13:43 -04:00
Patrick Cloke
e0164d9a21 Linting. 2023-07-18 11:57:05 -04:00
Patrick Cloke
36cd55f6a0 Sort auth/state. 2023-07-18 11:56:49 -04:00
Patrick Cloke
98c864bd46 Do not return depth. 2023-07-18 11:56:16 -04:00
Patrick Cloke
b04530ebd3 Fix accepting an invite. 2023-07-18 11:41:50 -04:00
Patrick Cloke
ab75a7d2ac Fix creating events from LPDUs. 2023-07-18 10:12:55 -04:00
Patrick Cloke
3fc09bb13a Remove unsigned and depth in more places. 2023-07-18 09:50:05 -04:00
Patrick Cloke
dcd3d5cdc6 Provide templated LPDUs for make_{join,leave,knock} requests. 2023-07-17 11:05:44 -04:00
Patrick Cloke
547a7076b4 Accept LPDUs in transactions and fan them back out. 2023-07-17 11:05:44 -04:00
Patrick Cloke
c74a073389 Convert new-style EDUs to old-style EDUs. 2023-07-17 11:05:44 -04:00
Patrick Cloke
b90edc168e Implement new device endpoint. 2023-07-17 11:05:44 -04:00
Patrick Cloke
48193d339f Implement new send endpoint. 2023-07-17 11:05:44 -04:00
Patrick Cloke
ae26625694 Implement new invite endpoint. 2023-07-17 11:05:44 -04:00
Patrick Cloke
a2d697b745 Implement new send_{join,leave,knock} endpoints. 2023-07-17 11:05:44 -04:00
Patrick Cloke
23cd415b9e Implement new event and backfill endpoints. 2023-07-17 11:05:43 -04:00
Patrick Cloke
b38ba4a8b1 Handle LPDU content hash. 2023-07-17 11:05:43 -04:00
Patrick Cloke
2c7001679e Ignore non-state events sent in state. 2023-07-17 11:05:43 -04:00
Patrick Cloke
c02f115306 Linearized Matrix events do not have a depth. 2023-07-17 11:05:43 -04:00
Patrick Cloke
ddd3d43049 Remove references to m.room.hub events. 2023-07-17 11:05:43 -04:00
Patrick Cloke
cf26b9f897 Update for v2. 2023-07-17 11:05:43 -04:00
Patrick Cloke
24647487a0 Revert "Initial routing of events." 2023-07-17 11:05:43 -04:00
Patrick Cloke
b33bea983c Initial routing of events. 2023-07-17 11:05:43 -04:00
Patrick Cloke
a93540b60d Disable TLS over federation. 2023-07-17 11:05:43 -04:00
Patrick Cloke
123b63a443 Initial cut at signature verification. 2023-07-17 11:05:43 -04:00
22 changed files with 1275 additions and 191 deletions

View File

@@ -30,12 +30,14 @@ class EventFormatVersions:
ROOM_V1_V2 = 1 # $id:server event id format: used for room v1 and v2
ROOM_V3 = 2 # MSC1659-style $hash event id format: used for room v3
ROOM_V4_PLUS = 3 # MSC1884-style $hash format: introduced for room v4
LINEARIZED = 4 # Delegated Linearized event
KNOWN_EVENT_FORMAT_VERSIONS = {
EventFormatVersions.ROOM_V1_V2,
EventFormatVersions.ROOM_V3,
EventFormatVersions.ROOM_V4_PLUS,
EventFormatVersions.LINEARIZED,
}
@@ -101,6 +103,7 @@ class RoomVersion:
# support the flag. Unknown flags are ignored by the evaluator, making conditions
# fail if used.
msc3931_push_features: Tuple[str, ...] # values from PushRuleRoomFlag
linearized_matrix: bool
class RoomVersions:
@@ -122,6 +125,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V2 = RoomVersion(
"2",
@@ -141,6 +145,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V3 = RoomVersion(
"3",
@@ -160,6 +165,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V4 = RoomVersion(
"4",
@@ -179,6 +185,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V5 = RoomVersion(
"5",
@@ -198,6 +205,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V6 = RoomVersion(
"6",
@@ -217,6 +225,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V7 = RoomVersion(
"7",
@@ -236,6 +245,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V8 = RoomVersion(
"8",
@@ -255,6 +265,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V9 = RoomVersion(
"9",
@@ -274,6 +285,7 @@ class RoomVersions:
knock_restricted_join_rule=False,
enforce_int_power_levels=False,
msc3931_push_features=(),
linearized_matrix=False,
)
V10 = RoomVersion(
"10",
@@ -293,6 +305,7 @@ class RoomVersions:
knock_restricted_join_rule=True,
enforce_int_power_levels=True,
msc3931_push_features=(),
linearized_matrix=False,
)
MSC1767v10 = RoomVersion(
# MSC1767 (Extensible Events) based on room version "10"
@@ -313,6 +326,7 @@ class RoomVersions:
knock_restricted_join_rule=True,
enforce_int_power_levels=True,
msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
linearized_matrix=False,
)
V11 = RoomVersion(
"11",
@@ -332,6 +346,32 @@ class RoomVersions:
knock_restricted_join_rule=True,
enforce_int_power_levels=True,
msc3931_push_features=(),
linearized_matrix=False,
)
# Based on room version 11:
#
# - Enable MSC2176, MSC2175, MSC3989, MSC2174, MSC1767, MSC3821.
# - Disable 'restricted' and 'knock_restricted' join rules.
# - Mark as linearized.
LINEARIZED = RoomVersion(
"org.matrix.i-d.ralston-mimi-linearized-matrix.02",
RoomDisposition.UNSTABLE,
EventFormatVersions.LINEARIZED,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
implicit_room_creator=True,
updated_redaction_rules=True,
restricted_join_rule=True,
restricted_join_rule_fix=True,
knock_join_rule=True,
msc3389_relation_redactions=False,
knock_restricted_join_rule=True,
enforce_int_power_levels=True,
msc3931_push_features=(),
linearized_matrix=True,
)
@@ -349,6 +389,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
RoomVersions.V9,
RoomVersions.V10,
RoomVersions.V11,
RoomVersions.LINEARIZED,
)
}

View File

@@ -36,31 +36,38 @@ logger = logging.getLogger(__name__)
Hasher = Callable[[bytes], "hashlib._Hash"]
@trace
def check_event_content_hash(
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
def _check_dict_hash(
event_id: str,
hash_log: str,
hashes: Any,
d: JsonDict,
hash_algorithm: Hasher,
lpdu_hash: bool,
) -> bool:
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
name, expected_hash = compute_content_hash(d, hash_algorithm, lpdu_hash)
logger.debug(
"Verifying content hash on %s (expecting: %s)",
event.event_id,
"Verifying %s hash on %s (expecting: %s)",
hash_log,
event_id,
encode_base64(expected_hash),
)
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
# nb it might be a immutabledict or a dict
if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(
400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
400,
"Malformed %s hashes: %s"
% (
hash_log,
type(hashes),
),
Codes.UNAUTHORIZED,
)
if name not in hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (name, list(hashes)),
"Algorithm %s not in %s hashes %s" % (name, hash_log, list(hashes)),
Codes.UNAUTHORIZED,
)
message_hash_base64 = hashes[name]
@@ -73,8 +80,45 @@ def check_event_content_hash(
return message_hash_bytes == expected_hash
@trace
def check_event_content_hash(
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
) -> bool:
"""Check whether the hash for this PDU matches the contents"""
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
if not _check_dict_hash(
event.event_id,
"content",
hashes,
event.get_pdu_json(),
hash_algorithm,
lpdu_hash=False,
):
return False
# Check the content hash of the LPDU, if this was sent via a hub.
if event.room_version.linearized_matrix and event.hub_server:
# hashes must be a dictionary to have passed _check_dict_hash above.
lpdu_hashes = hashes.get("lpdu")
return _check_dict_hash(
event.event_id,
"linearized content",
lpdu_hashes,
event.get_linearized_pdu_json(),
hash_algorithm,
lpdu_hash=True,
)
# Non-linearized matrix doesn't care about other checks.
return True
def compute_content_hash(
event_dict: Dict[str, Any], hash_algorithm: Hasher
event_dict: Dict[str, Any], hash_algorithm: Hasher, lpdu_hash: False
) -> Tuple[str, bytes]:
"""Compute the content hash of an event, which is the hash of the
unredacted event.
@@ -83,6 +127,7 @@ def compute_content_hash(
event_dict: The unredacted event as a dict
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
lpdu_hash: True if the LPDU hash is being calculated.
Returns:
A tuple of the name of hash and the hash as raw bytes.
@@ -91,7 +136,17 @@ def compute_content_hash(
event_dict.pop("age_ts", None)
event_dict.pop("unsigned", None)
event_dict.pop("signatures", None)
event_dict.pop("hashes", None)
hashes = event_dict.pop("hashes", {})
# If we are calculating the content hash of a PDU sent on behalf of another
# server, hashes only`contains the lpdu hash.
#
# TODO(LM) This is wrong in that an incorrectly formatted event could have
# bad content hashes from this.
if not lpdu_hash and "lpdu" in hashes:
event_dict["hashes"] = {"lpdu": hashes["lpdu"]}
event_dict.pop("outlier", None)
event_dict.pop("destinations", None)
@@ -176,7 +231,10 @@ def add_hashes_and_signatures(
signing_key: The key to sign with
"""
name, digest = compute_content_hash(event_dict, hash_algorithm=hashlib.sha256)
# Synapse only ever generates PDUs, not LPDUs.
name, digest = compute_content_hash(
event_dict, hash_algorithm=hashlib.sha256, lpdu_hash=False
)
event_dict.setdefault("hashes", {})[name] = encode_base64(digest)

View File

@@ -49,6 +49,7 @@ from synapse.types import JsonDict
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.batching_queue import BatchingQueue
from synapse.util.caches.descriptors import cached
from synapse.util.retryutils import NotRetryingDestination
if TYPE_CHECKING:
@@ -189,6 +190,39 @@ class Keyring:
valid_until_ts=2**63, # fake future timestamp
)
self._client = hs.get_federation_http_client()
@cached()
async def is_server_linearized(self, server_name: str) -> bool:
# TODO(LM) Cache this in the database.
# TODO(LM) Support perspectives server.
try:
response = await self._client.get_json(
destination=server_name,
path="/_matrix/key/v2/server",
ignore_backoff=True,
# we only give the remote server 10s to respond. It should be an
# easy request to handle, so if it doesn't reply within 10s, it's
# probably not going to.
#
# Furthermore, when we are acting as a notary server, we cannot
# wait all day for all of the origin servers, as the requesting
# server will otherwise time out before we can respond.
#
# (Note that get_json may make 4 attempts, so this can still take
# almost 45 seconds to fetch the headers, plus up to another 60s to
# read the response).
timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve
# upon
raise KeyLookupError(str(e))
except HttpResponseException as e:
raise KeyLookupError("Remote server returned an error: %s" % (e,))
return response.get("m.linearized", False)
async def verify_json_for_server(
self,
server_name: str,

View File

@@ -117,7 +117,7 @@ def validate_event_for_room_version(event: "EventBase") -> None:
if not is_invite_via_3pid:
raise AuthError(403, "Event not signed by sender's server")
if event.format_version in (EventFormatVersions.ROOM_V1_V2,):
if event.format_version == EventFormatVersions.ROOM_V1_V2:
# Only older room versions have event IDs to check.
event_id_domain = get_domain_from_id(event.event_id)
@@ -125,6 +125,15 @@ def validate_event_for_room_version(event: "EventBase") -> None:
if not event.signatures.get(event_id_domain):
raise AuthError(403, "Event not signed by sending server")
if event.format_version == EventFormatVersions.LINEARIZED:
# TODO Are these handling DAG-native events properly? Is the null-checks
# a bypass?
# Check that the hub server signed it.
hub_server = event.hub_server
if hub_server and not event.signatures.get(hub_server):
raise AuthError(403, "Event not signed by hub server")
is_invite_via_allow_rule = (
event.room_version.restricted_join_rule
and event.type == EventTypes.Member
@@ -277,6 +286,13 @@ def check_state_dependent_auth_rules(
auth_dict = {(e.type, e.state_key): e for e in auth_events}
# If the event was sent from a hub server it must be the current hub server.
if event.room_version.linearized_matrix:
if event.hub_server:
current_hub_server = get_hub_server(auth_dict)
if current_hub_server != event.hub_server:
raise AuthError(403, "Event sent from incorrect hub server.")
# additional check for m.federate
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
@@ -1063,6 +1079,12 @@ def _verify_third_party_invite(
return False
def get_hub_server(auth_events: StateMap["EventBase"]) -> str:
# The current hub is the sender of the create event.
create_event = auth_events[(EventTypes.Create, "")]
return get_domain_from_id(create_event.sender)
def get_public_keys(invite_event: "EventBase") -> List[Dict[str, Any]]:
public_keys = []
if "public_key" in invite_event.content:

View File

@@ -39,7 +39,7 @@ from unpaddedbase64 import encode_base64
from synapse.api.constants import RelationTypes
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.types import JsonDict, RoomStreamToken, StrCollection, get_domain_from_id
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool
@@ -334,11 +334,18 @@ class EventBase(metaclass=abc.ABCMeta):
state_key: DictProperty[str] = DictProperty("state_key")
type: DictProperty[str] = DictProperty("type")
user_id: DictProperty[str] = DictProperty("sender")
# Should only matter for Linearized Matrix.
hub_server: DictProperty[Optional[str]] = DefaultDictProperty("hub_server", None)
@property
def event_id(self) -> str:
raise NotImplementedError()
@property
def pdu_domain(self) -> str:
"""The domain which added this event to the DAG."""
return get_domain_from_id(self.sender)
@property
def membership(self) -> str:
return self.content["membership"]
@@ -382,6 +389,9 @@ class EventBase(metaclass=abc.ABCMeta):
return pdu_json
def get_linearized_pdu_json(self) -> JsonDict:
raise NotImplementedError()
def get_templated_pdu_json(self) -> JsonDict:
"""
Return a JSON object suitable for a templated event, as used in the
@@ -396,6 +406,9 @@ class EventBase(metaclass=abc.ABCMeta):
return template_json
def get_templated_linearized_pdu_json(self) -> JsonDict:
raise NotImplementedError()
def __getitem__(self, field: str) -> Optional[Any]:
return self._dict[field]
@@ -591,6 +604,68 @@ class FrozenEventV3(FrozenEventV2):
return self._event_id
class FrozenLinearizedEvent(FrozenEventV3):
"""
Represents a Delegated Linearized PDU.
"""
format_version = EventFormatVersions.LINEARIZED
# TODO(LM): Do we re-calculate depth at some point?
depth = 0 # type: ignore[assignment]
@property
def origin(self) -> str:
return get_domain_from_id(self.sender)
@property
def pdu_domain(self) -> str:
"""The domain which added this event to the DAG.
It could be the authorized server or the sender."""
if self.hub_server is not None:
return self.hub_server
return super().pdu_domain
def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict:
pdu = super().get_pdu_json()
# TODO(LM) Why does this sometimes exist?
pdu.pop("depth", None)
# Internally Synapse uses depth & unsigned, but this isn't part of LM.
pdu.pop("unsigned")
return pdu
def get_linearized_pdu_json(self) -> JsonDict:
# Get the full PDU and then remove fields from it.
pdu = self.get_pdu_json()
# Strip everything except for the lpdu property from the hashes.
pdu["hashes"] = {"lpdu": pdu["hashes"]["lpdu"]}
pdu.pop("auth_events")
pdu.pop("prev_events")
return pdu
def get_templated_linearized_pdu_json(self) -> JsonDict:
"""
Return a JSON object suitable for a templated event, as used in the
make_{join,leave,knock} workflow.
"""
# By using _dict directly we don't pull in signatures/unsigned.
template_json = dict(self._dict)
# The hashes (similar to the signature) need to be recalculated by the
# joining/leaving/knocking server after (potentially) modifying the
# event.
template_json.pop("hashes")
# Linearized Matrix servers don't know about auth/prev events.
template_json.pop("auth_events")
template_json.pop("prev_events")
return template_json
def _event_type_from_format_version(
format_version: int,
) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]:
@@ -610,6 +685,8 @@ def _event_type_from_format_version(
return FrozenEventV2
elif format_version == EventFormatVersions.ROOM_V4_PLUS:
return FrozenEventV3
elif format_version == EventFormatVersions.LINEARIZED:
return FrozenLinearizedEvent
else:
raise Exception("No event format %r" % (format_version,))

View File

@@ -87,6 +87,11 @@ class EventBuilder:
_redacts: Optional[str] = None
_origin_server_ts: Optional[int] = None
# TODO(LM) If Synapse is acting as a hub-server this should be itself.
hub_server: Optional[str] = None
_lpdu_hashes: Optional[Dict[str, str]] = None
_lpdu_signatures: Optional[Dict[str, Dict[str, str]]] = None
internal_metadata: _EventInternalMetadata = attr.Factory(
lambda: _EventInternalMetadata({})
)
@@ -145,20 +150,6 @@ class EventBuilder:
auth_events = auth_event_ids
prev_events = prev_event_ids
# Otherwise, progress the depth as normal
if depth is None:
(
_,
most_recent_prev_event_depth,
) = await self._store.get_max_depth_of(prev_event_ids)
depth = most_recent_prev_event_depth + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
event_dict: Dict[str, Any] = {
"auth_events": auth_events,
"prev_events": prev_events,
@@ -167,8 +158,25 @@ class EventBuilder:
"sender": self.sender,
"content": self.content,
"unsigned": self.unsigned,
"depth": depth,
}
if not self.room_version.linearized_matrix:
# Otherwise, progress the depth as normal
if depth is None:
(
_,
most_recent_prev_event_depth,
) = await self._store.get_max_depth_of(prev_event_ids)
depth = most_recent_prev_event_depth + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
event_dict["depth"] = depth
elif self.hub_server is not None:
event_dict["hub_server"] = self.hub_server
if self.is_state():
event_dict["state_key"] = self._state_key
@@ -181,7 +189,12 @@ class EventBuilder:
if self._origin_server_ts is not None:
event_dict["origin_server_ts"] = self._origin_server_ts
return create_local_event_from_event_dict(
if self.room_version.linearized_matrix and self._lpdu_hashes:
event_dict.setdefault("hashes", {})["lpdu"] = self._lpdu_hashes
if self.room_version.linearized_matrix and self._lpdu_signatures:
event_dict.setdefault("signatures", {}).update(self._lpdu_signatures)
event = create_local_event_from_event_dict(
clock=self._clock,
hostname=self._hostname,
signing_key=self._signing_key,
@@ -190,6 +203,8 @@ class EventBuilder:
internal_metadata_dict=self.internal_metadata.get_dict(),
)
return event
class EventBuilderFactory:
def __init__(self, hs: "HomeServer"):
@@ -230,6 +245,9 @@ class EventBuilderFactory:
unsigned=key_values.get("unsigned", {}),
redacts=key_values.get("redacts", None),
origin_server_ts=key_values.get("origin_server_ts", None),
hub_server=key_values.get("hub_server", None),
lpdu_hashes=key_values.get("lpdu_hashes", None),
lpdu_signatures=key_values.get("lpdu_signatures", None),
)
@@ -258,7 +276,11 @@ def create_local_event_from_event_dict(
if format_version == EventFormatVersions.ROOM_V1_V2:
event_dict["event_id"] = _create_event_id(clock, hostname)
event_dict["origin"] = hostname
if not room_version.linearized_matrix:
# Do not add "origin" field to events (LPDUs and PDUs) sent in
# rooms that are meant to be compatible with linearized matrix.
event_dict["origin"] = hostname
event_dict.setdefault("origin_server_ts", time_now)
event_dict.setdefault("unsigned", {})

View File

@@ -64,6 +64,7 @@ def prune_event(event: EventBase) -> EventBase:
the user has specified, but we do want to keep necessary information like
type, state_key etc.
"""
pruned_event_dict = prune_event_dict(event.room_version, event.get_dict())
from . import make_event_from_dict
@@ -102,7 +103,6 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
"content",
"type",
"state_key",
"depth",
"prev_events",
"auth_events",
"origin_server_ts",
@@ -112,6 +112,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
if not room_version.updated_redaction_rules:
allowed_keys.extend(["prev_state", "membership", "origin"])
# The hub server should not be redacted for linear matrix.
if room_version.linearized_matrix:
allowed_keys.append("hub_server")
else:
allowed_keys.append("depth")
event_type = event_dict["type"]
new_content = {}

View File

@@ -66,6 +66,11 @@ class EventValidator:
"type",
]
if event.room_version.linearized_matrix:
# Do not add "origin" field to events (LPDUs and PDUs) sent in
# rooms that are meant to be compatible with linearized matrix.
required.remove("origin")
for k in required:
if k not in event:
raise SynapseError(400, "Event does not have key %s" % (k,))

View File

@@ -21,7 +21,7 @@ from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.crypto.event_signing import check_event_content_hash
from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.events.utils import prune_event, prune_event_dict, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
@@ -174,7 +174,7 @@ async def _check_sigs_on_pdu(
# we want to check that the event is signed by:
#
# (a) the sender's server
# (a) the sender's server or the hub
#
# - except in the case of invites created from a 3pid invite, which are exempt
# from this check, because the sender has to match that of the original 3pid
@@ -193,19 +193,22 @@ async def _check_sigs_on_pdu(
# let's start by getting the domain for each pdu, and flattening the event back
# to JSON.
# First we check that the sender event is signed by the sender's domain
# First we check that the sender event is signed by the domain of the server
# which added it to the DAG.
# (except if its a 3pid invite, in which case it may be sent by any server)
pdu_domain = pdu.pdu_domain
sender_domain = get_domain_from_id(pdu.sender)
origin_server_ts_for_signing = (
pdu.origin_server_ts if room_version.enforce_key_validity else 0
)
if not _is_invite_via_3pid(pdu):
try:
await keyring.verify_event_for_server(
sender_domain,
pdu,
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
pdu_domain, pdu, origin_server_ts_for_signing
)
except Exception as e:
raise InvalidEventSignatureError(
f"unable to verify signature for sender domain {sender_domain}: {e}",
f"unable to verify signature for PDU domain {pdu_domain}: {e}",
pdu.event_id,
) from None
@@ -218,9 +221,7 @@ async def _check_sigs_on_pdu(
if event_domain != sender_domain:
try:
await keyring.verify_event_for_server(
event_domain,
pdu,
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
event_domain, pdu, origin_server_ts_for_signing
)
except Exception as e:
raise InvalidEventSignatureError(
@@ -241,16 +242,35 @@ async def _check_sigs_on_pdu(
)
try:
await keyring.verify_event_for_server(
authorising_server,
pdu,
pdu.origin_server_ts if room_version.enforce_key_validity else 0,
authorising_server, pdu, origin_server_ts_for_signing
)
except Exception as e:
raise InvalidEventSignatureError(
f"unable to verify signature for authorising serve {authorising_server}: {e}",
f"unable to verify signature for authorising server {authorising_server}: {e}",
pdu.event_id,
) from None
# If this is a linearized PDU we may need to check signatures of the hub
# and sender.
if room_version.event_format == EventFormatVersions.LINEARIZED:
# If the event was sent via a hub server, check the signature of the
# sender against the Linear PDU. (But only if the sender isn't the hub.)
#
# Note that the signature of the hub server, if one exists, is checked
# against the full PDU above.
if pdu.hub_server and pdu.hub_server != sender_domain:
try:
await keyring.verify_json_for_server(
sender_domain,
prune_event_dict(pdu.room_version, pdu.get_linearized_pdu_json()),
origin_server_ts_for_signing,
)
except Exception as e:
raise InvalidEventSignatureError(
f"unable to verify signature for sender {sender_domain}: {e}",
pdu.event_id,
) from None
def _is_invite_via_3pid(event: EventBase) -> bool:
return (
@@ -273,21 +293,26 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
assert_params_in_dict(pdu_json, ("type", "depth"))
if room_version.event_format == EventFormatVersions.LINEARIZED:
assert_params_in_dict(pdu_json, ("type",))
else:
assert_params_in_dict(pdu_json, ("type", "depth"))
depth = pdu_json["depth"]
if type(depth) is not int:
raise SynapseError(
400, "Depth %r not an integer" % (depth,), Codes.BAD_JSON
)
if depth < 0:
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
# Strip any unauthorized values from "unsigned" if they exist
if "unsigned" in pdu_json:
_strip_unsigned_values(pdu_json)
depth = pdu_json["depth"]
if type(depth) is not int:
raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
if depth < 0:
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
# Validate that the JSON conforms to the specification.
if room_version.strict_canonicaljson:
validate_canonicaljson(pdu_json)

View File

@@ -325,9 +325,21 @@ class FederationClient(FederationBase):
if not extremities:
return None
transaction_data = await self.transport_layer.backfill(
dest, room_id, extremities, limit
)
try:
# Note that this only returns pdus now, but this is close enough to a transaction.
transaction_data = await self.transport_layer.backfill_unstable(
dest, room_id, extremities, limit
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not is_unknown_endpoint(e):
raise
transaction_data = await self.transport_layer.backfill(
dest, room_id, extremities, limit
)
logger.debug("backfill transaction_data=%r", transaction_data)
@@ -373,45 +385,58 @@ class FederationClient(FederationBase):
Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
"""
transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)
try:
# Note that this only returns pdus now, but this is close enough to a transaction.
pdu_json = await self.transport_layer.get_event_unstable(
destination, event_id, timeout=timeout
)
pdu = event_from_pdu_json(pdu_json, room_version)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not is_unknown_endpoint(e):
raise
transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout
)
pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
else:
return None
logger.debug(
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
pdu,
)
pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]
# Check signatures are correct.
try:
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
# Check signatures are correct.
try:
async def _record_failure_callback(
event: EventBase, cause: str
) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)
signed_pdu = await self._check_sigs_and_hash(
room_version, pdu, _record_failure_callback
async def _record_failure_callback(event: EventBase, cause: str) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)
except InvalidEventSignatureError as e:
errmsg = f"event id {pdu.event_id}: {e}"
logger.warning("%s", errmsg)
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
return signed_pdu
signed_pdu = await self._check_sigs_and_hash(
room_version, pdu, _record_failure_callback
)
except InvalidEventSignatureError as e:
errmsg = f"event id {pdu.event_id}: {e}"
logger.warning("%s", errmsg)
raise SynapseError(403, errmsg, Codes.FORBIDDEN)
return None
return signed_pdu
@trace
@tag_args
@@ -1222,6 +1247,19 @@ class FederationClient(FederationBase):
) -> SendJoinResponse:
time_now = self._clock.time_msec()
try:
return await self.transport_layer.send_join_unstable(
room_version=room_version,
destination=destination,
txn_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v2 endpoint.
if not is_unknown_endpoint(e):
raise
try:
return await self.transport_layer.send_join_v2(
room_version=room_version,
@@ -1293,6 +1331,22 @@ class FederationClient(FederationBase):
"""
time_now = self._clock.time_msec()
try:
return await self.transport_layer.send_invite_unstable(
destination=destination,
txn_id=pdu.event_id,
content={
"event": pdu.get_pdu_json(time_now),
"room_version": room_version.identifier,
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
},
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v2.
if not is_unknown_endpoint(e):
raise
try:
return await self.transport_layer.send_invite_v2(
destination=destination,
@@ -1359,6 +1413,18 @@ class FederationClient(FederationBase):
async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
time_now = self._clock.time_msec()
try:
return await self.transport_layer.send_leave_unstable(
destination=destination,
txn_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v2.
if not is_unknown_endpoint(e):
raise
try:
return await self.transport_layer.send_leave_v2(
destination=destination,
@@ -1435,6 +1501,18 @@ class FederationClient(FederationBase):
"""
time_now = self._clock.time_msec()
try:
return await self.transport_layer.send_knock_unstable(
destination=destination,
txn_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v2.
if not is_unknown_endpoint(e):
raise
return await self.transport_layer.send_knock_v1(
destination=destination,
room_id=pdu.room_id,

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import logging
import random
from typing import (
@@ -26,7 +27,6 @@ from typing import (
Mapping,
Optional,
Tuple,
Union,
)
from matrix_common.regex import glob_to_regex
@@ -56,13 +56,14 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.utils import validate_canonicaljson
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
event_from_pdu_json,
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.federation.units import Transaction
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
@@ -213,19 +214,15 @@ class FederationServer(FederationBase):
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
) -> List[EventBase]:
async with self._server_linearizer.queue((origin, room_id)):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
pdus = await self.handler.on_backfill_request(
return await self.handler.on_backfill_request(
origin, room_id, versions, limit
)
res = self._transaction_dict_from_pdus(pdus)
return 200, res
async def on_timestamp_to_event_request(
self, origin: str, room_id: str, timestamp: int, direction: Direction
) -> Tuple[int, Dict[str, Any]]:
@@ -463,7 +460,21 @@ class FederationServer(FederationBase):
logger.info("Ignoring PDU: %s", e)
continue
event = event_from_pdu_json(p, room_version)
# If the transaction is from a linearized matrix server, create PDUs
# from the LPDUs.
if (
room_version.linearized_matrix
and await self.keyring.is_server_linearized(origin)
):
event = await self._on_lpdu_event(p, room_version)
# Send this event on behalf of the other server.
#
# We are acting as the hub for this transaction and need to send
# this event out to other participants.
event.internal_metadata.send_on_behalf_of = origin
else:
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
if event.origin_server_ts > newest_pdu_ts:
@@ -534,13 +545,45 @@ class FederationServer(FederationBase):
async def _process_edu(edu_dict: JsonDict) -> None:
received_edus_counter.inc()
edu = Edu(
origin=origin,
destination=self.server_name,
edu_type=edu_dict["edu_type"],
content=edu_dict["content"],
)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
# TODO(LM) Handle this more natively instead of munging to the current form.
if "type" in edu_dict:
edu_type = edu_dict["type"]
content = edu_dict["content"]
sender = edu_dict["sender"]
if edu_type == EduTypes.DEVICE_LIST_UPDATE:
for device_info in content.get("changed", []):
device_info["stream_id"] = 0 # XXX Will this work?
await self.registry.on_edu(edu_type, origin, device_info)
for device_id in content.get("removed", []):
new_content = {
"device_id": device_id,
"deleted": True,
"stream_id": 0, # XXX Will this work?
"user_id": sender,
}
await self.registry.on_edu(edu_type, origin, new_content)
elif edu_type == EduTypes.DIRECT_TO_DEVICE:
new_content = {
"message_id": 0, # XXX Will this work?
"messages": {
content["target"]: {
content["target_device"]: content["message"]
}
},
"sender": sender,
"type": content["message_type"],
}
await self.registry.on_edu(edu_type, origin, new_content)
else:
raise ValueError()
else:
edu_type = edu_dict["edu_type"]
content = edu_dict["content"]
await self.registry.on_edu(edu_type, origin, content)
await concurrently_execute(
_process_edu,
@@ -616,15 +659,8 @@ class FederationServer(FederationBase):
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
}
async def on_pdu_request(
self, origin: str, event_id: str
) -> Tuple[int, Union[JsonDict, str]]:
pdu = await self.handler.get_persisted_pdu(origin, event_id)
if pdu:
return 200, self._transaction_dict_from_pdus([pdu])
else:
return 404, ""
async def on_pdu_request(self, origin: str, event_id: str) -> Optional[EventBase]:
return await self.handler.get_persisted_pdu(origin, event_id)
async def on_query_request(
self, query_type: str, args: Dict[str, str]
@@ -639,12 +675,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
room_version = await self.store.get_room_version_id(room_id)
if room_version not in supported_versions:
# checking the room version will check that we've actually heard of the room
# (and return a 404 otherwise)
room_version_id = await self.store.get_room_version_id(room_id)
if room_version_id not in supported_versions:
logger.warning(
"Room version %s not in %s", room_version, supported_versions
"Room version %s not in %s", room_version_id, supported_versions
)
raise IncompatibleRoomVersionError(room_version=room_version)
raise IncompatibleRoomVersionError(room_version=room_version_id)
# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
@@ -655,8 +693,21 @@ class FederationServer(FederationBase):
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
pdu = await self.handler.on_make_join_request(
origin, room_id, KNOWN_ROOM_VERSIONS[room_version_id], user_id
)
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
if (
pdu.room_version.linearized_matrix
and await self.keyring.is_server_linearized(origin)
):
event_json = pdu.get_templated_linearized_pdu_json()
else:
event_json = pdu.get_templated_pdu_json()
return {"event": event_json, "room_version": room_version_id}
async def on_invite_request(
self, origin: str, content: JsonDict, room_version_id: str
@@ -686,13 +737,14 @@ class FederationServer(FederationBase):
self,
origin: str,
content: JsonDict,
room_id: str,
expected_room_id: Optional[str] = None,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
set_tag(
SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
caller_supports_partial_state,
)
room_id = content["room_id"]
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
@@ -700,7 +752,7 @@ class FederationServer(FederationBase):
)
event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
origin, content, Membership.JOIN, expected_room_id
)
prev_state_ids = await context.get_prev_state_ids()
@@ -731,6 +783,14 @@ class FederationServer(FederationBase):
auth_chain_events = await self.store.get_events_as_list(auth_chain_event_ids)
state_events = await self.store.get_events_as_list(state_event_ids)
# TODO(LM) eigen-server wants events in order.
auth_chain_events = sorted(
auth_chain_events, key=lambda e: e.internal_metadata.stream_ordering
)
state_events = sorted(
state_events, key=lambda e: e.internal_metadata.stream_ordering
)
# we try to do all the async stuff before this point, so that time_now is as
# accurate as possible.
time_now = self._clock.time_msec()
@@ -756,13 +816,25 @@ class FederationServer(FederationBase):
room_version = await self.store.get_room_version_id(room_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
if (
pdu.room_version.linearized_matrix
and await self.keyring.is_server_linearized(origin)
):
event_json = pdu.get_templated_linearized_pdu_json()
else:
event_json = pdu.get_templated_pdu_json()
return {"event": event_json, "room_version": room_version}
async def on_send_leave_request(
self, origin: str, content: JsonDict, room_id: str
self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
) -> dict:
logger.debug("on_send_leave_request: content: %s", content)
await self._on_send_membership_event(origin, content, Membership.LEAVE, room_id)
await self._on_send_membership_event(
origin, content, Membership.LEAVE, expected_room_id
)
return {}
async def on_make_knock_request(
@@ -814,16 +886,24 @@ class FederationServer(FederationBase):
)
pdu = await self.handler.on_make_knock_request(origin, room_id, user_id)
# For a linearized matrix server return a templated LPDU, otherwise a templated PDU.
# TODO(LM) Is it an error for a LM server to not be on a LM room version?
if (
pdu.room_version.linearized_matrix
and await self.keyring.is_server_linearized(origin)
):
event_json = pdu.get_templated_linearized_pdu_json()
else:
event_json = pdu.get_templated_pdu_json()
return {
"event": pdu.get_templated_pdu_json(),
"event": event_json,
"room_version": room_version.identifier,
}
async def on_send_knock_request(
self,
origin: str,
content: JsonDict,
room_id: str,
self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
) -> Dict[str, List[JsonDict]]:
"""
We have received a knock event for a room. Verify and send the event into the room
@@ -833,13 +913,13 @@ class FederationServer(FederationBase):
Args:
origin: The remote homeserver of the knocking user.
content: The content of the request.
room_id: The ID of the room to knock on.
expected_room_id: The room ID included in the request.
Returns:
The stripped room state.
"""
_, context = await self._on_send_membership_event(
origin, content, Membership.KNOCK, room_id
origin, content, Membership.KNOCK, expected_room_id
)
# Retrieve stripped state events from the room and send them back to the remote
@@ -860,7 +940,11 @@ class FederationServer(FederationBase):
}
async def _on_send_membership_event(
self, origin: str, content: JsonDict, membership_type: str, room_id: str
self,
origin: str,
content: JsonDict,
membership_type: str,
expected_room_id: Optional[str],
) -> Tuple[EventBase, EventContext]:
"""Handle an on_send_{join,leave,knock} request
@@ -872,8 +956,8 @@ class FederationServer(FederationBase):
content: The body of the send_* request - a complete membership event
membership_type: The expected membership type (join or leave, depending
on the endpoint)
room_id: The room_id from the request, to be validated against the room_id
in the event
expected_room_id: The room_id from the request, to be validated against
the room_id in the event. None if the request did not include a room ID.
Returns:
The event and context of the event after inserting it into the room graph.
@@ -883,12 +967,16 @@ class FederationServer(FederationBase):
the room_id not matching or the event not being authorized.
"""
assert_params_in_dict(content, ["room_id"])
if content["room_id"] != room_id:
if expected_room_id is None:
room_id = content["room_id"]
elif content["room_id"] != expected_room_id:
raise SynapseError(
400,
"Room ID in body does not match that in request path",
Codes.BAD_JSON,
)
else:
room_id = expected_room_id
# Note that get_room_version throws if the room does not exist here.
room_version = await self.store.get_room_version(room_id)
@@ -916,7 +1004,14 @@ class FederationServer(FederationBase):
errcode=Codes.FORBIDDEN,
)
event = event_from_pdu_json(content, room_version)
# Linearized Matrix requires building the event (i.e. adding auth/prev
# events). The input content is an LPDU.
if room_version.linearized_matrix and await self.keyring.is_server_linearized(
origin
):
event = await self._on_lpdu_event(content, room_version)
else:
event = event_from_pdu_json(content, room_version)
if event.type != EventTypes.Member or not event.is_state():
raise SynapseError(400, "Not an m.room.member event", Codes.BAD_JSON)
@@ -978,6 +1073,120 @@ class FederationServer(FederationBase):
origin, event
)
async def _on_lpdu_event(
self, lpdu_json: JsonDict, room_version: RoomVersion
) -> EventBase:
"""Construct an EventBase from a linearized event json received over federation
See event_from_pdu_json.
Args:
lpdu_json: lpdu as received over federation
room_version: The version of the room this event belongs to
Raises:
SynapseError: if the lpdu is missing required fields or is otherwise
not a valid matrix event
"""
if not room_version.linearized_matrix:
raise ValueError("Cannot be used on non-linearized matrix")
# The minimum fields to create an LPDU.
assert_params_in_dict(
lpdu_json,
(
"type",
"room_id",
"sender",
"hub_server",
"origin_server_ts",
"content",
"hashes",
"signatures",
),
)
# The participant server should *not* provide auth/prev events.
for field in ("auth_events", "prev_events"):
if field in lpdu_json:
raise SynapseError(400, f"LPDU contained {field}", Codes.BAD_JSON)
# Hashes must contain (only) "lpdu".
hashes = lpdu_json.pop("hashes")
if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(400, "Invalid hashes", Codes.BAD_JSON)
if hashes.keys() != {"lpdu"}:
raise SynapseError(
400, "hashes must contain exactly one key: 'lpdu'", Codes.BAD_JSON
)
lpdu_json["lpdu_hashes"] = hashes["lpdu"]
lpdu_json["lpdu_signatures"] = lpdu_json.pop("signatures")
# Validate that the JSON conforms to the specification.
if room_version.strict_canonicaljson:
validate_canonicaljson(lpdu_json)
# An LPDU doesn't have enough information to just create an event, it needs
# to be built and signed, etc.
builder = self.hs.get_event_builder_factory().for_room_version(
room_version, lpdu_json
)
try:
(
event,
unpersisted_context,
) = await self.hs.get_event_creation_handler().create_new_client_event(
builder=builder
)
except SynapseError as e:
logger.warning(
"Failed to create PDU from template for room %s because %s",
lpdu_json["room_id"],
e,
)
raise
if not event.hub_server:
raise SynapseError(400, "Cannot send PDU via hub server", Codes.BAD_JSON)
# If an LPDU is trying to be sent through us, but we're not the hub
# then deny.
if not self._is_mine_server_name(event.hub_server):
raise SynapseError(
400,
f"Cannot authorise event for hub server: {event.hub_server}",
Codes.FORBIDDEN,
)
# Double check that we *are* the hub.
state_ids = await self._state_storage_controller.get_current_state_ids(
event.room_id
)
# Get the current hub server from the sender of the create event.
create_event = await self.store.get_event(state_ids[(EventTypes.Create, "")])
hub_server = get_domain_from_id(create_event.sender)
if not self._is_mine_server_name(hub_server):
raise SynapseError(400, "Not the hub server", Codes.FORBIDDEN)
# Sign the event as the hub.
event.signatures.update(
compute_event_signature(
room_version,
event.get_pdu_json(),
self.hs.hostname,
self.hs.signing_key,
)
)
# Note that the signatures, etc. get checked later on in _handle_received_pdu.
# Server ACLs are checked in the caller: _handle_pdus_in_txn.
# TODO(LM) Do we need to check that the event will be accepted here?
return event
async def on_event_auth(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:
@@ -1073,21 +1282,6 @@ class FederationServer(FederationBase):
ts_now_ms = self._clock.time_msec()
return await self.store.get_user_id_for_open_id_token(token, ts_now_ms)
def _transaction_dict_from_pdus(self, pdu_list: List[EventBase]) -> JsonDict:
"""Returns a new Transaction containing the given PDUs suitable for
transmission.
"""
time_now = self._clock.time_msec()
pdus = [p.get_pdu_json(time_now) for p in pdu_list]
return Transaction(
# Just need a dummy transaction ID and destination since it won't be used.
transaction_id="",
origin=self.server_name,
pdus=pdus,
origin_server_ts=int(time_now),
destination="",
).get_dict()
async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
"""Process a PDU received in a federation /send/ transaction.

View File

@@ -489,10 +489,19 @@ class FederationSender(AbstractFederationSender):
break
async def handle_event(event: EventBase) -> None:
# Only send events for this server.
# Send events which this server is sending on behalf of another,
# e.g. an event due to send_{join,leave,knock}.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
# Send events which originate from this server.
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
# Finally, if the server is acting as a linearized matrix hub,
# then send to any participating servers off the hub.
is_hub_server = (
event.room_version.linearized_matrix
and event.hub_server
and self.is_mine_server_name(event.hub_server)
)
if not is_mine and send_on_behalf_of is None and not is_hub_server:
logger.debug("Not sending remote-origin event %s", event)
return
@@ -542,6 +551,7 @@ class FederationSender(AbstractFederationSender):
)
return
# TODO(LM): Is the calculation of all destinations correct?
destinations: Optional[Collection[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
@@ -614,10 +624,16 @@ class FederationSender(AbstractFederationSender):
)
}
if send_on_behalf_of is not None:
if (
send_on_behalf_of is not None
and not event.room_version.linearized_matrix
):
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
#
# For linearized matrix, send it back to the origin.
# TODO(LM) Do not send back to DAG servers?
sharded_destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, sharded_destinations)

View File

@@ -21,6 +21,7 @@ from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.client import is_unknown_endpoint
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
@@ -159,17 +160,46 @@ class TransactionManager:
del p["age_ts"]
return data
def json_data_cb_unstable() -> JsonDict:
data = {
"pdus": transaction.pdus,
}
if transaction.edus:
data["edus"] = transaction.edus
return data
try:
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
response = await self._transport_layer.send_unstable_transaction(
transaction, json_data_cb_unstable
)
except HttpResponseException as e:
code = e.code
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# and raise.
if not is_unknown_endpoint(e):
code = e.code
set_tag(tags.ERROR, True)
set_tag(tags.ERROR, True)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise
try:
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
except HttpResponseException as e:
code = e.code
set_tag(tags.ERROR, True)
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise
logger.info("TX [%s] {%s} got 200 response", destination, txn_id)

View File

@@ -134,6 +134,31 @@ class TransportLayerClient:
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
)
async def get_event_unstable(
self, destination: str, event_id: str, timeout: Optional[int] = None
) -> JsonDict:
"""Requests the pdu with give id and origin from the given server.
Args:
destination: The host name of the remote homeserver we want
to get the state from.
event_id: The id of the event being requested.
timeout: How long to try (in ms) the destination for before
giving up. None indicates no timeout.
Returns:
Results in a dict received from the remote homeserver.
"""
logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/event/{event_id}"
result = await self.client.get_json(
destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
)
# Note that this has many callers, convert the result into the v1 response
# (i.e. a transaction).
return {"pdus": [result]}
async def backfill(
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
) -> Optional[Union[JsonDict, list]]:
@@ -171,6 +196,43 @@ class TransportLayerClient:
destination, path=path, args=args, try_trailing_slash_on_400=True
)
async def backfill_unstable(
self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
) -> Optional[Union[JsonDict, list]]:
"""Requests `limit` previous PDUs in a given context before list of
PDUs.
Args:
destination
room_id
event_tuples:
Must be a Collection that is falsy when empty.
(Iterable is not enough here!)
limit
Returns:
Results in a dict received from the remote homeserver.
"""
logger.debug(
"backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
destination,
room_id,
event_tuples,
str(limit),
)
if not event_tuples:
# TODO: raise?
return None
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/backfill/{room_id}"
args = {"v": event_tuples, "limit": [str(limit)]}
return await self.client.get_json(
destination, path=path, args=args, try_trailing_slash_on_400=True
)
async def timestamp_to_event(
self, destination: str, room_id: str, timestamp: int, direction: Direction
) -> Union[JsonDict, List]:
@@ -253,6 +315,51 @@ class TransportLayerClient:
try_trailing_slash_on_400=True,
)
async def send_unstable_transaction(
self,
transaction: Transaction,
json_data_callback: Optional[Callable[[], JsonDict]] = None,
) -> JsonDict:
"""Sends the given Transaction to its destination
Args:
transaction
Returns:
Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
logger.debug(
"send_data dest=%s, txid=%s",
transaction.destination,
transaction.transaction_id,
)
if self._is_mine_server_name(transaction.destination):
raise RuntimeError("Transport layer cannot send to itself!")
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send/{transaction.transaction_id}"
result = await self.client.put_json(
transaction.destination,
path=path,
json_data_callback=json_data_callback,
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
try_trailing_slash_on_400=True,
)
# Convert the result to match the v1 result.
return {"pdus": result.get("failed_pdus", {})}
async def make_query(
self,
destination: str,
@@ -373,6 +480,22 @@ class TransportLayerClient:
parser=SendJoinParser(room_version, v1_api=False),
)
async def send_join_unstable(
self,
room_version: RoomVersion,
destination: str,
txn_id: str,
content: JsonDict,
) -> "SendJoinResponse":
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_join/{txn_id}"
return await self.client.post_json(
destination=destination,
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=False),
)
async def send_leave_v1(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> Tuple[int, JsonDict]:
@@ -406,6 +529,22 @@ class TransportLayerClient:
ignore_backoff=True,
)
async def send_leave_unstable(
self, destination: str, txn_id: str, content: JsonDict
) -> JsonDict:
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_leave/{txn_id}"
return await self.client.post_json(
destination=destination,
path=path,
data=content,
# we want to do our best to send this through. The problem is
# that if it fails, we won't retry it later, so if the remote
# server was just having a momentary blip, the room will be out of
# sync.
ignore_backoff=True,
)
async def send_knock_v1(
self,
destination: str,
@@ -439,6 +578,15 @@ class TransportLayerClient:
destination=destination, path=path, data=content
)
async def send_knock_unstable(
self, destination: str, txn_id: str, content: JsonDict
) -> JsonDict:
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_knock/{txn_id}"
return await self.client.post_json(
destination=destination, path=path, data=content
)
async def send_invite_v1(
self, destination: str, room_id: str, event_id: str, content: JsonDict
) -> Tuple[int, JsonDict]:
@@ -461,6 +609,15 @@ class TransportLayerClient:
destination=destination, path=path, data=content, ignore_backoff=True
)
async def send_invite_unstable(
self, destination: str, txn_id: str, content: JsonDict
) -> JsonDict:
path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/invite/{txn_id}"
return await self.client.post_json(
destination=destination, path=path, data=content, ignore_backoff=True
)
async def get_public_rooms(
self,
remote_server: str,

View File

@@ -35,6 +35,7 @@ from synapse.federation.transport.server._base import (
Authenticator,
BaseFederationServlet,
)
from synapse.federation.units import Transaction
from synapse.http.servlet import (
parse_boolean_from_args,
parse_integer_from_args,
@@ -67,6 +68,7 @@ class BaseFederationServerServlet(BaseFederationServlet):
):
super().__init__(hs, authenticator, ratelimiter, server_name)
self.handler = hs.get_federation_server()
self._clock = hs.get_clock()
class FederationSendServlet(BaseFederationServerServlet):
@@ -138,6 +140,48 @@ class FederationSendServlet(BaseFederationServerServlet):
return code, response
class FederationUnstableSendServlet(FederationSendServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
# This is when someone is trying to send us a bunch of data.
async def on_PUT(
self,
origin: str,
content: JsonDict,
query: Dict[bytes, List[bytes]],
transaction_id: str,
) -> Tuple[int, JsonDict]:
"""Called on PUT /send/<transaction_id>/
Args:
transaction_id: The transaction_id associated with this request. This
is *not* None.
Returns:
Tuple of `(code, response)`, where
`response` is a python dict to be converted into JSON that is
used as the response body.
"""
# The removed fields (origin and origin_server_ts) are unused, but the
# response is slightly different.
code, response = await super().on_PUT(origin, content, query, transaction_id)
# The response only includes failed PDUs.
response = {
"failed_pdus": {
event_id: result
for event_id, result in response["pdus"].items()
if "error" in result
}
}
return code, response
class FederationEventServlet(BaseFederationServerServlet):
PATH = "/event/(?P<event_id>[^/]*)/?"
CATEGORY = "Federation requests"
@@ -150,7 +194,46 @@ class FederationEventServlet(BaseFederationServerServlet):
query: Dict[bytes, List[bytes]],
event_id: str,
) -> Tuple[int, Union[JsonDict, str]]:
return await self.handler.on_pdu_request(origin, event_id)
event = await self.handler.on_pdu_request(origin, event_id)
if event:
# Returns a new Transaction containing the given PDUs suitable for transmission.
time_now = self._clock.time_msec()
pdus = [event.get_pdu_json(time_now)]
return (
200,
Transaction(
# Just need a dummy transaction ID and destination since it won't be used.
transaction_id="",
origin=self.server_name,
pdus=pdus,
origin_server_ts=int(time_now),
destination="",
).get_dict(),
)
return 404, ""
class FederationUnstableEventServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/event/(?P<event_id>[^/]*)/?"
CATEGORY = "Federation requests"
# This is when someone asks for a data item for a given server data_id pair.
async def on_GET(
self,
origin: str,
content: Literal[None],
query: Dict[bytes, List[bytes]],
event_id: str,
) -> Tuple[int, Union[JsonDict, str]]:
event = await self.handler.on_pdu_request(origin, event_id)
if event:
return 200, event.get_dict()
return 404, ""
class FederationStateV1Servlet(BaseFederationServerServlet):
@@ -207,7 +290,52 @@ class FederationBackfillServlet(BaseFederationServerServlet):
if not limit:
return 400, {"error": "Did not include limit param"}
return await self.handler.on_backfill_request(origin, room_id, versions, limit)
pdu_list = await self.handler.on_backfill_request(
origin, room_id, versions, limit
)
# Returns a new Transaction containing the given PDUs suitable for transmission.
time_now = self._clock.time_msec()
pdus = [p.get_pdu_json(time_now) for p in pdu_list]
return (
200,
Transaction(
# Just need a dummy transaction ID and destination since it won't be used.
transaction_id="",
origin=self.server_name,
pdus=pdus,
origin_server_ts=int(time_now),
destination="",
).get_dict(),
)
class FederationUnstableBackfillServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/backfill/(?P<room_id>[^/]*)/?"
CATEGORY = "Federation requests"
async def on_GET(
self,
origin: str,
content: Literal[None],
query: Dict[bytes, List[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
versions = [x.decode("ascii") for x in query[b"v"]]
# TODO(LM) Only a single version is allowed for Linearized Matrix.
limit = parse_integer_from_args(query, "limit", None)
if not limit:
return 400, {"error": "Did not include limit param"}
pdu_list = await self.handler.on_backfill_request(
origin, room_id, versions, limit
)
return 200, {"pdus": [p.get_pdu_json() for p in pdu_list]}
class FederationTimestampLookupServlet(BaseFederationServerServlet):
@@ -354,6 +482,25 @@ class FederationV2SendLeaveServlet(BaseFederationServerServlet):
return 200, result
class FederationUnstableSendLeaveServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/send_leave/(?P<txn_id>[^/]*)"
CATEGORY = "Federation requests"
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Dict[bytes, List[bytes]],
txn_id: str,
) -> Tuple[int, JsonDict]:
# TODO Use the txn_id for idempotency.
result = await self.handler.on_send_leave_request(origin, content)
return 200, result
class FederationMakeKnockServlet(BaseFederationServerServlet):
PATH = "/make_knock/(?P<room_id>[^/]*)/(?P<user_id>[^/]*)"
CATEGORY = "Federation requests"
@@ -393,6 +540,25 @@ class FederationV1SendKnockServlet(BaseFederationServerServlet):
return 200, result
class FederationUnstableSendKnockServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/send_knock/(?P<txn_id>[^/]*)"
CATEGORY = "Federation requests"
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Dict[bytes, List[bytes]],
txn_id: str,
) -> Tuple[int, JsonDict]:
# TODO Use the txn_id for idempotency.
result = await self.handler.on_send_knock_request(origin, content)
return 200, {"stripped_state": result["knock_room_state"]}
class FederationEventAuthServlet(BaseFederationServerServlet):
PATH = "/event_auth/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
CATEGORY = "Federation requests"
@@ -451,6 +617,30 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
return 200, result
class FederationUnstableSendJoinServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/send_join/(?P<txn_id>[^/]*)"
CATEGORY = "Federation requests"
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Dict[bytes, List[bytes]],
txn_id: str,
) -> Tuple[int, JsonDict]:
# TODO Use the txn_id for idempotency.
result = await self.handler.on_send_join_request(origin, content)
return 200, {
"event": result["event"],
"state": result["state"],
"auth_chain": result["auth_chain"],
}
class FederationV1InviteServlet(BaseFederationServerServlet):
PATH = "/invite/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
CATEGORY = "Federation requests"
@@ -513,6 +703,42 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
return 200, result
class FederationUnstableInviteServlet(BaseFederationServerServlet):
PREFIX = (
FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
)
PATH = "/invite/(?P<txn_id>[^/]*)"
CATEGORY = "Federation requests"
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Dict[bytes, List[bytes]],
txn_id: str,
) -> Tuple[int, JsonDict]:
# TODO Use the txn_id for idempotency.
room_version = content["room_version"]
event = content["event"]
invite_room_state = content.get("invite_room_state", [])
# Synapse expects invite_room_state to be in unsigned, as it is in v1
# API
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
result = await self.handler.on_invite_request(
origin, event, room_version_id=room_version
)
# We only store invite_room_state for internal use, so remove it before
# returning the event to the remote homeserver.
result["event"].get("unsigned", {}).pop("invite_room_state", None)
return 200, result
class FederationThirdPartyInviteExchangeServlet(BaseFederationServerServlet):
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
CATEGORY = "Federation requests"
@@ -552,6 +778,33 @@ class FederationUserDevicesQueryServlet(BaseFederationServerServlet):
return await self.handler.on_query_user_devices(origin, user_id)
class FederationUnstableUserDeviceQueryServlet(BaseFederationServerServlet):
PREFIX = "unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
PATH = "/user/(?P<user_id>[^/]*)/device/(?P<device_id>[^/]*)"
CATEGORY = "Federation requests"
async def on_GET(
self,
origin: str,
content: Literal[None],
query: Dict[bytes, List[bytes]],
user_id: str,
device_id: str,
) -> Tuple[int, JsonDict]:
# TODO This is not efficient, we should only query the individual device.
_, result = await self.handler.on_query_user_devices(origin, user_id)
# TODO 404 if not a local user or unknown user.
devices = result["devices"]
if device_id not in devices:
return 404, {}
return 200, {
"user_id": user_id,
**devices[device_id],
}
class FederationClientKeysClaimServlet(BaseFederationServerServlet):
PATH = "/user/keys/claim"
CATEGORY = "Federation requests"
@@ -811,4 +1064,13 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
FederationAccountStatusServlet,
# TODO(LM) Linearized Matrix additions.
FederationUnstableSendServlet,
FederationUnstableEventServlet,
FederationUnstableBackfillServlet,
FederationUnstableInviteServlet,
FederationUnstableSendJoinServlet,
FederationUnstableSendLeaveServlet,
FederationUnstableSendKnockServlet,
FederationUnstableUserDeviceQueryServlet,
)

View File

@@ -551,6 +551,9 @@ class FederationHandler:
)
except RequestSendFailed:
raise SynapseError(502, f"Can't connect to server {target_host}")
except HttpResponseException as e:
logger.error(f"Error sending invite to '{target_host}'", e.response)
raise e
return pdu
@@ -662,7 +665,9 @@ class FederationHandler:
origin = ret.origin
state = ret.state
auth_chain = ret.auth_chain
auth_chain.sort(key=lambda e: e.depth)
# TODO(LM) Assume the auth chain is reasonable ordered.
if not room_version_obj.linearized_matrix:
auth_chain.sort(key=lambda e: e.depth)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
@@ -897,7 +902,7 @@ class FederationHandler:
)
async def on_make_join_request(
self, origin: str, room_id: str, user_id: str
self, origin: str, room_id: str, room_version: RoomVersion, user_id: str
) -> EventBase:
"""We've received a /make_join/ request, so we create a partial
join event for the room and return that. We do *not* persist or
@@ -906,6 +911,7 @@ class FederationHandler:
Args:
origin: The (verified) server name of the requesting server.
room_id: Room to create join event in
room_version: The room's room version.
user_id: The user to create the join for
"""
if get_domain_from_id(user_id) != origin:
@@ -916,10 +922,6 @@ class FederationHandler:
)
raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
# checking the room version will check that we've actually heard of the room
# (and return a 404 otherwise)
room_version = await self.store.get_room_version(room_id)
if await self.store.is_partial_state_room(room_id):
# If our server is still only partially joined, we can't give a complete
# response to /make_join, so return a 404 as we would if we weren't in the
@@ -995,16 +997,15 @@ class FederationHandler:
state_ids,
)
builder = self.event_builder_factory.for_room_version(
room_version,
{
"type": EventTypes.Member,
"content": event_content,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
},
)
event_dict = {
"type": EventTypes.Member,
"content": event_content,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
}
builder = self.event_builder_factory.for_room_version(room_version, event_dict)
try:
(

View File

@@ -495,6 +495,9 @@ class FederationEventHandler:
PartialStateConflictError if the homeserver is already in the room and it
has been un-partial stated.
"""
# TODO(LM) LM server currently includes non-state events.
state = [e for e in state if e.is_state()]
create_event = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):

View File

@@ -1011,6 +1011,7 @@ class MatrixFederationHttpClient:
return body
@overload
async def post_json(
self,
destination: str,
@@ -1020,7 +1021,35 @@ class MatrixFederationHttpClient:
timeout: Optional[int] = None,
ignore_backoff: bool = False,
args: Optional[QueryParams] = None,
parser: Literal[None] = None,
) -> JsonDict:
...
@overload
async def post_json(
self,
destination: str,
path: str,
data: Optional[JsonDict] = None,
long_retries: bool = False,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
args: Optional[QueryParams] = None,
parser: Optional[ByteParser[T]] = None,
) -> T:
...
async def post_json(
self,
destination: str,
path: str,
data: Optional[JsonDict] = None,
long_retries: bool = False,
timeout: Optional[int] = None,
ignore_backoff: bool = False,
args: Optional[QueryParams] = None,
parser: Optional[ByteParser[T]] = None,
) -> Union[JsonDict, T]:
"""Sends the specified json data using POST
Args:
@@ -1046,6 +1075,9 @@ class MatrixFederationHttpClient:
try the request anyway.
args: query params
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
Returns:
Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
@@ -1078,8 +1110,11 @@ class MatrixFederationHttpClient:
else:
_sec_timeout = self.default_timeout_seconds
if parser is None:
parser = cast(ByteParser[T], JsonParser())
body = await _handle_response(
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
self.reactor, _sec_timeout, request, response, start_ms, parser=parser
)
return body

View File

@@ -38,7 +38,6 @@ from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
from synapse.appservice.api import ApplicationServiceApi
from synapse.appservice.scheduler import ApplicationServiceScheduler
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.crypto.context_factory import RegularPolicyForHTTPS
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
@@ -475,9 +474,11 @@ class HomeServer(metaclass=abc.ABCMeta):
"""
An HTTP client for federation.
"""
tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
self.config
)
# XXX Disable TLS for federation.
# tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
# self.config
# )
tls_client_options_factory = None
return MatrixFederationHttpClient(self, tls_client_options_factory)
@cache_in_self

View File

@@ -21,6 +21,7 @@ from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
from synapse.api.errors import HttpResponseException
from synapse.federation.units import Transaction
from synapse.handlers.device import DeviceHandler
from synapse.rest import admin
@@ -274,7 +275,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(
spec=["send_transaction", "query_user_devices"]
spec=["send_transaction", "send_unstable_transaction", "query_user_devices"]
)
return self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
@@ -313,6 +314,11 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
# whenever send_transaction is called, record the edu data
self.edus: List[JsonDict] = []
self.federation_transport_client.send_unstable_transaction.side_effect = (
HttpResponseException(
404, "Unknown", response=b'{"errcode":"M_UNRECOGNIZED"}'
)
)
self.federation_transport_client.send_transaction.side_effect = (
self.record_transaction
)

View File

@@ -27,7 +27,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
from synapse.api.room_versions import RoomVersions
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.federation_base import event_from_pdu_json
from synapse.federation.federation_client import SendJoinResult
@@ -124,7 +124,9 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
room_version = self.get_success(self.store.get_room_version(room_id))
# pretend that another server has joined
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
join_event = self._build_and_send_join_event(
OTHER_SERVER, OTHER_USER, room_id, room_version
)
# check the state group
sg = self.get_success(
@@ -177,7 +179,9 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
room_version = self.get_success(self.store.get_room_version(room_id))
# pretend that another server has joined
join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id)
join_event = self._build_and_send_join_event(
OTHER_SERVER, OTHER_USER, room_id, room_version
)
# check the state group
sg = self.get_success(
@@ -479,10 +483,16 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
)
def _build_and_send_join_event(
self, other_server: str, other_user: str, room_id: str
self,
other_server: str,
other_user: str,
room_id: str,
room_version: RoomVersion,
) -> EventBase:
join_event = self.get_success(
self.handler.on_make_join_request(other_server, room_id, other_user)
self.handler.on_make_join_request(
other_server, room_id, room_version, other_user
)
)
# the auth code requires that a signature exists, but doesn't check that
# signature... go figure.

View File

@@ -85,6 +85,7 @@ class FakeEvent:
self.state_key = state_key
self.content = content
self.room_id = ROOM_ID
self.hub_server = None
def to_event(self, auth_events: List[str], prev_events: List[str]) -> EventBase:
"""Given the auth_events and prev_events, convert to a Frozen Event