From e7d98d3429902919b0dc34462153caafb4114138 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Sep 2025 14:51:13 -0500 Subject: [PATCH] Remove `sentinel` logcontext in `Clock` utilities (`looping_call`, `looping_call_now`, `call_later`) (#18907) Part of https://github.com/element-hq/synapse/issues/18905 Lints for ensuring we use `Clock.call_later` instead of `reactor.callLater`, etc are coming in https://github.com/element-hq/synapse/pull/18944 ### Testing strategy 1. Configure Synapse to log at the `DEBUG` level 1. Start Synapse: `poetry run synapse_homeserver --config-path homeserver.yaml` 1. Wait 10 seconds for the [database profiling loop](https://github.com/element-hq/synapse/blob/9cc400177822805e2a08d4d934daad6f3bc2a4df/synapse/storage/database.py#L711) to execute 1. Notice the logcontext being used for the `Total database time` log line Before (`sentinel`): ``` 2025-09-10 16:36:58,651 - synapse.storage.TIME - 707 - DEBUG - sentinel - Total database time: 0.646% {room_forgetter_stream_pos(2): 0.131%, reap_monthly_active_users(1): 0.083%, get_device_change_last_converted_pos(1): 0.078%} ``` After (`looping_call`): ``` 2025-09-10 16:36:58,651 - synapse.storage.TIME - 707 - DEBUG - looping_call - Total database time: 0.646% {room_forgetter_stream_pos(2): 0.131%, reap_monthly_active_users(1): 0.083%, get_device_change_last_converted_pos(1): 0.078%} ``` --- changelog.d/18907.misc | 1 + synapse/util/clock.py | 86 +++++++++++-- tests/push/test_email.py | 3 +- tests/util/test_logcontext.py | 226 +++++++++++++++++++++++++++++++--- 4 files changed, 286 insertions(+), 30 deletions(-) create mode 100644 changelog.d/18907.misc diff --git a/changelog.d/18907.misc b/changelog.d/18907.misc new file mode 100644 index 0000000000..4fca9ec8fb --- /dev/null +++ b/changelog.d/18907.misc @@ -0,0 +1 @@ +Remove `sentinel` logcontext usage in `Clock` utilities like `looping_call` and `call_later`. diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 043b06a108..d28dbac357 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -23,6 +23,7 @@ import attr from typing_extensions import ParamSpec from twisted.internet import defer, task +from twisted.internet.defer import Deferred from twisted.internet.interfaces import IDelayedCall from twisted.internet.task import LoopingCall @@ -46,6 +47,8 @@ class Clock: async def sleep(self, seconds: float) -> None: d: defer.Deferred[float] = defer.Deferred() + # Start task in the `sentinel` logcontext, to avoid leaking the current context + # into the reactor once it finishes. with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) await d @@ -74,8 +77,9 @@ class Clock: this functionality thanks to this function being a thin wrapper around `twisted.internet.task.LoopingCall`. - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. @@ -97,8 +101,9 @@ class Clock: As with `looping_call`: subsequent calls are not scheduled until after the the Awaitable returned by a previous call has finished. - Also as with `looping_call`: the function is called with no logcontext and - you probably want to wrap it in `run_as_background_process`. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. @@ -117,9 +122,43 @@ class Clock: **kwargs: P.kwargs, ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - call = task.LoopingCall(f, *args, **kwargs) + + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `looping_call` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) + + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `looping_call` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.PreserveLoggingContext(context.LoggingContext("looping_call")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + return context.run_in_background(f, *args, **kwargs) + + call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor - d = call.start(msec / 1000.0, now=now) + # If `now=true`, the function will be called here immediately so we need to be + # in the sentinel context now. + # + # We want to start the task in the `sentinel` logcontext, to avoid leaking the + # current context into the reactor after the function finishes. + with context.PreserveLoggingContext(): + d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) return call @@ -128,8 +167,9 @@ class Clock: ) -> IDelayedCall: """Call something later - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + Note that the function will be called with generic `call_later` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: delay: How long to wait in seconds. @@ -139,11 +179,33 @@ class Clock: """ def wrapped_callback(*args: Any, **kwargs: Any) -> None: - with context.PreserveLoggingContext(): - callback(*args, **kwargs) + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) - with context.PreserveLoggingContext(): - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `call_later` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.PreserveLoggingContext(context.LoggingContext("call_later")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + context.run_in_background(callback, *args, **kwargs) + + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 80a22044dd..26819e2d3c 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -31,6 +31,7 @@ from twisted.internet.testing import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes, SynapseError +from synapse.logging.context import make_deferred_yieldable from synapse.push.emailpusher import EmailPusher from synapse.rest.client import login, room from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource @@ -89,7 +90,7 @@ class EmailPusherTests(HomeserverTestCase): # This mocks out synapse.reactor.send_email._sendmail. d: Deferred = Deferred() self.email_attempts.append((d, args, kwargs)) - return d + return make_deferred_yieldable(d) hs.get_send_email_handler()._sendmail = sendmail # type: ignore[assignment] diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 43912d05da..0ecf712bab 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -19,6 +19,7 @@ # # +import logging from typing import Callable, Generator, cast import twisted.python.failure @@ -28,6 +29,7 @@ from synapse.logging.context import ( SENTINEL_CONTEXT, LoggingContext, PreserveLoggingContext, + _Sentinel, current_context, make_deferred_yieldable, nested_logging_context, @@ -36,7 +38,10 @@ from synapse.logging.context import ( from synapse.types import ISynapseReactor from synapse.util.clock import Clock -from .. import unittest +from tests import unittest +from tests.unittest import logcontext_clean + +logger = logging.getLogger(__name__) reactor = cast(ISynapseReactor, _reactor) @@ -44,33 +49,212 @@ reactor = cast(ISynapseReactor, _reactor) class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value: str) -> None: context = current_context() - assert isinstance(context, LoggingContext) - self.assertEqual(context.name, value) + assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( + f"Expected LoggingContext({value}) but saw {context}" + ) + self.assertEqual( + str(context), value, f"Expected LoggingContext({value}) but saw {context}" + ) + @logcontext_clean def test_with_context(self) -> None: with LoggingContext("test"): self._check_test_key("test") + @logcontext_clean async def test_sleep(self) -> None: + """ + Test `Clock.sleep` + """ clock = Clock(reactor) + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + async def competing_callback() -> None: - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") + nonlocal callback_finished + try: + # A callback from the reactor should start with the sentinel context. In + # other words, another task shouldn't have leaked their context to us. + self._check_test_key("sentinel") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("sentinel") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) - with LoggingContext("one"): + with LoggingContext("foo"): await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + @logcontext_clean + async def test_looping_call(self) -> None: + """ + Test `Clock.looping_call` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + lc = clock.looping_call( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() + + @logcontext_clean + async def test_looping_call_now(self) -> None: + """ + Test `Clock.looping_call_now` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + lc = clock.looping_call_now( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() + + @logcontext_clean + async def test_call_later(self) -> None: + """ + Test `Clock.call_later` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `call_later` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("call_later") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("call_later") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: sentinel_context = current_context() callback_completed = False - with LoggingContext("one"): + with LoggingContext("foo"): # fire off function, but don't wait on it. d2 = run_in_background(function) @@ -81,7 +265,7 @@ class LoggingContextTestCase(unittest.TestCase): d2.addCallback(cb) - self._check_test_key("one") + self._check_test_key("foo") # now wait for the function under test to have run, and check that # the logcontext is left in a sane state. @@ -105,12 +289,14 @@ class LoggingContextTestCase(unittest.TestCase): # test is done once d2 finishes return d2 + @logcontext_clean def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: async def blocking_function() -> None: await Clock(reactor).sleep(0) return self._test_run_in_background(blocking_function) + @logcontext_clean def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: @defer.inlineCallbacks def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: @@ -119,6 +305,7 @@ class LoggingContextTestCase(unittest.TestCase): return self._test_run_in_background(nonblocking_function) + @logcontext_clean def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: # a function which returns a deferred which looks like it has been # called, but is actually paused @@ -127,22 +314,25 @@ class LoggingContextTestCase(unittest.TestCase): return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") d = defer.ensureDeferred(Clock(reactor).sleep(0)) self.assertIs(current_context(), SENTINEL_CONTEXT) await d - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable( self, @@ -156,7 +346,7 @@ class LoggingContextTestCase(unittest.TestCase): sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(blocking_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -164,15 +354,16 @@ class LoggingContextTestCase(unittest.TestCase): yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable_with_chained_deferreds( self, ) -> Generator["defer.Deferred[object]", object, None]: sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(_chained_deferred_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -180,8 +371,9 @@ class LoggingContextTestCase(unittest.TestCase): yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") + @logcontext_clean def test_nested_logging_context(self) -> None: with LoggingContext("foo"): nested_context = nested_logging_context(suffix="bar")