Compare commits
4 Commits
anoa/modul
...
erikj/bett
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6002debbde | ||
|
|
4247fa48b5 | ||
|
|
b946b028bd | ||
|
|
f401976fd8 |
@@ -76,7 +76,13 @@ from synapse.types import (
|
||||
create_requester,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
|
||||
from synapse.util import (
|
||||
Duration,
|
||||
json_decoder,
|
||||
json_encoder,
|
||||
log_failure,
|
||||
unwrapFirstError,
|
||||
)
|
||||
from synapse.util.async_helpers import Linearizer, gather_results
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
@@ -506,7 +512,13 @@ class EventCreationHandler:
|
||||
|
||||
# We limit concurrent event creation for a room to 1. This prevents state resolution
|
||||
# from occurring when sending bursts of events to a local room
|
||||
self.limiter = Linearizer(max_count=1, name="room_event_creation_limit")
|
||||
self.limiter = Linearizer(
|
||||
max_count=1,
|
||||
name="room_event_creation_limit",
|
||||
# We timeout queued requests after 90 seconds, as the client will
|
||||
# likely have timed out by then.
|
||||
timeout=90 * Duration.SECOND_MS,
|
||||
)
|
||||
|
||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||
|
||||
|
||||
@@ -59,7 +59,8 @@ class Duration:
|
||||
"""Helper class that holds constants for common time durations in
|
||||
milliseconds."""
|
||||
|
||||
MINUTE_MS = 60 * 1000
|
||||
SECOND_MS = 1000
|
||||
MINUTE_MS = 60 * SECOND_MS
|
||||
HOUR_MS = 60 * MINUTE_MS
|
||||
DAY_MS = 24 * HOUR_MS
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError
|
||||
from twisted.internet.interfaces import IReactorTime
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
@@ -535,10 +536,19 @@ class Linearizer:
|
||||
name: Optional[str] = None,
|
||||
max_count: int = 1,
|
||||
clock: Optional[Clock] = None,
|
||||
timeout: Optional[int] = None,
|
||||
max_queue_size: Optional[int] = None,
|
||||
limit_error_code: int = 503,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
max_count: The maximum number of concurrent accesses
|
||||
timeout: The maximum time to wait for a lock to be acquired, in
|
||||
milliseconds. Optional.
|
||||
max_queue_size: The maximum number of items for a given key that
|
||||
can be queued up waiting for a lock. Optional.
|
||||
limit_error_code: The HTTP error code to return when one of the
|
||||
above limits is reached.
|
||||
"""
|
||||
if name is None:
|
||||
self.name: Union[str, int] = id(self)
|
||||
@@ -551,6 +561,9 @@ class Linearizer:
|
||||
clock = Clock(cast(IReactorTime, reactor))
|
||||
self._clock = clock
|
||||
self.max_count = max_count
|
||||
self._timeout = timeout
|
||||
self._max_queue_size = max_queue_size
|
||||
self._limit_error_code = limit_error_code
|
||||
|
||||
# key_to_defer is a map from the key to a _LinearizerEntry.
|
||||
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
|
||||
@@ -594,6 +607,17 @@ class Linearizer:
|
||||
entry.count += 1
|
||||
return entry
|
||||
|
||||
# Check if the number of deferreds waiting for this key has reached the
|
||||
# maximum queue size.
|
||||
if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size:
|
||||
logger.warning(
|
||||
"Linearizer %r for key %r has reached max queue size %d",
|
||||
self.name,
|
||||
key,
|
||||
self._max_queue_size,
|
||||
)
|
||||
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||
|
||||
# Otherwise, the number of things executing is at the maximum and we have to
|
||||
# add a deferred to the list of blocked items.
|
||||
# When one of the things currently executing finishes it will callback
|
||||
@@ -604,7 +628,22 @@ class Linearizer:
|
||||
entry.deferreds[new_defer] = 1
|
||||
|
||||
try:
|
||||
await new_defer
|
||||
if self._timeout:
|
||||
await timeout_deferred(
|
||||
new_defer,
|
||||
timeout=self._timeout / 1000,
|
||||
reactor=self._clock._reactor,
|
||||
)
|
||||
else:
|
||||
await new_defer
|
||||
except defer.TimeoutError:
|
||||
logger.warning(
|
||||
"Timed out waiting for linearizer lock %r for key %r",
|
||||
self.name,
|
||||
key,
|
||||
)
|
||||
del entry.deferreds[new_defer]
|
||||
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||
except Exception as e:
|
||||
logger.info("defer %r got err %r", new_defer, e)
|
||||
if isinstance(e, CancelledError):
|
||||
@@ -653,12 +692,14 @@ class Linearizer:
|
||||
# blocked waiting to execute and start one of them
|
||||
entry.count -= 1
|
||||
|
||||
# Find the first deferred in the list that is pending completion and
|
||||
# call it.
|
||||
if entry.deferreds:
|
||||
(next_def, _) = entry.deferreds.popitem(last=False)
|
||||
|
||||
# we need to run the next thing in the sentinel context.
|
||||
with PreserveLoggingContext():
|
||||
next_def.callback(None)
|
||||
|
||||
elif entry.count == 0:
|
||||
# We were the last thing for this key: remove it from the
|
||||
# map.
|
||||
|
||||
@@ -21,14 +21,15 @@
|
||||
|
||||
from typing import Hashable, Protocol, Tuple
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.base import ReactorBase
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import CancelledError, Deferred
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import LoggingContext, current_context
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import get_clock
|
||||
|
||||
|
||||
class UnblockFunction(Protocol):
|
||||
@@ -36,6 +37,9 @@ class UnblockFunction(Protocol):
|
||||
|
||||
|
||||
class LinearizerTestCase(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.reactor, self.clock = get_clock()
|
||||
|
||||
def _start_task(
|
||||
self, linearizer: Linearizer, key: Hashable
|
||||
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
|
||||
@@ -67,19 +71,13 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
# The next task, if it exists, will acquire the lock and require a kick of
|
||||
# the reactor to advance.
|
||||
if pump_reactor:
|
||||
self._pump()
|
||||
self.reactor.pump([0.0])
|
||||
|
||||
return d, acquired_d, unblock
|
||||
|
||||
def _pump(self) -> None:
|
||||
"""Pump the reactor to advance `Linearizer`s."""
|
||||
assert isinstance(reactor, ReactorBase)
|
||||
while reactor.getDelayedCalls():
|
||||
reactor.runUntilCurrent()
|
||||
|
||||
def test_linearizer(self) -> None:
|
||||
"""Tests that a task is queued up behind an earlier task."""
|
||||
linearizer = Linearizer()
|
||||
linearizer = Linearizer(clock=self.clock)
|
||||
|
||||
key = object()
|
||||
|
||||
@@ -100,7 +98,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
Runs through the same scenario as `test_linearizer`.
|
||||
"""
|
||||
linearizer = Linearizer()
|
||||
linearizer = Linearizer(clock=self.clock)
|
||||
|
||||
key = object()
|
||||
|
||||
@@ -131,7 +129,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
The stack should *not* explode when the slow thing completes.
|
||||
"""
|
||||
linearizer = Linearizer()
|
||||
linearizer = Linearizer(clock=self.clock)
|
||||
key = ""
|
||||
|
||||
async def func(i: int) -> None:
|
||||
@@ -151,7 +149,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
def test_multiple_entries(self) -> None:
|
||||
"""Tests a `Linearizer` with a concurrency above 1."""
|
||||
limiter = Linearizer(max_count=3)
|
||||
limiter = Linearizer(clock=self.clock, max_count=3)
|
||||
|
||||
key = object()
|
||||
|
||||
@@ -192,7 +190,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
def test_cancellation(self) -> None:
|
||||
"""Tests cancellation while waiting for a `Linearizer`."""
|
||||
linearizer = Linearizer()
|
||||
linearizer = Linearizer(clock=self.clock)
|
||||
|
||||
key = object()
|
||||
|
||||
@@ -226,7 +224,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
def test_cancellation_during_sleep(self) -> None:
|
||||
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
|
||||
linearizer = Linearizer()
|
||||
linearizer = Linearizer(clock=self.clock)
|
||||
|
||||
key = object()
|
||||
|
||||
@@ -246,7 +244,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
unblock1(pump_reactor=False)
|
||||
self.successResultOf(d1)
|
||||
d2.cancel()
|
||||
self._pump()
|
||||
self.reactor.pump([0.0])
|
||||
|
||||
self.assertTrue(d2.called)
|
||||
self.failureResultOf(d2, CancelledError)
|
||||
@@ -258,3 +256,87 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
)
|
||||
unblock3()
|
||||
self.successResultOf(d3)
|
||||
|
||||
def test_timeout(self) -> None:
|
||||
"""Test the `Linearizer` timeout behaviour."""
|
||||
linearizer = Linearizer(
|
||||
clock=self.clock,
|
||||
timeout=10_000,
|
||||
limit_error_code=999,
|
||||
)
|
||||
key = object()
|
||||
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
|
||||
self.assertTrue(acquired_d1.called)
|
||||
|
||||
# Create a second task, waiting for the first task.
|
||||
d2, acquired_d2, _ = self._start_task(linearizer, key)
|
||||
self.assertFalse(acquired_d2.called)
|
||||
self.assertFalse(d2.called)
|
||||
|
||||
# Wait for the timeout to occur.
|
||||
self.reactor.advance(20_000)
|
||||
|
||||
# We should have received a timeout error for the second task, and *not*
|
||||
# acquired the lock.
|
||||
f = self.failureResultOf(d2, SynapseError)
|
||||
self.assertEqual(f.value.code, 999)
|
||||
self.assertFalse(acquired_d2.called)
|
||||
|
||||
# The first task should still be running.
|
||||
self.assertFalse(d1.called)
|
||||
|
||||
# Create a third task, waiting for the first task.
|
||||
d3, acquired_d3, _ = self._start_task(linearizer, key)
|
||||
self.assertFalse(acquired_d3.called)
|
||||
self.assertFalse(acquired_d2.called)
|
||||
|
||||
# Unblock the first task.
|
||||
unblock1()
|
||||
self.successResultOf(d1)
|
||||
self.assertFalse(acquired_d2.called)
|
||||
|
||||
# The third task should have started running.
|
||||
self.assertTrue(acquired_d3.called)
|
||||
|
||||
def test_max_queue_size(self) -> None:
|
||||
"""Test the `Linearizer` max queue size behaviour."""
|
||||
linearizer = Linearizer(
|
||||
clock=self.clock,
|
||||
max_queue_size=2,
|
||||
limit_error_code=999,
|
||||
)
|
||||
key = object()
|
||||
|
||||
# Start three tasks.
|
||||
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
|
||||
d2, acquired_d2, unblock2 = self._start_task(linearizer, key)
|
||||
d3, _, unblock3 = self._start_task(linearizer, key)
|
||||
d4, _, _ = self._start_task(linearizer, key)
|
||||
|
||||
self.assertTrue(acquired_d1.called)
|
||||
self.assertFalse(d1.called)
|
||||
self.assertFalse(d2.called)
|
||||
self.assertFalse(d3.called)
|
||||
|
||||
# The fourth task should have been rejected.
|
||||
self.failureResultOf(d4, SynapseError)
|
||||
|
||||
# Unblock the first task.
|
||||
unblock1()
|
||||
|
||||
# Second task should now be running.
|
||||
self.assertTrue(acquired_d2.called)
|
||||
|
||||
# Adding one more task should succeed, but further tasks should be rejected.
|
||||
d5, acquired_d5, _ = self._start_task(linearizer, key)
|
||||
self.assertFalse(d5.called)
|
||||
|
||||
d6, _, _ = self._start_task(linearizer, key)
|
||||
self.failureResultOf(d6, SynapseError)
|
||||
|
||||
# Unblock the second and third task should cause the fifth task to start running.
|
||||
unblock2()
|
||||
self.assertTrue(d2.called)
|
||||
unblock3()
|
||||
self.assertTrue(d3.called)
|
||||
self.assertTrue(acquired_d5.called)
|
||||
|
||||
Reference in New Issue
Block a user