diff --git a/tests/storage/test_event_stats.py b/tests/storage/test_event_stats.py index a289e31963..8ce69723b8 100644 --- a/tests/storage/test_event_stats.py +++ b/tests/storage/test_event_stats.py @@ -14,17 +14,23 @@ import logging import time from unittest.mock import patch -from twisted.internet.defer import ensureDeferred from twisted.test.proto_helpers import MemoryReactor +from twisted.internet import defer +from synapse.logging.context import defer_to_thread from synapse.rest import admin, login, register, room from synapse.server import HomeServer from synapse.storage.database import ( LoggingTransaction, ) -from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock +from synapse.logging.context import ( + PreserveLoggingContext, + make_deferred_yieldable, + run_coroutine_in_background, + run_in_background, +) from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -183,11 +189,15 @@ class EventStatsTestCase(unittest.HomeserverTestCase): # We need to return something, so we return None. return None - def asdf() -> None: - self.get_success(self.store.db_pool.runInteraction("test", _todo_txn)) + # def asdf() -> None: + # self.get_success(self.store.db_pool.runInteraction("test", _todo_txn)) # Start a transaction that is interacting with the `event_stats` table - start_txn = defer_to_thread(self.reactor, asdf) + # start_txn = defer_to_thread(self.reactor, asdf) + + start_txn = run_in_background( + self.store.db_pool.runInteraction, "test", _todo_txn + ) logger.info("asdf1") _event_response = self.get_success( @@ -381,7 +391,7 @@ class EventStatsConcurrentEventsTestCase(BaseMultiWorkerStreamTestCase): {"worker_name": "worker1"}, ) - self.make_worker_hs( + worker_hs2 = self.make_worker_hs( "synapse.app.generic_worker", {"worker_name": "worker2"}, ) @@ -417,19 +427,17 @@ class EventStatsConcurrentEventsTestCase(BaseMultiWorkerStreamTestCase): # We need to return something, so we return None. return None - def asdf() -> None: - worker1_store = worker_hs1.get_datastores().main - self.get_success(worker1_store.db_pool.runInteraction("test", _todo_txn)) - # Start a transaction that is interacting with the `event_stats` table - start_txn = defer_to_thread(self.reactor, asdf) - - logger.info("asdf1") - _event_response = self.get_success( - defer_to_thread( - self.reactor, self.helper.send, room_id1, "activity", tok=user1_tok - ) + # + # Try from worker2 which may have it's own thread pool. + worker2_store = worker_hs2.get_datastores().main + start_txn = run_in_background( + worker2_store.db_pool.runInteraction, "test", _todo_txn ) + + # Then in room1 (handled by worker1) we send an event. + logger.info("asdf1") + _event_response = self.helper.send(room_id1, "activity", tok=user1_tok) logger.info("asdf2") block = False