Add timeout and max queue size support
This commit is contained in:
@@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError
|
||||
from twisted.internet.interfaces import IReactorTime
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.context import (
|
||||
PreserveLoggingContext,
|
||||
make_deferred_yieldable,
|
||||
@@ -535,10 +536,19 @@ class Linearizer:
|
||||
name: Optional[str] = None,
|
||||
max_count: int = 1,
|
||||
clock: Optional[Clock] = None,
|
||||
timeout: Optional[int] = None,
|
||||
max_queue_size: Optional[int] = None,
|
||||
limit_error_code: int = 503,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
max_count: The maximum number of concurrent accesses
|
||||
timeout: The maximum time to wait for a lock to be acquired, in
|
||||
milliseconds. Optional.
|
||||
max_queue_size: The maximum number of items for a given key that
|
||||
can be queued up waiting for a lock. Optional.
|
||||
limit_error_code: The HTTP error code to return when one of the
|
||||
above limits is reached.
|
||||
"""
|
||||
if name is None:
|
||||
self.name: Union[str, int] = id(self)
|
||||
@@ -551,6 +561,9 @@ class Linearizer:
|
||||
clock = Clock(cast(IReactorTime, reactor))
|
||||
self._clock = clock
|
||||
self.max_count = max_count
|
||||
self._timeout = timeout
|
||||
self._max_queue_size = max_queue_size
|
||||
self._limit_error_code = limit_error_code
|
||||
|
||||
# key_to_defer is a map from the key to a _LinearizerEntry.
|
||||
self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {}
|
||||
@@ -594,6 +607,17 @@ class Linearizer:
|
||||
entry.count += 1
|
||||
return entry
|
||||
|
||||
# Check if the number of deferreds waiting for this key has reached the
|
||||
# maximum queue size.
|
||||
if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size:
|
||||
logger.warning(
|
||||
"Linearizer %r for key %r has reached max queue size %d",
|
||||
self.name,
|
||||
key,
|
||||
self._max_queue_size,
|
||||
)
|
||||
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||
|
||||
# Otherwise, the number of things executing is at the maximum and we have to
|
||||
# add a deferred to the list of blocked items.
|
||||
# When one of the things currently executing finishes it will callback
|
||||
@@ -604,7 +628,22 @@ class Linearizer:
|
||||
entry.deferreds[new_defer] = 1
|
||||
|
||||
try:
|
||||
await new_defer
|
||||
if self._timeout:
|
||||
await timeout_deferred(
|
||||
new_defer,
|
||||
timeout=self._timeout / 1000,
|
||||
reactor=self._clock._reactor,
|
||||
)
|
||||
else:
|
||||
await new_defer
|
||||
except defer.TimeoutError:
|
||||
logger.warning(
|
||||
"Timed out waiting for linearizer lock %r for key %r",
|
||||
self.name,
|
||||
key,
|
||||
)
|
||||
del entry.deferreds[new_defer]
|
||||
raise SynapseError(code=self._limit_error_code, msg="Limit exceeded")
|
||||
except Exception as e:
|
||||
logger.info("defer %r got err %r", new_defer, e)
|
||||
if isinstance(e, CancelledError):
|
||||
@@ -653,12 +692,14 @@ class Linearizer:
|
||||
# blocked waiting to execute and start one of them
|
||||
entry.count -= 1
|
||||
|
||||
# Find the first deferred in the list that is pending completion and
|
||||
# call it.
|
||||
if entry.deferreds:
|
||||
(next_def, _) = entry.deferreds.popitem(last=False)
|
||||
|
||||
# we need to run the next thing in the sentinel context.
|
||||
with PreserveLoggingContext():
|
||||
next_def.callback(None)
|
||||
|
||||
elif entry.count == 0:
|
||||
# We were the last thing for this key: remove it from the
|
||||
# map.
|
||||
|
||||
Reference in New Issue
Block a user