Fixup some todos
This commit is contained in:
@@ -30,7 +30,7 @@ from synapse.api.errors import (
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.http import get_request_user_agent
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.tracing import active_span, force_tracing, start_active_span
|
||||
from synapse.logging.tracing import get_active_span, force_tracing, start_active_span
|
||||
from synapse.storage.databases.main.registration import TokenLookupResult
|
||||
from synapse.types import Requester, UserID, create_requester
|
||||
|
||||
@@ -132,7 +132,7 @@ class Auth:
|
||||
is invalid.
|
||||
AuthError if access is denied for the user in the access token
|
||||
"""
|
||||
parent_span = active_span()
|
||||
parent_span = get_active_span()
|
||||
with start_active_span("get_user_by_req"):
|
||||
requester = await self._wrapped_get_user_by_req(
|
||||
request, allow_guest, allow_expired
|
||||
|
||||
@@ -26,7 +26,7 @@ from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.tracing import (
|
||||
active_span,
|
||||
get_active_span,
|
||||
set_attribute,
|
||||
span_context_from_request,
|
||||
start_active_span,
|
||||
@@ -318,7 +318,7 @@ class BaseFederationServlet:
|
||||
context = span_context_from_request(request)
|
||||
|
||||
if context:
|
||||
servlet_span = active_span()
|
||||
servlet_span = get_active_span()
|
||||
# a scope which uses the origin's context as a parent
|
||||
processing_start_time = time.time()
|
||||
scope = start_active_span_follows_from(
|
||||
|
||||
@@ -1568,7 +1568,7 @@ class SyncHandler:
|
||||
sync_result_builder,
|
||||
room_entry,
|
||||
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
||||
attributes=tags_by_room.get(room_entry.room_id),
|
||||
tags=tags_by_room.get(room_entry.room_id),
|
||||
account_data=account_data_by_room.get(room_entry.room_id, {}),
|
||||
always_include=sync_result_builder.full_state,
|
||||
)
|
||||
|
||||
@@ -533,7 +533,9 @@ class MatrixFederationHttpClient:
|
||||
|
||||
# Inject the span into the headers
|
||||
headers_dict: Dict[bytes, List[bytes]] = {}
|
||||
tracing.inject_header_dict(headers_dict, request.destination)
|
||||
tracing.inject_active_span_context_into_header_dict(
|
||||
headers_dict, request.destination
|
||||
)
|
||||
|
||||
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
||||
from synapse.logging.tracing import active_span, start_active_span, trace_servlet
|
||||
from synapse.logging.tracing import get_active_span, start_active_span, trace_servlet
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.iterutils import chunk_seq
|
||||
@@ -880,7 +880,7 @@ async def _async_write_json_to_request_in_thread(
|
||||
return res
|
||||
|
||||
with start_active_span("encode_json_response"):
|
||||
span = active_span()
|
||||
span = get_active_span()
|
||||
json_str = await defer_to_thread(request.reactor, encode, span)
|
||||
|
||||
_write_bytes_to_request(request, json_str)
|
||||
|
||||
@@ -204,7 +204,7 @@ if TYPE_CHECKING:
|
||||
|
||||
# Helper class
|
||||
|
||||
# Always returns the value given for any accessed property
|
||||
# Always returns the fixed value given for any accessed property
|
||||
class _DummyLookup(object):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
@@ -223,6 +223,7 @@ try:
|
||||
import opentelemetry.sdk.trace.export
|
||||
import opentelemetry.semconv.trace
|
||||
import opentelemetry.trace
|
||||
import opentelemetry.propagate
|
||||
|
||||
SpanKind = opentelemetry.trace.SpanKind
|
||||
SpanAttributes = opentelemetry.semconv.trace.SpanAttributes
|
||||
@@ -237,7 +238,7 @@ except ImportError:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`
|
||||
class SynapseTags:
|
||||
# The message ID of any to_device message processed
|
||||
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
||||
@@ -374,8 +375,6 @@ def init_tracer(hs: "HomeServer") -> None:
|
||||
# Pull out of the config if it was given. Otherwise set it to something sensible.
|
||||
set_homeserver_whitelist(hs.config.tracing.opentelemetry_whitelist)
|
||||
|
||||
# TODO: opentelemetry_whitelist
|
||||
|
||||
resource = opentelemetry.sdk.resources.Resource(
|
||||
attributes={
|
||||
opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {hs.get_instance_name()}"
|
||||
@@ -514,7 +513,7 @@ def start_active_span_from_edu(
|
||||
|
||||
# OpenTelemetry setters for attributes, logs, etc
|
||||
@only_if_tracing
|
||||
def active_span() -> Optional["opentelemetry.trace.span.Span"]:
|
||||
def get_active_span() -> Optional["opentelemetry.trace.span.Span"]:
|
||||
"""Get the currently active span, if any"""
|
||||
return opentelemetry.trace.get_current_span()
|
||||
|
||||
@@ -522,26 +521,29 @@ def active_span() -> Optional["opentelemetry.trace.span.Span"]:
|
||||
@ensure_active_span("set a tag")
|
||||
def set_attribute(key: str, value: Union[str, bool, int, float]) -> None:
|
||||
"""Sets a tag on the active span"""
|
||||
span = active_span()
|
||||
assert span is not None
|
||||
span.set_attribute(key, value)
|
||||
active_span = get_active_span()
|
||||
assert active_span is not None
|
||||
active_span.set_attribute(key, value)
|
||||
|
||||
|
||||
@ensure_active_span("set the status")
|
||||
def set_status(key: str, status: "opentelemetry.trace.StatusCode") -> None:
|
||||
"""Sets a tag on the active span"""
|
||||
span = active_span()
|
||||
span = get_active_span()
|
||||
assert span is not None
|
||||
span.set_status(status)
|
||||
|
||||
|
||||
DEFAULT_LOG_NAME = "log"
|
||||
|
||||
|
||||
@ensure_active_span("log")
|
||||
def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None:
|
||||
"""Log to the active span"""
|
||||
span = active_span()
|
||||
assert span is not None
|
||||
event_name = key_values.get("event", "log")
|
||||
span.add_event(event_name, attributes=key_values, timestamp=timestamp)
|
||||
active_span = get_active_span()
|
||||
assert active_span is not None
|
||||
event_name = key_values.get("event", DEFAULT_LOG_NAME)
|
||||
active_span.add_event(event_name, attributes=key_values, timestamp=timestamp)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@@ -571,7 +573,7 @@ def is_context_forced_tracing(
|
||||
|
||||
|
||||
@ensure_active_span("inject the span into a header dict")
|
||||
def inject_header_dict(
|
||||
def inject_active_span_context_into_header_dict(
|
||||
headers: Dict[bytes, List[bytes]],
|
||||
destination: Optional[str] = None,
|
||||
check_destination: bool = True,
|
||||
@@ -582,10 +584,10 @@ def inject_header_dict(
|
||||
Args:
|
||||
headers: the dict to inject headers into
|
||||
destination: address of entity receiving the span context. Must be given unless
|
||||
check_destination is False. The context will only be injected if the
|
||||
destination matches the opentracing whitelist
|
||||
check_destination (bool): If false, destination will be ignored and the context
|
||||
will always be injected.
|
||||
`check_destination` is False.
|
||||
check_destination (bool): If False, destination will be ignored and the context
|
||||
will always be injected. If True, the context will only be injected if the
|
||||
destination matches the tracing allowlist
|
||||
|
||||
Note:
|
||||
The headers set by the tracer are custom to the tracer implementation which
|
||||
@@ -594,19 +596,31 @@ def inject_header_dict(
|
||||
here:
|
||||
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||
"""
|
||||
# TODO
|
||||
pass
|
||||
if check_destination:
|
||||
if destination is None:
|
||||
raise ValueError(
|
||||
"destination must be given unless check_destination is False"
|
||||
)
|
||||
if not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
active_span = get_active_span()
|
||||
active_span_context = active_span.get_span_context()
|
||||
|
||||
propagator = opentelemetry.propagate.get_global_textmap()
|
||||
# Put all of SpanContext properties into the headers dict
|
||||
propagator.inject(headers, context=active_span_context)
|
||||
|
||||
|
||||
def inject_response_headers(response_headers: Headers) -> None:
|
||||
"""Inject the current trace id into the HTTP response headers"""
|
||||
if not opentelemetry:
|
||||
return
|
||||
current_span = opentelemetry.trace.get_current_span()
|
||||
if not current_span:
|
||||
active_span = get_active_span()
|
||||
if not active_span:
|
||||
return
|
||||
|
||||
trace_id = current_span.get_span_context().trace_id
|
||||
trace_id = active_span.get_span_context().trace_id
|
||||
|
||||
if trace_id is not None:
|
||||
response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
|
||||
@@ -626,9 +640,15 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
|
||||
Returns:
|
||||
dict: the active span's context if opentracing is enabled, otherwise empty.
|
||||
"""
|
||||
# TODO
|
||||
carrier: Dict[str, str] = {}
|
||||
return carrier
|
||||
active_span = get_active_span()
|
||||
active_span_context = active_span.get_span_context()
|
||||
|
||||
carrier_text_map: Dict[str, str] = {}
|
||||
propagator = opentelemetry.propagate.get_global_textmap()
|
||||
# Put all of SpanContext properties onto the carrier text map that we can return
|
||||
propagator.inject(carrier_text_map, context=active_span_context)
|
||||
|
||||
return carrier_text_map
|
||||
|
||||
|
||||
def span_context_from_request(
|
||||
@@ -639,8 +659,15 @@ def span_context_from_request(
|
||||
This is useful when we have received an HTTP request from another part of our
|
||||
system, and want to link our spans to those of the remote system.
|
||||
"""
|
||||
# TODO
|
||||
return None
|
||||
if not opentelemetry:
|
||||
return None
|
||||
header_dict = {
|
||||
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
|
||||
}
|
||||
|
||||
# Extract all of the relevant values from the headers to construct a
|
||||
# SpanContext to return.
|
||||
return extract_text_map(header_dict)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@@ -655,8 +682,10 @@ def extract_text_map(
|
||||
Returns:
|
||||
The active span context extracted from carrier.
|
||||
"""
|
||||
# TODO
|
||||
return None
|
||||
propagator = opentelemetry.propagate.get_global_textmap()
|
||||
# Extract all of the relevant values from the `carrier` to construct a
|
||||
# SpanContext to return.
|
||||
return propagator.extract(carrier)
|
||||
|
||||
|
||||
# Tracing decorators
|
||||
|
||||
@@ -248,7 +248,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
||||
# Add an authorization header, if configured.
|
||||
if replication_secret:
|
||||
headers[b"Authorization"] = [b"Bearer " + replication_secret]
|
||||
tracing.inject_header_dict(headers, check_destination=False)
|
||||
tracing.inject_active_span_context_into_header_dict(
|
||||
headers, check_destination=False
|
||||
)
|
||||
|
||||
try:
|
||||
# Keep track of attempts made so we can bail if we don't manage to
|
||||
|
||||
@@ -223,7 +223,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
queue.append(end_item)
|
||||
|
||||
# also add our active opentracing span to the item so that we get a link back
|
||||
span = tracing.active_span()
|
||||
span = tracing.get_active_span()
|
||||
if span:
|
||||
end_item.parent_opentracing_span_contexts.append(span.context)
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.tracing import (
|
||||
active_span,
|
||||
get_active_span,
|
||||
start_active_span,
|
||||
start_active_span_follows_from,
|
||||
)
|
||||
@@ -240,7 +240,7 @@ class ResponseCache(Generic[KV]):
|
||||
# NB it is important that we do not `await` before setting span_context!
|
||||
nonlocal span_context
|
||||
with start_active_span(f"ResponseCache[{self._name}].calculate"):
|
||||
span = active_span()
|
||||
span = get_active_span()
|
||||
if span:
|
||||
span_context = span.context
|
||||
return await callback(*args, **kwargs)
|
||||
|
||||
Reference in New Issue
Block a user