Compare commits
3 Commits
quenting/l
...
erikj/smoo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f0c33a540 | ||
|
|
ccdfa36131 | ||
|
|
a1b6dea0b7 |
@@ -18,6 +18,7 @@ import collections
|
|||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
from collections import deque
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
@@ -30,6 +31,7 @@ from typing import (
|
|||||||
Set,
|
Set,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
Union,
|
Union,
|
||||||
|
Deque,
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.defer import CancelledError
|
from twisted.internet.defer import CancelledError
|
||||||
from twisted.internet.interfaces import IReactorTime
|
from twisted.internet.interfaces import IReactorTime, IDelayedCall
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
|
|
||||||
from synapse.logging.context import (
|
from synapse.logging.context import (
|
||||||
@@ -552,3 +554,84 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
return DoneAwaitable(value)
|
return DoneAwaitable(value)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True)
|
||||||
|
class _SmootherEntry:
|
||||||
|
scheduled_at_ms = attr.ib(type=int)
|
||||||
|
scheduled_for_ms = attr.ib(type=int)
|
||||||
|
defer = attr.ib(type=defer.Deferred)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True)
|
||||||
|
class Smoother:
|
||||||
|
_reactor = attr.ib(type=IReactorTime)
|
||||||
|
_target_ms = attr.ib(type=int)
|
||||||
|
|
||||||
|
_queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
|
||||||
|
_last_run = attr.ib(type=int, default=0)
|
||||||
|
_next_call = attr.ib(type=Optional[IDelayedCall], default=None)
|
||||||
|
|
||||||
|
def _fire_next(self):
|
||||||
|
if not self._queue:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._next_call = None
|
||||||
|
|
||||||
|
entry = self._queue.popleft()
|
||||||
|
entry.defer.callback(None)
|
||||||
|
|
||||||
|
async def smooth(self) -> None:
|
||||||
|
now = self._reactor.seconds() * 1000.0
|
||||||
|
|
||||||
|
if not self._queue:
|
||||||
|
scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
|
||||||
|
if scheduled_for_ms <= now:
|
||||||
|
self._last_run = now
|
||||||
|
return
|
||||||
|
|
||||||
|
entry = _SmootherEntry(
|
||||||
|
scheduled_at_ms=now,
|
||||||
|
scheduled_for_ms=scheduled_for_ms,
|
||||||
|
defer=defer.Deferred(),
|
||||||
|
)
|
||||||
|
self._queue.append(entry)
|
||||||
|
|
||||||
|
else:
|
||||||
|
last_entry = self._queue[-1]
|
||||||
|
|
||||||
|
scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
|
||||||
|
|
||||||
|
entry = _SmootherEntry(
|
||||||
|
scheduled_at_ms=now,
|
||||||
|
scheduled_for_ms=scheduled_for_ms,
|
||||||
|
defer=defer.Deferred(),
|
||||||
|
)
|
||||||
|
self._queue.append(entry)
|
||||||
|
|
||||||
|
step = self._target_ms / (len(self._queue) + 1)
|
||||||
|
for idx, entry in enumerate(self._queue):
|
||||||
|
new_time = now + (idx + 1) * step
|
||||||
|
if new_time < entry.scheduled_for_ms:
|
||||||
|
entry.scheduled_for_ms = new_time
|
||||||
|
|
||||||
|
if self._next_call and not self._next_call.active:
|
||||||
|
self._next_call.reset(
|
||||||
|
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._next_call = self._reactor.callLater(
|
||||||
|
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
|
||||||
|
)
|
||||||
|
|
||||||
|
await make_deferred_yieldable(entry.defer)
|
||||||
|
now = self._reactor.seconds() * 1000.0
|
||||||
|
|
||||||
|
self._last_run = now
|
||||||
|
|
||||||
|
if self._queue:
|
||||||
|
self._next_call = self._reactor.callLater(
|
||||||
|
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next,
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ from synapse.logging.context import (
|
|||||||
PreserveLoggingContext,
|
PreserveLoggingContext,
|
||||||
current_context,
|
current_context,
|
||||||
)
|
)
|
||||||
from synapse.util.async_helpers import timeout_deferred
|
from synapse.util.async_helpers import timeout_deferred, Smoother
|
||||||
|
|
||||||
from tests.unittest import TestCase
|
from tests.unittest import TestCase
|
||||||
|
|
||||||
@@ -105,3 +105,54 @@ class TimeoutDeferredTest(TestCase):
|
|||||||
)
|
)
|
||||||
self.failureResultOf(timing_out_d, defer.TimeoutError)
|
self.failureResultOf(timing_out_d, defer.TimeoutError)
|
||||||
self.assertIs(current_context(), context_one)
|
self.assertIs(current_context(), context_one)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSmoother(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.clock = Clock()
|
||||||
|
|
||||||
|
self.smoother = Smoother(self.clock, 10 * 1000)
|
||||||
|
|
||||||
|
def test_first(self):
|
||||||
|
self.clock.advance(100)
|
||||||
|
|
||||||
|
d = self.smoother.smooth()
|
||||||
|
self.successResultOf(d)
|
||||||
|
|
||||||
|
def test_multiple_at_same_time(self):
|
||||||
|
self.clock.advance(100)
|
||||||
|
|
||||||
|
d1 = defer.ensureDeferred(self.smoother.smooth())
|
||||||
|
self.successResultOf(d1)
|
||||||
|
|
||||||
|
d2 = defer.ensureDeferred(self.smoother.smooth())
|
||||||
|
self.assertNoResult(d2)
|
||||||
|
self.assertAlmostEqual(
|
||||||
|
self.smoother._queue[0].scheduled_for_ms,
|
||||||
|
self.clock.seconds() * 1000 + self.smoother._target_ms / 2,
|
||||||
|
)
|
||||||
|
|
||||||
|
d3 = defer.ensureDeferred(self.smoother.smooth())
|
||||||
|
self.assertNoResult(d3)
|
||||||
|
self.assertAlmostEqual(
|
||||||
|
self.smoother._queue[0].scheduled_for_ms,
|
||||||
|
self.clock.seconds() * 1000 + self.smoother._target_ms / 3,
|
||||||
|
)
|
||||||
|
self.assertAlmostEqual(
|
||||||
|
self.smoother._queue[1].scheduled_for_ms,
|
||||||
|
self.clock.seconds() * 1000 + 2 * self.smoother._target_ms / 3,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.clock.advance(4)
|
||||||
|
self.successResultOf(d2)
|
||||||
|
self.assertNoResult(d3)
|
||||||
|
|
||||||
|
self.clock.advance(0)
|
||||||
|
self.assertNoResult(d3)
|
||||||
|
|
||||||
|
self.clock.advance(4)
|
||||||
|
self.successResultOf(d3)
|
||||||
|
|
||||||
|
self.clock.advance(100)
|
||||||
|
|
||||||
|
self.assertNot(self.smoother._queue)
|
||||||
|
|||||||
Reference in New Issue
Block a user