Fix some lints
This commit is contained in:
@@ -97,7 +97,7 @@ class TransactionManager:
|
||||
|
||||
with start_active_span(
|
||||
"send_transaction",
|
||||
links=[Link(context) for context in span_contexts],
|
||||
links=[Link(span_context) for span_context in span_contexts],
|
||||
):
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
|
||||
|
||||
@@ -26,11 +26,13 @@ from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.tracing import (
|
||||
Link,
|
||||
context_from_request,
|
||||
create_non_recording_span,
|
||||
get_active_span,
|
||||
set_attribute,
|
||||
span_context_from_request,
|
||||
start_active_span,
|
||||
start_span,
|
||||
use_span,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.types import JsonDict
|
||||
@@ -313,45 +315,49 @@ class BaseFederationServlet:
|
||||
|
||||
# if the origin is authenticated and whitelisted, use its span context
|
||||
# as the parent.
|
||||
origin_span_context = None
|
||||
origin_context = None
|
||||
if origin and whitelisted_homeserver(origin):
|
||||
origin_span_context = span_context_from_request(request)
|
||||
origin_context = context_from_request(request)
|
||||
|
||||
if origin_span_context:
|
||||
if origin_context:
|
||||
local_servlet_span = get_active_span()
|
||||
# Create a span which uses the `origin_span_context` as a parent
|
||||
# Create a span which uses the `origin_context` as a parent
|
||||
# so we can see how the incoming payload was processed while
|
||||
# we're looking at the outgoing trace. Since the parent is set
|
||||
# to a remote span (from the origin), it won't show up in the
|
||||
# local trace which is why we create another span below for the
|
||||
# local trace. A span can only have one parent so we have to
|
||||
# create two separate ones.
|
||||
remote_parent_span = start_active_span(
|
||||
remote_parent_span = start_span(
|
||||
"incoming-federation-request",
|
||||
context=origin_span_context,
|
||||
context=origin_context,
|
||||
# Cross-link back to the local trace so we can jump
|
||||
# to the incoming side from the remote origin trace.
|
||||
links=[Link(local_servlet_span.get_span_context())],
|
||||
links=[Link(local_servlet_span.get_span_context())]
|
||||
if local_servlet_span
|
||||
else None,
|
||||
)
|
||||
|
||||
# Create a local span to appear in the local trace
|
||||
local_parent_span = start_active_span(
|
||||
local_parent_span_cm = start_active_span(
|
||||
"process-federation-request",
|
||||
# Cross-link back to the remote outgoing trace so we jump over
|
||||
# there.
|
||||
# Cross-link back to the remote outgoing trace so we can
|
||||
# jump over there.
|
||||
links=[Link(remote_parent_span.get_span_context())],
|
||||
)
|
||||
|
||||
else:
|
||||
# Otherwise just use our local context as a parent
|
||||
local_parent_span = start_active_span(
|
||||
# Otherwise just use our local active servlet context as a parent
|
||||
local_parent_span_cm = start_active_span(
|
||||
"process-federation-request",
|
||||
)
|
||||
|
||||
# Don't need to record anything for the remote
|
||||
# Don't need to record anything for the remote because no remote
|
||||
# trace context given.
|
||||
remote_parent_span = create_non_recording_span()
|
||||
|
||||
with remote_parent_span, local_parent_span:
|
||||
remote_parent_span_cm = use_span(remote_parent_span, end_on_exit=True)
|
||||
|
||||
with remote_parent_span_cm, local_parent_span_cm:
|
||||
if origin and self.RATELIMIT:
|
||||
with ratelimiter.ratelimit(origin) as d:
|
||||
await d
|
||||
|
||||
@@ -55,10 +55,10 @@ class Edu:
|
||||
}
|
||||
|
||||
def get_context(self) -> str:
|
||||
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
|
||||
return getattr(self, "content", {}).get("org.matrix.tracing_context", "{}")
|
||||
|
||||
def strip_context(self) -> None:
|
||||
getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
|
||||
getattr(self, "content", {})["org.matrix.tracing_context"] = "{}"
|
||||
|
||||
|
||||
def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]:
|
||||
|
||||
@@ -688,7 +688,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
else:
|
||||
return
|
||||
|
||||
for user_id, device_id, room_id, stream_id, opentracing_context in rows:
|
||||
for user_id, device_id, room_id, stream_id, tracing_context in rows:
|
||||
hosts = set()
|
||||
|
||||
# Ignore any users that aren't ours
|
||||
@@ -707,7 +707,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
room_id=room_id,
|
||||
stream_id=stream_id,
|
||||
hosts=hosts,
|
||||
context=opentracing_context,
|
||||
context=tracing_context,
|
||||
)
|
||||
|
||||
# Notify replication that we've updated the device list stream.
|
||||
|
||||
@@ -273,7 +273,7 @@ class DeviceMessageHandler:
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"org.matrix.opentracing_context": json_encoder.encode(context),
|
||||
"org.matrix.tracing_context": json_encoder.encode(context),
|
||||
}
|
||||
|
||||
# Add messages to the database.
|
||||
|
||||
@@ -202,31 +202,33 @@ if TYPE_CHECKING:
|
||||
|
||||
# Helper class
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class _DummyLookup(object):
|
||||
"""This will always returns the fixed value given for any accessed property"""
|
||||
|
||||
def __init__(self, value):
|
||||
def __init__(self, value: T) -> None:
|
||||
self.value = value
|
||||
|
||||
def __getattribute__(self, name):
|
||||
def __getattribute__(self, name: str) -> T:
|
||||
return object.__getattribute__(self, "value")
|
||||
|
||||
|
||||
class DummyLink(ABC):
|
||||
"""Dummy placeholder for `opentelemetry.trace.Link`"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self.not_implemented_message = (
|
||||
"opentelemetry wasn't imported so this is just a dummy link placeholder"
|
||||
"opentelemetry isn't installed so this is just a dummy link placeholder"
|
||||
)
|
||||
|
||||
@property
|
||||
def context(self):
|
||||
def context(self) -> None:
|
||||
raise NotImplementedError(self.not_implemented_message)
|
||||
|
||||
@property
|
||||
def attributes(self):
|
||||
def attributes(self) -> None:
|
||||
raise NotImplementedError(self.not_implemented_message)
|
||||
|
||||
|
||||
@@ -242,17 +244,18 @@ try:
|
||||
import opentelemetry.semconv.trace
|
||||
import opentelemetry.trace
|
||||
import opentelemetry.trace.propagation
|
||||
import opentelemetry.trace.status
|
||||
|
||||
SpanKind = opentelemetry.trace.SpanKind
|
||||
SpanAttributes = opentelemetry.semconv.trace.SpanAttributes
|
||||
StatusCode = opentelemetry.trace.StatusCode
|
||||
StatusCode = opentelemetry.trace.status.StatusCode
|
||||
Link = opentelemetry.trace.Link
|
||||
except ImportError:
|
||||
opentelemetry = None # type: ignore[assignment]
|
||||
SpanKind = _DummyLookup(0)
|
||||
SpanAttributes = _DummyLookup("fake-attribute")
|
||||
StatusCode = _DummyLookup(0)
|
||||
Link = DummyLink
|
||||
SpanKind = _DummyLookup(0) # type: ignore
|
||||
SpanAttributes = _DummyLookup("fake-attribute") # type: ignore
|
||||
StatusCode = _DummyLookup(0) # type: ignore
|
||||
Link = DummyLink # type: ignore
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -303,7 +306,6 @@ class _Sentinel(enum.Enum):
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]:
|
||||
@@ -455,15 +457,61 @@ def whitelisted_homeserver(destination: str) -> bool:
|
||||
# Start spans and scopes
|
||||
|
||||
|
||||
def create_non_recording_span():
|
||||
def use_span(
|
||||
span: "opentelemetry.trace.span.Span",
|
||||
end_on_exit: bool = True,
|
||||
) -> ContextManager["opentelemetry.trace.span.Span"]:
|
||||
if opentelemetry is None:
|
||||
return contextlib.nullcontext() # type: ignore[unreachable]
|
||||
|
||||
return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit)
|
||||
|
||||
|
||||
def create_non_recording_span() -> "opentelemetry.trace.span.Span":
|
||||
"""Create a no-op span that does not record or become part of a recorded trace"""
|
||||
|
||||
return opentelemetry.trace.NonRecordingSpan(
|
||||
opentelemetry.trace.INVALID_SPAN_CONTEXT
|
||||
)
|
||||
|
||||
|
||||
def start_span(
|
||||
name: str,
|
||||
*,
|
||||
context: Optional["opentelemetry.context.context.Context"] = None,
|
||||
kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL,
|
||||
attributes: "opentelemetry.util.types.Attributes" = None,
|
||||
links: Optional[Sequence["opentelemetry.trace.Link"]] = None,
|
||||
start_time: Optional[int] = None,
|
||||
record_exception: bool = True,
|
||||
set_status_on_exception: bool = True,
|
||||
end_on_exit: bool = True,
|
||||
# For testing only
|
||||
tracer: Optional["opentelemetry.trace.Tracer"] = None,
|
||||
) -> "opentelemetry.trace.span.Span":
|
||||
if opentelemetry is None:
|
||||
raise Exception("Not able to create span without opentelemetry installed.")
|
||||
|
||||
if tracer is None:
|
||||
tracer = opentelemetry.trace.get_tracer(__name__)
|
||||
|
||||
# TODO: Why is this necessary to satisfy this error? It has a default?
|
||||
# ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]`
|
||||
if kind is None:
|
||||
kind = SpanKind.INTERNAL
|
||||
|
||||
return tracer.start_span(
|
||||
name=name,
|
||||
context=context,
|
||||
kind=kind,
|
||||
attributes=attributes,
|
||||
links=links,
|
||||
start_time=start_time,
|
||||
record_exception=record_exception,
|
||||
set_status_on_exception=set_status_on_exception,
|
||||
)
|
||||
|
||||
|
||||
def start_active_span(
|
||||
name: str,
|
||||
*,
|
||||
@@ -476,15 +524,12 @@ def start_active_span(
|
||||
set_status_on_exception: bool = True,
|
||||
end_on_exit: bool = True,
|
||||
# For testing only
|
||||
tracer: Optional["opentelemetry.sdk.trace.TracerProvider"] = None,
|
||||
tracer: Optional["opentelemetry.trace.Tracer"] = None,
|
||||
) -> ContextManager["opentelemetry.trace.span.Span"]:
|
||||
if opentelemetry is None:
|
||||
return contextlib.nullcontext() # type: ignore[unreachable]
|
||||
|
||||
if tracer is None:
|
||||
tracer = opentelemetry.trace.get_tracer(__name__)
|
||||
|
||||
return tracer.start_as_current_span(
|
||||
span = start_span(
|
||||
name=name,
|
||||
context=context,
|
||||
kind=kind,
|
||||
@@ -493,7 +538,14 @@ def start_active_span(
|
||||
start_time=start_time,
|
||||
record_exception=record_exception,
|
||||
set_status_on_exception=set_status_on_exception,
|
||||
)
|
||||
|
||||
# Equivalent to `tracer.start_as_current_span`
|
||||
return opentelemetry.trace.use_span(
|
||||
span,
|
||||
end_on_exit=end_on_exit,
|
||||
record_exception=record_exception,
|
||||
set_status_on_exception=set_status_on_exception,
|
||||
)
|
||||
|
||||
|
||||
@@ -531,7 +583,7 @@ def set_attribute(key: str, value: Union[str, bool, int, float]) -> None:
|
||||
|
||||
@ensure_active_span("set the status")
|
||||
def set_status(
|
||||
status: "opentelemetry.trace.StatusCode", exc: Optional[Exception]
|
||||
status: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception]
|
||||
) -> None:
|
||||
"""Sets a tag on the active span"""
|
||||
active_span = get_active_span()
|
||||
@@ -545,7 +597,7 @@ DEFAULT_LOG_NAME = "log"
|
||||
|
||||
|
||||
@ensure_active_span("log")
|
||||
def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None:
|
||||
def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None:
|
||||
"""Log to the active span"""
|
||||
active_span = get_active_span()
|
||||
assert active_span is not None
|
||||
@@ -665,9 +717,9 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
|
||||
return carrier_text_map
|
||||
|
||||
|
||||
def span_context_from_request(
|
||||
def context_from_request(
|
||||
request: Request,
|
||||
) -> Optional["opentelemetry.trace.span.SpanContext"]:
|
||||
) -> Optional["opentelemetry.context.context.Context"]:
|
||||
"""Extract an opentracing context from the headers on an HTTP request
|
||||
|
||||
This is useful when we have received an HTTP request from another part of our
|
||||
@@ -687,18 +739,18 @@ def span_context_from_request(
|
||||
@only_if_tracing
|
||||
def extract_text_map(
|
||||
carrier: Dict[str, str]
|
||||
) -> Optional["opentelemetry.shim.opentracing_shim.SpanContextShim"]:
|
||||
) -> Optional["opentelemetry.context.context.Context"]:
|
||||
"""
|
||||
Wrapper method for opentracing's tracer.extract for TEXT_MAP.
|
||||
Args:
|
||||
carrier: a dict possibly containing a span context.
|
||||
carrier: a dict possibly containing a context.
|
||||
|
||||
Returns:
|
||||
The active span context extracted from carrier.
|
||||
The active context extracted from carrier.
|
||||
"""
|
||||
propagator = opentelemetry.propagate.get_global_textmap()
|
||||
# Extract all of the relevant values from the `carrier` to construct a
|
||||
# SpanContext to return.
|
||||
# Context to return.
|
||||
return propagator.extract(carrier)
|
||||
|
||||
|
||||
@@ -826,7 +878,7 @@ def trace_servlet(
|
||||
}
|
||||
|
||||
request_name = request.request_metrics.name
|
||||
span_context = span_context_from_request(request) if extract_context else None
|
||||
tracing_context = context_from_request(request) if extract_context else None
|
||||
|
||||
# we configure the scope not to finish the span immediately on exit, and instead
|
||||
# pass the span into the SynapseRequest, which will finish it once we've finished
|
||||
@@ -835,7 +887,7 @@ def trace_servlet(
|
||||
with start_active_span(
|
||||
request_name,
|
||||
kind=SpanKind.SERVER,
|
||||
context=span_context,
|
||||
context=tracing_context,
|
||||
end_on_exit=False,
|
||||
) as span:
|
||||
request.set_tracing_span(span)
|
||||
|
||||
@@ -333,12 +333,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
|
||||
# (user_id, device_id) entries into a map, with the value being
|
||||
# the max stream_id across each set of duplicate entries
|
||||
#
|
||||
# maps (user_id, device_id) -> (stream_id, opentracing_context)
|
||||
# maps (user_id, device_id) -> (stream_id,tracing_context)
|
||||
#
|
||||
# opentracing_context contains the opentracing metadata for the request
|
||||
# tracing_context contains the opentelemetry metadata for the request
|
||||
# that created the poke
|
||||
#
|
||||
# The most recent request's opentracing_context is used as the
|
||||
# The most recent request's tracing_context is used as the
|
||||
# context which created the Edu.
|
||||
|
||||
# This is the stream ID that we will return for the consumer to resume
|
||||
@@ -468,11 +468,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
|
||||
- user_id
|
||||
- device_id
|
||||
- stream_id
|
||||
- opentracing_context
|
||||
- tracing_context
|
||||
"""
|
||||
# get the list of device updates that need to be sent
|
||||
sql = """
|
||||
SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
|
||||
SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes
|
||||
WHERE destination = ? AND ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
@@ -531,13 +531,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
|
||||
|
||||
for device_id in device_ids:
|
||||
device = user_devices[device_id]
|
||||
stream_id, opentracing_context = query_map[(user_id, device_id)]
|
||||
stream_id, tracing_context = query_map[(user_id, device_id)]
|
||||
result = {
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"prev_id": [prev_id] if prev_id else [],
|
||||
"stream_id": stream_id,
|
||||
"org.matrix.opentracing_context": opentracing_context,
|
||||
"org.matrix.tracing_context": tracing_context,
|
||||
}
|
||||
|
||||
prev_id = stream_id
|
||||
@@ -1801,7 +1801,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
"device_id",
|
||||
"sent",
|
||||
"ts",
|
||||
"opentracing_context",
|
||||
"tracing_context",
|
||||
),
|
||||
values=[
|
||||
(
|
||||
@@ -1846,7 +1846,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
"room_id",
|
||||
"stream_id",
|
||||
"converted_to_destinations",
|
||||
"opentracing_context",
|
||||
"tracing_context",
|
||||
),
|
||||
values=[
|
||||
(
|
||||
@@ -1870,11 +1870,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
written to `device_lists_outbound_pokes`.
|
||||
|
||||
Returns:
|
||||
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
|
||||
A list of user ID, device ID, room ID, stream ID and optional opentelemetry context.
|
||||
"""
|
||||
|
||||
sql = """
|
||||
SELECT user_id, device_id, room_id, stream_id, opentracing_context
|
||||
SELECT user_id, device_id, room_id, stream_id, tracing_context
|
||||
FROM device_lists_changes_in_room
|
||||
WHERE NOT converted_to_destinations
|
||||
ORDER BY stream_id
|
||||
@@ -1892,9 +1892,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
device_id,
|
||||
room_id,
|
||||
stream_id,
|
||||
db_to_json(opentracing_context),
|
||||
db_to_json(tracing_context),
|
||||
)
|
||||
for user_id, device_id, room_id, stream_id, opentracing_context in txn
|
||||
for user_id, device_id, room_id, stream_id, tracing_context in txn
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry
|
||||
ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context;
|
||||
Reference in New Issue
Block a user