1
0

Compare commits

...

50 Commits

Author SHA1 Message Date
Jorik Schellekens 33789fc83e I wish python had a good type system. 2019-07-17 16:53:08 +01:00
Jorik Schellekens 794f1e6d8b Use better decorator names. 2019-07-17 15:39:12 +01:00
Jorik Schellekens 7027cfc771 Trailing .d 2019-07-17 14:39:47 +01:00
Jorik Schellekens 7f282d17e6 Unbreak json parsing. 2019-07-17 14:37:25 +01:00
Jorik Schellekens 51e5a12c3a Make sure there is an active span here. 2019-07-17 14:37:25 +01:00
Jorik Schellekens 844aac1d1f newsfile 2019-07-17 14:37:19 +01:00
Jorik Schellekens 5b104b0d02 Add user _id 2019-07-17 14:33:00 +01:00
Jorik Schellekens 310fb51c7d Better args wrapper 2019-07-17 14:33:00 +01:00
Jorik Schellekens 0e8c35c8e8 A little extra device_list tracing 2019-07-17 14:33:00 +01:00
Jorik Schellekens 1197e2e97b Though style is subjective it depends on a ruthless objectivity: you either have it, or you don't· 2019-07-17 14:33:00 +01:00
Jorik Schellekens eb7a79553c Isort of ran out of puns for this one. 2019-07-17 14:33:00 +01:00
Jorik Schellekens 9f599fcaef Nicer tracing 2019-07-17 14:33:00 +01:00
Jorik Schellekens 1e5f5c2ccf Trace key claiming 2019-07-17 14:33:00 +01:00
Jorik Schellekens f6ce4fec23 Cleanup key upload tracing 2019-07-17 14:33:00 +01:00
Jorik Schellekens 5ca5845284 Clean up room key tracing 2019-07-17 14:33:00 +01:00
Jorik Schellekens 52a7f625a8 Some tracing 2019-07-17 14:33:00 +01:00
Jorik Schellekens 783ddc417f Opentracing across streams 2019-07-17 14:33:00 +01:00
Jorik Schellekens 87061c72c4 The great logging/ migration 2019-07-17 14:33:00 +01:00
Jorik Schellekens fbe23fe32d These functions were not deferreds! 2019-07-17 14:33:00 +01:00
Jorik Schellekens 5aaee551a4 How did that half of the statement get deleted? 2019-07-17 14:33:00 +01:00
Jorik Schellekens 2e141b8177 typo 2019-07-17 14:33:00 +01:00
Jorik Schellekens d5b9c97943 Include servletname in incoming-request trace 2019-07-17 14:33:00 +01:00
Jorik Schellekens 34e1de3374 Update to new access pattern 2019-07-17 14:33:00 +01:00
Jorik Schellekens 4e649702ec Trace device messages. 2019-07-17 14:33:00 +01:00
Jorik Schellekens 8c17b7a8da Trace devices 2019-07-17 14:33:00 +01:00
Jorik Schellekens 65c63f983c Trace more e2e stuff and less e2e stuff 2019-07-17 14:33:00 +01:00
Jorik Schellekens 9170c46af9 Fix e2e bugs 2019-07-17 14:33:00 +01:00
Jorik Schellekens 32342d0ec2 Trace e2e 2019-07-17 14:33:00 +01:00
Jorik Schellekens 07fadd4e9b Cleanup 2019-07-17 14:24:04 +01:00
Jorik Schellekens 3e5f4a40de Unbreak json parsing. 2019-07-17 14:22:55 +01:00
Jorik Schellekens bf892473f9 Noop if opentracing is disabled 2019-07-17 14:22:16 +01:00
Jorik Schellekens 75d489f2f3 Better args wrapper 2019-07-17 14:22:14 +01:00
Jorik Schellekens 8f0a00ef35 Fix codestyle 2019-07-17 14:20:50 +01:00
Jorik Schellekens 4c2f39cce4 Isort opentracing 2019-07-17 14:20:50 +01:00
Jorik Schellekens aca1141a62 Rebase doctoring 2019-07-17 14:20:48 +01:00
Jorik Schellekens 7c6bfaed11 Sectioning comments 2019-07-17 14:20:22 +01:00
Jorik Schellekens 62da3aa0af Move whitelisting 2019-07-17 14:20:22 +01:00
Jorik Schellekens 04c3bf430c Move opentracing setters 2019-07-17 14:20:22 +01:00
Jorik Schellekens 4c3e592e39 Group utils 2019-07-17 14:20:22 +01:00
Jorik Schellekens 4d54425b9d Move dummy tags 2019-07-17 14:20:22 +01:00
Jorik Schellekens 54211f5435 Typos and errata 2019-07-17 14:20:22 +01:00
Jorik Schellekens d67376f7fc Better decorator names 2019-07-17 14:20:22 +01:00
Jorik Schellekens 854d75c41b Better inner function names 2019-07-17 14:20:20 +01:00
Jorik Schellekens d0dd14ccee Remove unused function. 2019-07-17 14:18:31 +01:00
Jorik Schellekens c9fa681937 Opentracing survival guide 2019-07-17 14:18:31 +01:00
Jorik Schellekens 161b8077f0 Typo 2019-07-17 14:18:31 +01:00
Jorik Schellekens 55a8ee509a Context and edu utils 2019-07-17 14:18:31 +01:00
Jorik Schellekens 342840f884 Use the new clean contexts 2019-07-17 14:18:29 +01:00
Jorik Schellekens 64c6eee389 Try catch wrapping to make sure the span is finished 2019-07-17 14:17:22 +01:00
Jorik Schellekens 2006f7ae94 Add decerators for tracing functions 2019-07-15 10:21:21 +01:00
20 changed files with 981 additions and 208 deletions
+1
View File
@@ -0,0 +1 @@
Opentrace e2e code paths.
+179
View File
@@ -0,0 +1,179 @@
===========
Opentracing
===========
Background
----------
Opentracing is semi-standard being addopted by a number of distributed tracing
platforms. It is a standardised api for facilitating vendor agnostic tracing
instrumentation. That is, we can use the opentracing api and select one of a
number of tracer implementations to do the heavy lifting in the background.
Our current selected implementation is Jaeger.
Opentracing concepts can be found at
https://opentracing.io/docs/overview/what-is-tracing/
Python specific tracing concepts are at https://opentracing.io/guides/python/.
Note that synapse wraps opentracing in a small library in order to make the
opentracing dependency optional. That means that the access patterns are
different to those demonstrated here. However, it is still usefull to know.
Especially if opentracing is included as a full dependency in the future or if
you are modifying synapse's opentracing lib.
For more information about Jaeger's implementation see
https://www.jaegertracing.io/docs/
=================
Setup opentracing
=================
To receive opentracing spans start up a Jaeger server using docker like so
.. code-block:: bash
docker run -d --name jaeger \ -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.13
Latest documentation is probably at
https://www.jaegertracing.io/docs/1.13/getting-started/
Enable opentracing in synapse
-----------------------------
Opentracing is not enabled by default. It must be enabled in the homeserver
config by uncommenting the config options under ``opentracing``. For example:
.. code-block:: yaml
opentracing:
# Enable / disable tracer
tracer_enabled: true
# The list of homeservers we wish to expose our current traces to.
# The list is a list of regexes which are matched against the
# servername of the homeserver
homeserver_whitelist:
- ".*"
Homeserver whitelisting
-----------------------
The homeserver whitelist is configured using regex. A list of regexes can be
given and their union will be compared when propagating any spans through a
carrier. Most of the whitelist checks are encapsulated in the lib's injection
and extraction method but be aware that using custom carriers or crossing
unchartered waters will require the enforcement of this whitelist.
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
in a destination and compares it to the whitelist.
============================
Using opentracing in synapse
============================
Access to the opentracing api is mediated through the
``logging/opentracing.py`` lib. Opentracing is encapsulated such that
no statefull spans from opentracing are used in synapses code. This allows
opentracing to be easily disabled in synapse and thereby have opentracing as
an optional dependency. This does however limit the number of modifyable spans
at any point in the code to one. From here out references to opentracing refer
to the lib implemented in synapse.
Tracing
-------
In synapse it is not possible to start a non-active span. Spans can be started
using the ``opentracing.start_active_span`` method. This returns a scope (see
opentracing docs) which is a context manager that needs to be entered and
exited. This is usually done by using ``with``.
.. code-block:: python
with start_active_span("operation name"):
# Do something we want to tracer
Forgetting to enter or exit a scope will result in some mysterious grevious log
context errors.
At anytime where there is an active span ``opentracing.set_tag`` can be used to
set a tag on the current active span.
Tracing functions
-----------------
Functions can be easily traced using decorators. There is a decorator for
'normal' function and for functions which are actually deferreds. The name of
function becomes the operation name for the span.
.. code-block:: python
# Start a span using 'normal_function' as the operation name
@trace
def normal_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
# Start a span using 'deferred_function' as the operation name
@trace_deferred
# Yes, there is a typo in the lib. I will fix this
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
defer.returnValue(something_usual_and_useful)
Operation names can be explicitely set for functions by using
``trace_using_operation_name`` and
``trace_deferred_using_operation_name``
.. code-block:: python
@trace_using_operation_name("A *much* better operation name")
def normal_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
@trace_deferred_using_operation_name("An operation name that fixes the typo!")
# Yes, there is a typo in the lib. I will fix this
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
defer.returnValue(something_usual_and_useful)
Contexts and carriers
---------------------
There are a selection of wrappers for injecting and extracting contexts from
carriers provided. Unfortunately opentracing's standard three are not adequate
in the majority of cases. Also note that the binnary encoding format mandated
by opentracing is not actually implemented by Jaeger and it will silently noop.
Please refer to the the end of ``logging/opentracing.py`` for the available
injection and extraction methods.
==================
Configuring Jaeger
==================
Sampling strategies can be set as in this document:
https://www.jaegertracing.io/docs/1.13/sampling/
=======
Gotchas
=======
- Checking whitelists on span propagation
- Inserting pii
- Forgetting to enter or exit a scope
- Span source: make sure that the span you expect to be active across a
function call really will be that one. Does the current function have more
than one caller? Will all of those calling functions have be in a context
with an active span?
+11
View File
@@ -22,6 +22,7 @@ from netaddr import IPAddress
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
@@ -178,6 +179,7 @@ class Auth(object):
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_user_by_req(
self, request, allow_guest=False, rights="access", allow_expired=False
@@ -209,6 +211,10 @@ class Auth(object):
user_id, app_service = yield self._get_appservice_user_id(request)
if user_id:
request.authenticated_entity = user_id
opentracing.set_tag("authenticated_entity", user_id)
# there is at least one other place where authenticated entity is
# set. user_id is tagged incase authenticated_entity is clobbered
opentracing.set_tag("user_id", user_id)
if ip_addr and self.hs.config.track_appservice_user_ips:
yield self.store.insert_client_ip(
@@ -262,6 +268,11 @@ class Auth(object):
request.authenticated_entity = user.to_string()
opentracing.set_tag("authenticated_entity", user.to_string())
# there is at least one other place where authenticated entity is
# set. user_id is tagged incase authenticated_entity is clobbered
opentracing.set_tag("user_id", user.to_string())
defer.returnValue(
synapse.types.create_requester(
user, token_id, is_guest, device_id, app_service=app_service
+12 -6
View File
@@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
import synapse.logging.opentracing as opentracing
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
@@ -518,6 +519,7 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
@opentracing.trace_deferred
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -526,6 +528,9 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
opentracing.log_kv(
{"message": "Claiming one time keys.", "user, device pairs": query}
)
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -821,12 +826,13 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with opentracing.start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
@@ -16,10 +16,12 @@
import datetime
import logging
from canonicaljson import json
from prometheus_client import Counter
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
@@ -204,97 +206,142 @@ class PerDestinationQueue(object):
pending_edus = device_update_edus + to_device_edus
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
# Make a transaction sending span, this span follows on from all the
# edus in that transaction. This needs to be done because if the edus
# are never received on the remote the span effectively has no causality.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
span_contexts = [
opentracing.extract_text_map(
json.loads(
edu.get_dict().get("content", {}).get("context", "{}")
).get("opentracing", {})
)
for edu in pending_edus
]
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
with opentracing.start_active_span_follows_from(
"send_transaction", span_contexts
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
# Link each sent edu to this transaction's span
_pending_edus = []
for edu in pending_edus:
edu_dict = edu.get_dict()
span_context = json.loads(
edu_dict.get("content", {}).get("context", "{}")
).get("opentracing", {})
# If there is no span context then we are either blacklisting
# this destination or we are not tracing
if not span_context == {}:
if "references" not in span_context:
span_context["references"] = [
opentracing.active_span_context_as_string()
]
else:
span_context["references"].append(
opentracing.active_span_context_as_string()
)
edu_dict["content"]["context"] = json.dumps(
{"opentracing": span_context}
)
_pending_edus.append(Edu(**edu_dict))
pending_edus = _pending_edus
if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = (
pending_pdus[:50],
pending_pdus[50:],
)
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
pending_edus.extend(
self._pop_pending_edus(
MAX_EDUS_PER_TRANSACTION - len(pending_edus)
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
+1
View File
@@ -300,6 +300,7 @@ class BaseFederationServlet(object):
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
},
):
if origin:
+66 -3
View File
@@ -18,6 +18,7 @@ from six import iteritems, itervalues
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.api.errors import (
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device
"""
opentracing.set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices:
_update_device_from_client_ips(device, ips)
opentracing.log_kv(device_map)
defer.returnValue(devices)
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
@@ -85,9 +90,13 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
opentracing.set_tag("device", device)
opentracing.set_tag("ips", ips)
defer.returnValue(device)
@measure_func("device.get_user_ids_changed")
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
@@ -97,6 +106,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -133,7 +145,7 @@ class DeviceWorkerHandler(BaseHandler):
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
continue
# Fetch the current state at the time.
try:
@@ -148,6 +160,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
opentracing.log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
@@ -200,6 +215,10 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = []
possibly_left = []
opentracing.log_kv(
{"changed": list(possibly_joined), "left": list(possibly_left)}
)
defer.returnValue(
{"changed": list(possibly_joined), "left": list(possibly_left)}
)
@@ -269,6 +288,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -286,6 +306,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
opentracing.set_tag("error", True)
opentracing.log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass
else:
raise
@@ -298,6 +322,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id])
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
@@ -333,6 +358,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
opentracing.set_tag("error", True)
opentracing.set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
@@ -373,6 +400,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
@opentracing.trace_deferred
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
@@ -388,6 +416,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
opentracing.set_tag("hosts to update", hosts)
position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
@@ -407,6 +437,9 @@ class DeviceHandler(DeviceWorkerHandler):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
opentracing.log_kv(
{"message": "sent device update to host", "host": host}
)
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
@@ -455,12 +488,15 @@ class DeviceListEduUpdater(object):
iterable=True,
)
@opentracing.trace_deferred
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
opentracing.set_tag("origin", origin)
opentracing.set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -475,12 +511,29 @@ class DeviceListEduUpdater(object):
device_id,
origin,
)
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Got a device list update edu from a user and device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Got an update from a user for which "
+ "we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
@@ -523,6 +576,7 @@ class DeviceListEduUpdater(object):
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
if resync:
opentracing.log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
@@ -543,16 +597,25 @@ class DeviceListEduUpdater(object):
# eventually become consistent.
return
except FederationDeniedError as e:
opentracing.set_tag("error", True)
opentracing.log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception:
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Exception raised by federation request",
"exception": e,
}
)
logger.exception(
"Failed to handle device list update for %s", user_id
)
return
opentracing.log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
+21 -7
View File
@@ -15,8 +15,11 @@
import logging
from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import SynapseError
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -78,7 +81,8 @@ class DeviceMessageHandler(object):
@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
opentracing.set_tag("number of messages", len(messages))
opentracing.set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
@@ -100,15 +104,24 @@ class DeviceMessageHandler(object):
message_id = random_string(16)
context = {"opentracing": {}}
opentracing.inject_active_span_text_map(context["opentracing"])
remote_edu_contents = {}
for destination, messages in remote_messages.items():
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
}
with opentracing.start_active_span("to_device_for_user"):
opentracing.set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
opentracing.log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
@@ -117,6 +130,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
opentracing.log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
+67 -3
View File
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import UserID, get_domain_from_id
@@ -45,6 +46,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
@opentracing.trace_deferred
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -65,6 +67,7 @@ class E2eKeysHandler(object):
}
}
"""
device_keys_query = query_body.get("device_keys", {})
# separate users by domain.
@@ -79,6 +82,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)
# First get local devices.
failures = {}
results = {}
@@ -119,9 +125,12 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
@opentracing.trace_deferred
@defer.inlineCallbacks
def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination]
opentracing.set_tag("key_query", destination_query)
try:
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +141,10 @@ class E2eKeysHandler(object):
results[user_id] = keys
except Exception as e:
failures[destination] = _exception_to_failure(e)
failure = _exception_to_failure(e)
failures[destination] = failure
opentracing.set_tag("error", True)
opentracing.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -146,6 +158,7 @@ class E2eKeysHandler(object):
defer.returnValue({"device_keys": results, "failures": failures})
@opentracing.trace_deferred
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -158,6 +171,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
opentracing.set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -165,6 +179,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
opentracing.log_kv(
{
"message": "Requested a local key for a user which"
+ " was not local to the homeserver",
"user_id": user_id,
}
)
opentracing.set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -189,6 +211,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
opentracing.log_kv(results)
defer.returnValue(result_dict)
@defer.inlineCallbacks
@@ -199,6 +222,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
defer.returnValue({"device_keys": res})
@opentracing.trace_deferred
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -213,6 +237,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -224,8 +251,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
@opentracing.trace_deferred
@defer.inlineCallbacks
def claim_client_keys(destination):
opentracing.set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -234,8 +263,12 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
except Exception as e:
failures[destination] = _exception_to_failure(e)
failure = _exception_to_failure(e)
failures[destination] = failure
opentracing.set_tag("error", True)
opentracing.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -259,14 +292,19 @@ class E2eKeysHandler(object):
),
)
opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
defer.returnValue({"one_time_keys": json_result, "failures": failures})
@defer.inlineCallbacks
@opentracing.tag_args
def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None)
opentracing.set_tag("device_keys", device_keys)
if device_keys:
logger.info(
"Updating device_keys for device %r for user %s at %d",
@@ -274,6 +312,13 @@ class E2eKeysHandler(object):
user_id,
time_now,
)
opentracing.log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
@@ -281,12 +326,27 @@ class E2eKeysHandler(object):
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
else:
opentracing.log_kv(
{"message": "Not updating device_keys for user", "user_id": user_id}
)
one_time_keys = keys.get("one_time_keys", None)
opentracing.set_tag("one_time_keys", one_time_keys)
if one_time_keys:
opentracing.log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
opentracing.log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -297,6 +357,7 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
opentracing.set_tag("one_time_key_counts", result)
defer.returnValue({"one_time_key_counts": result})
@defer.inlineCallbacks
@@ -340,6 +401,9 @@ class E2eKeysHandler(object):
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
opentracing.log_kv(
{"message": "Inserting new one_time_keys.", "keys": new_keys}
)
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
+26 -2
View File
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
Codes,
NotFoundError,
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
opentracing.log_kv(results)
defer.returnValue(results)
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@opentracing.trace_deferred
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
opentracing.log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
)
except StoreError as e:
if e.code == 404:
pass
opentracing.log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
opentracing.log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
opentracing.log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
return False
return True
@opentracing.trace_deferred
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
raise
defer.returnValue(res)
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
else:
raise
@opentracing.trace_deferred
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
+328 -97
View File
@@ -11,7 +11,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import opentracing
# limitations under the License.
# NOTE
@@ -36,45 +36,22 @@ except ImportError:
LogContextScopeManager = None
import contextlib
import inspect
import logging
import re
from functools import wraps
from canonicaljson import json
from twisted.internet import defer
logger = logging.getLogger(__name__)
class _DumTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
# Block everything by default
_homeserver_whitelist = None
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
# Util methods
def only_if_tracing(func):
@@ -91,10 +68,13 @@ def only_if_tracing(func):
return _only_if_tracing_inner
# Block everything by default
_homeserver_whitelist = None
@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
yield
tags = _DumTagNames
# Setup
def init_tracer(config):
@@ -138,13 +118,38 @@ def init_tracer(config):
tags = opentracing.tags
@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
"""Does absolutely nothing really well. Can be entered and exited arbitrarily.
Good substitute for an opentracing scope."""
yield
# Whitelisting
@only_if_tracing
def set_homeserver_whitelist(homeserver_whitelist):
"""Sets the homeserver whitelist
Args:
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
"""
global _homeserver_whitelist
if homeserver_whitelist:
# Makes a single regex which accepts all passed in regexes in the list
_homeserver_whitelist = re.compile(
"({})".format(")|(".join(homeserver_whitelist))
)
@only_if_tracing
def whitelisted_homeserver(destination):
"""Checks if a destination matches the whitelist
Args:
destination (String)"""
global _homeserver_whitelist
if _homeserver_whitelist:
return _homeserver_whitelist.match(destination)
return False
# Start spans and scopes
# Could use kwargs but I want these to be explicit
def start_active_span(
operation_name,
@@ -163,8 +168,10 @@ def start_active_span(
Returns:
scope (Scope) or noop_context_manager
"""
if opentracing is None:
return _noop_context_manager()
else:
# We need to enter the scope here for the logcontext to become active
return opentracing.tracer.start_active_span(
@@ -178,64 +185,13 @@ def start_active_span(
)
@only_if_tracing
def close_active_span():
"""Closes the active span. This will close it's logcontext if the context
was made for the span"""
opentracing.tracer.scope_manager.active.__exit__(None, None, None)
@only_if_tracing
def set_tag(key, value):
"""Set's a tag on the active span"""
opentracing.tracer.active_span.set_tag(key, value)
@only_if_tracing
def log_kv(key_values, timestamp=None):
"""Log to the active span"""
opentracing.tracer.active_span.log_kv(key_values, timestamp)
# Note: we don't have a get baggage items because we're trying to hide all
# scope and span state from synapse. I think this method may also be useless
# as a result
@only_if_tracing
def set_baggage_item(key, value):
"""Attach baggage to the active span"""
opentracing.tracer.active_span.set_baggage_item(key, value)
@only_if_tracing
def set_operation_name(operation_name):
"""Sets the operation name of the active span"""
opentracing.tracer.active_span.set_operation_name(operation_name)
@only_if_tracing
def set_homeserver_whitelist(homeserver_whitelist):
"""Sets the whitelist
Args:
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
"""
global _homeserver_whitelist
if homeserver_whitelist:
# Makes a single regex which accepts all passed in regexes in the list
_homeserver_whitelist = re.compile(
"({})".format(")|(".join(homeserver_whitelist))
)
@only_if_tracing
def whitelisted_homeserver(destination):
"""Checks if a destination matches the whitelist
Args:
destination (String)"""
global _homeserver_whitelist
if _homeserver_whitelist:
return _homeserver_whitelist.match(destination)
return False
def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
return _noop_context_manager()
else:
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
return scope
def start_active_span_from_context(
@@ -257,6 +213,7 @@ def start_active_span_from_context(
# Twisted encodes the values as lists whereas opentracing doesn't.
# So, we take the first item in the list.
# Also, twisted uses byte arrays while opentracing expects strings.
if opentracing is None:
return _noop_context_manager()
@@ -274,6 +231,91 @@ def start_active_span_from_context(
)
def start_active_span_from_edu(
edu_content,
operation_name,
references=[],
tags=None,
start_time=None,
ignore_active_span=False,
finish_on_close=True,
):
"""
Extracts a span context from an edu and uses it to start a new active span
Args:
edu_content (Dict): and edu_content with a `context` field whose value is
canonical json for a dict which contains opentracing information.
"""
if opentracing is None:
return _noop_context_manager()
carrier = json.loads(edu_content.get("context", "{}")).get("opentracing", {})
context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
_references = [
opentracing.child_of(span_context_from_string(x))
for x in carrier.get("references", [])
]
# For some reason jaeger decided not to support the visualization of multiple parent
# spans or explicitely show references. I include the span context as a tag here as
# an aid to people debugging but it's really not an ideal solution.
references += _references
scope = opentracing.tracer.start_active_span(
operation_name,
child_of=context,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)
scope.span.set_tag("references", carrier.get("references", []))
return scope
# Opentracing setters for tags, logs, etc
@only_if_tracing
def set_tag(key, value):
"""Sets a tag on the active span"""
opentracing.tracer.active_span.set_tag(key, value)
@only_if_tracing
def log_kv(key_values, timestamp=None):
"""Log to the active span"""
opentracing.tracer.active_span.log_kv(key_values, timestamp)
# Note: we don't have a get baggage items because we're trying to hide all
# scope and span state from synapse. I think this method may also be useless
# as a result
# I also thinks it's dangerous with respect to pii. If the whitelisting
# is missconfigured or buggy span information will leak. This is no issue
# if it's jaeger span id's but baggage can contain any arbitrary data. I would
# suggest removing this.
@only_if_tracing
def set_baggage_item(key, value):
"""Attach baggage to the active span"""
opentracing.tracer.active_span.set_baggage_item(key, value)
@only_if_tracing
def set_operation_name(operation_name):
"""Sets the operation name of the active span"""
opentracing.tracer.active_span.set_operation_name(operation_name)
# Injection and extraction
@only_if_tracing
def inject_active_span_twisted_headers(headers, destination):
"""
@@ -337,9 +379,160 @@ def inject_active_span_byte_dict(headers, destination):
headers[key.encode()] = [value.encode()]
@only_if_tracing
def inject_active_span_text_map(carrier, destination=None):
if destination and not whitelisted_homeserver(destination):
return
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
def active_span_context_as_string():
carrier = {}
if opentracing:
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
return json.dumps(carrier)
@only_if_tracing
def span_context_from_string(carrier):
carrier = json.loads(carrier)
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
@only_if_tracing
def extract_text_map(carrier):
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
# Tracing decorators
def trace_deferred(func):
"""Decorator to trace a deferred function. Sets the operation name to that of the
function's."""
if not opentracing:
return func
@wraps(func)
@defer.inlineCallbacks
def _trace_deferred_inner(self, *args, **kwargs):
with start_active_span(func.__name__):
r = yield func(self, *args, **kwargs)
defer.returnValue(r)
return _trace_deferred_inner
def trace_deferred_using_operation_name(name):
"""Decorator to trace a deferred function. Explicitely sets the operation_name."""
def trace_deferred(func):
if not opentracing:
return func
@wraps(func)
@defer.inlineCallbacks
def _trace_deferred_inner(self, *args, **kwargs):
# Start scope
with start_active_span(name):
r = yield func(self, *args, **kwargs)
defer.returnValue(r)
return _trace_deferred_inner
return trace_deferred
def trace(func):
"""Decorator to trace a normal function. Sets the operation name to that of the
function's."""
if not opentracing:
return func
@wraps(func)
def _trace_inner(self, *args, **kwargs):
with start_active_span(func.__name__):
return func(self, *args, **kwargs)
return _trace_inner
def trace_using_operation_name(operation_name):
"""Decorator to trace a function. Explicitely sets the operation_name."""
def trace(func):
if not opentracing:
return func
@wraps(func)
def _trace_inner(self, *args, **kwargs):
with start_active_span(operation_name):
return func(self, *args, **kwargs)
return _trace_inner
return trace
def tag_args(func):
if not opentracing:
return func
@wraps(func)
def _tag_args_inner(self, *args, **kwargs):
argspec = inspect.getargspec(func)
for i, arg in enumerate(argspec.args[1:]):
set_tag("ARG_" + arg, args[i])
set_tag("args", args[len(argspec.args) :])
set_tag("kwargs", kwargs)
return func(self, *args, **kwargs)
return _tag_args_inner
def wrap_in_span(func):
"""Its purpose is to wrap a function that is being passed into a context
which is a complete break from the current logcontext. This function creates
a non active span from the current context and closes it after the function
executes."""
global opentracing
# I haven't use this function yet
if not opentracing:
return func
parent_span = opentracing.tracer.active_span
@wraps(func)
def _wrap_in_span_inner(self, *args, **kwargs):
span = opentracing.tracer.start_span(func.__name__, child_of=parent_span)
try:
return func(self, *args, **kwargs)
except Exception as e:
span.set_tag("error", True)
span.log_kv({"exception", e})
raise
finally:
span.finish()
return _wrap_in_span_inner
def trace_servlet(servlet_name, func):
"""Decorator which traces a serlet. It starts a span with some servlet specific
tags such as the servlet_name and request information"""
if not opentracing:
return func
@wraps(func)
@defer.inlineCallbacks
@@ -357,6 +550,44 @@ def trace_servlet(servlet_name, func):
},
):
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
defer.returnValue(result)
defer.returnValue(result)
return _trace_servlet_inner
# Helper class
class _DummyTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
tags = _DummyTagNames
+1 -1
View File
@@ -133,7 +133,7 @@ class _LogContextScope(Scope):
def close(self):
if self.manager.active is not self:
logger.error("Tried to close a none active scope!")
logger.error("Tried to close a non-active scope!")
return
if self._finish_on_close:
+15 -1
View File
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import SynapseError
from synapse.http.servlet import (
RestServlet,
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@opentracing.trace_deferred_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id,
"key_being_uploaded": device_id,
}
)
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
@@ -173,20 +183,24 @@ class KeyChangesServlet(RestServlet):
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
@opentracing.trace_deferred
@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from")
opentracing.set_tag("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
parse_string(request, "to")
opentracing.set_tag("to", parse_string(request, "to"))
from_token = StreamToken.from_string(from_token_string)
user_id = requester.user.to_string()
opentracing.set_tag("user_id", user_id)
results = yield self.device_handler.get_user_ids_changed(user_id, from_token)
defer.returnValue((200, results))
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
@@ -311,6 +312,7 @@ class RoomKeysVersionServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
@opentracing.trace_deferred_using_operation_name("get_room_keys_version")
@defer.inlineCallbacks
def on_GET(self, request, version):
"""
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request
from synapse.rest.client.transactions import HttpTransactionCache
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler()
@opentracing.trace_deferred_using_operation_name("sendToDevice")
def on_PUT(self, request, message_type, txn_id):
opentracing.set_tag("message_type", message_type)
opentracing.set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)
+23
View File
@@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"get_new_messages_for_device", get_new_messages_for_device_txn
)
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None
)
opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id
)
if not has_changed:
opentracing.log_kv({"message": "No changes in cache since last check"})
defer.returnValue(0)
def delete_messages_for_device_txn(txn):
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn
)
opentracing.log_kv(
{"message": "deleted {} messages for device".format(count), "count": count}
)
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
defer.returnValue(count)
@opentracing.trace
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
):
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to.
"""
opentracing.set_tag("destination", destination)
opentracing.set_tag("last_stream_id", last_stream_id)
opentracing.set_tag("current_stream_id", current_stream_id)
opentracing.set_tag("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
if not has_changed or last_stream_id == current_stream_id:
opentracing.log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id))
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id))
@opentracing.trace
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -156,6 +174,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0]
messages.append(json.loads(row[1]))
if len(messages) < limit:
opentracing.log_kv(
{"message": "Set stream position to current position"}
)
stream_pos = current_stream_id
return (messages, stream_pos)
@@ -164,6 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
get_new_messages_for_remote_destination_txn,
)
@opentracing.trace_deferred
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges
their receipt.
@@ -214,6 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000,
)
@opentracing.trace_deferred
@defer.inlineCallbacks
def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination
+30 -5
View File
@@ -20,6 +20,7 @@ from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
@@ -73,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
defer.returnValue({d["device_id"]: d for d in devices})
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
@@ -127,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
# maps (user_id, device_id) -> stream_id
# maps (user_id, device_id) -> (stream_id, context)
# as long as their stream_id does not match that of the last row
# where context is any metadata about the message's context such as
# opentracing data
query_map = {}
for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +140,17 @@ class DeviceWorkerStore(SQLBaseStore):
break
key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2])
logger.info("++++++++++++++++++++++++ update-0 %s", update[0])
logger.info("++++++++++++++++++++++++ update-1 %s", update[1])
logger.info("++++++++++++++++++++++++ update-2 %s", update[2])
logger.info("++++++++++++++++++++++++ update-3 %s", update[3])
logger.info(
"+++++++++++++++++++++++++++ %s", (query_map.get(key, 0), update[2])
)
query_map[key] = (
max(query_map.get(key, (0, None))[0], update[2]),
update[3],
)
# If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same
@@ -171,7 +185,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates
"""
sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT ?
@@ -210,12 +224,15 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
stream_id = query_map[(user_id, device_id)][0]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
"context": query_map[(user_id, device_id)][1]
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
prev_id = stream_id
@@ -299,6 +316,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@@ -329,7 +347,8 @@ class DeviceWorkerStore(SQLBaseStore):
results.setdefault(user_id, {})[device_id] = device
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
opentracing.set_tag("in_cache", results)
opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
defer.returnValue((user_ids_not_in_cache, results))
@cachedInlineCallbacks(num_args=2, tree=True)
@@ -814,6 +833,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
],
)
context = {"opentracing": {}}
opentracing.inject_active_span_text_map(context["opentracing"])
self._simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
@@ -825,6 +847,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
for destination in hosts
for device_id in device_ids
+14
View File
@@ -17,6 +17,7 @@ import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import StoreError
from ._base import SQLBaseStore
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
lock=False,
)
opentracing.log_kv(
{
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
"room_key": room_key,
}
)
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
defer.returnValue(sessions)
@opentracing.trace_deferred
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
@opentracing.trace_deferred
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
@opentracing.trace_deferred
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
@opentracing.trace_deferred
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
+38 -4
View File
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
@opentracing.trace_deferred
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
opentracing.set_tag("query_list", query_list)
if not query_list:
defer.returnValue({})
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
defer.returnValue(results)
@opentracing.trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
opentracing.set_tag("include_all_devices", include_all_devices)
opentracing.set_tag("include_deleted_devices", include_deleted_devices)
query_clauses = []
query_params = []
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
opentracing.log_kv(result)
return result
@defer.inlineCallbacks
@@ -129,10 +137,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
defer.returnValue(
{(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
opentracing.log_kv(
{"message": "Fetched one time keys for user", "one_time_keys": result}
)
defer.returnValue(result)
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -148,6 +157,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("device_id", device_id)
opentracing.set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -204,6 +216,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("device_id", device_id)
opentracing.set_tag("time_now", time_now)
opentracing.set_tag("device_keys", device_keys)
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -217,6 +234,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
opentracing.log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@@ -225,7 +243,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
opentracing.log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -233,6 +251,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
@opentracing.trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -254,7 +273,15 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
opentracing.log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
opentracing.log_kv(
{"message": "finished executing and invalidating cache"}
)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -264,6 +291,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
opentracing.log_kv(
{
"message": "Deleting keys for device",
"device_id": device_id,
"user_id": user_id,
}
)
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
@@ -0,0 +1,16 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE device_lists_outbound_pokes ADD context TEXT;