From d0e2a456353bbf33b7a888c7d1996569fa09774b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 30 Apr 2025 23:07:33 -0500 Subject: [PATCH] Try more things --- tests/storage/test_event_stats.py | 42 ++++++++++++++++++------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/tests/storage/test_event_stats.py b/tests/storage/test_event_stats.py index a289e31963..8ce69723b8 100644 --- a/tests/storage/test_event_stats.py +++ b/tests/storage/test_event_stats.py @@ -14,17 +14,23 @@ import logging import time from unittest.mock import patch -from twisted.internet.defer import ensureDeferred from twisted.test.proto_helpers import MemoryReactor +from twisted.internet import defer +from synapse.logging.context import defer_to_thread from synapse.rest import admin, login, register, room from synapse.server import HomeServer from synapse.storage.database import ( LoggingTransaction, ) -from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock +from synapse.logging.context import ( + PreserveLoggingContext, + make_deferred_yieldable, + run_coroutine_in_background, + run_in_background, +) from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -183,11 +189,15 @@ class EventStatsTestCase(unittest.HomeserverTestCase): # We need to return something, so we return None. return None - def asdf() -> None: - self.get_success(self.store.db_pool.runInteraction("test", _todo_txn)) + # def asdf() -> None: + # self.get_success(self.store.db_pool.runInteraction("test", _todo_txn)) # Start a transaction that is interacting with the `event_stats` table - start_txn = defer_to_thread(self.reactor, asdf) + # start_txn = defer_to_thread(self.reactor, asdf) + + start_txn = run_in_background( + self.store.db_pool.runInteraction, "test", _todo_txn + ) logger.info("asdf1") _event_response = self.get_success( @@ -381,7 +391,7 @@ class EventStatsConcurrentEventsTestCase(BaseMultiWorkerStreamTestCase): {"worker_name": "worker1"}, ) - self.make_worker_hs( + worker_hs2 = self.make_worker_hs( "synapse.app.generic_worker", {"worker_name": "worker2"}, ) @@ -417,19 +427,17 @@ class EventStatsConcurrentEventsTestCase(BaseMultiWorkerStreamTestCase): # We need to return something, so we return None. return None - def asdf() -> None: - worker1_store = worker_hs1.get_datastores().main - self.get_success(worker1_store.db_pool.runInteraction("test", _todo_txn)) - # Start a transaction that is interacting with the `event_stats` table - start_txn = defer_to_thread(self.reactor, asdf) - - logger.info("asdf1") - _event_response = self.get_success( - defer_to_thread( - self.reactor, self.helper.send, room_id1, "activity", tok=user1_tok - ) + # + # Try from worker2 which may have it's own thread pool. + worker2_store = worker_hs2.get_datastores().main + start_txn = run_in_background( + worker2_store.db_pool.runInteraction, "test", _todo_txn ) + + # Then in room1 (handled by worker1) we send an event. + logger.info("asdf1") + _event_response = self.helper.send(room_id1, "activity", tok=user1_tok) logger.info("asdf2") block = False