Fix some lints (mistakes) and better trace when fetching events
This commit is contained in:
@@ -61,8 +61,8 @@ from synapse.federation.federation_client import InvalidResponseError
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.tracing import (
|
||||
SynapseTags,
|
||||
start_active_span,
|
||||
set_attribute,
|
||||
start_active_span,
|
||||
tag_args,
|
||||
trace,
|
||||
)
|
||||
@@ -722,7 +722,7 @@ class FederationEventHandler:
|
||||
|
||||
@trace
|
||||
async def _process_pulled_events(
|
||||
self, origin: str, events: Iterable[EventBase], backfilled: bool
|
||||
self, origin: str, events: List[EventBase], backfilled: bool
|
||||
) -> None:
|
||||
"""Process a batch of events we have pulled from a remote server
|
||||
|
||||
@@ -1662,7 +1662,7 @@ class FederationEventHandler:
|
||||
origin, event
|
||||
)
|
||||
set_attribute(
|
||||
"claimed_auth_events", [ev.event_id for ev in claimed_auth_events]
|
||||
"claimed_auth_events", str([ev.event_id for ev in claimed_auth_events])
|
||||
)
|
||||
|
||||
# ... and check that the event passes auth at those auth events.
|
||||
|
||||
@@ -390,15 +390,18 @@ class EventsPersistenceStorageController:
|
||||
PartialStateConflictError: if attempting to persist a partial state event in
|
||||
a room that has been un-partial stated.
|
||||
"""
|
||||
set_attribute(
|
||||
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_and_contexts)})",
|
||||
str([e.event_id for e, _ in events_and_contexts]),
|
||||
)
|
||||
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
|
||||
|
||||
event_ids: List[str] = []
|
||||
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
|
||||
for event, ctx in events_and_contexts:
|
||||
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
||||
event_ids.append(event.event_id)
|
||||
|
||||
set_attribute(
|
||||
SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})",
|
||||
str(event_ids),
|
||||
)
|
||||
set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
|
||||
|
||||
async def enqueue(
|
||||
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
|
||||
|
||||
@@ -54,7 +54,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.logging.tracing import tag_args, trace
|
||||
from synapse.logging.tracing import start_active_span, tag_args, trace
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
@@ -395,8 +395,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
return event
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def get_events(
|
||||
self,
|
||||
event_ids: Collection[str],
|
||||
@@ -433,6 +431,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
return {e.event_id: e for e in events}
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def get_events_as_list(
|
||||
self,
|
||||
event_ids: Collection[str],
|
||||
@@ -1034,6 +1034,11 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
fetched_events: Dict[str, _EventRow] = {}
|
||||
events_to_fetch = event_ids
|
||||
|
||||
is_recording_redaction_trace = False
|
||||
fetching_redactions_tracing_span_cm = start_active_span(
|
||||
"recursively fetching redactions"
|
||||
)
|
||||
|
||||
while events_to_fetch:
|
||||
row_map = await self._enqueue_events(events_to_fetch)
|
||||
|
||||
@@ -1049,6 +1054,14 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
events_to_fetch = redaction_ids.difference(fetched_event_ids)
|
||||
if events_to_fetch:
|
||||
logger.debug("Also fetching redaction events %s", events_to_fetch)
|
||||
# Start tracing how long it takes for us to get all of the redactions
|
||||
if not is_recording_redaction_trace:
|
||||
fetching_redactions_tracing_span_cm.__enter__()
|
||||
is_recording_redaction_trace = True
|
||||
|
||||
# Only stop recording if we were recording in the first place
|
||||
if is_recording_redaction_trace:
|
||||
fetching_redactions_tracing_span_cm.__exit__(None, None, None)
|
||||
|
||||
# build a map from event_id to EventBase
|
||||
event_map: Dict[str, EventBase] = {}
|
||||
|
||||
Reference in New Issue
Block a user