diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 9bdb0f043c..2633bd9264 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -30,7 +30,7 @@ from synapse.api.errors import ( from synapse.appservice import ApplicationService from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest -from synapse.logging.tracing import active_span, force_tracing, start_active_span +from synapse.logging.tracing import get_active_span, force_tracing, start_active_span from synapse.storage.databases.main.registration import TokenLookupResult from synapse.types import Requester, UserID, create_requester @@ -132,7 +132,7 @@ class Auth: is invalid. AuthError if access is denied for the user in the access token """ - parent_span = active_span() + parent_span = get_active_span() with start_active_span("get_user_by_req"): requester = await self._wrapped_get_user_by_req( request, allow_guest, allow_expired diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index c7ee06a062..b35fefc7dc 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -26,7 +26,7 @@ from synapse.http.servlet import parse_json_object_from_request from synapse.http.site import SynapseRequest from synapse.logging.context import run_in_background from synapse.logging.tracing import ( - active_span, + get_active_span, set_attribute, span_context_from_request, start_active_span, @@ -318,7 +318,7 @@ class BaseFederationServlet: context = span_context_from_request(request) if context: - servlet_span = active_span() + servlet_span = get_active_span() # a scope which uses the origin's context as a parent processing_start_time = time.time() scope = start_active_span_follows_from( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3a40bc1694..ad71feb774 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1568,7 +1568,7 @@ class SyncHandler: sync_result_builder, room_entry, ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - attributes=tags_by_room.get(room_entry.room_id), + tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), always_include=sync_result_builder.full_state, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 5b5f3e9f40..3b5ce72c28 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -533,7 +533,9 @@ class MatrixFederationHttpClient: # Inject the span into the headers headers_dict: Dict[bytes, List[bytes]] = {} - tracing.inject_header_dict(headers_dict, request.destination) + tracing.inject_active_span_context_into_header_dict( + headers_dict, request.destination + ) headers_dict[b"User-Agent"] = [self.version_string_bytes] diff --git a/synapse/http/server.py b/synapse/http/server.py index 1e89050650..c12028c921 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -60,7 +60,7 @@ from synapse.api.errors import ( ) from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background -from synapse.logging.tracing import active_span, start_active_span, trace_servlet +from synapse.logging.tracing import get_active_span, start_active_span, trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict from synapse.util.iterutils import chunk_seq @@ -880,7 +880,7 @@ async def _async_write_json_to_request_in_thread( return res with start_active_span("encode_json_response"): - span = active_span() + span = get_active_span() json_str = await defer_to_thread(request.reactor, encode, span) _write_bytes_to_request(request, json_str) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index d9e2cde8aa..6c5c231c08 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -204,7 +204,7 @@ if TYPE_CHECKING: # Helper class -# Always returns the value given for any accessed property +# Always returns the fixed value given for any accessed property class _DummyLookup(object): def __init__(self, value): self.value = value @@ -223,6 +223,7 @@ try: import opentelemetry.sdk.trace.export import opentelemetry.semconv.trace import opentelemetry.trace + import opentelemetry.propagate SpanKind = opentelemetry.trace.SpanKind SpanAttributes = opentelemetry.semconv.trace.SpanAttributes @@ -237,7 +238,7 @@ except ImportError: logger = logging.getLogger(__name__) - +# FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes` class SynapseTags: # The message ID of any to_device message processed TO_DEVICE_MESSAGE_ID = "to_device.message_id" @@ -374,8 +375,6 @@ def init_tracer(hs: "HomeServer") -> None: # Pull out of the config if it was given. Otherwise set it to something sensible. set_homeserver_whitelist(hs.config.tracing.opentelemetry_whitelist) - # TODO: opentelemetry_whitelist - resource = opentelemetry.sdk.resources.Resource( attributes={ opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {hs.get_instance_name()}" @@ -514,7 +513,7 @@ def start_active_span_from_edu( # OpenTelemetry setters for attributes, logs, etc @only_if_tracing -def active_span() -> Optional["opentelemetry.trace.span.Span"]: +def get_active_span() -> Optional["opentelemetry.trace.span.Span"]: """Get the currently active span, if any""" return opentelemetry.trace.get_current_span() @@ -522,26 +521,29 @@ def active_span() -> Optional["opentelemetry.trace.span.Span"]: @ensure_active_span("set a tag") def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: """Sets a tag on the active span""" - span = active_span() - assert span is not None - span.set_attribute(key, value) + active_span = get_active_span() + assert active_span is not None + active_span.set_attribute(key, value) @ensure_active_span("set the status") def set_status(key: str, status: "opentelemetry.trace.StatusCode") -> None: """Sets a tag on the active span""" - span = active_span() + span = get_active_span() assert span is not None span.set_status(status) +DEFAULT_LOG_NAME = "log" + + @ensure_active_span("log") def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None: """Log to the active span""" - span = active_span() - assert span is not None - event_name = key_values.get("event", "log") - span.add_event(event_name, attributes=key_values, timestamp=timestamp) + active_span = get_active_span() + assert active_span is not None + event_name = key_values.get("event", DEFAULT_LOG_NAME) + active_span.add_event(event_name, attributes=key_values, timestamp=timestamp) @only_if_tracing @@ -571,7 +573,7 @@ def is_context_forced_tracing( @ensure_active_span("inject the span into a header dict") -def inject_header_dict( +def inject_active_span_context_into_header_dict( headers: Dict[bytes, List[bytes]], destination: Optional[str] = None, check_destination: bool = True, @@ -582,10 +584,10 @@ def inject_header_dict( Args: headers: the dict to inject headers into destination: address of entity receiving the span context. Must be given unless - check_destination is False. The context will only be injected if the - destination matches the opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context - will always be injected. + `check_destination` is False. + check_destination (bool): If False, destination will be ignored and the context + will always be injected. If True, the context will only be injected if the + destination matches the tracing allowlist Note: The headers set by the tracer are custom to the tracer implementation which @@ -594,19 +596,31 @@ def inject_header_dict( here: https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py """ - # TODO - pass + if check_destination: + if destination is None: + raise ValueError( + "destination must be given unless check_destination is False" + ) + if not whitelisted_homeserver(destination): + return + + active_span = get_active_span() + active_span_context = active_span.get_span_context() + + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of SpanContext properties into the headers dict + propagator.inject(headers, context=active_span_context) def inject_response_headers(response_headers: Headers) -> None: """Inject the current trace id into the HTTP response headers""" if not opentelemetry: return - current_span = opentelemetry.trace.get_current_span() - if not current_span: + active_span = get_active_span() + if not active_span: return - trace_id = current_span.get_span_context().trace_id + trace_id = active_span.get_span_context().trace_id if trace_id is not None: response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") @@ -626,9 +640,15 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str Returns: dict: the active span's context if opentracing is enabled, otherwise empty. """ - # TODO - carrier: Dict[str, str] = {} - return carrier + active_span = get_active_span() + active_span_context = active_span.get_span_context() + + carrier_text_map: Dict[str, str] = {} + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of SpanContext properties onto the carrier text map that we can return + propagator.inject(carrier_text_map, context=active_span_context) + + return carrier_text_map def span_context_from_request( @@ -639,8 +659,15 @@ def span_context_from_request( This is useful when we have received an HTTP request from another part of our system, and want to link our spans to those of the remote system. """ - # TODO - return None + if not opentelemetry: + return None + header_dict = { + k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() + } + + # Extract all of the relevant values from the headers to construct a + # SpanContext to return. + return extract_text_map(header_dict) @only_if_tracing @@ -655,8 +682,10 @@ def extract_text_map( Returns: The active span context extracted from carrier. """ - # TODO - return None + propagator = opentelemetry.propagate.get_global_textmap() + # Extract all of the relevant values from the `carrier` to construct a + # SpanContext to return. + return propagator.extract(carrier) # Tracing decorators diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 76035cd618..482e92d0af 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -248,7 +248,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): # Add an authorization header, if configured. if replication_secret: headers[b"Authorization"] = [b"Bearer " + replication_secret] - tracing.inject_header_dict(headers, check_destination=False) + tracing.inject_active_span_context_into_header_dict( + headers, check_destination=False + ) try: # Keep track of attempts made so we can bail if we don't manage to diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 88abfd78ba..18bc61fc46 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -223,7 +223,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue.append(end_item) # also add our active opentracing span to the item so that we get a link back - span = tracing.active_span() + span = tracing.get_active_span() if span: end_item.parent_opentracing_span_contexts.append(span.context) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index cc5e1c312a..1d1a487c71 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -30,7 +30,7 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.tracing import ( - active_span, + get_active_span, start_active_span, start_active_span_follows_from, ) @@ -240,7 +240,7 @@ class ResponseCache(Generic[KV]): # NB it is important that we do not `await` before setting span_context! nonlocal span_context with start_active_span(f"ResponseCache[{self._name}].calculate"): - span = active_span() + span = get_active_span() if span: span_context = span.context return await callback(*args, **kwargs)