diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index e596e1ed20..f01f799771 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -59,6 +59,7 @@ from twisted.internet.defer import CancelledError from twisted.internet.interfaces import IReactorTime from twisted.python.failure import Failure +from synapse.api.errors import SynapseError from synapse.logging.context import ( PreserveLoggingContext, make_deferred_yieldable, @@ -535,10 +536,19 @@ class Linearizer: name: Optional[str] = None, max_count: int = 1, clock: Optional[Clock] = None, + timeout: Optional[int] = None, + max_queue_size: Optional[int] = None, + limit_error_code: int = 503, ): """ Args: max_count: The maximum number of concurrent accesses + timeout: The maximum time to wait for a lock to be acquired, in + milliseconds. Optional. + max_queue_size: The maximum number of items for a given key that + can be queued up waiting for a lock. Optional. + limit_error_code: The HTTP error code to return when one of the + above limits is reached. """ if name is None: self.name: Union[str, int] = id(self) @@ -551,6 +561,9 @@ class Linearizer: clock = Clock(cast(IReactorTime, reactor)) self._clock = clock self.max_count = max_count + self._timeout = timeout + self._max_queue_size = max_queue_size + self._limit_error_code = limit_error_code # key_to_defer is a map from the key to a _LinearizerEntry. self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {} @@ -594,6 +607,17 @@ class Linearizer: entry.count += 1 return entry + # Check if the number of deferreds waiting for this key has reached the + # maximum queue size. + if self._max_queue_size and len(entry.deferreds) >= self._max_queue_size: + logger.warning( + "Linearizer %r for key %r has reached max queue size %d", + self.name, + key, + self._max_queue_size, + ) + raise SynapseError(code=self._limit_error_code, msg="Limit exceeded") + # Otherwise, the number of things executing is at the maximum and we have to # add a deferred to the list of blocked items. # When one of the things currently executing finishes it will callback @@ -604,7 +628,22 @@ class Linearizer: entry.deferreds[new_defer] = 1 try: - await new_defer + if self._timeout: + await timeout_deferred( + new_defer, + timeout=self._timeout / 1000, + reactor=self._clock._reactor, + ) + else: + await new_defer + except defer.TimeoutError: + logger.warning( + "Timed out waiting for linearizer lock %r for key %r", + self.name, + key, + ) + del entry.deferreds[new_defer] + raise SynapseError(code=self._limit_error_code, msg="Limit exceeded") except Exception as e: logger.info("defer %r got err %r", new_defer, e) if isinstance(e, CancelledError): @@ -653,12 +692,14 @@ class Linearizer: # blocked waiting to execute and start one of them entry.count -= 1 + # Find the first deferred in the list that is pending completion and + # call it. if entry.deferreds: (next_def, _) = entry.deferreds.popitem(last=False) - # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) + elif entry.count == 0: # We were the last thing for this key: remove it from the # map.