Fixes #19269 Versions of zope-interface from RHEL, Ubuntu LTS 22 & 24 and OpenSuse don't support the new python union `X | Y` syntax for interfaces. This PR partially reverts the change over to fully use the new syntax, adds a minimum supported version of zope-interface to Synapse's dependency list, and removes the linter auto-upgrades which prefer the newer syntax. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [X] Pull request is based on the develop branch * [X] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [X] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
204 lines
6.9 KiB
Python
204 lines
6.9 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from zope.interface import implementer
|
|
|
|
from twisted.internet import defer
|
|
from twisted.internet.endpoints import (
|
|
HostnameEndpoint,
|
|
UNIXClientEndpoint,
|
|
wrapClientTLS,
|
|
)
|
|
from twisted.internet.interfaces import IStreamClientEndpoint
|
|
from twisted.python.failure import Failure
|
|
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
|
|
from twisted.web.error import SchemeNotSupported
|
|
from twisted.web.http_headers import Headers
|
|
from twisted.web.iweb import (
|
|
IAgent,
|
|
IAgentEndpointFactory,
|
|
IBodyProducer,
|
|
IPolicyForHTTPS,
|
|
IResponse,
|
|
)
|
|
|
|
from synapse.config.workers import (
|
|
InstanceLocationConfig,
|
|
InstanceTcpLocationConfig,
|
|
InstanceUnixLocationConfig,
|
|
)
|
|
from synapse.types import ISynapseReactor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@implementer(IAgentEndpointFactory)
|
|
class ReplicationEndpointFactory:
|
|
"""Connect to a given TCP or UNIX socket"""
|
|
|
|
def __init__(
|
|
self,
|
|
reactor: ISynapseReactor,
|
|
instance_map: dict[str, InstanceLocationConfig],
|
|
context_factory: IPolicyForHTTPS,
|
|
) -> None:
|
|
self.reactor = reactor
|
|
self.instance_map = instance_map
|
|
self.context_factory = context_factory
|
|
|
|
def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
|
|
"""
|
|
This part of the factory decides what kind of endpoint is being connected to.
|
|
|
|
Args:
|
|
uri: The pre-parsed URI object containing all the uri data
|
|
|
|
Returns: The correct client endpoint object
|
|
"""
|
|
# The given URI has a special scheme and includes the worker name. The
|
|
# actual connection details are pulled from the instance map.
|
|
worker_name = uri.netloc.decode("utf-8")
|
|
location_config = self.instance_map[worker_name]
|
|
scheme = location_config.scheme()
|
|
|
|
if isinstance(location_config, InstanceTcpLocationConfig):
|
|
endpoint = HostnameEndpoint(
|
|
self.reactor,
|
|
location_config.host,
|
|
location_config.port,
|
|
)
|
|
if scheme == "https":
|
|
wrapped_endpoint = wrapClientTLS(
|
|
# The 'port' argument below isn't actually used by the function
|
|
self.context_factory.creatorForNetloc(
|
|
location_config.host.encode("utf-8"),
|
|
location_config.port,
|
|
),
|
|
endpoint,
|
|
)
|
|
return wrapped_endpoint
|
|
|
|
return endpoint
|
|
elif isinstance(location_config, InstanceUnixLocationConfig):
|
|
return UNIXClientEndpoint(self.reactor, location_config.path)
|
|
else:
|
|
raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
|
|
|
|
|
|
@implementer(IAgent)
|
|
class ReplicationAgent(_AgentBase):
|
|
"""
|
|
Client for connecting to replication endpoints via HTTP and HTTPS.
|
|
|
|
Much of this code is copied from Twisted's twisted.web.client.Agent.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
reactor: ISynapseReactor,
|
|
instance_map: dict[str, InstanceLocationConfig],
|
|
contextFactory: IPolicyForHTTPS,
|
|
connectTimeout: float | None = None,
|
|
bindAddress: bytes | None = None,
|
|
pool: HTTPConnectionPool | None = None,
|
|
):
|
|
"""
|
|
Create a ReplicationAgent.
|
|
|
|
Args:
|
|
reactor: A reactor for this Agent to place outgoing connections.
|
|
contextFactory: A factory for TLS contexts, to control the
|
|
verification parameters of OpenSSL. The default is to use a
|
|
BrowserLikePolicyForHTTPS, so unless you have special
|
|
requirements you can leave this as-is.
|
|
connectTimeout: The amount of time that this Agent will wait
|
|
for the peer to accept a connection.
|
|
bindAddress: The local address for client sockets to bind to.
|
|
pool: An HTTPConnectionPool instance, or None, in which
|
|
case a non-persistent HTTPConnectionPool instance will be
|
|
created.
|
|
"""
|
|
_AgentBase.__init__(self, reactor, pool)
|
|
endpoint_factory = ReplicationEndpointFactory(
|
|
reactor, instance_map, contextFactory
|
|
)
|
|
self._endpointFactory = endpoint_factory
|
|
|
|
def request(
|
|
self,
|
|
method: bytes,
|
|
uri: bytes,
|
|
headers: Headers | None = None,
|
|
bodyProducer: Optional[IBodyProducer] = None,
|
|
) -> "defer.Deferred[IResponse]":
|
|
"""
|
|
Issue a request to the server indicated by the given uri.
|
|
|
|
An existing connection from the connection pool may be used or a new
|
|
one may be created.
|
|
|
|
Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
|
|
|
|
This is copied from twisted.web.client.Agent, except:
|
|
|
|
* It uses a different pool key (combining the scheme with either host & port or
|
|
socket path).
|
|
* It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
|
|
required when using a worker's name as a 'hostname' for Synapse HTTP
|
|
Replication machinery. Specifically, this allows a range of ascii characters
|
|
such as '+' and '_' in hostnames/worker's names.
|
|
|
|
See: twisted.web.iweb.IAgent.request
|
|
"""
|
|
parsedURI = URI.fromBytes(uri)
|
|
try:
|
|
endpoint = self._endpointFactory.endpointForURI(parsedURI)
|
|
except SchemeNotSupported:
|
|
return defer.fail(Failure())
|
|
|
|
worker_name = parsedURI.netloc.decode("utf-8")
|
|
key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
|
|
key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
|
|
# Build a connection pool key.
|
|
#
|
|
# `_AgentBase` expects this to be a three-tuple of `(scheme, host,
|
|
# port)` of type `bytes`. We don't have a real port when connecting via
|
|
# a Unix socket, so use `0`.
|
|
key = (
|
|
key_scheme.encode("ascii"),
|
|
key_netloc.encode("utf-8"),
|
|
0,
|
|
)
|
|
|
|
# _requestWithEndpoint comes from _AgentBase class
|
|
return self._requestWithEndpoint(
|
|
key,
|
|
endpoint,
|
|
method,
|
|
parsedURI,
|
|
headers,
|
|
bodyProducer,
|
|
parsedURI.originForm,
|
|
)
|