Compare commits
5 Commits
v1.140.0rc
...
erikj/jaeg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fc4231ad8 | ||
|
|
b4334ff71a | ||
|
|
55b359bd28 | ||
|
|
7e6da9f5d1 | ||
|
|
b71ef075f1 |
@@ -367,15 +367,11 @@ class LoggingContext(object):
|
||||
# we track the current request
|
||||
record.request = self.request
|
||||
|
||||
# we also track the current scope:
|
||||
record.scope = self.scope
|
||||
|
||||
def copy_to_twisted_log_entry(self, record) -> None:
|
||||
"""
|
||||
Copy logging fields from this context to a Twisted log record.
|
||||
"""
|
||||
record["request"] = self.request
|
||||
record["scope"] = self.scope
|
||||
|
||||
def start(self, rusage: "Optional[resource._RUsage]") -> None:
|
||||
"""
|
||||
@@ -659,9 +655,10 @@ def nested_logging_context(
|
||||
context = parent_context # type: LoggingContextOrSentinel
|
||||
else:
|
||||
context = current_context()
|
||||
return LoggingContext(
|
||||
parent_context=context, request=str(context.request) + "-" + suffix
|
||||
)
|
||||
name = str(context.request)
|
||||
if suffix:
|
||||
name = name + "-" + suffix
|
||||
return LoggingContext(parent_context=context, request=name)
|
||||
|
||||
|
||||
def preserve_fn(f):
|
||||
|
||||
@@ -526,6 +526,19 @@ def start_active_span_from_edu(
|
||||
return scope
|
||||
|
||||
|
||||
def get_active_span_context():
|
||||
"""Gets the active span's context, if any.
|
||||
"""
|
||||
if not opentracing:
|
||||
return None
|
||||
|
||||
active_span = opentracing.tracer.active_span
|
||||
if not active_span:
|
||||
return None
|
||||
|
||||
return active_span.context
|
||||
|
||||
|
||||
# Opentracing setters for tags, logs, etc
|
||||
|
||||
|
||||
|
||||
@@ -50,7 +50,12 @@ class LogContextScopeManager(ScopeManager):
|
||||
available.
|
||||
"""
|
||||
ctx = current_context()
|
||||
return ctx.scope
|
||||
while ctx:
|
||||
if ctx.scope:
|
||||
return ctx.scope
|
||||
ctx = ctx.parent_context
|
||||
|
||||
return None
|
||||
|
||||
def activate(self, span, finish_on_close):
|
||||
"""
|
||||
|
||||
@@ -25,6 +25,10 @@ from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_context,
|
||||
start_active_span_follows_from,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import resource
|
||||
@@ -188,6 +192,8 @@ def run_as_background_process(desc, func, *args, **kwargs):
|
||||
follow the synapse logcontext rules.
|
||||
"""
|
||||
|
||||
previous_span_context = get_active_span_context()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run():
|
||||
with _bg_metrics_lock:
|
||||
@@ -197,7 +203,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
|
||||
_background_process_start_count.labels(desc).inc()
|
||||
_background_process_in_flight_count.labels(desc).inc()
|
||||
|
||||
with BackgroundProcessLoggingContext(desc) as context:
|
||||
with BackgroundProcessLoggingContext(desc, previous_span_context) as context:
|
||||
context.request = "%s-%i" % (desc, count)
|
||||
|
||||
try:
|
||||
@@ -250,12 +256,14 @@ class BackgroundProcessLoggingContext(LoggingContext):
|
||||
processes.
|
||||
"""
|
||||
|
||||
__slots__ = ["_proc"]
|
||||
__slots__ = ["_proc", "_span", "_previous_spans"]
|
||||
|
||||
def __init__(self, name: str):
|
||||
def __init__(self, name: str, previous_span_context):
|
||||
super().__init__(name)
|
||||
|
||||
self._proc = _BackgroundProcess(name, self)
|
||||
self._span = None
|
||||
self._previous_spans = [previous_span_context] if previous_span_context else []
|
||||
|
||||
def start(self, rusage: "Optional[resource._RUsage]"):
|
||||
"""Log context has started running (again).
|
||||
@@ -269,10 +277,20 @@ class BackgroundProcessLoggingContext(LoggingContext):
|
||||
with _bg_metrics_lock:
|
||||
_background_processes_active_since_last_scrape.add(self._proc)
|
||||
|
||||
def __enter__(self) -> LoggingContext:
|
||||
context = super().__enter__()
|
||||
|
||||
self._span = start_active_span_follows_from(self.name, self._previous_spans)
|
||||
self._span.__enter__()
|
||||
|
||||
return context
|
||||
|
||||
def __exit__(self, type, value, traceback) -> None:
|
||||
"""Log context has finished.
|
||||
"""
|
||||
|
||||
self._span.__exit__(type, value, traceback)
|
||||
|
||||
super().__exit__(type, value, traceback)
|
||||
|
||||
# The background process has finished. We explictly remove and manually
|
||||
|
||||
@@ -28,6 +28,12 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_context,
|
||||
set_tag,
|
||||
start_active_span_follows_from,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.data_stores import DataStores
|
||||
@@ -76,13 +82,16 @@ class _EventPeristenceQueue(object):
|
||||
"""
|
||||
|
||||
_EventPersistQueueItem = namedtuple(
|
||||
"_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
|
||||
"_EventPersistQueueItem",
|
||||
("id", "events_and_contexts", "backfilled", "deferred", "contexts"),
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
self._event_persist_queues = {}
|
||||
self._currently_persisting_rooms = set()
|
||||
self._entry_seq = 1
|
||||
|
||||
@trace(opname="persist_events_queued")
|
||||
def add_to_queue(self, room_id, events_and_contexts, backfilled):
|
||||
"""Add events to the queue, with the given persist_event options.
|
||||
|
||||
@@ -99,25 +108,40 @@ class _EventPeristenceQueue(object):
|
||||
defer.Deferred: a deferred which will resolve once the events are
|
||||
persisted. Runs its callbacks *without* a logcontext.
|
||||
"""
|
||||
set_tag("room_id", room_id)
|
||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||
if queue:
|
||||
# if the last item in the queue has the same `backfilled` setting,
|
||||
# we can just add these new events to that item.
|
||||
end_item = queue[-1]
|
||||
if end_item.backfilled == backfilled:
|
||||
set_tag("queue_entry_id", end_item.id)
|
||||
end_item.events_and_contexts.extend(events_and_contexts)
|
||||
end_item.contexts.append(get_active_span_context())
|
||||
return end_item.deferred.observe()
|
||||
|
||||
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
|
||||
# Generate a unique ID to track this queue entry. This is mainly used to
|
||||
# tag the opentracing spans so that we can correlate between this
|
||||
# current span and the span started to handle this queued item
|
||||
# (annoyingly it seems hard in jaeger to see all spans that follow from
|
||||
# a span).
|
||||
queue_entry_id = "%s-%s" % (id(self), self._entry_seq)
|
||||
self._entry_seq += 1
|
||||
|
||||
queue.append(
|
||||
self._EventPersistQueueItem(
|
||||
id=queue_entry_id,
|
||||
events_and_contexts=events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
deferred=deferred,
|
||||
contexts=[get_active_span_context()],
|
||||
)
|
||||
)
|
||||
|
||||
set_tag("queue_entry_id", queue_entry_id)
|
||||
|
||||
return deferred.observe()
|
||||
|
||||
def handle_queue(self, room_id, per_item_callback):
|
||||
@@ -146,7 +170,12 @@ class _EventPeristenceQueue(object):
|
||||
queue = self._get_drainining_queue(room_id)
|
||||
for item in queue:
|
||||
try:
|
||||
ret = await per_item_callback(item)
|
||||
with start_active_span_follows_from(
|
||||
"persist_queue", item.contexts,
|
||||
):
|
||||
set_tag("room_id", room_id)
|
||||
set_tag("queue_entry_id", item.id)
|
||||
ret = await per_item_callback(item)
|
||||
except Exception:
|
||||
with PreserveLoggingContext():
|
||||
item.deferred.errback()
|
||||
|
||||
@@ -22,6 +22,7 @@ from prometheus_client import Counter
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import LoggingContext, current_context
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.metrics import InFlightGauge
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -93,6 +94,7 @@ class Measure(object):
|
||||
"name",
|
||||
"_logging_context",
|
||||
"start",
|
||||
"_span",
|
||||
]
|
||||
|
||||
def __init__(self, clock, name):
|
||||
@@ -100,6 +102,7 @@ class Measure(object):
|
||||
self.name = name
|
||||
self._logging_context = None
|
||||
self.start = None
|
||||
self._span = None
|
||||
|
||||
def __enter__(self):
|
||||
if self._logging_context:
|
||||
@@ -111,12 +114,19 @@ class Measure(object):
|
||||
"Measure[%s]" % (self.name,), parent_context
|
||||
)
|
||||
self._logging_context.__enter__()
|
||||
|
||||
self._span = start_active_span(self.name)
|
||||
self._span.__enter__()
|
||||
|
||||
in_flight.register((self.name,), self._update_in_flight)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if not self._logging_context:
|
||||
raise RuntimeError("Measure() block exited without being entered")
|
||||
|
||||
if self._span:
|
||||
self._span.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
duration = self.clock.time() - self.start
|
||||
usage = self._logging_context.get_resource_usage()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user