From 6984cefa79602fbbd472cd39f463e341b09023d9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Jul 2022 00:55:43 -0500 Subject: [PATCH] Progress towards OTEL --- poetry.lock | 16 +- pyproject.toml | 1 - .../federation/sender/transaction_manager.py | 6 +- synapse/handlers/sync.py | 2 +- synapse/http/client.py | 21 +- synapse/http/matrixfederationclient.py | 25 +- synapse/http/server.py | 12 +- synapse/http/site.py | 8 +- synapse/logging/scopecontextmanager.py | 14 +- synapse/logging/tracing.py | 378 +++++------------- synapse/metrics/background_process_metrics.py | 3 +- synapse/push/httppusher.py | 6 +- synapse/replication/http/_base.py | 4 +- synapse/replication/tcp/external_cache.py | 10 +- synapse/storage/controllers/persist_events.py | 12 +- synapse/storage/database.py | 34 +- synapse/util/caches/response_cache.py | 8 +- 17 files changed, 198 insertions(+), 362 deletions(-) diff --git a/poetry.lock b/poetry.lock index 3310510b4c..e268fdb5c5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -636,17 +636,6 @@ category = "main" optional = true python-versions = ">=3.6" -[[package]] -name = "opentracing" -version = "2.4.0" -description = "OpenTracing API for Python. See documentation at http://opentracing.io" -category = "main" -optional = true -python-versions = "*" - -[package.extras] -tests = ["doubles", "flake8", "flake8-quotes", "mock", "pytest", "pytest-cov", "pytest-mock", "sphinx", "sphinx-rtd-theme", "six (>=1.10.0,<2.0)", "gevent", "tornado"] - [[package]] name = "packaging" version = "21.3" @@ -1535,7 +1524,7 @@ url_preview = ["lxml"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "14602f17c83b68a9dde71aee3b37d9c902153fed752a9a62d1d84b5ca8a6cd14" +content-hash = "c41657e7ab748ab5cf7cc149e78cde43e35588d2d1d47a94f161085c0c5d4ba5" [metadata.files] attrs = [ @@ -2116,9 +2105,6 @@ opentelemetry-semantic-conventions = [ {file = "opentelemetry-semantic-conventions-0.30b1.tar.gz", hash = "sha256:2fac7c7202602566b87b2ee3c90fbc272be6094725479f8102f083bf425cc253"}, {file = "opentelemetry_semantic_conventions-0.30b1-py3-none-any.whl", hash = "sha256:5213268cd0a7a8fb94c054e4c1bac8c17586f732eca91769463320f3dcd910bb"}, ] -opentracing = [ - {file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"}, -] packaging = [ {file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"}, {file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"}, diff --git a/pyproject.toml b/pyproject.toml index 4f66a3a969..a6df4a26a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -182,7 +182,6 @@ parameterized = { version = ">=0.7.4", optional = true } idna = { version = ">=2.5", optional = true } opentelemetry-api = {version = "^1.11.1", optional = true} opentelemetry-sdk = {version = "^1.11.1", optional = true} -opentracing = {version = "^2.4.0", optional = true} [tool.poetry.extras] # NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 7117689108..e8c491758f 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -23,9 +23,9 @@ from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.logging.tracing import ( extract_text_map, - set_attribute, + set_status, start_active_span_follows_from, - tags, + StatusCode, whitelisted_homeserver, ) from synapse.types import JsonDict @@ -166,7 +166,7 @@ class TransactionManager: except HttpResponseException as e: code = e.code - set_attribute(tags.ERROR, True) + set_status(StatusCode.ERROR) logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) raise diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ad71feb774..3a40bc1694 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1568,7 +1568,7 @@ class SyncHandler: sync_result_builder, room_entry, ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), + attributes=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), always_include=sync_result_builder.full_state, ) diff --git a/synapse/http/client.py b/synapse/http/client.py index fb21d2f39b..b26a7520bf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -75,7 +75,13 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable -from synapse.logging.tracing import set_attribute, start_active_span, tags +from synapse.logging.tracing import ( + set_status, + start_active_span, + SpanKind, + SpanAttributes, + StatusCode, +) from synapse.types import ISynapseReactor from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred @@ -402,12 +408,12 @@ class SimpleHttpClient: with start_active_span( "outgoing-client-request", - tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.HTTP_METHOD: method, - tags.HTTP_URL: uri, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.HTTP_METHOD: method, + SpanAttributes.HTTP_URL: uri, }, - finish_on_close=True, + end_on_exit=True, ): try: body_producer = None @@ -459,8 +465,7 @@ class SimpleHttpClient: type(e).__name__, e.args[0], ) - set_attribute(tags.ERROR, True) - set_attribute("error_reason", e.args[0]) + set_status(StatusCode.ERROR, e.args[0]) raise async def post_urlencoded_get_json( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6502f4a632..72e812ff66 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,9 +72,14 @@ from synapse.http.client import ( ) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.http.types import QueryParams -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.tracing import set_attribute, start_active_span, tags +from synapse.logging.tracing import ( + set_attribute, + start_active_span, + SpanKind, + SpanAttributes, +) from synapse.types import JsonDict from synapse.util import json_decoder from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred @@ -517,18 +522,18 @@ class MatrixFederationHttpClient: scope = start_active_span( "outgoing-federation-request", - tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.PEER_ADDRESS: request.destination, - tags.HTTP_METHOD: request.method, - tags.HTTP_URL: request.path, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.PEER_ADDRESS: request.destination, + SpanAttributes.HTTP_METHOD: request.method, + SpanAttributes.HTTP_URL: request.path, }, - finish_on_close=True, + end_on_exit=True, ) # Inject the span into the headers headers_dict: Dict[bytes, List[bytes]] = {} - opentelemetry.inject_header_dict(headers_dict, request.destination) + tracing.inject_header_dict(headers_dict, request.destination) headers_dict[b"User-Agent"] = [self.version_string_bytes] @@ -614,7 +619,7 @@ class MatrixFederationHttpClient: request.method, response.code ).inc() - set_attribute(tags.HTTP_STATUS_CODE, response.code) + set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.code) response_phrase = response.phrase.decode("ascii", errors="replace") if 200 <= response.code < 300: diff --git a/synapse/http/server.py b/synapse/http/server.py index 7dc7a78ad0..1e89050650 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -66,7 +66,7 @@ from synapse.util.caches import intern_dict from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: - import opentracing + import opentelemetry from synapse.server import HomeServer @@ -868,15 +868,15 @@ async def _async_write_json_to_request_in_thread( expensive. """ - def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes: + def encode(tracing_span: Optional["opentelemetry.trace.span.Span"]) -> bytes: # it might take a while for the threadpool to schedule us, so we write # opentracing logs once we actually get scheduled, so that we can see how # much that contributed. - if opentracing_span: - opentracing_span.log_kv({"event": "scheduled"}) + if tracing_span: + tracing_span.add_event("scheduled", attributes={"event": "scheduled"}) res = json_encoder(json_object) - if opentracing_span: - opentracing_span.log_kv({"event": "encoded"}) + if tracing_span: + tracing_span.add_event("scheduled", attributes={"event": "encoded"}) return res with start_active_span("encode_json_response"): diff --git a/synapse/http/site.py b/synapse/http/site.py index c238b39d82..805b90eb65 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -37,7 +37,7 @@ from synapse.logging.context import ( from synapse.types import Requester if TYPE_CHECKING: - import opentracing + import opentelemetry logger = logging.getLogger(__name__) @@ -87,7 +87,7 @@ class SynapseRequest(Request): # An opentracing span for this request. Will be closed when the request is # completely processed. - self._opentracing_span: "Optional[opentracing.Span]" = None + self._opentracing_span: Optional["opentelemetry.trace.span.Span"] = None # we can't yet create the logcontext, as we don't know the method. self.logcontext: Optional[LoggingContext] = None @@ -164,9 +164,7 @@ class SynapseRequest(Request): # If there's no authenticated entity, it was the requester. self.logcontext.request.authenticated_entity = authenticated_entity or requester - def set_opentracing_span( - self, span: opentelemetry.shim.opentracing_shim.SpanShim - ) -> None: + def set_opentracing_span(self, span: "opentelemetry.trace.span.Span") -> None: """attach an opentracing span to this request Doing so will cause the span to be closed when we finish processing the request diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py index 10877bdfc5..33c4204cf9 100644 --- a/synapse/logging/scopecontextmanager.py +++ b/synapse/logging/scopecontextmanager.py @@ -57,12 +57,12 @@ class LogContextScopeManager(ScopeManager): ctx = current_context() return ctx.scope - def activate(self, span: Span, finish_on_close: bool) -> Scope: + def activate(self, span: Span, end_on_exit: bool) -> Scope: """ Makes a Span active. Args span: the span that should become active. - finish_on_close: whether Span should be automatically finished when + end_on_exit: whether Span should be automatically finished when Scope.close() is called. Returns: @@ -93,7 +93,7 @@ class LogContextScopeManager(ScopeManager): # "Re-starting finished log context" errors). enter_logcontext = False - scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) + scope = _LogContextScope(self, span, ctx, enter_logcontext, end_on_exit) ctx.scope = scope if enter_logcontext: ctx.__enter__() @@ -118,7 +118,7 @@ class _LogContextScope(Scope): span: Span, logcontext: LoggingContext, enter_logcontext: bool, - finish_on_close: bool, + end_on_exit: bool, ): """ Args: @@ -131,12 +131,12 @@ class _LogContextScope(Scope): the log context to which this scope is attached. enter_logcontext: if True the log context will be exited when the scope is finished - finish_on_close: + end_on_exit: if True finish the span when the scope is closed """ super().__init__(manager, span) self.logcontext = logcontext - self._finish_on_close = finish_on_close + self._end_on_exit = end_on_exit self._enter_logcontext = enter_logcontext def __exit__( @@ -162,7 +162,7 @@ class _LogContextScope(Scope): active_scope, ) - if self._finish_on_close: + if self._end_on_exit: self.span.finish() self.logcontext.scope = None diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 78ae1119f9..e437b2b627 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -176,6 +176,7 @@ from typing import ( Dict, Generator, Iterable, + Iterator, List, Optional, Pattern, @@ -203,51 +204,30 @@ if TYPE_CHECKING: # Helper class +# Always returns the value given for any accessed property +class _DummyLookup(object): + def __init__(self, value): + self.value = value -class _DummyTagNames: - """wrapper of opentracings tags. We need to have them if we - want to reference them without opentracing around. Clearly they - should never actually show up in a trace. `set_attributes` overwrites - these with the correct ones.""" - - INVALID_TAG = "invalid-tag" - COMPONENT = INVALID_TAG - DATABASE_INSTANCE = INVALID_TAG - DATABASE_STATEMENT = INVALID_TAG - DATABASE_TYPE = INVALID_TAG - DATABASE_USER = INVALID_TAG - ERROR = INVALID_TAG - HTTP_METHOD = INVALID_TAG - HTTP_STATUS_CODE = INVALID_TAG - HTTP_URL = INVALID_TAG - MESSAGE_BUS_DESTINATION = INVALID_TAG - PEER_ADDRESS = INVALID_TAG - PEER_HOSTNAME = INVALID_TAG - PEER_HOST_IPV4 = INVALID_TAG - PEER_HOST_IPV6 = INVALID_TAG - PEER_PORT = INVALID_TAG - PEER_SERVICE = INVALID_TAG - SAMPLING_PRIORITY = INVALID_TAG - SERVICE = INVALID_TAG - SPAN_KIND = INVALID_TAG - SPAN_KIND_CONSUMER = INVALID_TAG - SPAN_KIND_PRODUCER = INVALID_TAG - SPAN_KIND_RPC_CLIENT = INVALID_TAG - SPAN_KIND_RPC_SERVER = INVALID_TAG + def __getattribute__(self, name): + return self.value # These dependencies are optional so they can fail to import # and we try: - import opentelemetry - import opentracing + import opentelemetry.trace + import opentelemetry.semconv.trace + + SpanKind = opentelemetry.trace.SpanKind + SpanAttributes = opentelemetry.semconv.trace.SpanAttributes + StatusCode = opentelemetry.trace.StatusCode - # TODO: tags? except ImportError: opentelemetry = None # type: ignore[assignment] - opentracing = None # type: ignore[assignment] - tags = _DummyTagNames # type: ignore[assignment] - + SpanKind = _DummyLookup(0) + SpanAttributes = _DummyLookup("fake-attribute") + StatusCode = _DummyLookup(0) logger = logging.getLogger(__name__) @@ -431,49 +411,33 @@ def whitelisted_homeserver(destination: str) -> bool: # Start spans and scopes -# Could use kwargs but I want these to be explicit -def start_active_span( - operation_name: str, - child_of: Optional[ - Union[ - opentelemetry.shim.opentracing_shim.SpanShim, - opentelemetry.shim.opentracing_shim.SpanContextShim, - ] - ] = None, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, - *, - tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None, -) -> opentelemetry.shim.opentracing_shim.ScopeShim: - """Starts an active opentracing span. - Records the start time for the span, and sets it as the "active span" in the - scope manager. - Args: - See opentracing.tracer - Returns: - scope (Scope) or contextlib.nullcontext - """ +def start_active_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: Optional["opentelemetry.trace.SpanKind"] = None, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, +) -> Iterator["opentelemetry.trace.span.Span"]: if opentelemetry is None: return contextlib.nullcontext() # type: ignore[unreachable] - if tracer is None: - # use the global tracer by default - otel_tracer = opentelemetry.trace.get_tracer(__name__) - tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer) - tracer = tracerShim - - return tracer.start_active_span( - operation_name, - child_of=child_of, - references=references, - tags=tags, + tracer = opentelemetry.trace.get_tracer(__name__) + return tracer.start_as_current_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + end_on_exit=end_on_exit, ) @@ -482,15 +446,15 @@ def start_active_span_follows_from( contexts: Collection, child_of: Optional[ Union[ - opentelemetry.shim.opentracing_shim.SpanShim, - opentelemetry.shim.opentracing_shim.SpanContextShim, + "opentelemetry.shim.opentracing_shim.SpanShim", + "opentelemetry.shim.opentracing_shim.SpanContextShim", ] ] = None, start_time: Optional[float] = None, *, inherit_force_tracing: bool = False, - tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None, -) -> opentelemetry.shim.opentracing_shim.ScopeShim: + tracer: Optional["opentelemetry.shim.opentracing_shim.TracerShim"] = None, +) -> Iterator["opentelemetry.trace.span.Span"]: """Starts an active opentracing span, with additional references to previous spans Args: operation_name: name of the operation represented by the new span @@ -504,35 +468,14 @@ def start_active_span_follows_from( forced, the new span will also have tracing forced. tracer: override the opentracing tracer. By default the global tracer is used. """ - if opentelemetry is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - references = [opentracing.follows_from(context) for context in contexts] - scope = start_active_span( - operation_name, - child_of=child_of, - references=references, - start_time=start_time, - tracer=tracer, - ) - - if inherit_force_tracing and any( - is_context_forced_tracing(ctx) for ctx in contexts - ): - force_tracing(scope.span) - - return scope + # TODO + pass def start_active_span_from_edu( edu_content: Dict[str, Any], operation_name: str, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, -) -> opentelemetry.shim.opentracing_shim.ScopeShim: +) -> Iterator["opentelemetry.trace.span.Span"]: """ Extracts a span context from an edu and uses it to start a new active span @@ -542,50 +485,13 @@ def start_active_span_from_edu( For the other args see opentracing.tracer """ - references = references or [] - - if opentelemetry is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - carrier = json_decoder.decode(edu_content.get("context", "{}")).get( - "opentracing", {} - ) - - otel_tracer = opentelemetry.trace.get_tracer(__name__) - tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer) - - context = tracerShim.extract(opentracing.Format.TEXT_MAP, carrier) - _references = [ - opentracing.Reference( - type=opentracing.ReferenceType.CHILD_OF, - referenced_context=span_context_from_string(x), - ) - for x in carrier.get("references", []) - ] - - # For some reason jaeger decided not to support the visualization of multiple parent - # spans or explicitly show references. I include the span context as a tag here as - # an aid to people debugging but it's really not an ideal solution. - - references += _references - - scope = tracerShim.start_active_span( - operation_name, - child_of=context, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - scope.span.set_attribute("references", carrier.get("references", [])) - return scope + # TODO + pass # OpenTelemetry setters for attributes, logs, etc @only_if_tracing -def active_span() -> Optional[opentelemetry.trace.span.Span]: +def active_span() -> Optional["opentelemetry.trace.span.Span"]: """Get the currently active span, if any""" return opentelemetry.trace.get_current_span() @@ -593,24 +499,32 @@ def active_span() -> Optional[opentelemetry.trace.span.Span]: @ensure_active_span("set a tag") def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: """Sets a tag on the active span""" - active_span = active_span() - assert active_span is not None - active_span.set_attribute(key, value) + span = active_span() + assert span is not None + span.set_attribute(key, value) + + +@ensure_active_span("set the status") +def set_status(key: str, status: "opentelemetry.trace.StatusCode") -> None: + """Sets a tag on the active span""" + span = active_span() + assert span is not None + span.set_status(status) @ensure_active_span("log") def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None: """Log to the active span""" - active_span = active_span() - assert active_span is not None - event_name = opentelemetry.ext.opentracing_shim.util.event_name_from_kv(key_values) - active_span.add_event(event_name, timestamp, key_values) + span = active_span() + assert span is not None + event_name = key_values.get("event", "log") + span.add_event(event_name, attributes=key_values, timestamp=timestamp) @only_if_tracing def force_tracing( span: Union[ - opentelemetry.shim.opentracing_shim.SpanShim, _Sentinel + "opentelemetry.shim.opentracing_shim.SpanShim", _Sentinel ] = _Sentinel.sentinel ) -> None: """Force sampling for the active/given span and its children. @@ -618,28 +532,16 @@ def force_tracing( Args: span: span to force tracing for. By default, the active span. """ - if isinstance(span, _Sentinel): - span_to_trace = opentelemetry.trace.get_current_span() - else: - span_to_trace = span - if span_to_trace is None: - logger.error("No active span in force_tracing") - return - - span_to_trace.set_attribute(opentracing.tags.SAMPLING_PRIORITY, 1) - - # also set a bit of baggage, so that we have a way of figuring out if - # it is enabled later - span_to_trace.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1") + # TODO + pass def is_context_forced_tracing( - span_context: Optional[opentelemetry.shim.opentracing_shim.SpanContextShim], + span_context: Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"], ) -> bool: """Check if sampling has been force for the given span context.""" - if span_context is None: - return False - return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None + # TODO + return False # Injection and extraction @@ -669,42 +571,19 @@ def inject_header_dict( here: https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py """ - if check_destination: - if destination is None: - raise ValueError( - "destination must be given unless check_destination is False" - ) - if not whitelisted_homeserver(destination): - return - - otel_tracer = opentelemetry.trace.get_tracer(__name__) - tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer) - - span = tracerShim.active_span - - carrier: Dict[str, str] = {} - assert span is not None - - tracerShim.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier) - - for key, value in carrier.items(): - headers[key.encode()] = [value.encode()] + # TODO + pass def inject_response_headers(response_headers: Headers) -> None: """Inject the current trace id into the HTTP response headers""" if not opentelemetry: return - span = opentelemetry.trace.get_current_span() - if not span: + current_span = opentelemetry.trace.get_current_span() + if not current_span: return - # This is a bit implementation-specific. - # - # Jaeger's Spans have a trace_id property; other implementations (including the - # dummy opentracing.span.Span which we use if init_tracer is not called) do not - # expose it - trace_id = getattr(span, "trace_id", None) + trace_id = current_span.get_span_context().trace_id if trace_id is not None: response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") @@ -724,66 +603,27 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str Returns: dict: the active span's context if opentracing is enabled, otherwise empty. """ - - if destination and not whitelisted_homeserver(destination): - return {} - + # TODO carrier: Dict[str, str] = {} - otel_tracer = opentelemetry.trace.get_tracer(__name__) - tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer) - assert tracerShim.active_span is not None - tracerShim.inject(tracerShim.context, opentracing.Format.TEXT_MAP, carrier) - return carrier -@ensure_active_span("get the span context as a string.", ret={}) -def active_span_context_as_string() -> str: - """ - Returns: - The active span context encoded as a string. - """ - carrier: Dict[str, str] = {} - if opentelemetry: - otel_tracer = opentelemetry.trace.get_tracer(__name__) - tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer) - assert tracerShim.active_span is not None - tracerShim.inject( - tracerShim.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - return json_encoder.encode(carrier) - - -def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]": +def span_context_from_request( + request: Request, +) -> Optional["opentelemetry.trace.span.SpanContext"]: """Extract an opentracing context from the headers on an HTTP request This is useful when we have received an HTTP request from another part of our system, and want to link our spans to those of the remote system. """ - if not opentracing: - return None - header_dict = { - k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() - } - return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) - - -@only_if_tracing -def span_context_from_string( - carrier: str, -) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]: - """ - Returns: - The active span context decoded from a string. - """ - payload: Dict[str, str] = json_decoder.decode(carrier) - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, payload) + # TODO + return None @only_if_tracing def extract_text_map( carrier: Dict[str, str] -) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]: +) -> Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"]: """ Wrapper method for opentracing's tracer.extract for TEXT_MAP. Args: @@ -792,7 +632,8 @@ def extract_text_map( Returns: The active span context extracted from carrier. """ - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) + # TODO + return None # Tracing decorators @@ -807,7 +648,7 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]] """ def decorator(func: Callable[P, R]) -> Callable[P, R]: - if opentracing is None: + if opentelemetry is None: return func # type: ignore[unreachable] if inspect.iscoroutinefunction(func): @@ -878,7 +719,7 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: Tags all of the args to the active span. """ - if not opentracing: + if not opentelemetry: return func @wraps(func) @@ -907,16 +748,15 @@ def trace_servlet( context from the request the servlet is handling. """ - if opentracing is None: + if opentelemetry is None: yield # type: ignore[unreachable] return - request_tags = { + request_attrs = { SynapseTags.REQUEST_ID: request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, + SpanAttributes.HTTP_METHOD: request.get_method(), + SpanAttributes.HTTP_URL: request.get_redacted_uri(), + SpanAttributes.PEER_HOST_IPV6: request.getClientAddress().host, } request_name = request.request_metrics.name @@ -925,23 +765,27 @@ def trace_servlet( # we configure the scope not to finish the span immediately on exit, and instead # pass the span into the SynapseRequest, which will finish it once we've finished # sending the response to the client. - scope = start_active_span(request_name, child_of=context, finish_on_close=False) - request.set_opentracing_span(scope.span) + span = start_active_span( + request_name, + kind=opentelemetry.trace.SpanKind.SERVER, + child_of=context, + end_on_exit=False, + ) + request.set_opentracing_span(span) - with scope: - inject_response_headers(request.responseHeaders) - try: - yield - finally: - # We set the operation name again in case its changed (which happens - # with JsonResource). - scope.span.update_name(request.request_metrics.name) + inject_response_headers(request.responseHeaders) + try: + yield + finally: + # We set the operation name again in case its changed (which happens + # with JsonResource). + span.update_name(request.request_metrics.name) - # set the tags *after* the servlet completes, in case it decided to - # prioritise the span (tags will get dropped on unprioritised spans) - request_tags[ - SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag + # set the tags *after* the servlet completes, in case it decided to + # prioritise the span (tags will get dropped on unprioritised spans) + request_attrs[ + SynapseTags.REQUEST_TAG + ] = request.request_metrics.start_context.tag - for k, v in request_tags.items(): - scope.span.set_attribute(k, v) + for k, v in request_attrs.items(): + span.set_attribute(k, v) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 65136dd7f6..fbf4fa282f 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -232,7 +232,8 @@ def run_as_background_process( try: if bg_start_span: ctx = start_active_span( - f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)} + f"bgproc.{desc}", + attributes={SynapseTags.REQUEST_ID: str(context)}, ) else: ctx = nullcontext() # type: ignore[assignment] diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 895a13b84e..11299367d2 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,7 +23,7 @@ from twisted.internet.interfaces import IDelayedCall from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.storage.databases.main.event_push_actions import HttpPushAction @@ -198,9 +198,9 @@ class HttpPusher(Pusher): ) for push_action in unprocessed: - with opentelemetry.start_active_span( + with tracing.start_active_span( "http-push", - tags={ + attributes={ "authenticated_entity": self.user_id, "event_id": push_action.event_id, "app_id": self.app_id, diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index e53c986148..76035cd618 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -28,7 +28,7 @@ from synapse.api.errors import HttpResponseException, SynapseError from synapse.http import RequestTimedOutError from synapse.http.server import HttpServer, is_method_cancellable from synapse.http.site import SynapseRequest -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.logging.tracing import trace_with_opname from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache @@ -248,7 +248,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): # Add an authorization header, if configured. if replication_secret: headers[b"Authorization"] = [b"Bearer " + replication_secret] - opentelemetry.inject_header_dict(headers, check_destination=False) + tracing.inject_header_dict(headers, check_destination=False) try: # Keep track of attempts made so we can bail if we don't manage to diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py index c69ba0f73e..e928fded36 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py @@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Optional from prometheus_client import Counter, Histogram -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder @@ -94,9 +94,9 @@ class ExternalCache: logger.debug("Caching %s %s: %r", cache_name, key, encoded_value) - with opentelemetry.start_active_span( + with tracing.start_active_span( "ExternalCache.set", - tags={opentelemetry.SynapseTags.CACHE_NAME: cache_name}, + attributes={tracing.SynapseTags.CACHE_NAME: cache_name}, ): with response_timer.labels("set").time(): return await make_deferred_yieldable( @@ -113,9 +113,9 @@ class ExternalCache: if self._redis_connection is None: return None - with opentelemetry.start_active_span( + with tracing.start_active_span( "ExternalCache.get", - tags={opentelemetry.SynapseTags.CACHE_NAME: cache_name}, + attributes={tracing.SynapseTags.CACHE_NAME: cache_name}, ): with response_timer.labels("get").time(): result = await make_deferred_yieldable( diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 3ff544cc2e..88abfd78ba 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,7 +45,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController @@ -223,7 +223,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue.append(end_item) # also add our active opentracing span to the item so that we get a link back - span = opentelemetry.active_span() + span = tracing.active_span() if span: end_item.parent_opentracing_span_contexts.append(span.context) @@ -234,7 +234,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): res = await make_deferred_yieldable(end_item.deferred.observe()) # add another opentracing span which links to the persist trace. - with opentelemetry.start_active_span_follows_from( + with tracing.start_active_span_follows_from( f"{task.name}_complete", (end_item.opentracing_span_context,) ): pass @@ -266,7 +266,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue = self._get_drainining_queue(room_id) for item in queue: try: - with opentelemetry.start_active_span_follows_from( + with tracing.start_active_span_follows_from( item.task.name, item.parent_opentracing_span_contexts, inherit_force_tracing=True, @@ -355,7 +355,7 @@ class EventsPersistenceStorageController: f"Found an unexpected task type in event persistence queue: {task}" ) - @opentelemetry.trace + @tracing.trace async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], @@ -418,7 +418,7 @@ class EventsPersistenceStorageController: self.main_store.get_room_max_token(), ) - @opentelemetry.trace + @tracing.trace async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 3445a93225..222fd382f1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig -from synapse.logging import opentelemetry +from synapse.logging import tracing from synapse.logging.context import ( LoggingContext, current_context, @@ -422,11 +422,11 @@ class LoggingTransaction: start = time.time() try: - with opentelemetry.start_active_span( + with tracing.start_active_span( "db.query", - tags={ - opentelemetry.tags.DATABASE_TYPE: "sql", - opentelemetry.tags.DATABASE_STATEMENT: one_line_sql, + attributes={ + tracing.SpanAttributes.DB_SYSTEM: "sql", + tracing.SpanAttributes.DB_STATEMENT: one_line_sql, }, ): return func(sql, *args, **kwargs) @@ -701,15 +701,15 @@ class DatabasePool: exception_callbacks=exception_callbacks, ) try: - with opentelemetry.start_active_span( + with tracing.start_active_span( "db.txn", - tags={ - opentelemetry.SynapseTags.DB_TXN_DESC: desc, - opentelemetry.SynapseTags.DB_TXN_ID: name, + attributes={ + tracing.SynapseTags.DB_TXN_DESC: desc, + tracing.SynapseTags.DB_TXN_ID: name, }, ): r = func(cursor, *args, **kwargs) - opentelemetry.log_kv({"message": "commit"}) + tracing.log_kv({"message": "commit"}) conn.commit() return r except self.engine.module.OperationalError as e: @@ -725,7 +725,7 @@ class DatabasePool: if i < N: i += 1 try: - with opentelemetry.start_active_span("db.rollback"): + with tracing.start_active_span("db.rollback"): conn.rollback() except self.engine.module.Error as e1: transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1) @@ -739,7 +739,7 @@ class DatabasePool: if i < N: i += 1 try: - with opentelemetry.start_active_span("db.rollback"): + with tracing.start_active_span("db.rollback"): conn.rollback() except self.engine.module.Error as e1: transaction_logger.warning( @@ -845,7 +845,7 @@ class DatabasePool: logger.warning("Starting db txn '%s' from sentinel context", desc) try: - with opentelemetry.start_active_span(f"db.{desc}"): + with tracing.start_active_span(f"db.{desc}"): result = await self.runWithConnection( self.new_transaction, desc, @@ -928,7 +928,7 @@ class DatabasePool: with LoggingContext( str(curr_context), parent_context=parent_context ) as context: - with opentelemetry.start_active_span( + with tracing.start_active_span( operation_name="db.connection", ): sched_duration_sec = monotonic_time() - start_time @@ -944,15 +944,13 @@ class DatabasePool: "Reconnecting database connection over transaction limit" ) conn.reconnect() - opentelemetry.log_kv( - {"message": "reconnected due to txn limit"} - ) + tracing.log_kv({"message": "reconnected due to txn limit"}) self._txn_counters[tid] = 1 if self.engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() - opentelemetry.log_kv({"message": "reconnected"}) + tracing.log_kv({"message": "reconnected"}) if self._txn_limit > 0: self._txn_counters[tid] = 1 diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 7edac03680..cc5e1c312a 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -41,7 +41,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) if TYPE_CHECKING: - import opentracing + import opentelemetry # the type of the key in the cache KV = TypeVar("KV") @@ -82,7 +82,7 @@ class ResponseCacheEntry: easier to cache Failure results. """ - opentracing_span_context: "Optional[opentracing.SpanContext]" + opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"] """The opentracing span which generated/is generating the result""" @@ -141,7 +141,7 @@ class ResponseCache(Generic[KV]): self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]", - opentracing_span_context: "Optional[opentracing.SpanContext]", + opentracing_span_context: Optional["opentelemetry.trace.span.SpanContext"], ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. @@ -234,7 +234,7 @@ class ResponseCache(Generic[KV]): if cache_context: kwargs["cache_context"] = context - span_context: Optional[opentracing.SpanContext] = None + span_context: Optional["opentelemetry.trace.span.SpanContext"] = None async def cb() -> RV: # NB it is important that we do not `await` before setting span_context!