1
0

Compare commits

...

17 Commits

Author SHA1 Message Date
Eric Eastwood
9e99547f65 Fix lints 2025-10-13 17:45:31 -05:00
Eric Eastwood
53fd1b29f8 Remove mentions of deferLater 2025-10-13 16:35:52 -05:00
Eric Eastwood
4f4bf8064e Remove `deferLater from other tests 2025-10-13 16:35:22 -05:00
Eric Eastwood
6a11d64acf Even better adaption incomplete_d 2025-10-13 16:33:15 -05:00
Eric Eastwood
b4d17898be Better conversion 2025-10-13 16:26:50 -05:00
Eric Eastwood
19b181b7bc Fix logcontext problems in tests/util/test_task_scheduler.py
```
builtins.AssertionError: Expected `looping_call` callback from the reactor to start with the sentinel logcontext but saw task-_resumable_task-0-IBzAmHUoepQfLnEA. In other words, another task shouldn't have leaked their logcontext to us.
```
2025-10-13 16:21:52 -05:00
Eric Eastwood
e14e5d3fb6 Fix lints 2025-10-13 15:27:47 -05:00
Eric Eastwood
edf9ef0b13 Comment about slight naivety 2025-10-13 15:25:09 -05:00
Eric Eastwood
5e87f89bf4 Better context for error 2025-10-13 15:23:27 -05:00
Eric Eastwood
064b30b6f0 Wait for homeserver to drive database background updates 2025-10-13 15:14:49 -05:00
Eric Eastwood
c1d89d89b5 Remove assumption
See https://github.com/element-hq/synapse/pull/19057#discussion_r2427083757
2025-10-13 14:39:19 -05:00
Eric Eastwood
74c5e64dd2 Fix lints 2025-10-13 14:37:45 -05:00
Eric Eastwood
2fcd70dbf8 Add future TODO 2025-10-13 14:35:05 -05:00
Eric Eastwood
01343d6519 Remove duplicate database background update running
This already happens as part of `HomeServer.start_background_tasks()`
2025-10-13 14:33:28 -05:00
Eric Eastwood
d1ed34a9ac Fix logcontext problems
See https://github.com/element-hq/synapse/pull/19057#discussion_r2427137480

Running any `HomeserverTestCase` results in:
```
$ SYNAPSE_TEST_LOG_LEVEL=DEBUG poetry run trial tests.rest.client.sliding_sync.test_sliding_sync.SlidingSyncTestCase_new.test_sync_list

...

builtins.AssertionError: Expected `call_later` callback from the reactor to start with the sentinel logcontext but saw run_bg_updates. In other words, another task shouldn't have leaked their logcontext to us.
```
2025-10-13 14:21:50 -05:00
Eric Eastwood
78fd66ce87 Add changelog 2025-10-13 13:51:31 -05:00
Eric Eastwood
4cf90e13fa Move start_doing_background_updates() to HomeServer.start_background_tasks() 2025-10-13 13:46:15 -05:00
8 changed files with 64 additions and 23 deletions

View File

@@ -1 +1 @@
Move `start_doing_background_updates()` to `SynapseHomeServer.start_background_tasks()`.
Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`.

1
changelog.d/19057.misc Normal file
View File

@@ -0,0 +1 @@
Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`.

View File

@@ -317,11 +317,6 @@ class SynapseHomeServer(HomeServer):
# during parsing
logger.warning("Unrecognized listener type: %s", listener.type)
def start_background_tasks(self) -> None:
super().start_background_tasks()
self.get_datastores().main.db_pool.updates.start_doing_background_updates()
def load_or_generate_config(argv_options: List[str]) -> HomeServerConfig:
"""

View File

@@ -646,6 +646,7 @@ class HomeServer(metaclass=abc.ABCMeta):
self.get_task_scheduler()
self.get_common_usage_metrics_manager().setup()
start_phone_stats_home(self)
self.get_datastores().main.db_pool.updates.start_doing_background_updates()
def get_reactor(self) -> ISynapseReactor:
"""

View File

@@ -53,8 +53,8 @@ running_tasks_gauge = LaterGauge(
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
you'd use `run_in_background` to start a background task or Twisted's
`deferLater` if you want to run it later.
you'd use `run_in_background` to start a background task or `clock.call_later`
if you want to run it later.
The issue is that these tasks stop completely and won't resume if Synapse is
shut down for any reason.

View File

@@ -27,7 +27,6 @@ from unittest.mock import AsyncMock, Mock
from parameterized import parameterized
from twisted.internet.task import deferLater
from twisted.internet.testing import MemoryReactor
import synapse.rest.admin
@@ -861,7 +860,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call
# before the purge is over. Note that it doesn't purge anymore, but we don't care.
async def purge_room(room_id: str, force: bool) -> None:
await deferLater(self.hs.get_reactor(), 100, lambda: None)
await self.hs.get_clock().sleep(100)
self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign]

View File

@@ -73,12 +73,12 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
LoggingContext,
current_context,
set_current_context,
)
from synapse.rest import RegisterServletsFunc
from synapse.server import HomeServer
from synapse.storage.background_updates import UpdaterStatus
from synapse.storage.keys import FetchKeyResult
from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester
from synapse.util.clock import Clock
@@ -108,6 +108,11 @@ P = ParamSpec("P")
R = TypeVar("R")
S = TypeVar("S")
BACKGROUND_UPDATE_TIMEOUT_SECONDS = 5
"""
We expect this to be pretty immediate as we're working with an empty database.
"""
class _TypedFailure(Generic[_ExcType], Protocol):
"""Extension to twisted.Failure, where the 'value' has a certain type."""
@@ -490,7 +495,11 @@ class HomeserverTestCase(TestCase):
def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
store = self.hs.get_datastores().main
self._wait_for_background_updates(self.hs)
def _wait_for_background_updates(self, hs: HomeServer) -> None:
"""Block until all background database updates have completed."""
store = hs.get_datastores().main
while not self.get_success(
store.db_pool.updates.has_completed_background_updates()
):
@@ -677,10 +686,6 @@ class HomeserverTestCase(TestCase):
# construct a homeserver with a matching name.
server_name = config_obj.server.server_name
async def run_bg_updates() -> None:
with LoggingContext(name="run_bg_updates", server_name=server_name):
self.get_success(stor.db_pool.updates.run_background_updates(False))
hs = setup_test_homeserver(
cleanup_func=self.addCleanup,
server_name=server_name,
@@ -689,11 +694,48 @@ class HomeserverTestCase(TestCase):
clock=clock,
**extra_homeserver_attributes,
)
stor = hs.get_datastores().main
# Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer":
self.get_success(run_bg_updates())
# Wait for the database background updates to complete. This is important
# because tests assume that the database is using the latest schema.
#
# We could use `self._wait_for_background_updates(hs)` to accomplish the same
# thing but we don't want to start or drive the background updates here. We want
# to ensure the homeserver itself is doing that.
start_time_s = time.time()
store = hs.get_datastores().main
while not self.get_success(
# This check is slightly naive. It only checks if there is anything left in
# the `background_updates` database table so it is possible that the
# homeserver mistakenly never registered any background updates to be run.
# Since `register_background_xxx(...)` is done across the codebase, we can't
# really assert that everything was registered as expected.
store.db_pool.updates.has_completed_background_updates()
):
# Timeout if it takes too long. This should be pretty immediate as we're
# working with an empty database.
current_time_s = time.time()
if current_time_s - start_time_s > BACKGROUND_UPDATE_TIMEOUT_SECONDS:
background_update_status = store.db_pool.updates.get_status()
# Add some better context when we give up
extra_message = ""
if background_update_status == UpdaterStatus.NOT_STARTED:
extra_message = (
"Did you forget to `start_doing_background_updates()`?"
)
elif background_update_status == UpdaterStatus.RUNNING_UPDATE:
extra_message = "Background updates were still running when we gave up. Are they stuck?"
else:
extra_message = (
f"Background update status was {background_update_status}."
)
raise AssertionError(
f"Timed out waiting for background updates to complete ({BACKGROUND_UPDATE_TIMEOUT_SECONDS}s). "
+ extra_message
)
self.pump(by=0.1)
return hs

View File

@@ -20,9 +20,10 @@
#
from typing import List, Optional, Tuple
from twisted.internet.task import deferLater
from twisted.internet.defer import Deferred
from twisted.internet.testing import MemoryReactor
from synapse.logging.context import make_deferred_yieldable
from synapse.server import HomeServer
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.clock import Clock
@@ -87,7 +88,7 @@ class TestTaskScheduler(HomeserverTestCase):
self, task: ScheduledTask
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
# Sleep for a second
await deferLater(self.reactor, 1, lambda: None)
await self.hs.get_clock().sleep(1)
return TaskStatus.COMPLETE, None, None
def test_schedule_lot_of_tasks(self) -> None:
@@ -170,8 +171,10 @@ class TestTaskScheduler(HomeserverTestCase):
return TaskStatus.COMPLETE, {"success": True}, None
else:
await self.task_scheduler.update_task(task.id, result={"in_progress": True})
# Create a deferred which we will never complete
incomplete_d: Deferred = Deferred()
# Await forever to simulate an aborted task because of a restart
await deferLater(self.reactor, 2**16, lambda: None)
await make_deferred_yieldable(incomplete_d)
# This should never been called
return TaskStatus.ACTIVE, None, None