Use redis for all replication tests.
This commit is contained in:
@@ -549,7 +549,6 @@ class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
def default_config(self):
|
||||
conf = super().default_config()
|
||||
conf["redis"] = {"enabled": "true"}
|
||||
conf["stream_writers"] = {"presence": ["presence_writer"]}
|
||||
conf["instance_map"] = {
|
||||
"presence_writer": {"host": "testserv", "port": 1001},
|
||||
|
||||
@@ -49,6 +49,11 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||
if not hiredis:
|
||||
skip = "Requires hiredis"
|
||||
|
||||
def default_config(self):
|
||||
config = super().default_config()
|
||||
config["redis"] = {"enabled": True}
|
||||
return config
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
# build a replication server
|
||||
server_factory = ReplicationStreamProtocolFactory(hs)
|
||||
@@ -57,6 +62,12 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||
IPv4Address("TCP", "127.0.0.1", 0)
|
||||
)
|
||||
|
||||
# Fake in memory Redis server that servers can connect to.
|
||||
self._redis_server = FakeRedisPubSubServer()
|
||||
|
||||
# We may have an attempt to connect to redis for the external cache already.
|
||||
self.connect_any_redis_attempts()
|
||||
|
||||
# Make a new HomeServer object for the worker
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
self.worker_hs = self.setup_test_homeserver(
|
||||
@@ -212,6 +223,30 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.assertEqual(request.method, b"GET")
|
||||
|
||||
def connect_any_redis_attempts(self):
|
||||
"""If redis is enabled we need to deal with workers connecting to a
|
||||
redis server. We don't want to use a real Redis server so we use a
|
||||
fake one.
|
||||
"""
|
||||
clients = self.reactor.tcpClients
|
||||
while clients:
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
self.assertEqual(host, "localhost")
|
||||
self.assertEqual(port, 6379)
|
||||
|
||||
client_protocol = client_factory.buildProtocol(None)
|
||||
server_protocol = self._redis_server.buildProtocol(None)
|
||||
|
||||
client_to_server_transport = FakeTransport(
|
||||
server_protocol, self.reactor, client_protocol
|
||||
)
|
||||
client_protocol.makeConnection(client_to_server_transport)
|
||||
|
||||
server_to_client_transport = FakeTransport(
|
||||
client_protocol, self.reactor, server_protocol
|
||||
)
|
||||
server_protocol.makeConnection(server_to_client_transport)
|
||||
|
||||
|
||||
class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
||||
"""Base class for tests running multiple workers.
|
||||
@@ -220,6 +255,11 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
|
||||
unlike `BaseStreamTestCase`.
|
||||
"""
|
||||
|
||||
def default_config(self):
|
||||
config = super().default_config()
|
||||
config["redis"] = {"enabled": True}
|
||||
return config
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
|
||||
@@ -27,10 +27,8 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
|
||||
servlets = [register.register_servlets]
|
||||
|
||||
def _get_worker_hs_config(self) -> dict:
|
||||
config = self.default_config()
|
||||
config = super()._get_worker_hs_config()
|
||||
config["worker_app"] = "synapse.app.client_reader"
|
||||
config["worker_replication_host"] = "testserv"
|
||||
config["worker_replication_http_port"] = "8765"
|
||||
return config
|
||||
|
||||
def test_register_single_worker(self):
|
||||
|
||||
@@ -51,7 +51,6 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
def default_config(self):
|
||||
conf = super().default_config()
|
||||
conf["redis"] = {"enabled": "true"}
|
||||
conf["stream_writers"] = {"events": ["worker1", "worker2"]}
|
||||
conf["instance_map"] = {
|
||||
"worker1": {"host": "testserv", "port": 1001},
|
||||
|
||||
Reference in New Issue
Block a user