Compare commits
8 Commits
anoa/modul
...
erikj/cont
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dfb7a8563b | ||
|
|
7a0d090adc | ||
|
|
032c0d9970 | ||
|
|
3368c1cb0a | ||
|
|
c9a743d053 | ||
|
|
47f157b510 | ||
|
|
e75de1d7a6 | ||
|
|
3c3c0dd419 |
@@ -171,9 +171,8 @@ canonicaljson = "^2.0.0"
|
|||||||
signedjson = "^1.1.0"
|
signedjson = "^1.1.0"
|
||||||
# validating SSL certs for IP addresses requires service_identity 18.1.
|
# validating SSL certs for IP addresses requires service_identity 18.1.
|
||||||
service-identity = ">=18.1.0"
|
service-identity = ">=18.1.0"
|
||||||
# Twisted 18.9 introduces some logger improvements that the structured
|
# Twisted 21.2 introduces contextvar support
|
||||||
# logger utilises
|
Twisted = { extras = ["tls", "contextvars"], version = ">=21.2.0" }
|
||||||
Twisted = {extras = ["tls"], version = ">=18.9.0"}
|
|
||||||
treq = ">=15.1"
|
treq = ">=15.1"
|
||||||
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
|
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
|
||||||
pyOpenSSL = ">=16.0.0"
|
pyOpenSSL = ">=16.0.0"
|
||||||
|
|||||||
@@ -72,7 +72,12 @@ from synapse.api.errors import (
|
|||||||
UnrecognizedRequestError,
|
UnrecognizedRequestError,
|
||||||
)
|
)
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
from synapse.logging.context import (
|
||||||
|
defer_to_thread,
|
||||||
|
measure_coroutine,
|
||||||
|
preserve_fn,
|
||||||
|
run_in_background,
|
||||||
|
)
|
||||||
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches import intern_dict
|
from synapse.util.caches import intern_dict
|
||||||
@@ -330,7 +335,6 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
with trace_servlet(request, self._extract_context):
|
with trace_servlet(request, self._extract_context):
|
||||||
callback_return = await self._async_render(request)
|
callback_return = await self._async_render(request)
|
||||||
|
|
||||||
if callback_return is not None:
|
if callback_return is not None:
|
||||||
code, response = callback_return
|
code, response = callback_return
|
||||||
self._send_response(request, code, response)
|
self._send_response(request, code, response)
|
||||||
@@ -361,7 +365,9 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
# Is it synchronous? We'll allow this for now.
|
# Is it synchronous? We'll allow this for now.
|
||||||
if isawaitable(raw_callback_return):
|
if isawaitable(raw_callback_return):
|
||||||
callback_return = await raw_callback_return
|
callback_return = await measure_coroutine(
|
||||||
|
request.request_metrics.name, raw_callback_return
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
callback_return = raw_callback_return
|
callback_return = raw_callback_return
|
||||||
|
|
||||||
@@ -541,7 +547,9 @@ class JsonResource(DirectServeJsonResource):
|
|||||||
|
|
||||||
# Is it synchronous? We'll allow this for now.
|
# Is it synchronous? We'll allow this for now.
|
||||||
if isinstance(raw_callback_return, (defer.Deferred, types.CoroutineType)):
|
if isinstance(raw_callback_return, (defer.Deferred, types.CoroutineType)):
|
||||||
callback_return = await raw_callback_return
|
callback_return = await measure_coroutine(
|
||||||
|
request.request_metrics.name, raw_callback_return
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
callback_return = raw_callback_return
|
callback_return = raw_callback_return
|
||||||
|
|
||||||
|
|||||||
@@ -33,9 +33,11 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import typing
|
import typing
|
||||||
import warnings
|
import warnings
|
||||||
|
from collections.abc import Coroutine, Generator
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
Any,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
Callable,
|
Callable,
|
||||||
Optional,
|
Optional,
|
||||||
@@ -47,6 +49,7 @@ from typing import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
from prometheus_client import Counter
|
||||||
from typing_extensions import Literal, ParamSpec
|
from typing_extensions import Literal, ParamSpec
|
||||||
|
|
||||||
from twisted.internet import defer, threads
|
from twisted.internet import defer, threads
|
||||||
@@ -58,6 +61,10 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
context_ru_utime = Counter(
|
||||||
|
"synapse_logging_context_ru_utime", "utime in log context", ("name",)
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import resource
|
import resource
|
||||||
|
|
||||||
@@ -227,7 +234,14 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
|
|||||||
class _Sentinel:
|
class _Sentinel:
|
||||||
"""Sentinel to represent the root context"""
|
"""Sentinel to represent the root context"""
|
||||||
|
|
||||||
__slots__ = ["previous_context", "finished", "request", "scope", "tag"]
|
__slots__ = [
|
||||||
|
"previous_context",
|
||||||
|
"finished",
|
||||||
|
"request",
|
||||||
|
"scope",
|
||||||
|
"tag",
|
||||||
|
"metrics_name",
|
||||||
|
]
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
# Minimal set for compatibility with LoggingContext
|
# Minimal set for compatibility with LoggingContext
|
||||||
@@ -236,6 +250,7 @@ class _Sentinel:
|
|||||||
self.request = None
|
self.request = None
|
||||||
self.scope = None
|
self.scope = None
|
||||||
self.tag = None
|
self.tag = None
|
||||||
|
self.metrics_name = None
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return "sentinel"
|
return "sentinel"
|
||||||
@@ -288,6 +303,7 @@ class LoggingContext:
|
|||||||
"request",
|
"request",
|
||||||
"tag",
|
"tag",
|
||||||
"scope",
|
"scope",
|
||||||
|
"metrics_name",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -298,6 +314,8 @@ class LoggingContext:
|
|||||||
) -> None:
|
) -> None:
|
||||||
self.previous_context = current_context()
|
self.previous_context = current_context()
|
||||||
|
|
||||||
|
self.metrics_name: Optional[str] = None
|
||||||
|
|
||||||
# track the resources used by this context so far
|
# track the resources used by this context so far
|
||||||
self._resource_usage = ContextResourceUsage()
|
self._resource_usage = ContextResourceUsage()
|
||||||
|
|
||||||
@@ -331,6 +349,7 @@ class LoggingContext:
|
|||||||
# if we don't have a `name`, but do have a parent context, use its name.
|
# if we don't have a `name`, but do have a parent context, use its name.
|
||||||
if self.parent_context and name is None:
|
if self.parent_context and name is None:
|
||||||
name = str(self.parent_context)
|
name = str(self.parent_context)
|
||||||
|
self.metrics_name = self.parent_context.metrics_name
|
||||||
if name is None:
|
if name is None:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"LoggingContext must be given either a name or a parent context"
|
"LoggingContext must be given either a name or a parent context"
|
||||||
@@ -813,13 +832,15 @@ def run_in_background(
|
|||||||
d: "defer.Deferred[R]"
|
d: "defer.Deferred[R]"
|
||||||
if isinstance(res, typing.Coroutine):
|
if isinstance(res, typing.Coroutine):
|
||||||
# Wrap the coroutine in a `Deferred`.
|
# Wrap the coroutine in a `Deferred`.
|
||||||
d = defer.ensureDeferred(res)
|
d = defer.ensureDeferred(measure_coroutine(current.metrics_name, res))
|
||||||
elif isinstance(res, defer.Deferred):
|
elif isinstance(res, defer.Deferred):
|
||||||
d = res
|
d = res
|
||||||
elif isinstance(res, Awaitable):
|
elif isinstance(res, Awaitable):
|
||||||
# `res` is probably some kind of completed awaitable, such as a `DoneAwaitable`
|
# `res` is probably some kind of completed awaitable, such as a `DoneAwaitable`
|
||||||
# or `Future` from `make_awaitable`.
|
# or `Future` from `make_awaitable`.
|
||||||
d = defer.ensureDeferred(_unwrap_awaitable(res))
|
d = defer.ensureDeferred(
|
||||||
|
measure_coroutine(current.metrics_name, _unwrap_awaitable(res))
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# `res` is a plain value. Wrap it in a `Deferred`.
|
# `res` is a plain value. Wrap it in a `Deferred`.
|
||||||
d = defer.succeed(res)
|
d = defer.succeed(res)
|
||||||
@@ -971,3 +992,98 @@ def defer_to_threadpool(
|
|||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||||
|
|
||||||
|
|
||||||
|
_T = TypeVar("_T")
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
|
class _ResourceTracker(Generator[defer.Deferred[Any], Any, _T]):
|
||||||
|
name: str
|
||||||
|
gen: Generator[defer.Deferred[Any], Any, _T]
|
||||||
|
|
||||||
|
def send(self, val: Any) -> defer.Deferred[_T]:
|
||||||
|
rusage_start = get_thread_resource_usage()
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self.gen.send(val)
|
||||||
|
finally:
|
||||||
|
rusage_end = get_thread_resource_usage()
|
||||||
|
if rusage_start and rusage_end:
|
||||||
|
context_ru_utime.labels(self.name).inc(
|
||||||
|
max(0, rusage_end.ru_utime - rusage_start.ru_utime)
|
||||||
|
)
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def throw(
|
||||||
|
self,
|
||||||
|
a: Type[BaseException],
|
||||||
|
b: object = ...,
|
||||||
|
c: Optional[TracebackType] = ...,
|
||||||
|
/,
|
||||||
|
) -> defer.Deferred[Any]: ...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def throw(
|
||||||
|
self, a: BaseException, v: None = ..., c: Optional[TracebackType] = ..., /
|
||||||
|
) -> defer.Deferred[Any]: ...
|
||||||
|
|
||||||
|
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
||||||
|
try:
|
||||||
|
return self.gen.throw(a, b, c)
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
|
class _ResourceTracker2(Coroutine[defer.Deferred[Any], Any, _T]):
|
||||||
|
name: str
|
||||||
|
gen: Coroutine[defer.Deferred[Any], Any, _T]
|
||||||
|
|
||||||
|
def send(self, val: Any) -> defer.Deferred[_T]:
|
||||||
|
rusage_start = get_thread_resource_usage()
|
||||||
|
|
||||||
|
try:
|
||||||
|
return self.gen.send(val)
|
||||||
|
finally:
|
||||||
|
rusage_end = get_thread_resource_usage()
|
||||||
|
if rusage_start and rusage_end:
|
||||||
|
context_ru_utime.labels(self.name).inc(
|
||||||
|
max(0, rusage_end.ru_utime - rusage_start.ru_utime)
|
||||||
|
)
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def throw(
|
||||||
|
self,
|
||||||
|
a: Type[BaseException],
|
||||||
|
b: object = ...,
|
||||||
|
c: Optional[TracebackType] = ...,
|
||||||
|
/,
|
||||||
|
) -> defer.Deferred[Any]: ...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def throw(
|
||||||
|
self, a: BaseException, v: None = ..., c: Optional[TracebackType] = ..., /
|
||||||
|
) -> defer.Deferred[Any]: ...
|
||||||
|
|
||||||
|
def throw(self, a: Any, b: Any = None, c: Any = None) -> defer.Deferred[Any]:
|
||||||
|
try:
|
||||||
|
return self.gen.throw(a, b, c)
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __await__(self) -> Generator[defer.Deferred[Any], Any, _T]:
|
||||||
|
return _ResourceTracker(self.name, self.gen.__await__())
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
return self.gen.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def measure_coroutine(
|
||||||
|
name: Optional[str], co: Coroutine[defer.Deferred[Any], Any, _T]
|
||||||
|
) -> _T:
|
||||||
|
if not name:
|
||||||
|
return await co
|
||||||
|
|
||||||
|
current_context().metrics_name = name
|
||||||
|
return await _ResourceTracker2(name, co)
|
||||||
|
|||||||
Reference in New Issue
Block a user