1
0

Some shim and some new

This commit is contained in:
Eric Eastwood
2022-07-26 21:53:11 -05:00
parent 0cc610ecbe
commit 2fe6911957
33 changed files with 233 additions and 198 deletions

38
poetry.lock generated
View File

@@ -636,6 +636,17 @@ category = "main"
optional = true
python-versions = ">=3.6"
[[package]]
name = "opentracing"
version = "2.4.0"
description = "OpenTracing API for Python. See documentation at http://opentracing.io"
category = "main"
optional = true
python-versions = "*"
[package.extras]
tests = ["doubles", "flake8", "flake8-quotes", "mock", "pytest", "pytest-cov", "pytest-mock", "sphinx", "sphinx-rtd-theme", "six (>=1.10.0,<2.0)", "gevent", "tornado"]
[[package]]
name = "packaging"
version = "21.3"
@@ -707,7 +718,7 @@ test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock
[[package]]
name = "prometheus-client"
version = "0.14.0"
version = "0.14.1"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
@@ -1330,14 +1341,6 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-opentracing"
version = "2.4.7"
description = "Typing stubs for opentracing"
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "types-pillow"
version = "9.0.15"
@@ -1515,12 +1518,12 @@ docs = ["sphinx", "repoze.sphinx.autointerface"]
test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"]
[extras]
all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "txredisapi", "hiredis", "Pympler"]
all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "opentelemetry-api", "opentelemetry-sdk", "txredisapi", "hiredis", "Pympler"]
cache_memory = ["Pympler"]
jwt = ["authlib"]
matrix-synapse-ldap3 = ["matrix-synapse-ldap3"]
oidc = ["authlib"]
opentracing = []
opentelemetry = ["opentelemetry-api", "opentelemetry-sdk"]
postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"]
redis = ["txredisapi", "hiredis"]
saml2 = ["pysaml2"]
@@ -1532,7 +1535,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
content-hash = "8e54010315066ba2049baf7617b73ad055d4f34122c1d3c4ffa50bf0a8c4e36e"
content-hash = "14602f17c83b68a9dde71aee3b37d9c902153fed752a9a62d1d84b5ca8a6cd14"
[metadata.files]
attrs = [
@@ -2113,6 +2116,9 @@ opentelemetry-semantic-conventions = [
{file = "opentelemetry-semantic-conventions-0.30b1.tar.gz", hash = "sha256:2fac7c7202602566b87b2ee3c90fbc272be6094725479f8102f083bf425cc253"},
{file = "opentelemetry_semantic_conventions-0.30b1-py3-none-any.whl", hash = "sha256:5213268cd0a7a8fb94c054e4c1bac8c17586f732eca91769463320f3dcd910bb"},
]
opentracing = [
{file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"},
]
packaging = [
{file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"},
{file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"},
@@ -2175,8 +2181,8 @@ platformdirs = [
{file = "platformdirs-2.5.1.tar.gz", hash = "sha256:7535e70dfa32e84d4b34996ea99c5e432fa29a708d0f4e394bbcb2a8faa4f16d"},
]
prometheus-client = [
{file = "prometheus_client-0.14.0-py3-none-any.whl", hash = "sha256:f4aba3fdd1735852049f537c1f0ab177159b7ab76f271ecc4d2f45aa2a1d01f2"},
{file = "prometheus_client-0.14.0.tar.gz", hash = "sha256:8f7a922dd5455ad524b6ba212ce8eb2b4b05e073f4ec7218287f88b1cac34750"},
{file = "prometheus_client-0.14.1-py3-none-any.whl", hash = "sha256:522fded625282822a89e2773452f42df14b5a8e84a86433e3f8a189c1d54dc01"},
{file = "prometheus_client-0.14.1.tar.gz", hash = "sha256:5459c427624961076277fdc6dc50540e2bacb98eebde99886e59ec55ed92093a"},
]
psycopg2 = [
{file = "psycopg2-2.9.3-cp310-cp310-win32.whl", hash = "sha256:083707a696e5e1c330af2508d8fab36f9700b26621ccbcb538abe22e15485362"},
@@ -2555,10 +2561,6 @@ types-jsonschema = [
{file = "types-jsonschema-4.4.6.tar.gz", hash = "sha256:7f2a804618756768c7c0616f8c794b61fcfe3077c7ee1ad47dcf01c5e5f692bb"},
{file = "types_jsonschema-4.4.6-py3-none-any.whl", hash = "sha256:1db9031ca49a8444d01bd2ce8cf2f89318382b04610953b108321e6f8fb03390"},
]
types-opentracing = [
{file = "types-opentracing-2.4.7.tar.gz", hash = "sha256:be60e9618355aa892571ace002e6b353702538b1c0dc4fbc1c921219d6658830"},
{file = "types_opentracing-2.4.7-py3-none-any.whl", hash = "sha256:861fb8103b07cf717f501dd400cb274ca9992552314d4d6c7a824b11a215e512"},
]
types-pillow = [
{file = "types-Pillow-9.0.15.tar.gz", hash = "sha256:d2e385fe5c192e75970f18accce69f5c2a9f186f3feb578a9b91cd6fdf64211d"},
{file = "types_Pillow-9.0.15-py3-none-any.whl", hash = "sha256:c9646595dfafdf8b63d4b1443292ead17ee0fc7b18a143e497b68e0ea2dc1eb6"},

View File

@@ -182,6 +182,7 @@ parameterized = { version = ">=0.7.4", optional = true }
idna = { version = ">=2.5", optional = true }
opentelemetry-api = {version = "^1.11.1", optional = true}
opentelemetry-sdk = {version = "^1.11.1", optional = true}
opentracing = {version = "^2.4.0", optional = true}
[tool.poetry.extras]
# NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified

View File

@@ -30,7 +30,7 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import active_span, force_tracing, start_active_span
from synapse.logging.tracing import active_span, force_tracing, start_active_span
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import Requester, UserID, create_requester
@@ -147,14 +147,14 @@ class Auth:
# so we also force it on for that.
force_tracing()
force_tracing(parent_span)
parent_span.set_tag(
parent_span.set_attribute(
"authenticated_entity", requester.authenticated_entity
)
parent_span.set_tag("user_id", requester.user.to_string())
parent_span.set_attribute("user_id", requester.user.to_string())
if requester.device_id is not None:
parent_span.set_tag("device_id", requester.device_id)
parent_span.set_attribute("device_id", requester.device_id)
if requester.app_service is not None:
parent_span.set_tag("appservice_id", requester.app_service.id)
parent_span.set_attribute("appservice_id", requester.app_service.id)
return requester
async def _wrapped_get_user_by_req(

View File

@@ -62,7 +62,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentelemetry import init_tracer
from synapse.logging.tracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
@@ -549,7 +549,7 @@ def setup_sentry(hs: "HomeServer") -> None:
# We set some default tags that give some context to this instance
with sentry_sdk.configure_scope() as scope:
scope.set_tag("matrix_server_name", hs.config.server.server_name)
scope.set_attribute("matrix_server_name", hs.config.server.server_name)
app = (
hs.config.worker.worker_app
@@ -557,8 +557,8 @@ def setup_sentry(hs: "HomeServer") -> None:
else "synapse.app.homeserver"
)
name = hs.get_instance_name()
scope.set_tag("worker_app", app)
scope.set_tag("worker_name", name)
scope.set_attribute("worker_app", app)
scope.set_attribute("worker_name", name)
def setup_sdnotify(hs: "HomeServer") -> None:

View File

@@ -51,15 +51,17 @@ class TracerConfig(Config):
if not isinstance(self.opentelemetry_whitelist, list):
raise ConfigError("Tracer homeserver_whitelist config is malformed")
force_tracing_for_users = opentelemetry_config.get("force_tracing_for_users", [])
force_tracing_for_users = opentelemetry_config.get(
"force_tracing_for_users", []
)
if not isinstance(force_tracing_for_users, list):
raise ConfigError(
"Expected a list", ("opentracing", "force_tracing_for_users")
"Expected a list", ("opentelemetry", "force_tracing_for_users")
)
for i, u in enumerate(force_tracing_for_users):
if not isinstance(u, str):
raise ConfigError(
"Expected a string",
("opentracing", "force_tracing_for_users", f"index {i}"),
("opentelemetry", "force_tracing_for_users", f"index {i}"),
)
self.force_tracing_for_users.add(u)

View File

@@ -61,7 +61,7 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
from synapse.logging.opentelemetry import log_kv, start_active_span_from_edu, trace
from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,

View File

@@ -32,7 +32,7 @@ from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentelemetry import SynapseTags, set_tag
from synapse.logging.tracing import SynapseTags, set_attribute
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
@@ -596,7 +596,7 @@ class PerDestinationQueue:
if not message_id:
continue
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
edus = [
Edu(

View File

@@ -21,9 +21,9 @@ from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.logging.opentelemetry import (
from synapse.logging.tracing import (
extract_text_map,
set_tag,
set_attribute,
start_active_span_follows_from,
tags,
whitelisted_homeserver,
@@ -166,7 +166,7 @@ class TransactionManager:
except HttpResponseException as e:
code = e.code
set_tag(tags.ERROR, True)
set_attribute(tags.ERROR, True)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise

View File

@@ -25,9 +25,9 @@ from synapse.http.server import HttpServer, ServletCallback, is_method_cancellab
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
from synapse.logging.opentelemetry import (
from synapse.logging.tracing import (
active_span,
set_tag,
set_attribute,
span_context_from_request,
start_active_span,
start_active_span_follows_from,
@@ -309,7 +309,7 @@ class BaseFederationServlet:
raise
# update the active opentracing span with the authenticated entity
set_tag("authenticated_entity", str(origin))
set_attribute("authenticated_entity", str(origin))
# if the origin is authenticated and whitelisted, use its span context
# as the parent.

View File

@@ -36,7 +36,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
from synapse.logging.opentelemetry import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -86,7 +86,7 @@ class DeviceWorkerHandler:
info on each device
"""
set_tag("user_id", user_id)
set_attribute("user_id", user_id)
device_map = await self.store.get_devices_by_user(user_id)
ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -118,8 +118,8 @@ class DeviceWorkerHandler:
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
set_tag("device", str(device))
set_tag("ips", str(ips))
set_attribute("device", str(device))
set_attribute("ips", str(ips))
return device
@@ -169,8 +169,8 @@ class DeviceWorkerHandler:
joined a room, that `user_id` may be interested in.
"""
set_tag("user_id", user_id)
set_tag("from_token", str(from_token))
set_attribute("user_id", user_id)
set_attribute("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -461,8 +461,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
set_attribute("error", True)
set_attribute("reason", "User doesn't have that device id.")
else:
raise
@@ -794,8 +794,8 @@ class DeviceListUpdater:
for parsing the EDU and adding to pending updates list.
"""
set_tag("origin", origin)
set_tag("edu_content", str(edu_content))
set_attribute("origin", origin)
set_attribute("edu_content", str(edu_content))
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -815,7 +815,7 @@ class DeviceListUpdater:
origin,
)
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
@@ -830,7 +830,7 @@ class DeviceListUpdater:
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Got an update from a user for which "
@@ -1027,12 +1027,12 @@ class DeviceListUpdater:
# eventually become consistent.
return None
except FederationDeniedError as e:
set_tag("error", True)
set_attribute("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return None
except Exception as e:
set_tag("error", True)
set_attribute("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)

View File

@@ -19,11 +19,11 @@ from synapse.api.constants import EduTypes, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentelemetry import (
from synapse.logging.tracing import (
SynapseTags,
get_active_span_text_map,
log_kv,
set_tag,
set_attribute,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
@@ -217,10 +217,10 @@ class DeviceMessageHandler:
sender_user_id = requester.user.to_string()
message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
set_attribute("sender", sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():

View File

@@ -28,7 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EduTypes
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentelemetry import log_kv, set_tag, tag_args, trace
from synapse.logging.tracing import log_kv, set_attribute, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
JsonDict,
@@ -138,8 +138,8 @@ class E2eKeysHandler:
else:
remote_queries[user_id] = device_ids
set_tag("local_key_query", str(local_query))
set_tag("remote_key_query", str(remote_queries))
set_attribute("local_key_query", str(local_query))
set_attribute("remote_key_query", str(remote_queries))
# First get local devices.
# A map of destination -> failure response.
@@ -342,8 +342,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", str(failure))
set_attribute("error", True)
set_attribute("reason", str(failure))
return
@@ -405,7 +405,7 @@ class E2eKeysHandler:
Returns:
A map from user_id -> device_id -> device details
"""
set_tag("local_query", str(query))
set_attribute("local_query", str(query))
local_query: List[Tuple[str, Optional[str]]] = []
result_dict: Dict[str, Dict[str, dict]] = {}
@@ -420,7 +420,7 @@ class E2eKeysHandler:
"user_id": user_id,
}
)
set_tag("error", True)
set_attribute("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -477,8 +477,8 @@ class E2eKeysHandler:
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
set_tag("local_key_query", str(local_query))
set_tag("remote_key_query", str(remote_queries))
set_attribute("local_key_query", str(local_query))
set_attribute("remote_key_query", str(remote_queries))
results = await self.store.claim_e2e_one_time_keys(local_query)
@@ -494,7 +494,7 @@ class E2eKeysHandler:
@trace
async def claim_client_keys(destination: str) -> None:
set_tag("destination", destination)
set_attribute("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = await self.federation.claim_client_keys(
@@ -507,8 +507,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", str(failure))
set_attribute("error", True)
set_attribute("reason", str(failure))
await make_deferred_yieldable(
defer.gatherResults(
@@ -611,7 +611,7 @@ class E2eKeysHandler:
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
set_tag("one_time_key_counts", str(result))
set_attribute("one_time_key_counts", str(result))
return {"one_time_key_counts": result}
async def _upload_one_time_keys_for_user(

View File

@@ -25,7 +25,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.logging.opentelemetry import log_kv, trace
from synapse.logging.tracing import log_kv, trace
from synapse.storage.databases.main.e2e_room_keys import RoomKey
from synapse.types import JsonDict
from synapse.util.async_helpers import Linearizer

View File

@@ -25,7 +25,12 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentelemetry import SynapseTags, log_kv, set_tag, start_active_span
from synapse.logging.tracing import (
SynapseTags,
log_kv,
set_attribute,
start_active_span,
)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import NotifCounts
from synapse.storage.roommember import MemberSummary
@@ -396,7 +401,7 @@ class SyncHandler:
sync_config, since_token, full_state
)
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
set_attribute(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
@@ -1337,7 +1342,7 @@ class SyncHandler:
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
@@ -1997,7 +2002,7 @@ class SyncHandler:
upto_token = room_builder.upto_token
with start_active_span("sync.generate_room_entry"):
set_tag("room_id", room_id)
set_attribute("room_id", room_id)
log_kv({"events": len(events or ())})
log_kv(

View File

@@ -75,7 +75,7 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentelemetry import set_tag, start_active_span, tags
from synapse.logging.tracing import set_attribute, start_active_span, tags
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
@@ -459,8 +459,8 @@ class SimpleHttpClient:
type(e).__name__,
e.args[0],
)
set_tag(tags.ERROR, True)
set_tag("error_reason", e.args[0])
set_attribute(tags.ERROR, True)
set_attribute("error_reason", e.args[0])
raise
async def post_urlencoded_get_json(

View File

@@ -74,7 +74,7 @@ from synapse.http.federation.matrix_federation_agent import MatrixFederationAgen
from synapse.http.types import QueryParams
from synapse.logging import opentelemetry
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentelemetry import set_tag, start_active_span, tags
from synapse.logging.tracing import set_attribute, start_active_span, tags
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
@@ -614,7 +614,7 @@ class MatrixFederationHttpClient:
request.method, response.code
).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)
set_attribute(tags.HTTP_STATUS_CODE, response.code)
response_phrase = response.phrase.decode("ascii", errors="replace")
if 200 <= response.code < 300:

View File

@@ -60,7 +60,7 @@ from synapse.api.errors import (
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentelemetry import active_span, start_active_span, trace_servlet
from synapse.logging.tracing import active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

View File

@@ -164,7 +164,9 @@ class SynapseRequest(Request):
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester
def set_opentracing_span(self, span: "opentracing.Span") -> None:
def set_opentracing_span(
self, span: opentelemetry.shim.opentracing_shim.SpanShim
) -> None:
"""attach an opentracing span to this request
Doing so will cause the span to be closed when we finish processing the request

View File

@@ -64,7 +64,7 @@ exited. This is usually done by using ``with``.
Forgetting to enter or exit a scope will result in some mysterious and grievous log
context errors.
At anytime where there is an active span ``opentracing.set_tag`` can be used to
At anytime where there is an active span ``opentracing.set_attribute`` can be used to
set a tag on the current active span.
Tracing functions
@@ -102,9 +102,9 @@ To set a tag on the active span do
.. code-block:: python
from synapse.logging.opentracing import set_tag
from synapse.logging.opentracing import set_attribute
set_tag(tag_name, tag_value)
set_attribute(tag_name, tag_value)
There's a convenient decorator to tag all the args of the method. It uses
inspection in order to use the formal parameter names prefixed with 'ARG_' as
@@ -179,6 +179,7 @@ from typing import (
List,
Optional,
Pattern,
Sequence,
Type,
TypeVar,
Union,
@@ -206,7 +207,7 @@ if TYPE_CHECKING:
class _DummyTagNames:
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
should never actually show up in a trace. `set_attributes` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
@@ -234,14 +235,17 @@ class _DummyTagNames:
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
# These dependencies are optional so they can fail to import
# and we
# and we
try:
import opentelemetry
import opentracing
# TODO: tags?
except ImportError:
opentelemetry = None # type: ignore[assignment]
opentracing = None # type: ignore[assignment]
tags = _DummyTagNames # type: ignore[assignment]
@@ -368,7 +372,7 @@ def ensure_active_span(
def init_tracer(hs: "HomeServer") -> None:
"""Set the whitelists and initialise the JaegerClient tracer"""
"""Set the whitelists and initialise the OpenTelemetry tracer"""
global opentelemetry
if not hs.config.tracing.opentelemetry_enabled:
# We don't have a tracer
@@ -393,6 +397,7 @@ def init_tracer(hs: "HomeServer") -> None:
# Sets the global default tracer provider
trace.set_tracer_provider(provider)
# Whitelisting
@@ -429,20 +434,23 @@ def whitelisted_homeserver(destination: str) -> bool:
# Could use kwargs but I want these to be explicit
def start_active_span(
operation_name: str,
child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None,
child_of: Optional[
Union[
opentelemetry.shim.opentracing_shim.SpanShim,
opentelemetry.shim.opentracing_shim.SpanContextShim,
]
] = None,
references: Optional[List["opentracing.Reference"]] = None,
tags: Optional[Dict[str, str]] = None,
start_time: Optional[float] = None,
ignore_active_span: bool = False,
finish_on_close: bool = True,
*,
tracer: Optional["opentracing.Tracer"] = None,
) -> "opentracing.Scope":
tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None,
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
"""Starts an active opentracing span.
Records the start time for the span, and sets it as the "active span" in the
scope manager.
Args:
See opentracing.tracer
Returns:
@@ -454,9 +462,11 @@ def start_active_span(
if tracer is None:
# use the global tracer by default
tracer = opentelemetry.trace.get_tracer(__name__)
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
tracer = tracerShim
return tracer.start_as_current_span(
return tracer.start_active_span(
operation_name,
child_of=child_of,
references=references,
@@ -470,30 +480,31 @@ def start_active_span(
def start_active_span_follows_from(
operation_name: str,
contexts: Collection,
child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None,
child_of: Optional[
Union[
opentelemetry.shim.opentracing_shim.SpanShim,
opentelemetry.shim.opentracing_shim.SpanContextShim,
]
] = None,
start_time: Optional[float] = None,
*,
inherit_force_tracing: bool = False,
tracer: Optional["opentracing.Tracer"] = None,
) -> "opentracing.Scope":
tracer: Optional[opentelemetry.shim.opentracing_shim.TracerShim] = None,
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
"""Starts an active opentracing span, with additional references to previous spans
Args:
operation_name: name of the operation represented by the new span
contexts: the previous spans to inherit from
child_of: optionally override the parent span. If unset, the currently active
span will be the parent. (If there is no currently active span, the first
span in `contexts` will be the parent.)
start_time: optional override for the start time of the created span. Seconds
since the epoch.
inherit_force_tracing: if set, and any of the previous contexts have had tracing
forced, the new span will also have tracing forced.
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if opentracing is None:
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
references = [opentracing.follows_from(context) for context in contexts]
@@ -521,7 +532,7 @@ def start_active_span_from_edu(
start_time: Optional[float] = None,
ignore_active_span: bool = False,
finish_on_close: bool = True,
) -> "opentracing.Scope":
) -> opentelemetry.shim.opentracing_shim.ScopeShim:
"""
Extracts a span context from an edu and uses it to start a new active span
@@ -533,15 +544,22 @@ def start_active_span_from_edu(
"""
references = references or []
if opentracing is None:
if opentelemetry is None:
return contextlib.nullcontext() # type: ignore[unreachable]
carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
)
context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
context = tracerShim.extract(opentracing.Format.TEXT_MAP, carrier)
_references = [
opentracing.child_of(span_context_from_string(x))
opentracing.Reference(
type=opentracing.ReferenceType.CHILD_OF,
referenced_context=span_context_from_string(x),
)
for x in carrier.get("references", [])
]
@@ -551,7 +569,7 @@ def start_active_span_from_edu(
references += _references
scope = opentracing.tracer.start_active_span(
scope = tracerShim.start_active_span(
operation_name,
child_of=context,
references=references,
@@ -561,41 +579,39 @@ def start_active_span_from_edu(
finish_on_close=finish_on_close,
)
scope.span.set_tag("references", carrier.get("references", []))
scope.span.set_attribute("references", carrier.get("references", []))
return scope
# Opentracing setters for tags, logs, etc
# OpenTelemetry setters for attributes, logs, etc
@only_if_tracing
def active_span() -> Optional["opentracing.Span"]:
def active_span() -> Optional[opentelemetry.trace.span.Span]:
"""Get the currently active span, if any"""
return opentracing.tracer.active_span
return opentelemetry.trace.get_current_span()
@ensure_active_span("set a tag")
def set_tag(key: str, value: Union[str, bool, int, float]) -> None:
def set_attribute(key: str, value: Union[str, bool, int, float]) -> None:
"""Sets a tag on the active span"""
assert opentracing.tracer.active_span is not None
opentracing.tracer.active_span.set_tag(key, value)
active_span = active_span()
assert active_span is not None
active_span.set_attribute(key, value)
@ensure_active_span("log")
def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None:
"""Log to the active span"""
assert opentracing.tracer.active_span is not None
opentracing.tracer.active_span.log_kv(key_values, timestamp)
@ensure_active_span("set the traces operation name")
def set_operation_name(operation_name: str) -> None:
"""Sets the operation name of the active span"""
assert opentracing.tracer.active_span is not None
opentracing.tracer.active_span.set_operation_name(operation_name)
active_span = active_span()
assert active_span is not None
event_name = opentelemetry.ext.opentracing_shim.util.event_name_from_kv(key_values)
active_span.add_event(event_name, timestamp, key_values)
@only_if_tracing
def force_tracing(
span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel
span: Union[
opentelemetry.shim.opentracing_shim.SpanShim, _Sentinel
] = _Sentinel.sentinel
) -> None:
"""Force sampling for the active/given span and its children.
@@ -603,14 +619,14 @@ def force_tracing(
span: span to force tracing for. By default, the active span.
"""
if isinstance(span, _Sentinel):
span_to_trace = opentracing.tracer.active_span
span_to_trace = opentelemetry.trace.get_current_span()
else:
span_to_trace = span
if span_to_trace is None:
logger.error("No active span in force_tracing")
return
span_to_trace.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
span_to_trace.set_attribute(opentracing.tags.SAMPLING_PRIORITY, 1)
# also set a bit of baggage, so that we have a way of figuring out if
# it is enabled later
@@ -618,7 +634,7 @@ def force_tracing(
def is_context_forced_tracing(
span_context: Optional["opentracing.SpanContext"],
span_context: Optional[opentelemetry.shim.opentracing_shim.SpanContextShim],
) -> bool:
"""Check if sampling has been force for the given span context."""
if span_context is None:
@@ -661,11 +677,15 @@ def inject_header_dict(
if not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
span = tracerShim.active_span
carrier: Dict[str, str] = {}
assert span is not None
opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
tracerShim.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
for key, value in carrier.items():
headers[key.encode()] = [value.encode()]
@@ -673,9 +693,9 @@ def inject_header_dict(
def inject_response_headers(response_headers: Headers) -> None:
"""Inject the current trace id into the HTTP response headers"""
if not opentracing:
if not opentelemetry:
return
span = opentracing.tracer.active_span
span = opentelemetry.trace.get_current_span()
if not span:
return
@@ -709,10 +729,10 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
return {}
carrier: Dict[str, str] = {}
assert opentracing.tracer.active_span is not None
opentracing.tracer.inject(
opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
)
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
assert tracerShim.active_span is not None
tracerShim.inject(tracerShim.context, opentracing.Format.TEXT_MAP, carrier)
return carrier
@@ -724,10 +744,12 @@ def active_span_context_as_string() -> str:
The active span context encoded as a string.
"""
carrier: Dict[str, str] = {}
if opentracing:
assert opentracing.tracer.active_span is not None
opentracing.tracer.inject(
opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
if opentelemetry:
otel_tracer = opentelemetry.trace.get_tracer(__name__)
tracerShim = opentelemetry.shim.opentracing_shim.create_tracer(otel_tracer)
assert tracerShim.active_span is not None
tracerShim.inject(
tracerShim.active_span.context, opentracing.Format.TEXT_MAP, carrier
)
return json_encoder.encode(carrier)
@@ -747,7 +769,9 @@ def span_context_from_request(request: Request) -> "Optional[opentracing.SpanCon
@only_if_tracing
def span_context_from_string(carrier: str) -> Optional["opentracing.SpanContext"]:
def span_context_from_string(
carrier: str,
) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]:
"""
Returns:
The active span context decoded from a string.
@@ -757,7 +781,9 @@ def span_context_from_string(carrier: str) -> Optional["opentracing.SpanContext"
@only_if_tracing
def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanContext"]:
def extract_text_map(
carrier: Dict[str, str]
) -> Optional[opentelemetry.shim.opentracing_shim.SpanContextShim]:
"""
Wrapper method for opentracing's tracer.extract for TEXT_MAP.
Args:
@@ -859,9 +885,9 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R:
argspec = inspect.getfullargspec(func)
for i, arg in enumerate(argspec.args[1:]):
set_tag("ARG_" + arg, args[i]) # type: ignore[index]
set_tag("args", args[len(argspec.args) :]) # type: ignore[index]
set_tag("kwargs", str(kwargs))
set_attribute("ARG_" + arg, args[i]) # type: ignore[index]
set_attribute("args", args[len(argspec.args) :]) # type: ignore[index]
set_attribute("kwargs", str(kwargs))
return func(*args, **kwargs)
return _tag_args_inner
@@ -909,7 +935,7 @@ def trace_servlet(
finally:
# We set the operation name again in case its changed (which happens
# with JsonResource).
scope.span.set_operation_name(request.request_metrics.name)
scope.span.update_name(request.request_metrics.name)
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)
@@ -918,4 +944,4 @@ def trace_servlet(
] = request.request_metrics.start_context.tag
for k, v in request_tags.items():
scope.span.set_tag(k, v)
scope.span.set_attribute(k, v)

View File

@@ -42,7 +42,7 @@ from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
)
from synapse.logging.opentelemetry import SynapseTags, start_active_span
from synapse.logging.tracing import SynapseTags, start_active_span
from synapse.metrics._types import Collector
if TYPE_CHECKING:

View File

@@ -39,7 +39,7 @@ from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentelemetry import log_kv, start_active_span
from synapse.logging.tracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (

View File

@@ -29,7 +29,7 @@ from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
from synapse.logging import opentelemetry
from synapse.logging.opentelemetry import trace_with_opname
from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

View File

@@ -26,7 +26,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import log_kv, set_tag, trace_with_opname
from synapse.logging.tracing import log_kv, set_attribute, trace_with_opname
from synapse.types import JsonDict, StreamToken
from ._base import client_patterns, interactive_auth_handler
@@ -88,7 +88,7 @@ class KeyUploadServlet(RestServlet):
user_id
)
if dehydrated_device is not None and device_id != dehydrated_device[0]:
set_tag("error", True)
set_attribute("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
@@ -204,13 +204,13 @@ class KeyChangesServlet(RestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from", required=True)
set_tag("from", from_token_string)
set_attribute("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
#
# XXX This does not enforce that "to" is passed.
set_tag("to", str(parse_string(request, "to")))
set_attribute("to", str(parse_string(request, "to")))
from_token = await StreamToken.from_string(self.store, from_token_string)

View File

@@ -24,7 +24,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import set_tag
from synapse.logging.tracing import set_attribute
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict, RoomAlias, RoomID
@@ -97,7 +97,7 @@ class KnockRoomAliasServlet(RestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id

View File

@@ -46,7 +46,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import set_tag
from synapse.logging.tracing import set_attribute
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.storage.state import StateFilter
@@ -82,7 +82,7 @@ class RoomCreateRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(request, self.on_POST, request)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
@@ -194,7 +194,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
if txn_id:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
content = parse_json_object_from_request(request)
@@ -229,7 +229,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
ret = {"event_id": event_id}
return 200, ret
@@ -279,7 +279,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_GET(
@@ -290,7 +290,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_type, txn_id
@@ -348,7 +348,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id
@@ -816,7 +816,7 @@ class RoomForgetRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, txn_id
@@ -916,7 +916,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, membership_action, txn_id
@@ -962,13 +962,13 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
set_tag("event_id", event_id)
set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_PUT(
self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("txn_id", txn_id)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_id, txn_id

View File

@@ -19,7 +19,7 @@ from synapse.http import servlet
from synapse.http.server import HttpServer
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import set_tag, trace_with_opname
from synapse.logging.tracing import set_attribute, trace_with_opname
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict
@@ -47,8 +47,8 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
set_tag("message_type", message_type)
set_tag("txn_id", txn_id)
set_attribute("message_type", message_type)
set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)

View File

@@ -37,7 +37,7 @@ from synapse.handlers.sync import (
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.logging.opentelemetry import trace_with_opname
from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict, StreamToken
from synapse.util import json_decoder

View File

@@ -27,7 +27,7 @@ from typing import (
)
from synapse.logging import issue9533_logger
from synapse.logging.opentelemetry import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -436,7 +436,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(user_id, device_id), None
)
set_tag("last_deleted_stream_id", str(last_deleted_stream_id))
set_attribute("last_deleted_stream_id", str(last_deleted_stream_id))
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -485,10 +485,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
A list of messages for the device and where in the stream the messages got to.
"""
set_tag("destination", destination)
set_tag("last_stream_id", last_stream_id)
set_tag("current_stream_id", current_stream_id)
set_tag("limit", limit)
set_attribute("destination", destination)
set_attribute("last_stream_id", last_stream_id)
set_attribute("current_stream_id", current_stream_id)
set_attribute("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id

View File

@@ -32,9 +32,9 @@ from typing_extensions import Literal
from synapse.api.constants import EduTypes
from synapse.api.errors import Codes, StoreError
from synapse.logging.opentelemetry import (
from synapse.logging.tracing import (
get_active_span_text_map,
set_tag,
set_attribute,
trace,
whitelisted_homeserver,
)
@@ -706,8 +706,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
else:
results[user_id] = await self.get_cached_devices_for_user(user_id)
set_tag("in_cache", str(results))
set_tag("not_in_cache", str(user_ids_not_in_cache))
set_attribute("in_cache", str(results))
set_attribute("not_in_cache", str(user_ids_not_in_cache))
return user_ids_not_in_cache, results

View File

@@ -18,7 +18,7 @@ from typing import Dict, Iterable, Mapping, Optional, Tuple, cast
from typing_extensions import Literal, TypedDict
from synapse.api.errors import StoreError
from synapse.logging.opentelemetry import log_kv, trace
from synapse.logging.tracing import log_kv, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict, JsonSerializable, StreamKeyType

View File

@@ -36,7 +36,7 @@ from synapse.appservice import (
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.logging.opentelemetry import log_kv, set_tag, trace
from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
@@ -146,7 +146,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
key data. The key data will be a dict in the same format as the
DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
"""
set_tag("query_list", str(query_list))
set_attribute("query_list", str(query_list))
if not query_list:
return {}
@@ -228,8 +228,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
Dict mapping from user-id to dict mapping from device_id to
key data.
"""
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)
set_attribute("include_all_devices", include_all_devices)
set_attribute("include_deleted_devices", include_deleted_devices)
result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
@@ -416,9 +416,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"""
def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("new_keys", str(new_keys))
set_attribute("user_id", user_id)
set_attribute("device_id", device_id)
set_attribute("new_keys", str(new_keys))
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -1158,10 +1158,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool:
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", str(device_keys))
set_attribute("user_id", user_id)
set_attribute("device_id", device_id)
set_attribute("time_now", time_now)
set_attribute("device_keys", str(device_keys))
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,

View File

@@ -29,7 +29,7 @@ import attr
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentelemetry import (
from synapse.logging.tracing import (
active_span,
start_active_span,
start_active_span_follows_from,

View File

@@ -22,10 +22,7 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentelemetry import (
start_active_span,
start_active_span_follows_from,
)
from synapse.logging.tracing import start_active_span, start_active_span_follows_from
from synapse.util import Clock
try: