Compare commits
17 Commits
release-v1
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e99547f65 | ||
|
|
53fd1b29f8 | ||
|
|
4f4bf8064e | ||
|
|
6a11d64acf | ||
|
|
b4d17898be | ||
|
|
19b181b7bc | ||
|
|
e14e5d3fb6 | ||
|
|
edf9ef0b13 | ||
|
|
5e87f89bf4 | ||
|
|
064b30b6f0 | ||
|
|
c1d89d89b5 | ||
|
|
74c5e64dd2 | ||
|
|
2fcd70dbf8 | ||
|
|
01343d6519 | ||
|
|
d1ed34a9ac | ||
|
|
78fd66ce87 | ||
|
|
4cf90e13fa |
@@ -1 +1 @@
|
||||
Move `start_doing_background_updates()` to `SynapseHomeServer.start_background_tasks()`.
|
||||
Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`.
|
||||
|
||||
1
changelog.d/19057.misc
Normal file
1
changelog.d/19057.misc
Normal file
@@ -0,0 +1 @@
|
||||
Move `start_doing_background_updates()` to `HomeServer.start_background_tasks()`.
|
||||
@@ -317,11 +317,6 @@ class SynapseHomeServer(HomeServer):
|
||||
# during parsing
|
||||
logger.warning("Unrecognized listener type: %s", listener.type)
|
||||
|
||||
def start_background_tasks(self) -> None:
|
||||
super().start_background_tasks()
|
||||
|
||||
self.get_datastores().main.db_pool.updates.start_doing_background_updates()
|
||||
|
||||
|
||||
def load_or_generate_config(argv_options: List[str]) -> HomeServerConfig:
|
||||
"""
|
||||
|
||||
@@ -646,6 +646,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
self.get_task_scheduler()
|
||||
self.get_common_usage_metrics_manager().setup()
|
||||
start_phone_stats_home(self)
|
||||
self.get_datastores().main.db_pool.updates.start_doing_background_updates()
|
||||
|
||||
def get_reactor(self) -> ISynapseReactor:
|
||||
"""
|
||||
|
||||
@@ -53,8 +53,8 @@ running_tasks_gauge = LaterGauge(
|
||||
class TaskScheduler:
|
||||
"""
|
||||
This is a simple task scheduler designed for resumable tasks. Normally,
|
||||
you'd use `run_in_background` to start a background task or Twisted's
|
||||
`deferLater` if you want to run it later.
|
||||
you'd use `run_in_background` to start a background task or `clock.call_later`
|
||||
if you want to run it later.
|
||||
|
||||
The issue is that these tasks stop completely and won't resume if Synapse is
|
||||
shut down for any reason.
|
||||
|
||||
@@ -27,7 +27,6 @@ from unittest.mock import AsyncMock, Mock
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
from twisted.internet.task import deferLater
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
import synapse.rest.admin
|
||||
@@ -861,7 +860,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
# Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call
|
||||
# before the purge is over. Note that it doesn't purge anymore, but we don't care.
|
||||
async def purge_room(room_id: str, force: bool) -> None:
|
||||
await deferLater(self.hs.get_reactor(), 100, lambda: None)
|
||||
await self.hs.get_clock().sleep(100)
|
||||
|
||||
self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign]
|
||||
|
||||
|
||||
@@ -73,12 +73,12 @@ from synapse.http.server import JsonResource, OptionsResource
|
||||
from synapse.http.site import SynapseRequest, SynapseSite
|
||||
from synapse.logging.context import (
|
||||
SENTINEL_CONTEXT,
|
||||
LoggingContext,
|
||||
current_context,
|
||||
set_current_context,
|
||||
)
|
||||
from synapse.rest import RegisterServletsFunc
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.background_updates import UpdaterStatus
|
||||
from synapse.storage.keys import FetchKeyResult
|
||||
from synapse.types import ISynapseReactor, JsonDict, Requester, UserID, create_requester
|
||||
from synapse.util.clock import Clock
|
||||
@@ -108,6 +108,11 @@ P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
S = TypeVar("S")
|
||||
|
||||
BACKGROUND_UPDATE_TIMEOUT_SECONDS = 5
|
||||
"""
|
||||
We expect this to be pretty immediate as we're working with an empty database.
|
||||
"""
|
||||
|
||||
|
||||
class _TypedFailure(Generic[_ExcType], Protocol):
|
||||
"""Extension to twisted.Failure, where the 'value' has a certain type."""
|
||||
@@ -490,7 +495,11 @@ class HomeserverTestCase(TestCase):
|
||||
|
||||
def wait_for_background_updates(self) -> None:
|
||||
"""Block until all background database updates have completed."""
|
||||
store = self.hs.get_datastores().main
|
||||
self._wait_for_background_updates(self.hs)
|
||||
|
||||
def _wait_for_background_updates(self, hs: HomeServer) -> None:
|
||||
"""Block until all background database updates have completed."""
|
||||
store = hs.get_datastores().main
|
||||
while not self.get_success(
|
||||
store.db_pool.updates.has_completed_background_updates()
|
||||
):
|
||||
@@ -677,10 +686,6 @@ class HomeserverTestCase(TestCase):
|
||||
# construct a homeserver with a matching name.
|
||||
server_name = config_obj.server.server_name
|
||||
|
||||
async def run_bg_updates() -> None:
|
||||
with LoggingContext(name="run_bg_updates", server_name=server_name):
|
||||
self.get_success(stor.db_pool.updates.run_background_updates(False))
|
||||
|
||||
hs = setup_test_homeserver(
|
||||
cleanup_func=self.addCleanup,
|
||||
server_name=server_name,
|
||||
@@ -689,11 +694,48 @@ class HomeserverTestCase(TestCase):
|
||||
clock=clock,
|
||||
**extra_homeserver_attributes,
|
||||
)
|
||||
stor = hs.get_datastores().main
|
||||
|
||||
# Run the database background updates, when running against "master".
|
||||
if hs.__class__.__name__ == "TestHomeServer":
|
||||
self.get_success(run_bg_updates())
|
||||
# Wait for the database background updates to complete. This is important
|
||||
# because tests assume that the database is using the latest schema.
|
||||
#
|
||||
# We could use `self._wait_for_background_updates(hs)` to accomplish the same
|
||||
# thing but we don't want to start or drive the background updates here. We want
|
||||
# to ensure the homeserver itself is doing that.
|
||||
start_time_s = time.time()
|
||||
store = hs.get_datastores().main
|
||||
while not self.get_success(
|
||||
# This check is slightly naive. It only checks if there is anything left in
|
||||
# the `background_updates` database table so it is possible that the
|
||||
# homeserver mistakenly never registered any background updates to be run.
|
||||
# Since `register_background_xxx(...)` is done across the codebase, we can't
|
||||
# really assert that everything was registered as expected.
|
||||
store.db_pool.updates.has_completed_background_updates()
|
||||
):
|
||||
# Timeout if it takes too long. This should be pretty immediate as we're
|
||||
# working with an empty database.
|
||||
current_time_s = time.time()
|
||||
if current_time_s - start_time_s > BACKGROUND_UPDATE_TIMEOUT_SECONDS:
|
||||
background_update_status = store.db_pool.updates.get_status()
|
||||
|
||||
# Add some better context when we give up
|
||||
extra_message = ""
|
||||
if background_update_status == UpdaterStatus.NOT_STARTED:
|
||||
extra_message = (
|
||||
"Did you forget to `start_doing_background_updates()`?"
|
||||
)
|
||||
elif background_update_status == UpdaterStatus.RUNNING_UPDATE:
|
||||
extra_message = "Background updates were still running when we gave up. Are they stuck?"
|
||||
else:
|
||||
extra_message = (
|
||||
f"Background update status was {background_update_status}."
|
||||
)
|
||||
|
||||
raise AssertionError(
|
||||
f"Timed out waiting for background updates to complete ({BACKGROUND_UPDATE_TIMEOUT_SECONDS}s). "
|
||||
+ extra_message
|
||||
)
|
||||
|
||||
self.pump(by=0.1)
|
||||
|
||||
return hs
|
||||
|
||||
|
||||
@@ -20,9 +20,10 @@
|
||||
#
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from twisted.internet.task import deferLater
|
||||
from twisted.internet.defer import Deferred
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
|
||||
from synapse.util.clock import Clock
|
||||
@@ -87,7 +88,7 @@ class TestTaskScheduler(HomeserverTestCase):
|
||||
self, task: ScheduledTask
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
# Sleep for a second
|
||||
await deferLater(self.reactor, 1, lambda: None)
|
||||
await self.hs.get_clock().sleep(1)
|
||||
return TaskStatus.COMPLETE, None, None
|
||||
|
||||
def test_schedule_lot_of_tasks(self) -> None:
|
||||
@@ -170,8 +171,10 @@ class TestTaskScheduler(HomeserverTestCase):
|
||||
return TaskStatus.COMPLETE, {"success": True}, None
|
||||
else:
|
||||
await self.task_scheduler.update_task(task.id, result={"in_progress": True})
|
||||
# Create a deferred which we will never complete
|
||||
incomplete_d: Deferred = Deferred()
|
||||
# Await forever to simulate an aborted task because of a restart
|
||||
await deferLater(self.reactor, 2**16, lambda: None)
|
||||
await make_deferred_yieldable(incomplete_d)
|
||||
# This should never been called
|
||||
return TaskStatus.ACTIVE, None, None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user