Compare commits

...

12 Commits

Author SHA1 Message Date
David Robertson
b4517d78f6 Master is a fed sender if none are configured 2023-02-24 17:44:42 +00:00
David Robertson
3452c2a23d Complain at startup if we can't find fed senders 2023-02-24 17:18:27 +00:00
David Robertson
5fc4155d2d Upgrade notes and docs 2023-02-24 17:18:27 +00:00
David Robertson
15b357e6cd Make new replication endpoint accessible in complement 2023-02-24 11:39:06 +00:00
David Robertson
4e3d2e8b69 Changelog 2023-02-24 00:41:19 +00:00
David Robertson
3a9aa533d1 Test case 2023-02-24 00:41:19 +00:00
David Robertson
bab3b58f7a Comments 2023-02-24 00:33:04 +00:00
David Robertson
8c5067609c Use new KeyFetcher 2023-02-24 00:33:04 +00:00
David Robertson
cdec54468a Add new KeyFetcher impl 2023-02-24 00:33:04 +00:00
David Robertson
0919513c3a Define a new replication endpoint 2023-02-24 00:33:03 +00:00
David Robertson
e0841c5d3f Keyring: isolate the keyfetching mechanism
So I can call it from federation senders
2023-02-23 23:57:24 +00:00
David Robertson
9a7f925a84 Pull out _FetchKeyRequest
This will help to break an import cycle
2023-02-23 23:55:53 +00:00
11 changed files with 387 additions and 38 deletions

1
changelog.d/15121.misc Normal file
View File

@@ -0,0 +1 @@
Route remote key requests via federation senders.

View File

@@ -100,7 +100,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
},
"federation_sender": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
@@ -345,7 +345,13 @@ def add_worker_roles_to_shared_config(
shared_config.setdefault("pusher_instances", []).append(worker_name)
elif worker_type == "federation_sender":
# Some outbound federation requests can be routed via federation senders,
# so federation senders need to be accessible by other workers.
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
elif worker_type == "event_persister":
# Event persisters write to the events stream, so we need to update

View File

@@ -87,6 +87,29 @@ process, for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.79.0
## Changes to federation sender config
_This notice only applies to deployments using multiple workers. Deployments
not using workers are not affected._
From Synapse 1.79, only [federation senders](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#federation_sender_instances
) will make outgoing key requests to homeservers and [trusted key servers](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#trusted_key_servers
). This will make it easier for server operators to reason about how Synapse
communicates with the wider federation. As a consequence, all other workers now
ask federation senders to fetch keys on their behalf.
To facilitate this,
- federation senders must now be present in the [instance map](
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#instance_map
), and
- federation senders must now run an [http listener](
https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#listeners
) which includes the `replication` resource.
# Upgrading to v1.78.0

View File

@@ -3811,7 +3811,7 @@ send_federation: false
### `federation_sender_instances`
It is possible to scale the processes that handle sending outbound federation requests
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding its [`worker_name`](#worker_name) to
a `federation_sender_instances` map. Doing so will remove handling of this function from
the main process. Multiple workers can be added to this map, in which case the work is
balanced across them.
@@ -3821,6 +3821,10 @@ sending, and if changed all federation sender workers must be stopped at the sam
and then started, to ensure that all instances are running with the same config (otherwise
events may be dropped).
Federation senders should have a replication [`http` listener](#listeners)
configured, and should be present in the [`instance_map`](#instance_map)
so that other workers can make internal http requests to the federation senders.
Example configuration for a single worker:
```yaml
federation_sender_instances:
@@ -3832,6 +3836,10 @@ federation_sender_instances:
- federation_sender1
- federation_sender2
```
_Changed in Synapse 1.79: Federation senders should now have an http listener
listening for `replication`, and should be present in the `instance_map`._
---
### `instance_map`

View File

@@ -590,10 +590,18 @@ It is likely this option will be deprecated in the future and not recommended fo
new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances).
Handles sending federation traffic to other servers. Doesn't handle any
REST endpoints itself, but you should set
client-facing REST endpoints itself, but you should set
[`send_federation: false`](usage/configuration/config_documentation.md#send_federation)
in the shared configuration file to stop the main synapse sending this traffic.
Federation senders should have a replication [`http` listener](
usage/configuration/config_documentation.md#listeners
) configured, and
should be present in the [`instance_map`](
usage/configuration/config_documentation.md#instance_map
) so that other workers can make internal
http requests to the federation senders.
If running multiple federation senders then you must list each
instance in the
[`federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances)
@@ -607,6 +615,13 @@ send_federation: false
federation_sender_instances:
- federation_sender1
- federation_sender2
instance_map:
- federation_sender1:
- host: localhost
- port: 1001
- federation_sender2:
- host: localhost
- port: 1002
```
An example for a federation sender instance:
@@ -615,6 +630,9 @@ An example for a federation sender instance:
{{#include systemd-with-workers/workers/federation_sender.yaml}}
```
_Changed in Synapse 1.79: Federation senders should now have an http listener
listening for `replication`, and should be present in the `instance_map`._
### `synapse.app.media_repository`
Handles the media repository. It can handle all endpoints starting with:

View File

@@ -174,7 +174,10 @@ class WorkerConfig(Config):
"synapse.app.federation_sender",
"federation_sender_instances",
)
self.send_federation = self.instance_name in federation_sender_instances
self.send_federation = (self.instance_name in federation_sender_instances) or (
not federation_sender_instances and self.instance_name == "master"
)
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)

View File

@@ -14,6 +14,7 @@
import abc
import logging
import random
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
import attr
@@ -40,10 +41,16 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
from synapse.config import ConfigError
from synapse.config.key import TrustedKeyServer
from synapse.crypto.types import _FetchKeyRequest
from synapse.events import EventBase
from synapse.events.utils import prune_event_dict
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.replication.http.keys import (
ReplicationFetchKeysEndpoint,
deserialise_fetch_key_result,
)
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import unwrapFirstError
@@ -123,25 +130,6 @@ class KeyLookupError(ValueError):
pass
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _FetchKeyRequest:
"""A request for keys for a given server.
We will continue to try and fetch until we have all the keys listed under
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
places to fetch keys from.
Attributes:
server_name: The name of the server that owns the keys.
minimum_valid_until_ts: The timestamp which the keys must be valid until.
key_ids: The IDs of the keys to attempt to fetch
"""
server_name: str
minimum_valid_until_ts: int
key_ids: List[str]
class Keyring:
"""Handles verifying signed JSON objects and fetching the keys needed to do
so.
@@ -153,14 +141,22 @@ class Keyring:
self.clock = hs.get_clock()
if key_fetchers is None:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Fetch keys from a configured Perspectives server.
PerspectivesKeyFetcher(hs),
# Fetch keys from the origin server directly.
ServerKeyFetcher(hs),
)
if hs.config.worker.send_federation:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Fetch keys from a configured Perspectives server.
PerspectivesKeyFetcher(hs),
# Fetch keys from the origin server directly.
ServerKeyFetcher(hs),
)
else:
key_fetchers = (
# Fetch keys from the database.
StoreKeyFetcher(hs),
# Ask a federation sender to fetch the keys for us.
InternalWorkerRequestKeyFetcher(hs),
)
self._key_fetchers = key_fetchers
self._fetch_keys_queue: BatchingQueue[
@@ -291,9 +287,7 @@ class Keyring:
minimum_valid_until_ts=verify_request.minimum_valid_until_ts,
key_ids=list(key_ids_to_find),
)
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
key_request, key=verify_request.server_name
)
found_keys_by_server = await self.fetch_keys(key_request)
# Since we batch up requests the returned set of keys may contain keys
# from other servers, so we pull out only the ones we care about.
@@ -320,6 +314,15 @@ class Keyring:
Codes.UNAUTHORIZED,
)
async def fetch_keys(
self, key_request: _FetchKeyRequest
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""Returns: {server name: {key id: fetch key result}}"""
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
key_request, key=key_request.server_name
)
return found_keys_by_server
async def _process_json(
self, verify_key: VerifyKey, verify_request: VerifyJsonRequest
) -> None:
@@ -469,6 +472,8 @@ class Keyring:
class KeyFetcher(metaclass=abc.ABCMeta):
"""Abstract gadget for fetching keys to validate other homeservers' signatures."""
def __init__(self, hs: "HomeServer"):
self._queue = BatchingQueue(
self.__class__.__name__, hs.get_clock(), self._fetch_keys
@@ -490,11 +495,15 @@ class KeyFetcher(metaclass=abc.ABCMeta):
async def _fetch_keys(
self, keys_to_fetch: List[_FetchKeyRequest]
) -> Dict[str, Dict[str, FetchKeyResult]]:
"""
Returns:
Map from server_name -> key_id -> FetchKeyResult
"""
pass
class StoreKeyFetcher(KeyFetcher):
"""KeyFetcher impl which fetches keys from our data store"""
"""Try to retrieve a previously-fetched key from the DB."""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
@@ -518,6 +527,8 @@ class StoreKeyFetcher(KeyFetcher):
class BaseV2KeyFetcher(KeyFetcher):
"""Abstract helper. Fetch keys by requesting it from some server."""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
@@ -620,7 +631,10 @@ class BaseV2KeyFetcher(KeyFetcher):
class PerspectivesKeyFetcher(BaseV2KeyFetcher):
"""KeyFetcher impl which fetches keys from the "perspectives" servers"""
"""Fetch keys for some homeserver X by requesting them from a trusted key server Y.
These trusted key servers were seemingly once known as "perspectives" servers.
"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
@@ -803,7 +817,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
class ServerKeyFetcher(BaseV2KeyFetcher):
"""KeyFetcher impl which fetches keys from the origin servers"""
"""Fetch keys for some homeserver X by requesting them directly from X."""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
@@ -903,3 +917,37 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
response_json=response,
time_added_ms=time_now_ms,
)
class InternalWorkerRequestKeyFetcher(KeyFetcher):
"""Ask a federation_sender worker to request keys for some homeserver X.
It may choose to do so via a notary or directly from X itself; we don't care.
"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._federation_shard_config = hs.config.worker.federation_shard_config
if not self._federation_shard_config.instances:
raise ConfigError("No federation senders configured")
self._client = ReplicationFetchKeysEndpoint.make_client(hs)
async def _fetch_keys(
self, keys_to_fetch: List[_FetchKeyRequest]
) -> Dict[str, Dict[str, FetchKeyResult]]:
# For simplicity's sake, pick a random federation sender
instance_name = random.choice(self._federation_shard_config.instances)
response = await self._client(
keys_to_fetch=keys_to_fetch,
instance_name=instance_name,
)
parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {}
for server_name, keys in response["server_keys"].items():
deserialised_keys = {
key_id: deserialise_fetch_key_result(key_id, verify_key)
for key_id, verify_key in keys.items()
}
parsed_response.setdefault(server_name, {}).update(deserialised_keys)
return parsed_response

35
synapse/crypto/types.py Normal file
View File

@@ -0,0 +1,35 @@
# Copyright 2023- The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import attr
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _FetchKeyRequest:
"""A request for keys for a given server.
We will continue to try and fetch until we have all the keys listed under
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
places to fetch keys from.
Attributes:
server_name: The name of the server that owns the keys.
minimum_valid_until_ts: The timestamp which the keys must be valid until.
key_ids: The IDs of the keys to attempt to fetch
"""
server_name: str
minimum_valid_until_ts: int
key_ids: List[str]

View File

@@ -19,6 +19,7 @@ from synapse.replication.http import (
account_data,
devices,
federation,
keys,
login,
membership,
presence,
@@ -52,6 +53,7 @@ class ReplicationRestResource(JsonResource):
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
state.register_servlets(hs, self)
keys.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:

View File

@@ -0,0 +1,123 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
import attr
from signedjson.key import decode_verify_key_bytes, encode_verify_key_base64
from unpaddedbase64 import decode_base64
from twisted.web.server import Request
from synapse.crypto.types import _FetchKeyRequest
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util.async_helpers import yieldable_gather_results
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__file__)
class ReplicationFetchKeysEndpoint(ReplicationEndpoint):
"""Another worker is asking us to fetch keys for a homeserver X.
The request looks like:
POST /_synapse/replication/fetch_keys
{
keys_to_fetch: [
{
"server_name": "example.com",
"minimum_valid_until_ts": 123456,
"key_ids": ["ABC", "DEF"]
}
]
}
We would normally return a group of FetchKeyResponse structs like the
normal code path does, but FetchKeyResponse holds a nacl.signing.VerifyKey
which is not JSON-serialisable. Instead, for each requested key we respond
with a boolean: `true` meaning we fetched this key, and `false` meaning we
didn't.
The response takes the form:
200 OK
{
"fetched_count": 1
}
"""
NAME = "fetch_keys"
PATH_ARGS = ()
METHOD = "POST"
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._keyring = hs.get_keyring()
async def _handle_request( # type: ignore[override]
self,
request: Request,
content: JsonDict,
) -> Tuple[int, JsonDict]:
parsed_requests = [
_FetchKeyRequest(**entry) for entry in content["keys_to_fetch"]
]
results: List[
Dict[str, Dict[str, FetchKeyResult]]
] = await yieldable_gather_results(
self._keyring.fetch_keys,
parsed_requests,
)
union_of_keys: Dict[str, Dict[str, JsonDict]] = {}
for result in results:
for server_name, keys in result.items():
serialised_keys = {
key_id: _serialise_fetch_key_result(verify_key)
for key_id, verify_key in keys.items()
}
union_of_keys.setdefault(server_name, {}).update(serialised_keys)
return 200, {"server_keys": union_of_keys}
@staticmethod
async def _serialize_payload(*, keys_to_fetch: List[_FetchKeyRequest]) -> JsonDict: # type: ignore[override]
return {"keys_to_fetch": [attr.asdict(key) for key in keys_to_fetch]}
def _serialise_fetch_key_result(result: FetchKeyResult) -> JsonDict:
return {
"verify_key": encode_verify_key_base64(result.verify_key),
"valid_until_ts": result.valid_until_ts,
}
def deserialise_fetch_key_result(key_id: str, data: JsonDict) -> FetchKeyResult:
return FetchKeyResult(
verify_key=decode_verify_key_bytes(key_id, decode_base64(data["verify_key"])),
valid_until_ts=data["valid_until_ts"],
)
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.worker.send_federation:
ReplicationFetchKeysEndpoint(hs).register(http_server)

View File

@@ -25,10 +25,12 @@ from signedjson.types import SigningKey, VerifyKey
from twisted.internet import defer
from twisted.internet.defer import Deferred, ensureDeferred
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import NoResource, Resource
from synapse.api.errors import SynapseError
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.crypto import keyring
from synapse.crypto.keyring import (
InternalWorkerRequestKeyFetcher,
PerspectivesKeyFetcher,
ServerKeyFetcher,
StoreKeyFetcher,
@@ -39,12 +41,15 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.rest.key.v2 import KeyResource
from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.httpresourcetree import create_resource_tree
from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.test_utils import make_awaitable
from tests.unittest import logcontext_clean, override_config
@@ -757,6 +762,83 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig")
class InternalWorkerRequestKeyFetcherTestCase(BaseMultiWorkerStreamTestCase):
def create_test_resource(self) -> Resource: # type: ignore[override]
return create_resource_tree(
{"/_matrix/key/v2": KeyResource(self.hs)}, root_resource=NoResource()
)
def default_config(self) -> Dict[str, Any]:
config = super().default_config()
config.update(
federation_sender_instances=["federation_sender1"],
instance_map={
"federation_sender1": {"host": "testserv", "port": 1001},
},
)
return config
def test_key_fetching_works_across_workers(self) -> None:
"""Test that a non-fed-sender worker requests keys via a fed-sender."""
mock_http_client = Mock()
# 1. Mock out the response from the notary server.
async def mock_post_json(*args: Any, **kwargs: Any) -> JsonDict:
"""Mock the request to the notary server."""
if kwargs.get("path") != "/_matrix/key/v2/query":
raise HttpResponseException(500, "ruh", b"roh")
return {"server_keys": []}
mock_http_client.post_json = mock_post_json
# 2. Build a valid response to /_matrix/key/v2/server for the server being
# queried.
SERVER_NAME = "server2"
testkey = signedjson.key.generate_signing_key("ver1")
testverifykey = signedjson.key.get_verify_key(testkey)
testverifykey_id = "ed25519:ver1"
VALID_UNTIL_TS = 200 * 1000
response = {
"server_name": SERVER_NAME,
"old_verify_keys": {},
"valid_until_ts": VALID_UNTIL_TS,
"verify_keys": {
testverifykey_id: {
"key": signedjson.key.encode_verify_key_base64(testverifykey)
}
},
}
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict:
if kwargs.get("path") != "/_matrix/key/v2/server":
raise HttpResponseException(500, "ruh", b"roh")
return response
mock_http_client.get_json = mock_get_json
# 3. Make a federation homeserver to actually make the request.
self.make_worker_hs(
"synapse.app.generic_worker",
{
"worker_name": "federation_sender1",
"federation_sender_instances": ["federation_sender1"],
},
federation_http_client=mock_http_client,
)
# 4. Use the via-fed-sender fetcher to get keys.
fetcher = InternalWorkerRequestKeyFetcher(self.hs)
keys = self.get_success(
fetcher.get_keys(SERVER_NAME, [testverifykey_id], 0), by=0.1
)
k = keys[testverifykey_id]
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
self.assertEqual(k.verify_key, testverifykey)
self.assertEqual(k.verify_key.alg, "ed25519")
self.assertEqual(k.verify_key.version, "ver1")
def get_key_id(key: SigningKey) -> str:
"""Get the matrix ID tag for a given SigningKey or VerifyKey"""
return "%s:%s" % (key.alg, key.version)