diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index d649309e11..38559a2baf 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -97,7 +97,7 @@ class TransactionManager: with start_active_span( "send_transaction", - links=[Link(context) for context in span_contexts], + links=[Link(span_context) for span_context in span_contexts], ): logger.debug("TX [%s] _attempt_new_transaction", destination) diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 1dc71d7807..80abece72c 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -26,11 +26,13 @@ from synapse.http.site import SynapseRequest from synapse.logging.context import run_in_background from synapse.logging.tracing import ( Link, + context_from_request, create_non_recording_span, get_active_span, set_attribute, - span_context_from_request, start_active_span, + start_span, + use_span, whitelisted_homeserver, ) from synapse.types import JsonDict @@ -313,45 +315,49 @@ class BaseFederationServlet: # if the origin is authenticated and whitelisted, use its span context # as the parent. - origin_span_context = None + origin_context = None if origin and whitelisted_homeserver(origin): - origin_span_context = span_context_from_request(request) + origin_context = context_from_request(request) - if origin_span_context: + if origin_context: local_servlet_span = get_active_span() - # Create a span which uses the `origin_span_context` as a parent + # Create a span which uses the `origin_context` as a parent # so we can see how the incoming payload was processed while # we're looking at the outgoing trace. Since the parent is set # to a remote span (from the origin), it won't show up in the # local trace which is why we create another span below for the # local trace. A span can only have one parent so we have to # create two separate ones. - remote_parent_span = start_active_span( + remote_parent_span = start_span( "incoming-federation-request", - context=origin_span_context, + context=origin_context, # Cross-link back to the local trace so we can jump # to the incoming side from the remote origin trace. - links=[Link(local_servlet_span.get_span_context())], + links=[Link(local_servlet_span.get_span_context())] + if local_servlet_span + else None, ) # Create a local span to appear in the local trace - local_parent_span = start_active_span( + local_parent_span_cm = start_active_span( "process-federation-request", - # Cross-link back to the remote outgoing trace so we jump over - # there. + # Cross-link back to the remote outgoing trace so we can + # jump over there. links=[Link(remote_parent_span.get_span_context())], ) - else: - # Otherwise just use our local context as a parent - local_parent_span = start_active_span( + # Otherwise just use our local active servlet context as a parent + local_parent_span_cm = start_active_span( "process-federation-request", ) - # Don't need to record anything for the remote + # Don't need to record anything for the remote because no remote + # trace context given. remote_parent_span = create_non_recording_span() - with remote_parent_span, local_parent_span: + remote_parent_span_cm = use_span(remote_parent_span, end_on_exit=True) + + with remote_parent_span_cm, local_parent_span_cm: if origin and self.RATELIMIT: with ratelimiter.ratelimit(origin) as d: await d diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b9b12fbea5..ffcf425324 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -55,10 +55,10 @@ class Edu: } def get_context(self) -> str: - return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") + return getattr(self, "content", {}).get("org.matrix.tracing_context", "{}") def strip_context(self) -> None: - getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}" + getattr(self, "content", {})["org.matrix.tracing_context"] = "{}" def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a84d417a8a..659ee0ef5e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -688,7 +688,7 @@ class DeviceHandler(DeviceWorkerHandler): else: return - for user_id, device_id, room_id, stream_id, opentracing_context in rows: + for user_id, device_id, room_id, stream_id, tracing_context in rows: hosts = set() # Ignore any users that aren't ours @@ -707,7 +707,7 @@ class DeviceHandler(DeviceWorkerHandler): room_id=room_id, stream_id=stream_id, hosts=hosts, - context=opentracing_context, + context=tracing_context, ) # Notify replication that we've updated the device list stream. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c0924adaf7..62ffdce58a 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -273,7 +273,7 @@ class DeviceMessageHandler: "sender": sender_user_id, "type": message_type, "message_id": message_id, - "org.matrix.opentracing_context": json_encoder.encode(context), + "org.matrix.tracing_context": json_encoder.encode(context), } # Add messages to the database. diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index ca9d3ba3cd..083453554f 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -202,31 +202,33 @@ if TYPE_CHECKING: # Helper class +T = TypeVar("T") + class _DummyLookup(object): """This will always returns the fixed value given for any accessed property""" - def __init__(self, value): + def __init__(self, value: T) -> None: self.value = value - def __getattribute__(self, name): + def __getattribute__(self, name: str) -> T: return object.__getattribute__(self, "value") class DummyLink(ABC): """Dummy placeholder for `opentelemetry.trace.Link`""" - def __init__(self): + def __init__(self) -> None: self.not_implemented_message = ( - "opentelemetry wasn't imported so this is just a dummy link placeholder" + "opentelemetry isn't installed so this is just a dummy link placeholder" ) @property - def context(self): + def context(self) -> None: raise NotImplementedError(self.not_implemented_message) @property - def attributes(self): + def attributes(self) -> None: raise NotImplementedError(self.not_implemented_message) @@ -242,17 +244,18 @@ try: import opentelemetry.semconv.trace import opentelemetry.trace import opentelemetry.trace.propagation + import opentelemetry.trace.status SpanKind = opentelemetry.trace.SpanKind SpanAttributes = opentelemetry.semconv.trace.SpanAttributes - StatusCode = opentelemetry.trace.StatusCode + StatusCode = opentelemetry.trace.status.StatusCode Link = opentelemetry.trace.Link except ImportError: opentelemetry = None # type: ignore[assignment] - SpanKind = _DummyLookup(0) - SpanAttributes = _DummyLookup("fake-attribute") - StatusCode = _DummyLookup(0) - Link = DummyLink + SpanKind = _DummyLookup(0) # type: ignore + SpanAttributes = _DummyLookup("fake-attribute") # type: ignore + StatusCode = _DummyLookup(0) # type: ignore + Link = DummyLink # type: ignore logger = logging.getLogger(__name__) @@ -303,7 +306,6 @@ class _Sentinel(enum.Enum): P = ParamSpec("P") R = TypeVar("R") -T = TypeVar("T") def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: @@ -455,15 +457,61 @@ def whitelisted_homeserver(destination: str) -> bool: # Start spans and scopes -def create_non_recording_span(): +def use_span( + span: "opentelemetry.trace.span.Span", + end_on_exit: bool = True, +) -> ContextManager["opentelemetry.trace.span.Span"]: if opentelemetry is None: return contextlib.nullcontext() # type: ignore[unreachable] + return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit) + + +def create_non_recording_span() -> "opentelemetry.trace.span.Span": + """Create a no-op span that does not record or become part of a recorded trace""" + return opentelemetry.trace.NonRecordingSpan( opentelemetry.trace.INVALID_SPAN_CONTEXT ) +def start_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + # For testing only + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> "opentelemetry.trace.span.Span": + if opentelemetry is None: + raise Exception("Not able to create span without opentelemetry installed.") + + if tracer is None: + tracer = opentelemetry.trace.get_tracer(__name__) + + # TODO: Why is this necessary to satisfy this error? It has a default? + # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` + if kind is None: + kind = SpanKind.INTERNAL + + return tracer.start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + + def start_active_span( name: str, *, @@ -476,15 +524,12 @@ def start_active_span( set_status_on_exception: bool = True, end_on_exit: bool = True, # For testing only - tracer: Optional["opentelemetry.sdk.trace.TracerProvider"] = None, + tracer: Optional["opentelemetry.trace.Tracer"] = None, ) -> ContextManager["opentelemetry.trace.span.Span"]: if opentelemetry is None: return contextlib.nullcontext() # type: ignore[unreachable] - if tracer is None: - tracer = opentelemetry.trace.get_tracer(__name__) - - return tracer.start_as_current_span( + span = start_span( name=name, context=context, kind=kind, @@ -493,7 +538,14 @@ def start_active_span( start_time=start_time, record_exception=record_exception, set_status_on_exception=set_status_on_exception, + ) + + # Equivalent to `tracer.start_as_current_span` + return opentelemetry.trace.use_span( + span, end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, ) @@ -531,7 +583,7 @@ def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: @ensure_active_span("set the status") def set_status( - status: "opentelemetry.trace.StatusCode", exc: Optional[Exception] + status: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception] ) -> None: """Sets a tag on the active span""" active_span = get_active_span() @@ -545,7 +597,7 @@ DEFAULT_LOG_NAME = "log" @ensure_active_span("log") -def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None: +def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None: """Log to the active span""" active_span = get_active_span() assert active_span is not None @@ -665,9 +717,9 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str return carrier_text_map -def span_context_from_request( +def context_from_request( request: Request, -) -> Optional["opentelemetry.trace.span.SpanContext"]: +) -> Optional["opentelemetry.context.context.Context"]: """Extract an opentracing context from the headers on an HTTP request This is useful when we have received an HTTP request from another part of our @@ -687,18 +739,18 @@ def span_context_from_request( @only_if_tracing def extract_text_map( carrier: Dict[str, str] -) -> Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"]: +) -> Optional["opentelemetry.context.context.Context"]: """ Wrapper method for opentracing's tracer.extract for TEXT_MAP. Args: - carrier: a dict possibly containing a span context. + carrier: a dict possibly containing a context. Returns: - The active span context extracted from carrier. + The active context extracted from carrier. """ propagator = opentelemetry.propagate.get_global_textmap() # Extract all of the relevant values from the `carrier` to construct a - # SpanContext to return. + # Context to return. return propagator.extract(carrier) @@ -826,7 +878,7 @@ def trace_servlet( } request_name = request.request_metrics.name - span_context = span_context_from_request(request) if extract_context else None + tracing_context = context_from_request(request) if extract_context else None # we configure the scope not to finish the span immediately on exit, and instead # pass the span into the SynapseRequest, which will finish it once we've finished @@ -835,7 +887,7 @@ def trace_servlet( with start_active_span( request_name, kind=SpanKind.SERVER, - context=span_context, + context=tracing_context, end_on_exit=False, ) as span: request.set_tracing_span(span) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 293fbe3539..6a36f0cd4b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -333,12 +333,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> (stream_id, opentracing_context) + # maps (user_id, device_id) -> (stream_id,tracing_context) # - # opentracing_context contains the opentracing metadata for the request + # tracing_context contains the opentelemetry metadata for the request # that created the poke # - # The most recent request's opentracing_context is used as the + # The most recent request's tracing_context is used as the # context which created the Edu. # This is the stream ID that we will return for the consumer to resume @@ -468,11 +468,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): - user_id - device_id - stream_id - - opentracing_context + - tracing_context """ # get the list of device updates that need to be sent sql = """ - SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? ORDER BY stream_id LIMIT ? @@ -531,13 +531,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): for device_id in device_ids: device = user_devices[device_id] - stream_id, opentracing_context = query_map[(user_id, device_id)] + stream_id, tracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, - "org.matrix.opentracing_context": opentracing_context, + "org.matrix.tracing_context": tracing_context, } prev_id = stream_id @@ -1801,7 +1801,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "device_id", "sent", "ts", - "opentracing_context", + "tracing_context", ), values=[ ( @@ -1846,7 +1846,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "room_id", "stream_id", "converted_to_destinations", - "opentracing_context", + "tracing_context", ), values=[ ( @@ -1870,11 +1870,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): written to `device_lists_outbound_pokes`. Returns: - A list of user ID, device ID, room ID, stream ID and optional opentracing context. + A list of user ID, device ID, room ID, stream ID and optional opentelemetry context. """ sql = """ - SELECT user_id, device_id, room_id, stream_id, opentracing_context + SELECT user_id, device_id, room_id, stream_id, tracing_context FROM device_lists_changes_in_room WHERE NOT converted_to_destinations ORDER BY stream_id @@ -1892,9 +1892,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id, room_id, stream_id, - db_to_json(opentracing_context), + db_to_json(tracing_context), ) - for user_id, device_id, room_id, stream_id, opentracing_context in txn + for user_id, device_id, room_id, stream_id, tracing_context in txn ] return await self.db_pool.runInteraction( diff --git a/synapse/storage/schema/main/delta/73/01rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/73/01rename_opentelemtetry_tracing_context.sql new file mode 100644 index 0000000000..f8c333011d --- /dev/null +++ b/synapse/storage/schema/main/delta/73/01rename_opentelemtetry_tracing_context.sql @@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry +ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context;