Add new KeyFetcher impl
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import random
|
||||
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
@@ -45,6 +46,10 @@ from synapse.crypto.types import _FetchKeyRequest
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import prune_event_dict
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.replication.http.keys import (
|
||||
ReplicationFetchKeysEndpoint,
|
||||
deserialise_fetch_key_result,
|
||||
)
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import unwrapFirstError
|
||||
@@ -892,3 +897,35 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
response_json=response,
|
||||
time_added_ms=time_now_ms,
|
||||
)
|
||||
|
||||
|
||||
class InternalWorkerRequestKeyFetcher(KeyFetcher):
|
||||
"""Ask a federation_sender worker to request keys for some homeserver X.
|
||||
|
||||
It may choose to do so via a notary or directly from X itself; we don't care.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self._federation_shard_config = hs.config.worker.federation_shard_config
|
||||
self._client = ReplicationFetchKeysEndpoint.make_client(hs)
|
||||
|
||||
async def _fetch_keys(
|
||||
self, keys_to_fetch: List[_FetchKeyRequest]
|
||||
) -> Dict[str, Dict[str, FetchKeyResult]]:
|
||||
# For simplicity's sake, pick a random federation sender
|
||||
instance_name = random.choice(self._federation_shard_config.instances)
|
||||
response = await self._client(
|
||||
keys_to_fetch=keys_to_fetch,
|
||||
instance_name=instance_name,
|
||||
)
|
||||
|
||||
parsed_response: Dict[str, Dict[str, FetchKeyResult]] = {}
|
||||
for server_name, keys in response["server_keys"].items():
|
||||
deserialised_keys = {
|
||||
key_id: deserialise_fetch_key_result(key_id, verify_key)
|
||||
for key_id, verify_key in keys.items()
|
||||
}
|
||||
parsed_response.setdefault(server_name, {}).update(deserialised_keys)
|
||||
|
||||
return parsed_response
|
||||
|
||||
Reference in New Issue
Block a user