diff --git a/changelog.d/19416.bugfix b/changelog.d/19416.bugfix new file mode 100644 index 0000000000..f0c2872410 --- /dev/null +++ b/changelog.d/19416.bugfix @@ -0,0 +1 @@ +Fix memory leak caused by not cleaning up stopped looping calls. Introduced in v1.140.0. diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 4355704f8a..a3872d6f93 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -15,10 +15,12 @@ import logging +from functools import wraps from typing import ( Any, Callable, ) +from weakref import WeakSet from typing_extensions import ParamSpec from zope.interface import implementer @@ -86,7 +88,7 @@ class Clock: self._delayed_call_id: int = 0 """Unique ID used to track delayed calls""" - self._looping_calls: list[LoopingCall] = [] + self._looping_calls: WeakSet[LoopingCall] = WeakSet() """List of active looping calls""" self._call_id_to_delayed_call: dict[int, IDelayedCall] = {} @@ -193,6 +195,7 @@ class Clock: if now: looping_call_context_string = "looping_call_now" + @wraps(f) def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: clock_debug_logger.debug( "%s(%s): Executing callback", looping_call_context_string, instance_id @@ -240,7 +243,7 @@ class Clock: with context.PreserveLoggingContext(): d = call.start(duration.as_secs(), now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) - self._looping_calls.append(call) + self._looping_calls.add(call) clock_debug_logger.debug( "%s(%s): Scheduled looping call every %sms later", @@ -302,6 +305,7 @@ class Clock: if self._is_shutdown: raise Exception("Cannot start delayed call. Clock has been shutdown") + @wraps(callback) def wrapped_callback(*args: Any, **kwargs: Any) -> None: clock_debug_logger.debug("call_later(%s): Executing callback", call_id) diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py new file mode 100644 index 0000000000..6c5a1158f5 --- /dev/null +++ b/tests/util/test_clock.py @@ -0,0 +1,77 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 Element Creations Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# + +import weakref + +from synapse.util.duration import Duration + +from tests.unittest import HomeserverTestCase + + +class ClockTestCase(HomeserverTestCase): + def test_looping_calls_are_gced(self) -> None: + """Test that looping calls are garbage collected after being stopped. + + The `Clock` tracks looping calls so to allow stopping of all looping + calls via the clock. + """ + clock = self.hs.get_clock() + + # Create a new looping call, and take a weakref to it. + call = clock.looping_call(lambda: None, Duration(seconds=1)) + + weak_call = weakref.ref(call) + + # Stop the looping call. It should get garbage collected after this. + call.stop() + + # Delete our strong reference to the call (otherwise it won't get garbage collected). + del call + + # Check that the call has been garbage collected. + self.assertIsNone(weak_call()) + + def test_looping_calls_stopped_on_clock_shutdown(self) -> None: + """Test that looping calls are stopped when the clock is shut down.""" + clock = self.hs.get_clock() + + was_called = False + + def on_call() -> None: + nonlocal was_called + was_called = True + + # Create a new looping call. + call = clock.looping_call(on_call, Duration(seconds=1)) + weak_call = weakref.ref(call) + del call # Remove our strong reference to the call. + + # The call should still exist. + self.assertIsNotNone(weak_call()) + + # Advance the clock to trigger the call. + self.reactor.advance(2) + self.assertTrue(was_called) + + # Shut down the clock, which should stop the looping call. + clock.shutdown() + + # The call should have been garbage collected. + self.assertIsNone(weak_call()) + + # Advance the clock again; the call should not be called again. + was_called = False + self.reactor.advance(2) + self.assertFalse(was_called)