Remove the TCP protocol implementation.
This commit is contained in:
@@ -41,7 +41,7 @@ The general interaction of the classes are:
|
||||
v |
|
||||
+-------------+
|
||||
| Protocols |
|
||||
| (TCP/redis) |
|
||||
| (Redis) |
|
||||
+-------------+
|
||||
|
||||
Where the ReplicationDataHandler (or subclasses) handles incoming stream
|
||||
|
||||
@@ -11,82 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""This module contains the implementation of both the client and server
|
||||
protocols.
|
||||
|
||||
The basic structure of the protocol is line based, where the initial word of
|
||||
each line specifies the command. The rest of the line is parsed based on the
|
||||
command. For example, the `RDATA` command is defined as::
|
||||
|
||||
RDATA <stream_name> <token> <row_json>
|
||||
|
||||
(Note that `<row_json>` may contains spaces, but cannot contain newlines.)
|
||||
|
||||
Blank lines are ignored.
|
||||
|
||||
# Example
|
||||
|
||||
An example iteraction is shown below. Each line is prefixed with '>' or '<' to
|
||||
indicate which side is sending, these are *not* included on the wire::
|
||||
|
||||
* connection established *
|
||||
> SERVER localhost:8823
|
||||
> PING 1490197665618
|
||||
< NAME synapse.app.appservice
|
||||
< PING 1490197665618
|
||||
< REPLICATE
|
||||
> POSITION events 1
|
||||
> POSITION backfill 1
|
||||
> POSITION caches 1
|
||||
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
|
||||
> RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
|
||||
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
|
||||
< PING 1490197675618
|
||||
> ERROR server stopping
|
||||
* connection closed by server *
|
||||
"""
|
||||
import fcntl
|
||||
import logging
|
||||
import struct
|
||||
from inspect import isawaitable
|
||||
from typing import TYPE_CHECKING, Any, Collection, List, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
from zope.interface import Interface, implementer
|
||||
from zope.interface import Interface
|
||||
|
||||
from twisted.internet import task
|
||||
from twisted.internet.tcp import Connection
|
||||
from twisted.protocols.basic import LineOnlyReceiver
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.logging.context import PreserveLoggingContext
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
BackgroundProcessLoggingContext,
|
||||
run_as_background_process,
|
||||
)
|
||||
from synapse.replication.tcp.commands import (
|
||||
VALID_CLIENT_COMMANDS,
|
||||
VALID_SERVER_COMMANDS,
|
||||
Command,
|
||||
ErrorCommand,
|
||||
NameCommand,
|
||||
PingCommand,
|
||||
ReplicateCommand,
|
||||
ServerCommand,
|
||||
parse_command_from_line,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
connection_close_counter = Counter(
|
||||
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
|
||||
)
|
||||
from synapse.replication.tcp.commands import Command
|
||||
|
||||
tcp_inbound_commands_counter = Counter(
|
||||
"synapse_replication_tcp_protocol_inbound_commands",
|
||||
@@ -100,484 +28,9 @@ tcp_outbound_commands_counter = Counter(
|
||||
["command", "name"],
|
||||
)
|
||||
|
||||
# A list of all connected protocols. This allows us to send metrics about the
|
||||
# connections.
|
||||
connected_connections: "List[BaseReplicationStreamProtocol]" = []
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
PING_TIME = 5000
|
||||
PING_TIMEOUT_MULTIPLIER = 5
|
||||
PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER
|
||||
|
||||
|
||||
class ConnectionStates:
|
||||
CONNECTING = "connecting"
|
||||
ESTABLISHED = "established"
|
||||
PAUSED = "paused"
|
||||
CLOSED = "closed"
|
||||
|
||||
|
||||
class IReplicationConnection(Interface):
|
||||
"""An interface for replication connections."""
|
||||
|
||||
def send_command(cmd: Command) -> None:
|
||||
"""Send the command down the connection"""
|
||||
|
||||
|
||||
@implementer(IReplicationConnection)
|
||||
class BaseReplicationStreamProtocol(LineOnlyReceiver):
|
||||
"""Base replication protocol shared between client and server.
|
||||
|
||||
Reads lines (ignoring blank ones) and parses them into command classes,
|
||||
asserting that they are valid for the given direction, i.e. server commands
|
||||
are only sent by the server.
|
||||
|
||||
On receiving a new command it calls `on_<COMMAND_NAME>` with the parsed
|
||||
command before delegating to `ReplicationCommandHandler.on_<COMMAND_NAME>`.
|
||||
`ReplicationCommandHandler.on_<COMMAND_NAME>` can optionally return a coroutine;
|
||||
if so, that will get run as a background process.
|
||||
|
||||
It also sends `PING` periodically, and correctly times out remote connections
|
||||
(if they send a `PING` command)
|
||||
"""
|
||||
|
||||
# The transport is going to be an ITCPTransport, but that doesn't have the
|
||||
# (un)registerProducer methods, those are only on the implementation.
|
||||
transport: Connection
|
||||
|
||||
delimiter = b"\n"
|
||||
|
||||
# Valid commands we expect to receive
|
||||
VALID_INBOUND_COMMANDS: Collection[str] = []
|
||||
|
||||
# Valid commands we can send
|
||||
VALID_OUTBOUND_COMMANDS: Collection[str] = []
|
||||
|
||||
max_line_buffer = 10000
|
||||
|
||||
def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"):
|
||||
self.clock = clock
|
||||
self.command_handler = handler
|
||||
|
||||
self.last_received_command = self.clock.time_msec()
|
||||
self.last_sent_command = 0
|
||||
# When we requested the connection be closed
|
||||
self.time_we_closed: Optional[int] = None
|
||||
|
||||
self.received_ping = False # Have we received a ping from the other side
|
||||
|
||||
self.state = ConnectionStates.CONNECTING
|
||||
|
||||
self.name = "anon" # The name sent by a client.
|
||||
self.conn_id = random_string(5) # To dedupe in case of name clashes.
|
||||
|
||||
# List of pending commands to send once we've established the connection
|
||||
self.pending_commands: List[Command] = []
|
||||
|
||||
# The LoopingCall for sending pings.
|
||||
self._send_ping_loop: Optional[task.LoopingCall] = None
|
||||
|
||||
# a logcontext which we use for processing incoming commands. We declare it as a
|
||||
# background process so that the CPU stats get reported to prometheus.
|
||||
with PreserveLoggingContext():
|
||||
# thanks to `PreserveLoggingContext()`, the new logcontext is guaranteed to
|
||||
# capture the sentinel context as its containing context and won't prevent
|
||||
# GC of / unintentionally reactivate what would be the current context.
|
||||
self._logging_context = BackgroundProcessLoggingContext(
|
||||
"replication-conn", self.conn_id
|
||||
)
|
||||
|
||||
def connectionMade(self) -> None:
|
||||
logger.info("[%s] Connection established", self.id())
|
||||
|
||||
self.state = ConnectionStates.ESTABLISHED
|
||||
|
||||
connected_connections.append(self) # Register connection for metrics
|
||||
|
||||
assert self.transport is not None
|
||||
self.transport.registerProducer(self, True) # For the *Producing callbacks
|
||||
|
||||
self._send_pending_commands()
|
||||
|
||||
# Starts sending pings
|
||||
self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000)
|
||||
|
||||
# Always send the initial PING so that the other side knows that they
|
||||
# can time us out.
|
||||
self.send_command(PingCommand(str(self.clock.time_msec())))
|
||||
|
||||
self.command_handler.new_connection(self)
|
||||
|
||||
def send_ping(self) -> None:
|
||||
"""Periodically sends a ping and checks if we should close the connection
|
||||
due to the other side timing out.
|
||||
"""
|
||||
now = self.clock.time_msec()
|
||||
|
||||
if self.time_we_closed:
|
||||
if now - self.time_we_closed > PING_TIMEOUT_MS:
|
||||
logger.info(
|
||||
"[%s] Failed to close connection gracefully, aborting", self.id()
|
||||
)
|
||||
assert self.transport is not None
|
||||
self.transport.abortConnection()
|
||||
else:
|
||||
if now - self.last_sent_command >= PING_TIME:
|
||||
self.send_command(PingCommand(str(now)))
|
||||
|
||||
if (
|
||||
self.received_ping
|
||||
and now - self.last_received_command > PING_TIMEOUT_MS
|
||||
):
|
||||
logger.info(
|
||||
"[%s] Connection hasn't received command in %r ms. Closing.",
|
||||
self.id(),
|
||||
now - self.last_received_command,
|
||||
)
|
||||
self.send_error("ping timeout")
|
||||
|
||||
def lineReceived(self, line: bytes) -> None:
|
||||
"""Called when we've received a line"""
|
||||
with PreserveLoggingContext(self._logging_context):
|
||||
self._parse_and_dispatch_line(line)
|
||||
|
||||
def _parse_and_dispatch_line(self, line: bytes) -> None:
|
||||
if line.strip() == b"":
|
||||
# Ignore blank lines
|
||||
return
|
||||
|
||||
linestr = line.decode("utf-8")
|
||||
|
||||
try:
|
||||
cmd = parse_command_from_line(linestr)
|
||||
except Exception as e:
|
||||
logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
|
||||
self.send_error("failed to parse line: %r (%r):" % (e, linestr))
|
||||
return
|
||||
|
||||
if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
|
||||
logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
|
||||
self.send_error("invalid command: %s", cmd.NAME)
|
||||
return
|
||||
|
||||
self.last_received_command = self.clock.time_msec()
|
||||
|
||||
tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
|
||||
|
||||
self.handle_command(cmd)
|
||||
|
||||
def handle_command(self, cmd: Command) -> None:
|
||||
"""Handle a command we have received over the replication stream.
|
||||
|
||||
First calls `self.on_<COMMAND>` if it exists, then calls
|
||||
`self.command_handler.on_<COMMAND>` if it exists (which can optionally
|
||||
return an Awaitable).
|
||||
|
||||
This allows for protocol level handling of commands (e.g. PINGs), before
|
||||
delegating to the handler.
|
||||
|
||||
Args:
|
||||
cmd: received command
|
||||
"""
|
||||
handled = False
|
||||
|
||||
# First call any command handlers on this instance. These are for TCP
|
||||
# specific handling.
|
||||
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
|
||||
if cmd_func:
|
||||
cmd_func(cmd)
|
||||
handled = True
|
||||
|
||||
# Then call out to the handler.
|
||||
cmd_func = getattr(self.command_handler, "on_%s" % (cmd.NAME,), None)
|
||||
if cmd_func:
|
||||
res = cmd_func(self, cmd)
|
||||
|
||||
# the handler might be a coroutine: fire it off as a background process
|
||||
# if so.
|
||||
|
||||
if isawaitable(res):
|
||||
run_as_background_process(
|
||||
"replication-" + cmd.get_logcontext_id(), lambda: res
|
||||
)
|
||||
|
||||
handled = True
|
||||
|
||||
if not handled:
|
||||
logger.warning("Unhandled command: %r", cmd)
|
||||
|
||||
def close(self) -> None:
|
||||
logger.warning("[%s] Closing connection", self.id())
|
||||
self.time_we_closed = self.clock.time_msec()
|
||||
assert self.transport is not None
|
||||
self.transport.loseConnection()
|
||||
self.on_connection_closed()
|
||||
|
||||
def send_error(self, error_string: str, *args: Any) -> None:
|
||||
"""Send an error to remote and close the connection."""
|
||||
self.send_command(ErrorCommand(error_string % args))
|
||||
self.close()
|
||||
|
||||
def send_command(self, cmd: Command, do_buffer: bool = True) -> None:
|
||||
"""Send a command if connection has been established.
|
||||
|
||||
Args:
|
||||
cmd
|
||||
do_buffer: Whether to buffer the message or always attempt
|
||||
to send the command. This is mostly used to send an error
|
||||
message if we're about to close the connection due our buffers
|
||||
becoming full.
|
||||
"""
|
||||
if self.state == ConnectionStates.CLOSED:
|
||||
logger.debug("[%s] Not sending, connection closed", self.id())
|
||||
return
|
||||
|
||||
if do_buffer and self.state != ConnectionStates.ESTABLISHED:
|
||||
self._queue_command(cmd)
|
||||
return
|
||||
|
||||
tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
|
||||
|
||||
string = "%s %s" % (cmd.NAME, cmd.to_line())
|
||||
if "\n" in string:
|
||||
raise Exception("Unexpected newline in command: %r", string)
|
||||
|
||||
encoded_string = string.encode("utf-8")
|
||||
|
||||
if len(encoded_string) > self.MAX_LENGTH:
|
||||
raise Exception(
|
||||
"Failed to send command %s as too long (%d > %d)"
|
||||
% (cmd.NAME, len(encoded_string), self.MAX_LENGTH)
|
||||
)
|
||||
|
||||
self.sendLine(encoded_string)
|
||||
|
||||
self.last_sent_command = self.clock.time_msec()
|
||||
|
||||
def _queue_command(self, cmd: Command) -> None:
|
||||
"""Queue the command until the connection is ready to write to again."""
|
||||
logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
|
||||
self.pending_commands.append(cmd)
|
||||
|
||||
if len(self.pending_commands) > self.max_line_buffer:
|
||||
# The other side is failing to keep up and out buffers are becoming
|
||||
# full, so lets close the connection.
|
||||
# XXX: should we squawk more loudly?
|
||||
logger.error("[%s] Remote failed to keep up", self.id())
|
||||
self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False)
|
||||
self.close()
|
||||
|
||||
def _send_pending_commands(self) -> None:
|
||||
"""Send any queued commandes"""
|
||||
pending = self.pending_commands
|
||||
self.pending_commands = []
|
||||
for cmd in pending:
|
||||
self.send_command(cmd)
|
||||
|
||||
def on_PING(self, cmd: PingCommand) -> None:
|
||||
self.received_ping = True
|
||||
|
||||
def on_ERROR(self, cmd: ErrorCommand) -> None:
|
||||
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
|
||||
|
||||
def pauseProducing(self) -> None:
|
||||
"""This is called when both the kernel send buffer and the twisted
|
||||
tcp connection send buffers have become full.
|
||||
|
||||
We don't actually have any control over those sizes, so we buffer some
|
||||
commands ourselves before knifing the connection due to the remote
|
||||
failing to keep up.
|
||||
"""
|
||||
logger.info("[%s] Pause producing", self.id())
|
||||
self.state = ConnectionStates.PAUSED
|
||||
|
||||
def resumeProducing(self) -> None:
|
||||
"""The remote has caught up after we started buffering!"""
|
||||
logger.info("[%s] Resume producing", self.id())
|
||||
self.state = ConnectionStates.ESTABLISHED
|
||||
self._send_pending_commands()
|
||||
|
||||
def stopProducing(self) -> None:
|
||||
"""We're never going to send any more data (normally because either
|
||||
we or the remote has closed the connection)
|
||||
"""
|
||||
logger.info("[%s] Stop producing", self.id())
|
||||
self.on_connection_closed()
|
||||
|
||||
def connectionLost(self, reason: Failure) -> None: # type: ignore[override]
|
||||
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
|
||||
if isinstance(reason, Failure):
|
||||
assert reason.type is not None
|
||||
connection_close_counter.labels(reason.type.__name__).inc()
|
||||
else:
|
||||
connection_close_counter.labels(reason.__class__.__name__).inc() # type: ignore[unreachable]
|
||||
|
||||
try:
|
||||
# Remove us from list of connections to be monitored
|
||||
connected_connections.remove(self)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Stop the looping call sending pings.
|
||||
if self._send_ping_loop and self._send_ping_loop.running:
|
||||
self._send_ping_loop.stop()
|
||||
|
||||
self.on_connection_closed()
|
||||
|
||||
def on_connection_closed(self) -> None:
|
||||
logger.info("[%s] Connection was closed", self.id())
|
||||
|
||||
self.state = ConnectionStates.CLOSED
|
||||
self.pending_commands = []
|
||||
|
||||
self.command_handler.lost_connection(self)
|
||||
|
||||
if self.transport:
|
||||
self.transport.unregisterProducer()
|
||||
|
||||
# mark the logging context as finished by triggering `__exit__()`
|
||||
with PreserveLoggingContext():
|
||||
with self._logging_context:
|
||||
pass
|
||||
# the sentinel context is now active, which may not be correct.
|
||||
# PreserveLoggingContext() will restore the correct logging context.
|
||||
|
||||
def __str__(self) -> str:
|
||||
addr = None
|
||||
if self.transport:
|
||||
addr = str(self.transport.getPeer())
|
||||
return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
|
||||
self.name,
|
||||
self.conn_id,
|
||||
addr,
|
||||
)
|
||||
|
||||
def id(self) -> str:
|
||||
return "%s-%s" % (self.name, self.conn_id)
|
||||
|
||||
def lineLengthExceeded(self, line: str) -> None:
|
||||
"""Called when we receive a line that is above the maximum line length"""
|
||||
self.send_error("Line length exceeded")
|
||||
|
||||
|
||||
class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
|
||||
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
|
||||
|
||||
def __init__(
|
||||
self, server_name: str, clock: Clock, handler: "ReplicationCommandHandler"
|
||||
):
|
||||
super().__init__(clock, handler)
|
||||
|
||||
self.server_name = server_name
|
||||
|
||||
def connectionMade(self) -> None:
|
||||
self.send_command(ServerCommand(self.server_name))
|
||||
super().connectionMade()
|
||||
|
||||
def on_NAME(self, cmd: NameCommand) -> None:
|
||||
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
|
||||
self.name = cmd.data
|
||||
|
||||
|
||||
class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
|
||||
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
client_name: str,
|
||||
server_name: str,
|
||||
clock: Clock,
|
||||
command_handler: "ReplicationCommandHandler",
|
||||
):
|
||||
super().__init__(clock, command_handler)
|
||||
|
||||
self.client_name = client_name
|
||||
self.server_name = server_name
|
||||
|
||||
def connectionMade(self) -> None:
|
||||
self.send_command(NameCommand(self.client_name))
|
||||
super().connectionMade()
|
||||
|
||||
# Once we've connected subscribe to the necessary streams
|
||||
self.replicate()
|
||||
|
||||
def on_SERVER(self, cmd: ServerCommand) -> None:
|
||||
if cmd.data != self.server_name:
|
||||
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
|
||||
self.send_error("Wrong remote")
|
||||
|
||||
def replicate(self) -> None:
|
||||
"""Send the subscription request to the server"""
|
||||
logger.info("[%s] Subscribing to replication streams", self.id())
|
||||
|
||||
self.send_command(ReplicateCommand())
|
||||
|
||||
|
||||
# The following simply registers metrics for the replication connections
|
||||
|
||||
pending_commands = LaterGauge(
|
||||
"synapse_replication_tcp_protocol_pending_commands",
|
||||
"",
|
||||
["name"],
|
||||
lambda: {(p.name,): len(p.pending_commands) for p in connected_connections},
|
||||
)
|
||||
|
||||
|
||||
def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int:
|
||||
if protocol.transport:
|
||||
size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen
|
||||
return size
|
||||
return 0
|
||||
|
||||
|
||||
transport_send_buffer = LaterGauge(
|
||||
"synapse_replication_tcp_protocol_transport_send_buffer",
|
||||
"",
|
||||
["name"],
|
||||
lambda: {(p.name,): transport_buffer_size(p) for p in connected_connections},
|
||||
)
|
||||
|
||||
|
||||
def transport_kernel_read_buffer_size(
|
||||
protocol: BaseReplicationStreamProtocol, read: bool = True
|
||||
) -> int:
|
||||
SIOCINQ = 0x541B
|
||||
SIOCOUTQ = 0x5411
|
||||
|
||||
if protocol.transport:
|
||||
fileno = protocol.transport.getHandle().fileno()
|
||||
if read:
|
||||
op = SIOCINQ
|
||||
else:
|
||||
op = SIOCOUTQ
|
||||
size = struct.unpack("I", fcntl.ioctl(fileno, op, b"\0\0\0\0"))[0]
|
||||
return size
|
||||
return 0
|
||||
|
||||
|
||||
tcp_transport_kernel_send_buffer = LaterGauge(
|
||||
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
||||
"",
|
||||
["name"],
|
||||
lambda: {
|
||||
(p.name,): transport_kernel_read_buffer_size(p, False)
|
||||
for p in connected_connections
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
tcp_transport_kernel_read_buffer = LaterGauge(
|
||||
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
||||
"",
|
||||
["name"],
|
||||
lambda: {
|
||||
(p.name,): transport_kernel_read_buffer_size(p, True)
|
||||
for p in connected_connections
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user