diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b63c151cfb..2688c1ee8e 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -118,13 +118,21 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): # have successfully subscribed to the stream - otherwise we might miss the # POSITION response sent back by the other end. logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name) - await make_deferred_yieldable(self.subscribe(self.synapse_stream_name)) + try: + await make_deferred_yieldable(self.subscribe(self.synapse_stream_name)) + except txredisapi.ConnectionError: + # The connection died, the factory will attempt to reconnect. + return logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) + + # If the connection has been severed for some reason, bail. + if not self.connected: + return + self.synapse_handler.new_connection(self) await self._async_send_command(ReplicateCommand()) - logger.info("REPLICATE successfully sent") # We send out our positions when there is a new connection in case the # other side missed updates. We do this for Redis connections as the @@ -360,6 +368,7 @@ def lazyConnection( reconnect: bool = True, password: Optional[str] = None, replyTimeout: int = 30, + handler: Optional[txredisapi.ConnectionHandler] = None, ) -> txredisapi.ConnectionHandler: """Creates a connection to Redis that is lazily set up and reconnects if the connections is lost.