diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index fa46041676..e08ce2f029 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -28,6 +28,12 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.opentracing import ( + get_active_span_context, + set_tag, + start_active_span_follows_from, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore from synapse.storage.data_stores import DataStores @@ -76,13 +82,16 @@ class _EventPeristenceQueue(object): """ _EventPersistQueueItem = namedtuple( - "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred") + "_EventPersistQueueItem", + ("id", "events_and_contexts", "backfilled", "deferred", "contexts"), ) def __init__(self): self._event_persist_queues = {} self._currently_persisting_rooms = set() + self._entry_seq = 1 + @trace(opname="persist_events_queued") def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. @@ -99,25 +108,40 @@ class _EventPeristenceQueue(object): defer.Deferred: a deferred which will resolve once the events are persisted. Runs its callbacks *without* a logcontext. """ + set_tag("room_id", room_id) queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: # if the last item in the queue has the same `backfilled` setting, # we can just add these new events to that item. end_item = queue[-1] if end_item.backfilled == backfilled: + set_tag("queue_entry_id", end_item.id) end_item.events_and_contexts.extend(events_and_contexts) + end_item.contexts.append(get_active_span_context()) return end_item.deferred.observe() deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) + # Generate a unique ID to track this queue entry. This is mainly used to + # tag the opentracing spans so that we can correlate between this + # current span and the span started to handle this queued item + # (annoyingly it seems hard in jaeger to see all spans that follow from + # a span). + queue_entry_id = "%s-%s" % (id(self), self._entry_seq) + self._entry_seq += 1 + queue.append( self._EventPersistQueueItem( + id=queue_entry_id, events_and_contexts=events_and_contexts, backfilled=backfilled, deferred=deferred, + contexts=[get_active_span_context()], ) ) + set_tag("queue_entry_id", queue_entry_id) + return deferred.observe() def handle_queue(self, room_id, per_item_callback): @@ -146,7 +170,12 @@ class _EventPeristenceQueue(object): queue = self._get_drainining_queue(room_id) for item in queue: try: - ret = await per_item_callback(item) + with start_active_span_follows_from( + "persist_queue", item.contexts, + ): + set_tag("room_id", room_id) + set_tag("queue_entry_id", item.id) + ret = await per_item_callback(item) except Exception: with PreserveLoggingContext(): item.deferred.errback()