Join persist events bg job with requests
This commit is contained in:
@@ -28,6 +28,12 @@ from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_context,
|
||||
set_tag,
|
||||
start_active_span_follows_from,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.data_stores import DataStores
|
||||
@@ -76,13 +82,16 @@ class _EventPeristenceQueue(object):
|
||||
"""
|
||||
|
||||
_EventPersistQueueItem = namedtuple(
|
||||
"_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
|
||||
"_EventPersistQueueItem",
|
||||
("id", "events_and_contexts", "backfilled", "deferred", "contexts"),
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
self._event_persist_queues = {}
|
||||
self._currently_persisting_rooms = set()
|
||||
self._entry_seq = 1
|
||||
|
||||
@trace(opname="persist_events_queued")
|
||||
def add_to_queue(self, room_id, events_and_contexts, backfilled):
|
||||
"""Add events to the queue, with the given persist_event options.
|
||||
|
||||
@@ -99,25 +108,40 @@ class _EventPeristenceQueue(object):
|
||||
defer.Deferred: a deferred which will resolve once the events are
|
||||
persisted. Runs its callbacks *without* a logcontext.
|
||||
"""
|
||||
set_tag("room_id", room_id)
|
||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||
if queue:
|
||||
# if the last item in the queue has the same `backfilled` setting,
|
||||
# we can just add these new events to that item.
|
||||
end_item = queue[-1]
|
||||
if end_item.backfilled == backfilled:
|
||||
set_tag("queue_entry_id", end_item.id)
|
||||
end_item.events_and_contexts.extend(events_and_contexts)
|
||||
end_item.contexts.append(get_active_span_context())
|
||||
return end_item.deferred.observe()
|
||||
|
||||
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
|
||||
# Generate a unique ID to track this queue entry. This is mainly used to
|
||||
# tag the opentracing spans so that we can correlate between this
|
||||
# current span and the span started to handle this queued item
|
||||
# (annoyingly it seems hard in jaeger to see all spans that follow from
|
||||
# a span).
|
||||
queue_entry_id = "%s-%s" % (id(self), self._entry_seq)
|
||||
self._entry_seq += 1
|
||||
|
||||
queue.append(
|
||||
self._EventPersistQueueItem(
|
||||
id=queue_entry_id,
|
||||
events_and_contexts=events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
deferred=deferred,
|
||||
contexts=[get_active_span_context()],
|
||||
)
|
||||
)
|
||||
|
||||
set_tag("queue_entry_id", queue_entry_id)
|
||||
|
||||
return deferred.observe()
|
||||
|
||||
def handle_queue(self, room_id, per_item_callback):
|
||||
@@ -146,7 +170,12 @@ class _EventPeristenceQueue(object):
|
||||
queue = self._get_drainining_queue(room_id)
|
||||
for item in queue:
|
||||
try:
|
||||
ret = await per_item_callback(item)
|
||||
with start_active_span_follows_from(
|
||||
"persist_queue", item.contexts,
|
||||
):
|
||||
set_tag("room_id", room_id)
|
||||
set_tag("queue_entry_id", item.id)
|
||||
ret = await per_item_callback(item)
|
||||
except Exception:
|
||||
with PreserveLoggingContext():
|
||||
item.deferred.errback()
|
||||
|
||||
Reference in New Issue
Block a user