More robust-ness against dying connections.
This commit is contained in:
@@ -118,13 +118,21 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
# have successfully subscribed to the stream - otherwise we might miss the
|
||||
# POSITION response sent back by the other end.
|
||||
logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
|
||||
await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
|
||||
try:
|
||||
await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
|
||||
except txredisapi.ConnectionError:
|
||||
# The connection died, the factory will attempt to reconnect.
|
||||
return
|
||||
logger.info(
|
||||
"Successfully subscribed to redis stream, sending REPLICATE command"
|
||||
)
|
||||
|
||||
# If the connection has been severed for some reason, bail.
|
||||
if not self.connected:
|
||||
return
|
||||
|
||||
self.synapse_handler.new_connection(self)
|
||||
await self._async_send_command(ReplicateCommand())
|
||||
logger.info("REPLICATE successfully sent")
|
||||
|
||||
# We send out our positions when there is a new connection in case the
|
||||
# other side missed updates. We do this for Redis connections as the
|
||||
@@ -360,6 +368,7 @@ def lazyConnection(
|
||||
reconnect: bool = True,
|
||||
password: Optional[str] = None,
|
||||
replyTimeout: int = 30,
|
||||
handler: Optional[txredisapi.ConnectionHandler] = None,
|
||||
) -> txredisapi.ConnectionHandler:
|
||||
"""Creates a connection to Redis that is lazily set up and reconnects if the
|
||||
connections is lost.
|
||||
|
||||
Reference in New Issue
Block a user