1
0

Accept LPDUs in transactions and fan them back out.

This commit is contained in:
Patrick Cloke
2023-07-13 08:07:24 -04:00
parent c74a073389
commit 547a7076b4
3 changed files with 151 additions and 10 deletions

View File

@@ -189,6 +189,10 @@ class Keyring:
valid_until_ts=2**63, # fake future timestamp
)
async def is_server_linearized(self, server_name: str) -> bool:
# TODO(LM) Fetch whether the key response of the origin contains m.linearized.
return not self._is_mine_server_name(server_name)
async def verify_json_for_server(
self,
server_name: str,

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import logging
import random
from typing import (
@@ -55,6 +56,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.utils import validate_canonicaljson
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
@@ -285,11 +287,6 @@ class FederationServer(FederationBase):
# accurate as possible.
request_time = self._clock.time_msec()
# TODO(LM): If we are the hub server and the destination is a participant
# server then pdus are Linear PDUs.
#
# TODO(LM): If we are the hub's DAG server we need to linearize and give
# it the results.
transaction = Transaction(
transaction_id=transaction_id,
destination=destination,
@@ -463,7 +460,21 @@ class FederationServer(FederationBase):
logger.info("Ignoring PDU: %s", e)
continue
event = event_from_pdu_json(p, room_version)
# If the transaction is from a linearized matrix server, create PDUs
# from the LPDUs.
if (
room_version.linearized_matrix
and await self.keyring.is_server_linearized(origin)
):
event = await self._on_lpdu_event(p, room_version)
# Send this event on behalf of the other server.
#
# We are acting as the hub for this transaction and need to send
# this event out to other participants.
event.internal_metadata.send_on_behalf_of = origin
else:
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
if event.origin_server_ts > newest_pdu_ts:
@@ -531,7 +542,6 @@ class FederationServer(FederationBase):
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction) -> None:
"""Process the EDUs in a received transaction."""
async def _process_edu(edu_dict: JsonDict) -> None:
received_edus_counter.inc()
@@ -1012,6 +1022,117 @@ class FederationServer(FederationBase):
origin, event
)
async def _on_lpdu_event(
self, lpdu_json: JsonDict, room_version: RoomVersion
) -> EventBase:
"""Construct an EventBase from a linearized event json received over federation
See event_from_pdu_json.
Args:
lpdu_json: lpdu as received over federation
room_version: The version of the room this event belongs to
Raises:
SynapseError: if the lpdu is missing required fields or is otherwise
not a valid matrix event
"""
if not room_version.linearized_matrix:
raise ValueError("Cannot be used on non-linearized matrix")
# The minimum fields to create an LPDU.
assert_params_in_dict(
lpdu_json,
(
"type",
"room_Id",
"sender",
"origin",
"hub_server",
"origin_server_ts",
"content",
"hashes",
),
)
# The participant server should *not* provide auth/prev events.
for field in ("auth_events", "prev_events"):
if field in lpdu_json:
raise SynapseError(400, f"LPDU contained {field}", Codes.BAD_JSON)
# Hashes must contain (only) "lpdu".
if not isinstance(lpdu_json["hashes"], collections.abc.Mapping):
raise SynapseError(400, "Invalid hashes", Codes.BAD_JSON)
if lpdu_json["hashes"].keys() != {"lpdu"}:
raise SynapseError(
400, "hashes must contain exactly one key: 'lpdu'", Codes.BAD_JSON
)
# Validate that the JSON conforms to the specification.
if room_version.strict_canonicaljson:
validate_canonicaljson(lpdu_json)
# An LPDU doesn't have enough information to just create an event, it needs
# to be built and signed, etc.
builder = self.hs.get_event_builder_factory().for_room_version(
room_version, lpdu_json
)
try:
(
event,
unpersisted_context,
) = await self.hs.get_event_creation_handler().create_new_client_event(
builder=builder
)
except SynapseError as e:
logger.warning(
"Failed to create PDU from template for room %s because %s",
lpdu_json["room_id"],
e,
)
raise
if not event.hub_server:
raise SynapseError(400, "Cannot send PDU via hub server", Codes.BAD_JSON)
# If an LPDU is trying to be sent through us, but we're not the hub
# then deny.
if not self._is_mine_server_name(event.hub_server):
raise SynapseError(
400,
f"Cannot authorise event for hub server: {event.hub_server}",
Codes.FORBIDDEN,
)
# Double check that we *are* the hub.
state_ids = await self._state_storage_controller.get_current_state_ids(
event.room_id
)
# Get the current hub server from the sender of the create event.
create_event = await self.store.get_event(state_ids[(EventTypes.Create, "")])
hub_server = get_domain_from_id(create_event.sender)
if not self._is_mine_server_name(hub_server):
raise SynapseError(400, "Not the hub server", Codes.FORBIDDEN)
# Sign the event as the hub.
event.signatures.update(
compute_event_signature(
room_version,
event.get_pdu_json(),
self.hs.hostname,
self.hs.signing_key,
)
)
# Note that the signatures, etc. get checked later on in _handle_received_pdu.
# Server ACLs are checked in the caller: _handle_pdus_in_txn.
# TODO(LM) Do we need to check that the event will be accepted here?
return event
async def on_event_auth(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:

View File

@@ -489,10 +489,19 @@ class FederationSender(AbstractFederationSender):
break
async def handle_event(event: EventBase) -> None:
# Only send events for this server.
# Send events which this server is sending on behalf of another,
# e.g. an event due to send_{join,leave,knock}.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
# Send events which originate from this server.
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
# Finally, if the server is acting as a linearized matrix hub,
# then send to any participating servers off the hub.
is_hub_server = (
event.room_version.linearized_matrix
and event.hub_server
and self.is_mine_server_name(event.hub_server)
)
if not is_mine and send_on_behalf_of is None and not is_hub_server:
logger.debug("Not sending remote-origin event %s", event)
return
@@ -542,6 +551,7 @@ class FederationSender(AbstractFederationSender):
)
return
# TODO(LM): Is the calculation of all destinations correct?
destinations: Optional[Collection[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
@@ -614,10 +624,16 @@ class FederationSender(AbstractFederationSender):
)
}
if send_on_behalf_of is not None:
if (
send_on_behalf_of is not None
and not event.room_version.linearized_matrix
):
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
#
# For linearized matrix, send it back to the origin.
# TODO(LM) Do not send back to DAG servers?
sharded_destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, sharded_destinations)