Compare commits
12 Commits
v1.140.0rc
...
dmr/key-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4517d78f6 | ||
|
|
3452c2a23d | ||
|
|
5fc4155d2d | ||
|
|
15b357e6cd | ||
|
|
4e3d2e8b69 | ||
|
|
3a9aa533d1 | ||
|
|
bab3b58f7a | ||
|
|
8c5067609c | ||
|
|
cdec54468a | ||
|
|
0919513c3a | ||
|
|
e0841c5d3f | ||
|
|
9a7f925a84 |
1
changelog.d/15121.misc
Normal file
1
changelog.d/15121.misc
Normal file
@@ -0,0 +1 @@
|
||||
Route remote key requests via federation senders.
|
||||
@@ -100,7 +100,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||
},
|
||||
"federation_sender": {
|
||||
"app": "synapse.app.generic_worker",
|
||||
"listener_resources": [],
|
||||
"listener_resources": ["replication"],
|
||||
"endpoint_patterns": [],
|
||||
"shared_extra_conf": {},
|
||||
"worker_extra_conf": "",
|
||||
@@ -345,7 +345,13 @@ def add_worker_roles_to_shared_config(
|
||||
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
||||
|
||||
elif worker_type == "federation_sender":
|
||||
# Some outbound federation requests can be routed via federation senders,
|
||||
# so federation senders need to be accessible by other workers.
|
||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||
instance_map[worker_name] = {
|
||||
"host": "localhost",
|
||||
"port": worker_port,
|
||||
}
|
||||
|
||||
elif worker_type == "event_persister":
|
||||
# Event persisters write to the events stream, so we need to update
|
||||
|
||||
@@ -87,6 +87,29 @@ process, for example:
|
||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
# Upgrading to v1.79.0
|
||||
|
||||
## Changes to federation sender config
|
||||
|
||||
_This notice only applies to deployments using multiple workers. Deployments
|
||||
not using workers are not affected._
|
||||
|
||||
From Synapse 1.79, only [federation senders](
|
||||
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#federation_sender_instances
|
||||
) will make outgoing key requests to homeservers and [trusted key servers](
|
||||
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#trusted_key_servers
|
||||
). This will make it easier for server operators to reason about how Synapse
|
||||
communicates with the wider federation. As a consequence, all other workers now
|
||||
ask federation senders to fetch keys on their behalf.
|
||||
|
||||
To facilitate this,
|
||||
|
||||
- federation senders must now be present in the [instance map](
|
||||
https://matrix-org.github.io/synapse/release-v1.79/usage/configuration/config_documentation.html#instance_map
|
||||
), and
|
||||
- federation senders must now run an [http listener](
|
||||
https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#listeners
|
||||
) which includes the `replication` resource.
|
||||
|
||||
# Upgrading to v1.78.0
|
||||
|
||||
|
||||
@@ -3811,7 +3811,7 @@ send_federation: false
|
||||
### `federation_sender_instances`
|
||||
|
||||
It is possible to scale the processes that handle sending outbound federation requests
|
||||
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
|
||||
by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding its [`worker_name`](#worker_name) to
|
||||
a `federation_sender_instances` map. Doing so will remove handling of this function from
|
||||
the main process. Multiple workers can be added to this map, in which case the work is
|
||||
balanced across them.
|
||||
@@ -3821,6 +3821,10 @@ sending, and if changed all federation sender workers must be stopped at the sam
|
||||
and then started, to ensure that all instances are running with the same config (otherwise
|
||||
events may be dropped).
|
||||
|
||||
Federation senders should have a replication [`http` listener](#listeners)
|
||||
configured, and should be present in the [`instance_map`](#instance_map)
|
||||
so that other workers can make internal http requests to the federation senders.
|
||||
|
||||
Example configuration for a single worker:
|
||||
```yaml
|
||||
federation_sender_instances:
|
||||
@@ -3832,6 +3836,10 @@ federation_sender_instances:
|
||||
- federation_sender1
|
||||
- federation_sender2
|
||||
```
|
||||
|
||||
_Changed in Synapse 1.79: Federation senders should now have an http listener
|
||||
listening for `replication`, and should be present in the `instance_map`._
|
||||
|
||||
---
|
||||
### `instance_map`
|
||||
|
||||
|
||||
@@ -590,10 +590,18 @@ It is likely this option will be deprecated in the future and not recommended fo
|
||||
new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances).
|
||||
|
||||
Handles sending federation traffic to other servers. Doesn't handle any
|
||||
REST endpoints itself, but you should set
|
||||
client-facing REST endpoints itself, but you should set
|
||||
[`send_federation: false`](usage/configuration/config_documentation.md#send_federation)
|
||||
in the shared configuration file to stop the main synapse sending this traffic.
|
||||
|
||||
Federation senders should have a replication [`http` listener](
|
||||
usage/configuration/config_documentation.md#listeners
|
||||
) configured, and
|
||||
should be present in the [`instance_map`](
|
||||
usage/configuration/config_documentation.md#instance_map
|
||||
) so that other workers can make internal
|
||||
http requests to the federation senders.
|
||||
|
||||
If running multiple federation senders then you must list each
|
||||
instance in the
|
||||
[`federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances)
|
||||
@@ -607,6 +615,13 @@ send_federation: false
|
||||
federation_sender_instances:
|
||||
- federation_sender1
|
||||
- federation_sender2
|
||||
instance_map:
|
||||
- federation_sender1:
|
||||
- host: localhost
|
||||
- port: 1001
|
||||
- federation_sender2:
|
||||
- host: localhost
|
||||
- port: 1002
|
||||
```
|
||||
|
||||
An example for a federation sender instance:
|
||||
@@ -615,6 +630,9 @@ An example for a federation sender instance:
|
||||
{{#include systemd-with-workers/workers/federation_sender.yaml}}
|
||||
```
|
||||
|
||||
_Changed in Synapse 1.79: Federation senders should now have an http listener
|
||||
listening for `replication`, and should be present in the `instance_map`._
|
||||
|
||||
### `synapse.app.media_repository`
|
||||
|
||||
Handles the media repository. It can handle all endpoints starting with:
|
||||
|
||||
@@ -174,7 +174,10 @@ class WorkerConfig(Config):
|
||||
"synapse.app.federation_sender",
|
||||
"federation_sender_instances",
|
||||
)
|
||||
self.send_federation = self.instance_name in federation_sender_instances
|
||||
self.send_federation = (self.instance_name in federation_sender_instances) or (
|
||||
not federation_sender_instances and self.instance_name == "master"
|
||||
)
|
||||
|
||||
self.federation_shard_config = ShardedWorkerHandlingConfig(
|
||||
federation_sender_instances
|
||||
)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import random
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
@@ -40,10 +41,16 @@ from synapse.api.errors import (
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.config import ConfigError
|
||||
from synapse.config.key import TrustedKeyServer
|
||||
from synapse.crypto.types import _FetchKeyRequest
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.replication.http.keys import (
|
||||
ReplicationFetchKeysEndpoint,
|
||||
deserialise_fetch_key_result,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import unwrapFirstError
|
||||
@@ -123,25 +130,6 @@ class KeyLookupError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _FetchKeyRequest:
|
||||
"""A request for keys for a given server.
|
||||
|
||||
We will continue to try and fetch until we have all the keys listed under
|
||||
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
|
||||
places to fetch keys from.
|
||||
|
||||
Attributes:
|
||||
server_name: The name of the server that owns the keys.
|
||||
minimum_valid_until_ts: The timestamp which the keys must be valid until.
|
||||
key_ids: The IDs of the keys to attempt to fetch
|
||||
"""
|
||||
|
||||
server_name: str
|
||||
minimum_valid_until_ts: int
|
||||
key_ids: List[str]
|
||||
|
||||
|
||||
class Keyring:
|
||||
"""Handles verifying signed JSON objects and fetching the keys needed to do
|
||||
so.
|
||||
@@ -153,14 +141,22 @@ class Keyring:
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
if key_fetchers is None:
|
||||
key_fetchers = (
|
||||
# Fetch keys from the database.
|
||||
StoreKeyFetcher(hs),
|
||||
# Fetch keys from a configured Perspectives server.
|
||||
PerspectivesKeyFetcher(hs),
|
||||
# Fetch keys from the origin server directly.
|
||||
ServerKeyFetcher(hs),
|
||||
)
|
||||
if hs.config.worker.send_federation:
|
||||
key_fetchers = (
|
||||
# Fetch keys from the database.
|
||||
StoreKeyFetcher(hs),
|
||||
# Fetch keys from a configured Perspectives server.
|
||||
PerspectivesKeyFetcher(hs),
|
||||
# Fetch keys from the origin server directly.
|
||||
ServerKeyFetcher(hs),
|
||||
)
|
||||
else:
|
||||
key_fetchers = (
|
||||
# Fetch keys from the database.
|
||||
StoreKeyFetcher(hs),
|
||||
# Ask a federation sender to fetch the keys for us.
|
||||
InternalWorkerRequestKeyFetcher(hs),
|
||||
)
|
||||
self._key_fetchers = key_fetchers
|
||||
|
||||
self._fetch_keys_queue: BatchingQueue[
|
||||
@@ -291,9 +287,7 @@ class Keyring:
|
||||
minimum_valid_until_ts=verify_request.minimum_valid_until_ts,
|
||||
key_ids=list(key_ids_to_find),
|
||||
)
|
||||
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
|
||||
key_request, key=verify_request.server_name
|
||||
)
|
||||
found_keys_by_server = await self.fetch_keys(key_request)
|
||||
|
||||
# Since we batch up requests the returned set of keys may contain keys
|
||||
# from other servers, so we pull out only the ones we care about.
|
||||
@@ -320,6 +314,15 @@ class Keyring:
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
async def fetch_keys(
|
||||
self, key_request: _FetchKeyRequest
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""Returns: {server name: {key id: fetch key result}}"""
|
||||
found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
|
||||
key_request, key=key_request.server_name
|
||||
)
|
||||
return found_keys_by_server
|
||||
|
||||
async def _process_json(
|
||||
self, verify_key: VerifyKey, verify_request: VerifyJsonRequest
|
||||
) -> None:
|
||||
@@ -469,6 +472,8 @@ class Keyring:
|
||||
|
||||
|
||||
class KeyFetcher(metaclass=abc.ABCMeta):
|
||||
"""Abstract gadget for fetching keys to validate other homeservers' signatures."""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._queue = BatchingQueue(
|
||||
self.__class__.__name__, hs.get_clock(), self._fetch_keys
|
||||
@@ -490,11 +495,15 @@ class KeyFetcher(metaclass=abc.ABCMeta):
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_FetchKeyRequest]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
"""
|
||||
Returns:
|
||||
Map from server_name -> key_id -> FetchKeyResult
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class StoreKeyFetcher(KeyFetcher):
|
||||
"""KeyFetcher impl which fetches keys from our data store"""
|
||||
"""Try to retrieve a previously-fetched key from the DB."""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
@@ -518,6 +527,8 @@ class StoreKeyFetcher(KeyFetcher):
|
||||
|
||||
|
||||
class BaseV2KeyFetcher(KeyFetcher):
|
||||
"""Abstract helper. Fetch keys by requesting it from some server."""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
@@ -620,7 +631,10 @@ class BaseV2KeyFetcher(KeyFetcher):
|
||||
|
||||
|
||||
class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
"""KeyFetcher impl which fetches keys from the "perspectives" servers"""
|
||||
"""Fetch keys for some homeserver X by requesting them from a trusted key server Y.
|
||||
|
||||
These trusted key servers were seemingly once known as "perspectives" servers.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
@@ -803,7 +817,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
|
||||
|
||||
class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
"""KeyFetcher impl which fetches keys from the origin servers"""
|
||||
"""Fetch keys for some homeserver X by requesting them directly from X."""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
@@ -903,3 +917,37 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
response_json=response,
|
||||
time_added_ms=time_now_ms,
|
||||
)
|
||||
|
||||
|
||||
class InternalWorkerRequestKeyFetcher(KeyFetcher):
|
||||
"""Ask a federation_sender worker to request keys for some homeserver X.
|
||||
|
||||
It may choose to do so via a notary or directly from X itself; we don't care.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self._federation_shard_config = hs.config.worker.federation_shard_config
|
||||
if not self._federation_shard_config.instances:
|
||||
raise ConfigError("No federation senders configured")
|
||||
self._client = ReplicationFetchKeysEndpoint.make_client(hs)
|
||||
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_FetchKeyRequest]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
# For simplicity's sake, pick a random federation sender
|
||||
instance_name = random.choice(self._federation_shard_config.instances)
|
||||
response = await self._client(
|
||||
keys_to_fetch=keys_to_fetch,
|
||||
instance_name=instance_name,
|
||||
)
|
||||
|
||||
parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {}
|
||||
for server_name, keys in response["server_keys"].items():
|
||||
deserialised_keys = {
|
||||
key_id: deserialise_fetch_key_result(key_id, verify_key)
|
||||
for key_id, verify_key in keys.items()
|
||||
}
|
||||
parsed_response.setdefault(server_name, {}).update(deserialised_keys)
|
||||
|
||||
return parsed_response
|
||||
|
||||
35
synapse/crypto/types.py
Normal file
35
synapse/crypto/types.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# Copyright 2023- The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import List
|
||||
|
||||
import attr
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class _FetchKeyRequest:
|
||||
"""A request for keys for a given server.
|
||||
|
||||
We will continue to try and fetch until we have all the keys listed under
|
||||
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
|
||||
places to fetch keys from.
|
||||
|
||||
Attributes:
|
||||
server_name: The name of the server that owns the keys.
|
||||
minimum_valid_until_ts: The timestamp which the keys must be valid until.
|
||||
key_ids: The IDs of the keys to attempt to fetch
|
||||
"""
|
||||
|
||||
server_name: str
|
||||
minimum_valid_until_ts: int
|
||||
key_ids: List[str]
|
||||
@@ -19,6 +19,7 @@ from synapse.replication.http import (
|
||||
account_data,
|
||||
devices,
|
||||
federation,
|
||||
keys,
|
||||
login,
|
||||
membership,
|
||||
presence,
|
||||
@@ -52,6 +53,7 @@ class ReplicationRestResource(JsonResource):
|
||||
account_data.register_servlets(hs, self)
|
||||
push.register_servlets(hs, self)
|
||||
state.register_servlets(hs, self)
|
||||
keys.register_servlets(hs, self)
|
||||
|
||||
# The following can't currently be instantiated on workers.
|
||||
if hs.config.worker.worker_app is None:
|
||||
|
||||
123
synapse/replication/http/keys.py
Normal file
123
synapse/replication/http/keys.py
Normal file
@@ -0,0 +1,123 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, List, Tuple
|
||||
|
||||
import attr
|
||||
from signedjson.key import decode_verify_key_bytes, encode_verify_key_base64
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.crypto.types import _FetchKeyRequest
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.replication.http._base import ReplicationEndpoint
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.async_helpers import yieldable_gather_results
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
|
||||
class ReplicationFetchKeysEndpoint(ReplicationEndpoint):
|
||||
"""Another worker is asking us to fetch keys for a homeserver X.
|
||||
|
||||
The request looks like:
|
||||
|
||||
POST /_synapse/replication/fetch_keys
|
||||
{
|
||||
keys_to_fetch: [
|
||||
{
|
||||
"server_name": "example.com",
|
||||
"minimum_valid_until_ts": 123456,
|
||||
"key_ids": ["ABC", "DEF"]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
We would normally return a group of FetchKeyResponse structs like the
|
||||
normal code path does, but FetchKeyResponse holds a nacl.signing.VerifyKey
|
||||
which is not JSON-serialisable. Instead, for each requested key we respond
|
||||
with a boolean: `true` meaning we fetched this key, and `false` meaning we
|
||||
didn't.
|
||||
|
||||
The response takes the form:
|
||||
|
||||
200 OK
|
||||
{
|
||||
"fetched_count": 1
|
||||
}
|
||||
"""
|
||||
|
||||
NAME = "fetch_keys"
|
||||
PATH_ARGS = ()
|
||||
METHOD = "POST"
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
self._keyring = hs.get_keyring()
|
||||
|
||||
async def _handle_request( # type: ignore[override]
|
||||
self,
|
||||
request: Request,
|
||||
content: JsonDict,
|
||||
) -> Tuple[int, JsonDict]:
|
||||
parsed_requests = [
|
||||
_FetchKeyRequest(**entry) for entry in content["keys_to_fetch"]
|
||||
]
|
||||
|
||||
results: List[
|
||||
Dict[str, Dict[str, FetchKeyResult]]
|
||||
] = await yieldable_gather_results(
|
||||
self._keyring.fetch_keys,
|
||||
parsed_requests,
|
||||
)
|
||||
|
||||
union_of_keys: Dict[str, Dict[str, JsonDict]] = {}
|
||||
for result in results:
|
||||
for server_name, keys in result.items():
|
||||
serialised_keys = {
|
||||
key_id: _serialise_fetch_key_result(verify_key)
|
||||
for key_id, verify_key in keys.items()
|
||||
}
|
||||
union_of_keys.setdefault(server_name, {}).update(serialised_keys)
|
||||
|
||||
return 200, {"server_keys": union_of_keys}
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(*, keys_to_fetch: List[_FetchKeyRequest]) -> JsonDict: # type: ignore[override]
|
||||
return {"keys_to_fetch": [attr.asdict(key) for key in keys_to_fetch]}
|
||||
|
||||
|
||||
def _serialise_fetch_key_result(result: FetchKeyResult) -> JsonDict:
|
||||
return {
|
||||
"verify_key": encode_verify_key_base64(result.verify_key),
|
||||
"valid_until_ts": result.valid_until_ts,
|
||||
}
|
||||
|
||||
|
||||
def deserialise_fetch_key_result(key_id: str, data: JsonDict) -> FetchKeyResult:
|
||||
return FetchKeyResult(
|
||||
verify_key=decode_verify_key_bytes(key_id, decode_base64(data["verify_key"])),
|
||||
valid_until_ts=data["valid_until_ts"],
|
||||
)
|
||||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
if hs.config.worker.send_federation:
|
||||
ReplicationFetchKeysEndpoint(hs).register(http_server)
|
||||
@@ -25,10 +25,12 @@ from signedjson.types import SigningKey, VerifyKey
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred, ensureDeferred
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
from twisted.web.resource import NoResource, Resource
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.crypto import keyring
|
||||
from synapse.crypto.keyring import (
|
||||
InternalWorkerRequestKeyFetcher,
|
||||
PerspectivesKeyFetcher,
|
||||
ServerKeyFetcher,
|
||||
StoreKeyFetcher,
|
||||
@@ -39,12 +41,15 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.rest.key.v2 import KeyResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
|
||||
from tests import unittest
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
from tests.test_utils import make_awaitable
|
||||
from tests.unittest import logcontext_clean, override_config
|
||||
|
||||
@@ -757,6 +762,83 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig")
|
||||
|
||||
|
||||
class InternalWorkerRequestKeyFetcherTestCase(BaseMultiWorkerStreamTestCase):
|
||||
def create_test_resource(self) -> Resource: # type: ignore[override]
|
||||
return create_resource_tree(
|
||||
{"/_matrix/key/v2": KeyResource(self.hs)}, root_resource=NoResource()
|
||||
)
|
||||
|
||||
def default_config(self) -> Dict[str, Any]:
|
||||
config = super().default_config()
|
||||
config.update(
|
||||
federation_sender_instances=["federation_sender1"],
|
||||
instance_map={
|
||||
"federation_sender1": {"host": "testserv", "port": 1001},
|
||||
},
|
||||
)
|
||||
return config
|
||||
|
||||
def test_key_fetching_works_across_workers(self) -> None:
|
||||
"""Test that a non-fed-sender worker requests keys via a fed-sender."""
|
||||
mock_http_client = Mock()
|
||||
|
||||
# 1. Mock out the response from the notary server.
|
||||
async def mock_post_json(*args: Any, **kwargs: Any) -> JsonDict:
|
||||
"""Mock the request to the notary server."""
|
||||
if kwargs.get("path") != "/_matrix/key/v2/query":
|
||||
raise HttpResponseException(500, "ruh", b"roh")
|
||||
return {"server_keys": []}
|
||||
|
||||
mock_http_client.post_json = mock_post_json
|
||||
|
||||
# 2. Build a valid response to /_matrix/key/v2/server for the server being
|
||||
# queried.
|
||||
SERVER_NAME = "server2"
|
||||
testkey = signedjson.key.generate_signing_key("ver1")
|
||||
testverifykey = signedjson.key.get_verify_key(testkey)
|
||||
testverifykey_id = "ed25519:ver1"
|
||||
VALID_UNTIL_TS = 200 * 1000
|
||||
response = {
|
||||
"server_name": SERVER_NAME,
|
||||
"old_verify_keys": {},
|
||||
"valid_until_ts": VALID_UNTIL_TS,
|
||||
"verify_keys": {
|
||||
testverifykey_id: {
|
||||
"key": signedjson.key.encode_verify_key_base64(testverifykey)
|
||||
}
|
||||
},
|
||||
}
|
||||
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
|
||||
|
||||
async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict:
|
||||
if kwargs.get("path") != "/_matrix/key/v2/server":
|
||||
raise HttpResponseException(500, "ruh", b"roh")
|
||||
return response
|
||||
|
||||
mock_http_client.get_json = mock_get_json
|
||||
|
||||
# 3. Make a federation homeserver to actually make the request.
|
||||
self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
{
|
||||
"worker_name": "federation_sender1",
|
||||
"federation_sender_instances": ["federation_sender1"],
|
||||
},
|
||||
federation_http_client=mock_http_client,
|
||||
)
|
||||
|
||||
# 4. Use the via-fed-sender fetcher to get keys.
|
||||
fetcher = InternalWorkerRequestKeyFetcher(self.hs)
|
||||
keys = self.get_success(
|
||||
fetcher.get_keys(SERVER_NAME, [testverifykey_id], 0), by=0.1
|
||||
)
|
||||
k = keys[testverifykey_id]
|
||||
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
self.assertEqual(k.verify_key.alg, "ed25519")
|
||||
self.assertEqual(k.verify_key.version, "ver1")
|
||||
|
||||
|
||||
def get_key_id(key: SigningKey) -> str:
|
||||
"""Get the matrix ID tag for a given SigningKey or VerifyKey"""
|
||||
return "%s:%s" % (key.alg, key.version)
|
||||
|
||||
Reference in New Issue
Block a user