1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
Erik Johnston
2f0c33a540 dfasd 2021-01-28 20:03:36 +00:00
Erik Johnston
ccdfa36131 Fixup 2021-01-28 19:42:41 +00:00
Erik Johnston
a1b6dea0b7 Add smoother 2021-01-28 19:28:22 +00:00
2 changed files with 136 additions and 2 deletions

View File

@@ -18,6 +18,7 @@ import collections
import inspect import inspect
import logging import logging
from contextlib import contextmanager from contextlib import contextmanager
from collections import deque
from typing import ( from typing import (
Any, Any,
Awaitable, Awaitable,
@@ -30,6 +31,7 @@ from typing import (
Set, Set,
TypeVar, TypeVar,
Union, Union,
Deque,
) )
import attr import attr
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.defer import CancelledError from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime from twisted.internet.interfaces import IReactorTime, IDelayedCall
from twisted.python import failure from twisted.python import failure
from synapse.logging.context import ( from synapse.logging.context import (
@@ -552,3 +554,84 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
return value return value
return DoneAwaitable(value) return DoneAwaitable(value)
@attr.s(slots=True)
class _SmootherEntry:
scheduled_at_ms = attr.ib(type=int)
scheduled_for_ms = attr.ib(type=int)
defer = attr.ib(type=defer.Deferred)
@attr.s(slots=True)
class Smoother:
_reactor = attr.ib(type=IReactorTime)
_target_ms = attr.ib(type=int)
_queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
_last_run = attr.ib(type=int, default=0)
_next_call = attr.ib(type=Optional[IDelayedCall], default=None)
def _fire_next(self):
if not self._queue:
return
self._next_call = None
entry = self._queue.popleft()
entry.defer.callback(None)
async def smooth(self) -> None:
now = self._reactor.seconds() * 1000.0
if not self._queue:
scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
if scheduled_for_ms <= now:
self._last_run = now
return
entry = _SmootherEntry(
scheduled_at_ms=now,
scheduled_for_ms=scheduled_for_ms,
defer=defer.Deferred(),
)
self._queue.append(entry)
else:
last_entry = self._queue[-1]
scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
entry = _SmootherEntry(
scheduled_at_ms=now,
scheduled_for_ms=scheduled_for_ms,
defer=defer.Deferred(),
)
self._queue.append(entry)
step = self._target_ms / (len(self._queue) + 1)
for idx, entry in enumerate(self._queue):
new_time = now + (idx + 1) * step
if new_time < entry.scheduled_for_ms:
entry.scheduled_for_ms = new_time
if self._next_call and not self._next_call.active:
self._next_call.reset(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
)
else:
self._next_call = self._reactor.callLater(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
)
await make_deferred_yieldable(entry.defer)
now = self._reactor.seconds() * 1000.0
self._last_run = now
if self._queue:
self._next_call = self._reactor.callLater(
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next,
)
return

View File

@@ -22,7 +22,7 @@ from synapse.logging.context import (
PreserveLoggingContext, PreserveLoggingContext,
current_context, current_context,
) )
from synapse.util.async_helpers import timeout_deferred from synapse.util.async_helpers import timeout_deferred, Smoother
from tests.unittest import TestCase from tests.unittest import TestCase
@@ -105,3 +105,54 @@ class TimeoutDeferredTest(TestCase):
) )
self.failureResultOf(timing_out_d, defer.TimeoutError) self.failureResultOf(timing_out_d, defer.TimeoutError)
self.assertIs(current_context(), context_one) self.assertIs(current_context(), context_one)
class TestSmoother(TestCase):
def setUp(self):
self.clock = Clock()
self.smoother = Smoother(self.clock, 10 * 1000)
def test_first(self):
self.clock.advance(100)
d = self.smoother.smooth()
self.successResultOf(d)
def test_multiple_at_same_time(self):
self.clock.advance(100)
d1 = defer.ensureDeferred(self.smoother.smooth())
self.successResultOf(d1)
d2 = defer.ensureDeferred(self.smoother.smooth())
self.assertNoResult(d2)
self.assertAlmostEqual(
self.smoother._queue[0].scheduled_for_ms,
self.clock.seconds() * 1000 + self.smoother._target_ms / 2,
)
d3 = defer.ensureDeferred(self.smoother.smooth())
self.assertNoResult(d3)
self.assertAlmostEqual(
self.smoother._queue[0].scheduled_for_ms,
self.clock.seconds() * 1000 + self.smoother._target_ms / 3,
)
self.assertAlmostEqual(
self.smoother._queue[1].scheduled_for_ms,
self.clock.seconds() * 1000 + 2 * self.smoother._target_ms / 3,
)
self.clock.advance(4)
self.successResultOf(d2)
self.assertNoResult(d3)
self.clock.advance(0)
self.assertNoResult(d3)
self.clock.advance(4)
self.successResultOf(d3)
self.clock.advance(100)
self.assertNot(self.smoother._queue)