Compare commits
50 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 33789fc83e | |||
| 794f1e6d8b | |||
| 7027cfc771 | |||
| 7f282d17e6 | |||
| 51e5a12c3a | |||
| 844aac1d1f | |||
| 5b104b0d02 | |||
| 310fb51c7d | |||
| 0e8c35c8e8 | |||
| 1197e2e97b | |||
| eb7a79553c | |||
| 9f599fcaef | |||
| 1e5f5c2ccf | |||
| f6ce4fec23 | |||
| 5ca5845284 | |||
| 52a7f625a8 | |||
| 783ddc417f | |||
| 87061c72c4 | |||
| fbe23fe32d | |||
| 5aaee551a4 | |||
| 2e141b8177 | |||
| d5b9c97943 | |||
| 34e1de3374 | |||
| 4e649702ec | |||
| 8c17b7a8da | |||
| 65c63f983c | |||
| 9170c46af9 | |||
| 32342d0ec2 | |||
| 07fadd4e9b | |||
| 3e5f4a40de | |||
| bf892473f9 | |||
| 75d489f2f3 | |||
| 8f0a00ef35 | |||
| 4c2f39cce4 | |||
| aca1141a62 | |||
| 7c6bfaed11 | |||
| 62da3aa0af | |||
| 04c3bf430c | |||
| 4c3e592e39 | |||
| 4d54425b9d | |||
| 54211f5435 | |||
| d67376f7fc | |||
| 854d75c41b | |||
| d0dd14ccee | |||
| c9fa681937 | |||
| 161b8077f0 | |||
| 55a8ee509a | |||
| 342840f884 | |||
| 64c6eee389 | |||
| 2006f7ae94 |
@@ -0,0 +1 @@
|
||||
Opentrace e2e code paths.
|
||||
@@ -0,0 +1,179 @@
|
||||
===========
|
||||
Opentracing
|
||||
===========
|
||||
|
||||
Background
|
||||
----------
|
||||
|
||||
Opentracing is semi-standard being addopted by a number of distributed tracing
|
||||
platforms. It is a standardised api for facilitating vendor agnostic tracing
|
||||
instrumentation. That is, we can use the opentracing api and select one of a
|
||||
number of tracer implementations to do the heavy lifting in the background.
|
||||
Our current selected implementation is Jaeger.
|
||||
|
||||
Opentracing concepts can be found at
|
||||
https://opentracing.io/docs/overview/what-is-tracing/
|
||||
|
||||
Python specific tracing concepts are at https://opentracing.io/guides/python/.
|
||||
Note that synapse wraps opentracing in a small library in order to make the
|
||||
opentracing dependency optional. That means that the access patterns are
|
||||
different to those demonstrated here. However, it is still usefull to know.
|
||||
Especially if opentracing is included as a full dependency in the future or if
|
||||
you are modifying synapse's opentracing lib.
|
||||
|
||||
For more information about Jaeger's implementation see
|
||||
https://www.jaegertracing.io/docs/
|
||||
|
||||
=================
|
||||
Setup opentracing
|
||||
=================
|
||||
|
||||
To receive opentracing spans start up a Jaeger server using docker like so
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
docker run -d --name jaeger \ -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
|
||||
-p 5775:5775/udp \
|
||||
-p 6831:6831/udp \
|
||||
-p 6832:6832/udp \
|
||||
-p 5778:5778 \
|
||||
-p 16686:16686 \
|
||||
-p 14268:14268 \
|
||||
-p 9411:9411 \
|
||||
jaegertracing/all-in-one:1.13
|
||||
|
||||
Latest documentation is probably at
|
||||
https://www.jaegertracing.io/docs/1.13/getting-started/
|
||||
|
||||
|
||||
Enable opentracing in synapse
|
||||
-----------------------------
|
||||
|
||||
Opentracing is not enabled by default. It must be enabled in the homeserver
|
||||
config by uncommenting the config options under ``opentracing``. For example:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
opentracing:
|
||||
# Enable / disable tracer
|
||||
tracer_enabled: true
|
||||
# The list of homeservers we wish to expose our current traces to.
|
||||
# The list is a list of regexes which are matched against the
|
||||
# servername of the homeserver
|
||||
homeserver_whitelist:
|
||||
- ".*"
|
||||
|
||||
Homeserver whitelisting
|
||||
-----------------------
|
||||
|
||||
The homeserver whitelist is configured using regex. A list of regexes can be
|
||||
given and their union will be compared when propagating any spans through a
|
||||
carrier. Most of the whitelist checks are encapsulated in the lib's injection
|
||||
and extraction method but be aware that using custom carriers or crossing
|
||||
unchartered waters will require the enforcement of this whitelist.
|
||||
|
||||
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
|
||||
in a destination and compares it to the whitelist.
|
||||
|
||||
============================
|
||||
Using opentracing in synapse
|
||||
============================
|
||||
|
||||
Access to the opentracing api is mediated through the
|
||||
``logging/opentracing.py`` lib. Opentracing is encapsulated such that
|
||||
no statefull spans from opentracing are used in synapses code. This allows
|
||||
opentracing to be easily disabled in synapse and thereby have opentracing as
|
||||
an optional dependency. This does however limit the number of modifyable spans
|
||||
at any point in the code to one. From here out references to opentracing refer
|
||||
to the lib implemented in synapse.
|
||||
|
||||
Tracing
|
||||
-------
|
||||
|
||||
In synapse it is not possible to start a non-active span. Spans can be started
|
||||
using the ``opentracing.start_active_span`` method. This returns a scope (see
|
||||
opentracing docs) which is a context manager that needs to be entered and
|
||||
exited. This is usually done by using ``with``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with start_active_span("operation name"):
|
||||
# Do something we want to tracer
|
||||
|
||||
Forgetting to enter or exit a scope will result in some mysterious grevious log
|
||||
context errors.
|
||||
|
||||
At anytime where there is an active span ``opentracing.set_tag`` can be used to
|
||||
set a tag on the current active span.
|
||||
|
||||
Tracing functions
|
||||
-----------------
|
||||
|
||||
Functions can be easily traced using decorators. There is a decorator for
|
||||
'normal' function and for functions which are actually deferreds. The name of
|
||||
function becomes the operation name for the span.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Start a span using 'normal_function' as the operation name
|
||||
@trace
|
||||
def normal_function(*args, **kwargs):
|
||||
# Does all kinds of cool and expected things
|
||||
return something_usual_and_useful
|
||||
|
||||
# Start a span using 'deferred_function' as the operation name
|
||||
@trace_deferred
|
||||
# Yes, there is a typo in the lib. I will fix this
|
||||
def deferred_function(*args, **kwargs):
|
||||
# We start
|
||||
yield we_wait
|
||||
# we finish
|
||||
defer.returnValue(something_usual_and_useful)
|
||||
|
||||
Operation names can be explicitely set for functions by using
|
||||
``trace_using_operation_name`` and
|
||||
``trace_deferred_using_operation_name``
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@trace_using_operation_name("A *much* better operation name")
|
||||
def normal_function(*args, **kwargs):
|
||||
# Does all kinds of cool and expected things
|
||||
return something_usual_and_useful
|
||||
|
||||
@trace_deferred_using_operation_name("An operation name that fixes the typo!")
|
||||
# Yes, there is a typo in the lib. I will fix this
|
||||
def deferred_function(*args, **kwargs):
|
||||
# We start
|
||||
yield we_wait
|
||||
# we finish
|
||||
defer.returnValue(something_usual_and_useful)
|
||||
|
||||
Contexts and carriers
|
||||
---------------------
|
||||
|
||||
There are a selection of wrappers for injecting and extracting contexts from
|
||||
carriers provided. Unfortunately opentracing's standard three are not adequate
|
||||
in the majority of cases. Also note that the binnary encoding format mandated
|
||||
by opentracing is not actually implemented by Jaeger and it will silently noop.
|
||||
Please refer to the the end of ``logging/opentracing.py`` for the available
|
||||
injection and extraction methods.
|
||||
|
||||
==================
|
||||
Configuring Jaeger
|
||||
==================
|
||||
|
||||
Sampling strategies can be set as in this document:
|
||||
https://www.jaegertracing.io/docs/1.13/sampling/
|
||||
|
||||
=======
|
||||
Gotchas
|
||||
=======
|
||||
|
||||
- Checking whitelists on span propagation
|
||||
- Inserting pii
|
||||
- Forgetting to enter or exit a scope
|
||||
- Span source: make sure that the span you expect to be active across a
|
||||
function call really will be that one. Does the current function have more
|
||||
than one caller? Will all of those calling functions have be in a context
|
||||
with an active span?
|
||||
@@ -22,6 +22,7 @@ from netaddr import IPAddress
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
import synapse.types
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
@@ -178,6 +179,7 @@ class Auth(object):
|
||||
def get_public_keys(self, invite_event):
|
||||
return event_auth.get_public_keys(invite_event)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_req(
|
||||
self, request, allow_guest=False, rights="access", allow_expired=False
|
||||
@@ -209,6 +211,10 @@ class Auth(object):
|
||||
user_id, app_service = yield self._get_appservice_user_id(request)
|
||||
if user_id:
|
||||
request.authenticated_entity = user_id
|
||||
opentracing.set_tag("authenticated_entity", user_id)
|
||||
# there is at least one other place where authenticated entity is
|
||||
# set. user_id is tagged incase authenticated_entity is clobbered
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
|
||||
if ip_addr and self.hs.config.track_appservice_user_ips:
|
||||
yield self.store.insert_client_ip(
|
||||
@@ -262,6 +268,11 @@ class Auth(object):
|
||||
|
||||
request.authenticated_entity = user.to_string()
|
||||
|
||||
opentracing.set_tag("authenticated_entity", user.to_string())
|
||||
# there is at least one other place where authenticated entity is
|
||||
# set. user_id is tagged incase authenticated_entity is clobbered
|
||||
opentracing.set_tag("user_id", user.to_string())
|
||||
|
||||
defer.returnValue(
|
||||
synapse.types.create_requester(
|
||||
user, token_id, is_guest, device_id, app_service=app_service
|
||||
|
||||
@@ -25,6 +25,7 @@ from twisted.internet import defer
|
||||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
@@ -518,6 +519,7 @@ class FederationServer(FederationBase):
|
||||
def on_query_user_devices(self, origin, user_id):
|
||||
return self.on_query_request("user_devices", user_id)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_claim_client_keys(self, origin, content):
|
||||
@@ -526,6 +528,9 @@ class FederationServer(FederationBase):
|
||||
for device_id, algorithm in device_keys.items():
|
||||
query.append((user_id, device_id, algorithm))
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "Claiming one time keys.", "user, device pairs": query}
|
||||
)
|
||||
results = yield self.store.claim_e2e_one_time_keys(query)
|
||||
|
||||
json_result = {}
|
||||
@@ -821,12 +826,13 @@ class FederationHandlerRegistry(object):
|
||||
if not handler:
|
||||
logger.warn("No handler registered for EDU type %s", edu_type)
|
||||
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
with opentracing.start_active_span_from_edu(content, "handle_edu"):
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
|
||||
def on_query(self, query_type, args):
|
||||
handler = self.query_handlers.get(query_type)
|
||||
|
||||
@@ -16,10 +16,12 @@
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
FederationDeniedError,
|
||||
HttpResponseException,
|
||||
@@ -204,97 +206,142 @@ class PerDestinationQueue(object):
|
||||
|
||||
pending_edus = device_update_edus + to_device_edus
|
||||
|
||||
# BEGIN CRITICAL SECTION
|
||||
#
|
||||
# In order to avoid a race condition, we need to make sure that
|
||||
# the following code (from popping the queues up to the point
|
||||
# where we decide if we actually have any pending messages) is
|
||||
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||
# meantime, but not get sent because we hold the
|
||||
# transmission_loop_running flag.
|
||||
# Make a transaction sending span, this span follows on from all the
|
||||
# edus in that transaction. This needs to be done because if the edus
|
||||
# are never received on the remote the span effectively has no causality.
|
||||
|
||||
pending_pdus = self._pending_pdus
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
|
||||
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=False))
|
||||
pending_presence = self._pending_presence
|
||||
self._pending_presence = {}
|
||||
if pending_presence:
|
||||
pending_edus.append(
|
||||
Edu(
|
||||
origin=self._server_name,
|
||||
destination=self._destination,
|
||||
edu_type="m.presence",
|
||||
content={
|
||||
"push": [
|
||||
format_user_presence_state(
|
||||
presence, self._clock.time_msec()
|
||||
)
|
||||
for presence in pending_presence.values()
|
||||
]
|
||||
},
|
||||
)
|
||||
span_contexts = [
|
||||
opentracing.extract_text_map(
|
||||
json.loads(
|
||||
edu.get_dict().get("content", {}).get("context", "{}")
|
||||
).get("opentracing", {})
|
||||
)
|
||||
for edu in pending_edus
|
||||
]
|
||||
|
||||
pending_edus.extend(
|
||||
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
|
||||
)
|
||||
while (
|
||||
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
|
||||
and self._pending_edus_keyed
|
||||
with opentracing.start_active_span_follows_from(
|
||||
"send_transaction", span_contexts
|
||||
):
|
||||
_, val = self._pending_edus_keyed.popitem()
|
||||
pending_edus.append(val)
|
||||
# Link each sent edu to this transaction's span
|
||||
_pending_edus = []
|
||||
for edu in pending_edus:
|
||||
edu_dict = edu.get_dict()
|
||||
span_context = json.loads(
|
||||
edu_dict.get("content", {}).get("context", "{}")
|
||||
).get("opentracing", {})
|
||||
# If there is no span context then we are either blacklisting
|
||||
# this destination or we are not tracing
|
||||
if not span_context == {}:
|
||||
if "references" not in span_context:
|
||||
span_context["references"] = [
|
||||
opentracing.active_span_context_as_string()
|
||||
]
|
||||
else:
|
||||
span_context["references"].append(
|
||||
opentracing.active_span_context_as_string()
|
||||
)
|
||||
edu_dict["content"]["context"] = json.dumps(
|
||||
{"opentracing": span_context}
|
||||
)
|
||||
_pending_edus.append(Edu(**edu_dict))
|
||||
pending_edus = _pending_edus
|
||||
|
||||
if pending_pdus:
|
||||
logger.debug(
|
||||
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||
self._destination,
|
||||
len(pending_pdus),
|
||||
# BEGIN CRITICAL SECTION
|
||||
#
|
||||
# In order to avoid a race condition, we need to make sure that
|
||||
# the following code (from popping the queues up to the point
|
||||
# where we decide if we actually have any pending messages) is
|
||||
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||
# meantime, but not get sent because we hold the
|
||||
# transmission_loop_running flag.
|
||||
|
||||
pending_pdus = self._pending_pdus
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, self._pending_pdus = (
|
||||
pending_pdus[:50],
|
||||
pending_pdus[50:],
|
||||
)
|
||||
|
||||
if not pending_pdus and not pending_edus:
|
||||
logger.debug("TX [%s] Nothing to send", self._destination)
|
||||
self._last_device_stream_id = device_stream_id
|
||||
return
|
||||
|
||||
# if we've decided to send a transaction anyway, and we have room, we
|
||||
# may as well send any pending RRs
|
||||
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=True))
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
# Remove the acknowledged device messages from the database
|
||||
# Only bother if we actually sent some device messages
|
||||
if to_device_edus:
|
||||
yield self._store.delete_device_msgs_for_remote(
|
||||
self._destination, device_stream_id
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=False))
|
||||
pending_presence = self._pending_presence
|
||||
self._pending_presence = {}
|
||||
if pending_presence:
|
||||
pending_edus.append(
|
||||
Edu(
|
||||
origin=self._server_name,
|
||||
destination=self._destination,
|
||||
edu_type="m.presence",
|
||||
content={
|
||||
"push": [
|
||||
format_user_presence_state(
|
||||
presence, self._clock.time_msec()
|
||||
)
|
||||
for presence in pending_presence.values()
|
||||
]
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# also mark the device updates as sent
|
||||
if device_update_edus:
|
||||
logger.info(
|
||||
"Marking as sent %r %r", self._destination, dev_list_id
|
||||
pending_edus.extend(
|
||||
self._pop_pending_edus(
|
||||
MAX_EDUS_PER_TRANSACTION - len(pending_edus)
|
||||
)
|
||||
yield self._store.mark_as_sent_devices_by_remote(
|
||||
self._destination, dev_list_id
|
||||
)
|
||||
while (
|
||||
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
|
||||
and self._pending_edus_keyed
|
||||
):
|
||||
_, val = self._pending_edus_keyed.popitem()
|
||||
pending_edus.append(val)
|
||||
|
||||
if pending_pdus:
|
||||
logger.debug(
|
||||
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||
self._destination,
|
||||
len(pending_pdus),
|
||||
)
|
||||
|
||||
self._last_device_stream_id = device_stream_id
|
||||
self._last_device_list_stream_id = dev_list_id
|
||||
else:
|
||||
break
|
||||
if not pending_pdus and not pending_edus:
|
||||
logger.debug("TX [%s] Nothing to send", self._destination)
|
||||
self._last_device_stream_id = device_stream_id
|
||||
return
|
||||
|
||||
# if we've decided to send a transaction anyway, and we have room, we
|
||||
# may as well send any pending RRs
|
||||
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=True))
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
# Remove the acknowledged device messages from the database
|
||||
# Only bother if we actually sent some device messages
|
||||
if to_device_edus:
|
||||
yield self._store.delete_device_msgs_for_remote(
|
||||
self._destination, device_stream_id
|
||||
)
|
||||
|
||||
# also mark the device updates as sent
|
||||
if device_update_edus:
|
||||
logger.info(
|
||||
"Marking as sent %r %r", self._destination, dev_list_id
|
||||
)
|
||||
yield self._store.mark_as_sent_devices_by_remote(
|
||||
self._destination, dev_list_id
|
||||
)
|
||||
|
||||
self._last_device_stream_id = device_stream_id
|
||||
self._last_device_list_stream_id = dev_list_id
|
||||
else:
|
||||
break
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
"TX [%s] not ready for retry yet (next retry at %s) - "
|
||||
|
||||
@@ -300,6 +300,7 @@ class BaseFederationServlet(object):
|
||||
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
|
||||
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
},
|
||||
):
|
||||
if origin:
|
||||
|
||||
@@ -18,6 +18,7 @@ from six import iteritems, itervalues
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api import errors
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import (
|
||||
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
self.state = hs.get_state_handler()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_user(self, user_id):
|
||||
"""
|
||||
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
defer.Deferred: list[dict[str, X]]: info on each device
|
||||
"""
|
||||
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
device_map = yield self.store.get_devices_by_user(user_id)
|
||||
|
||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
|
||||
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
for device in devices:
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
opentracing.log_kv(device_map)
|
||||
defer.returnValue(devices)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_device(self, user_id, device_id):
|
||||
""" Retrieve the given device
|
||||
@@ -85,9 +90,13 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
raise errors.NotFoundError
|
||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
opentracing.set_tag("device", device)
|
||||
opentracing.set_tag("ips", ips)
|
||||
defer.returnValue(device)
|
||||
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_user_ids_changed(self, user_id, from_token):
|
||||
"""Get list of users that have had the devices updated, or have newly
|
||||
@@ -97,6 +106,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
user_id (str)
|
||||
from_token (StreamToken)
|
||||
"""
|
||||
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("from_token", from_token)
|
||||
now_room_key = yield self.store.get_room_events_max_id()
|
||||
|
||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||
@@ -133,7 +145,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_left.add(state_key)
|
||||
continue
|
||||
continue
|
||||
|
||||
# Fetch the current state at the time.
|
||||
try:
|
||||
@@ -148,6 +160,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
# special-case for an empty prev state: include all members
|
||||
# in the changed list
|
||||
if not event_ids:
|
||||
opentracing.log_kv(
|
||||
{"event": "encountered empty previous state", "room_id": room_id}
|
||||
)
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
@@ -200,6 +215,10 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
possibly_joined = []
|
||||
possibly_left = []
|
||||
|
||||
opentracing.log_kv(
|
||||
{"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
)
|
||||
@@ -269,6 +288,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_device(self, user_id, device_id):
|
||||
""" Delete the given device
|
||||
@@ -286,6 +306,10 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{"reason": "User doesn't have device id.", "device_id": device_id}
|
||||
)
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
@@ -298,6 +322,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_all_devices_for_user(self, user_id, except_device_id=None):
|
||||
"""Delete all of the user's devices
|
||||
@@ -333,6 +358,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", "User doesn't have that device id.")
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
@@ -373,6 +400,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
else:
|
||||
raise
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@measure_func("notify_device_update")
|
||||
@defer.inlineCallbacks
|
||||
def notify_device_update(self, user_id, device_ids):
|
||||
@@ -388,6 +416,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
|
||||
hosts.discard(self.server_name)
|
||||
|
||||
opentracing.set_tag("hosts to update", hosts)
|
||||
|
||||
position = yield self.store.add_device_change_to_streams(
|
||||
user_id, device_ids, list(hosts)
|
||||
)
|
||||
@@ -407,6 +437,9 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
)
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
opentracing.log_kv(
|
||||
{"message": "sent device update to host", "host": host}
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_federation_query_user_devices(self, user_id):
|
||||
@@ -455,12 +488,15 @@ class DeviceListEduUpdater(object):
|
||||
iterable=True,
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def incoming_device_list_update(self, origin, edu_content):
|
||||
"""Called on incoming device list update from federation. Responsible
|
||||
for parsing the EDU and adding to pending updates list.
|
||||
"""
|
||||
|
||||
opentracing.set_tag("origin", origin)
|
||||
opentracing.set_tag("edu_content", edu_content)
|
||||
user_id = edu_content.pop("user_id")
|
||||
device_id = edu_content.pop("device_id")
|
||||
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
|
||||
@@ -475,12 +511,29 @@ class DeviceListEduUpdater(object):
|
||||
device_id,
|
||||
origin,
|
||||
)
|
||||
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Got a device list update edu from a user and device which does not match the origin of the request.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
return
|
||||
|
||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||
if not room_ids:
|
||||
# We don't share any rooms with this user. Ignore update, as we
|
||||
# probably won't get any further updates.
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Got an update from a user for which "
|
||||
+ "we don't share any rooms",
|
||||
"other user_id": user_id,
|
||||
}
|
||||
)
|
||||
logger.warning(
|
||||
"Got device list update edu for %r/%r, but don't share a room",
|
||||
user_id,
|
||||
@@ -523,6 +576,7 @@ class DeviceListEduUpdater(object):
|
||||
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
|
||||
|
||||
if resync:
|
||||
opentracing.log_kv({"message": "Doing resync to update device list."})
|
||||
# Fetch all devices for the user.
|
||||
origin = get_domain_from_id(user_id)
|
||||
try:
|
||||
@@ -543,16 +597,25 @@ class DeviceListEduUpdater(object):
|
||||
# eventually become consistent.
|
||||
return
|
||||
except FederationDeniedError as e:
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv({"reason": "FederationDeniedError"})
|
||||
logger.info(e)
|
||||
return
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Exception raised by federation request",
|
||||
"exception": e,
|
||||
}
|
||||
)
|
||||
logger.exception(
|
||||
"Failed to handle device list update for %s", user_id
|
||||
)
|
||||
return
|
||||
|
||||
opentracing.log_kv({"result": result})
|
||||
stream_id = result["stream_id"]
|
||||
devices = result["devices"]
|
||||
|
||||
|
||||
@@ -15,8 +15,11 @@
|
||||
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -78,7 +81,8 @@ class DeviceMessageHandler(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_device_message(self, sender_user_id, message_type, messages):
|
||||
|
||||
opentracing.set_tag("number of messages", len(messages))
|
||||
opentracing.set_tag("sender", sender_user_id)
|
||||
local_messages = {}
|
||||
remote_messages = {}
|
||||
for user_id, by_device in messages.items():
|
||||
@@ -100,15 +104,24 @@ class DeviceMessageHandler(object):
|
||||
|
||||
message_id = random_string(16)
|
||||
|
||||
context = {"opentracing": {}}
|
||||
opentracing.inject_active_span_text_map(context["opentracing"])
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
}
|
||||
with opentracing.start_active_span("to_device_for_user"):
|
||||
opentracing.set_tag("destination", destination)
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"context": json.dumps(context)
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
|
||||
opentracing.log_kv({"local_messages": local_messages})
|
||||
stream_id = yield self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
)
|
||||
@@ -117,6 +130,7 @@ class DeviceMessageHandler(object):
|
||||
"to_device_key", stream_id, users=local_messages.keys()
|
||||
)
|
||||
|
||||
opentracing.log_kv({"remote_messages": remote_messages})
|
||||
for destination in remote_messages.keys():
|
||||
# Enqueue a new federation transaction to send the new
|
||||
# device messages to each remote destination.
|
||||
|
||||
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import CodeMessageException, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
@@ -45,6 +46,7 @@ class E2eKeysHandler(object):
|
||||
"client_keys", self.on_federation_query_client_keys
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body, timeout):
|
||||
""" Handle a device key query from a client
|
||||
@@ -65,6 +67,7 @@ class E2eKeysHandler(object):
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
device_keys_query = query_body.get("device_keys", {})
|
||||
|
||||
# separate users by domain.
|
||||
@@ -79,6 +82,9 @@ class E2eKeysHandler(object):
|
||||
else:
|
||||
remote_queries[user_id] = device_ids
|
||||
|
||||
opentracing.set_tag("local_key_query", local_query)
|
||||
opentracing.set_tag("remote_key_query", remote_queries)
|
||||
|
||||
# First get local devices.
|
||||
failures = {}
|
||||
results = {}
|
||||
@@ -119,9 +125,12 @@ class E2eKeysHandler(object):
|
||||
r[user_id] = remote_queries[user_id]
|
||||
|
||||
# Now fetch any devices that we don't have in our cache
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def do_remote_query(destination):
|
||||
destination_query = remote_queries_not_in_cache[destination]
|
||||
|
||||
opentracing.set_tag("key_query", destination_query)
|
||||
try:
|
||||
remote_result = yield self.federation.query_client_keys(
|
||||
destination, {"device_keys": destination_query}, timeout=timeout
|
||||
@@ -132,7 +141,10 @@ class E2eKeysHandler(object):
|
||||
results[user_id] = keys
|
||||
|
||||
except Exception as e:
|
||||
failures[destination] = _exception_to_failure(e)
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -146,6 +158,7 @@ class E2eKeysHandler(object):
|
||||
|
||||
defer.returnValue({"device_keys": results, "failures": failures})
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
"""Get E2E device keys for local users
|
||||
@@ -158,6 +171,7 @@ class E2eKeysHandler(object):
|
||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
map from user_id -> device_id -> device details
|
||||
"""
|
||||
opentracing.set_tag("local_query", query)
|
||||
local_query = []
|
||||
|
||||
result_dict = {}
|
||||
@@ -165,6 +179,14 @@ class E2eKeysHandler(object):
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Requested a local key for a user which"
|
||||
+ " was not local to the homeserver",
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
opentracing.set_tag("error", True)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not device_ids:
|
||||
@@ -189,6 +211,7 @@ class E2eKeysHandler(object):
|
||||
r["unsigned"]["device_display_name"] = display_name
|
||||
result_dict[user_id][device_id] = r
|
||||
|
||||
opentracing.log_kv(results)
|
||||
defer.returnValue(result_dict)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -199,6 +222,7 @@ class E2eKeysHandler(object):
|
||||
res = yield self.query_local_devices(device_keys_query)
|
||||
defer.returnValue({"device_keys": res})
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def claim_one_time_keys(self, query, timeout):
|
||||
local_query = []
|
||||
@@ -213,6 +237,9 @@ class E2eKeysHandler(object):
|
||||
domain = get_domain_from_id(user_id)
|
||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||
|
||||
opentracing.set_tag("local_key_query", local_query)
|
||||
opentracing.set_tag("remote_key_query", remote_queries)
|
||||
|
||||
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
@@ -224,8 +251,10 @@ class E2eKeysHandler(object):
|
||||
key_id: json.loads(json_bytes)
|
||||
}
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def claim_client_keys(destination):
|
||||
opentracing.set_tag("destination", destination)
|
||||
device_keys = remote_queries[destination]
|
||||
try:
|
||||
remote_result = yield self.federation.claim_client_keys(
|
||||
@@ -234,8 +263,12 @@ class E2eKeysHandler(object):
|
||||
for user_id, keys in remote_result["one_time_keys"].items():
|
||||
if user_id in device_keys:
|
||||
json_result[user_id] = keys
|
||||
|
||||
except Exception as e:
|
||||
failures[destination] = _exception_to_failure(e)
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -259,14 +292,19 @@ class E2eKeysHandler(object):
|
||||
),
|
||||
)
|
||||
|
||||
opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
|
||||
|
||||
defer.returnValue({"one_time_keys": json_result, "failures": failures})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@opentracing.tag_args
|
||||
def upload_keys_for_user(self, user_id, device_id, keys):
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
# TODO: Validate the JSON to make sure it has the right keys.
|
||||
device_keys = keys.get("device_keys", None)
|
||||
opentracing.set_tag("device_keys", device_keys)
|
||||
if device_keys:
|
||||
logger.info(
|
||||
"Updating device_keys for device %r for user %s at %d",
|
||||
@@ -274,6 +312,13 @@ class E2eKeysHandler(object):
|
||||
user_id,
|
||||
time_now,
|
||||
)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Updating device_keys for user.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
# TODO: Sign the JSON with the server key
|
||||
changed = yield self.store.set_e2e_device_keys(
|
||||
user_id, device_id, time_now, device_keys
|
||||
@@ -281,12 +326,27 @@ class E2eKeysHandler(object):
|
||||
if changed:
|
||||
# Only notify about device updates *if* the keys actually changed
|
||||
yield self.device_handler.notify_device_update(user_id, [device_id])
|
||||
|
||||
else:
|
||||
opentracing.log_kv(
|
||||
{"message": "Not updating device_keys for user", "user_id": user_id}
|
||||
)
|
||||
one_time_keys = keys.get("one_time_keys", None)
|
||||
opentracing.set_tag("one_time_keys", one_time_keys)
|
||||
if one_time_keys:
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Updating one_time_keys for device.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
yield self._upload_one_time_keys_for_user(
|
||||
user_id, device_id, time_now, one_time_keys
|
||||
)
|
||||
else:
|
||||
opentracing.log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
|
||||
# the device should have been registered already, but it may have been
|
||||
# deleted due to a race with a DELETE request. Or we may be using an
|
||||
@@ -297,6 +357,7 @@ class E2eKeysHandler(object):
|
||||
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
|
||||
opentracing.set_tag("one_time_key_counts", result)
|
||||
defer.returnValue({"one_time_key_counts": result})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -340,6 +401,9 @@ class E2eKeysHandler(object):
|
||||
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
||||
)
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "Inserting new one_time_keys.", "keys": new_keys}
|
||||
)
|
||||
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from six import iteritems
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
NotFoundError,
|
||||
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
|
||||
# changed.
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
|
||||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
opentracing.log_kv(results)
|
||||
defer.returnValue(results)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
|
||||
with (yield self._upload_linearizer.queue(user_id)):
|
||||
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def upload_room_keys(self, user_id, version, room_keys):
|
||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
|
||||
session_id(str): the session whose room_key we're setting
|
||||
room_key(dict): the room_key being set
|
||||
"""
|
||||
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Trying to upload room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
# get the room_key for this particular row
|
||||
current_room_key = None
|
||||
try:
|
||||
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
|
||||
)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
pass
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Room key not found.",
|
||||
"room_id": room_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
if self._should_replace_room_key(current_room_key, room_key):
|
||||
opentracing.log_kv({"message": "Replacing room key."})
|
||||
yield self.store.set_e2e_room_key(
|
||||
user_id, version, room_id, session_id, room_key
|
||||
)
|
||||
else:
|
||||
opentracing.log_kv({"message": "Not replacing room_key."})
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(current_room_key, room_key):
|
||||
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def create_version(self, user_id, version_info):
|
||||
"""Create a new backup version. This automatically becomes the new
|
||||
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
|
||||
raise
|
||||
defer.returnValue(res)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_version(self, user_id, version=None):
|
||||
"""Deletes a given version of the user's e2e_room_keys backup
|
||||
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
|
||||
else:
|
||||
raise
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def update_version(self, user_id, version, version_info):
|
||||
"""Update the info about a given version of the user's backup
|
||||
|
||||
+328
-97
@@ -11,7 +11,7 @@
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.import opentracing
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# NOTE
|
||||
@@ -36,45 +36,22 @@ except ImportError:
|
||||
LogContextScopeManager = None
|
||||
|
||||
import contextlib
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
from functools import wraps
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _DumTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
# Block everything by default
|
||||
_homeserver_whitelist = None
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
# Util methods
|
||||
|
||||
|
||||
def only_if_tracing(func):
|
||||
@@ -91,10 +68,13 @@ def only_if_tracing(func):
|
||||
return _only_if_tracing_inner
|
||||
|
||||
|
||||
# Block everything by default
|
||||
_homeserver_whitelist = None
|
||||
@contextlib.contextmanager
|
||||
def _noop_context_manager(*args, **kwargs):
|
||||
"""Does exactly what it says on the tin"""
|
||||
yield
|
||||
|
||||
tags = _DumTagNames
|
||||
|
||||
# Setup
|
||||
|
||||
|
||||
def init_tracer(config):
|
||||
@@ -138,13 +118,38 @@ def init_tracer(config):
|
||||
tags = opentracing.tags
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _noop_context_manager(*args, **kwargs):
|
||||
"""Does absolutely nothing really well. Can be entered and exited arbitrarily.
|
||||
Good substitute for an opentracing scope."""
|
||||
yield
|
||||
# Whitelisting
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_homeserver_whitelist(homeserver_whitelist):
|
||||
"""Sets the homeserver whitelist
|
||||
|
||||
Args:
|
||||
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
|
||||
"""
|
||||
global _homeserver_whitelist
|
||||
if homeserver_whitelist:
|
||||
# Makes a single regex which accepts all passed in regexes in the list
|
||||
_homeserver_whitelist = re.compile(
|
||||
"({})".format(")|(".join(homeserver_whitelist))
|
||||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def whitelisted_homeserver(destination):
|
||||
"""Checks if a destination matches the whitelist
|
||||
|
||||
Args:
|
||||
destination (String)"""
|
||||
global _homeserver_whitelist
|
||||
if _homeserver_whitelist:
|
||||
return _homeserver_whitelist.match(destination)
|
||||
return False
|
||||
|
||||
|
||||
# Start spans and scopes
|
||||
|
||||
# Could use kwargs but I want these to be explicit
|
||||
def start_active_span(
|
||||
operation_name,
|
||||
@@ -163,8 +168,10 @@ def start_active_span(
|
||||
Returns:
|
||||
scope (Scope) or noop_context_manager
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
else:
|
||||
# We need to enter the scope here for the logcontext to become active
|
||||
return opentracing.tracer.start_active_span(
|
||||
@@ -178,64 +185,13 @@ def start_active_span(
|
||||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def close_active_span():
|
||||
"""Closes the active span. This will close it's logcontext if the context
|
||||
was made for the span"""
|
||||
opentracing.tracer.scope_manager.active.__exit__(None, None, None)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_tag(key, value):
|
||||
"""Set's a tag on the active span"""
|
||||
opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def log_kv(key_values, timestamp=None):
|
||||
"""Log to the active span"""
|
||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
|
||||
# Note: we don't have a get baggage items because we're trying to hide all
|
||||
# scope and span state from synapse. I think this method may also be useless
|
||||
# as a result
|
||||
@only_if_tracing
|
||||
def set_baggage_item(key, value):
|
||||
"""Attach baggage to the active span"""
|
||||
opentracing.tracer.active_span.set_baggage_item(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_operation_name(operation_name):
|
||||
"""Sets the operation name of the active span"""
|
||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_homeserver_whitelist(homeserver_whitelist):
|
||||
"""Sets the whitelist
|
||||
|
||||
Args:
|
||||
homeserver_whitelist (iterable of strings): regex of whitelisted homeservers
|
||||
"""
|
||||
global _homeserver_whitelist
|
||||
if homeserver_whitelist:
|
||||
# Makes a single regex which accepts all passed in regexes in the list
|
||||
_homeserver_whitelist = re.compile(
|
||||
"({})".format(")|(".join(homeserver_whitelist))
|
||||
)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def whitelisted_homeserver(destination):
|
||||
"""Checks if a destination matches the whitelist
|
||||
Args:
|
||||
destination (String)"""
|
||||
global _homeserver_whitelist
|
||||
if _homeserver_whitelist:
|
||||
return _homeserver_whitelist.match(destination)
|
||||
return False
|
||||
def start_active_span_follows_from(operation_name, contexts):
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
else:
|
||||
references = [opentracing.follows_from(context) for context in contexts]
|
||||
scope = start_active_span(operation_name, references=references)
|
||||
return scope
|
||||
|
||||
|
||||
def start_active_span_from_context(
|
||||
@@ -257,6 +213,7 @@ def start_active_span_from_context(
|
||||
# Twisted encodes the values as lists whereas opentracing doesn't.
|
||||
# So, we take the first item in the list.
|
||||
# Also, twisted uses byte arrays while opentracing expects strings.
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
@@ -274,6 +231,91 @@ def start_active_span_from_context(
|
||||
)
|
||||
|
||||
|
||||
def start_active_span_from_edu(
|
||||
edu_content,
|
||||
operation_name,
|
||||
references=[],
|
||||
tags=None,
|
||||
start_time=None,
|
||||
ignore_active_span=False,
|
||||
finish_on_close=True,
|
||||
):
|
||||
"""
|
||||
Extracts a span context from an edu and uses it to start a new active span
|
||||
|
||||
Args:
|
||||
edu_content (Dict): and edu_content with a `context` field whose value is
|
||||
canonical json for a dict which contains opentracing information.
|
||||
"""
|
||||
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
carrier = json.loads(edu_content.get("context", "{}")).get("opentracing", {})
|
||||
context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
_references = [
|
||||
opentracing.child_of(span_context_from_string(x))
|
||||
for x in carrier.get("references", [])
|
||||
]
|
||||
|
||||
# For some reason jaeger decided not to support the visualization of multiple parent
|
||||
# spans or explicitely show references. I include the span context as a tag here as
|
||||
# an aid to people debugging but it's really not an ideal solution.
|
||||
|
||||
references += _references
|
||||
|
||||
scope = opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=context,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
|
||||
scope.span.set_tag("references", carrier.get("references", []))
|
||||
return scope
|
||||
|
||||
|
||||
# Opentracing setters for tags, logs, etc
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_tag(key, value):
|
||||
"""Sets a tag on the active span"""
|
||||
opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def log_kv(key_values, timestamp=None):
|
||||
"""Log to the active span"""
|
||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
|
||||
# Note: we don't have a get baggage items because we're trying to hide all
|
||||
# scope and span state from synapse. I think this method may also be useless
|
||||
# as a result
|
||||
|
||||
# I also thinks it's dangerous with respect to pii. If the whitelisting
|
||||
# is missconfigured or buggy span information will leak. This is no issue
|
||||
# if it's jaeger span id's but baggage can contain any arbitrary data. I would
|
||||
# suggest removing this.
|
||||
@only_if_tracing
|
||||
def set_baggage_item(key, value):
|
||||
"""Attach baggage to the active span"""
|
||||
opentracing.tracer.active_span.set_baggage_item(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def set_operation_name(operation_name):
|
||||
"""Sets the operation name of the active span"""
|
||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
|
||||
|
||||
# Injection and extraction
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_twisted_headers(headers, destination):
|
||||
"""
|
||||
@@ -337,9 +379,160 @@ def inject_active_span_byte_dict(headers, destination):
|
||||
headers[key.encode()] = [value.encode()]
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_text_map(carrier, destination=None):
|
||||
if destination and not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
|
||||
|
||||
def active_span_context_as_string():
|
||||
carrier = {}
|
||||
if opentracing:
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
return json.dumps(carrier)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def span_context_from_string(carrier):
|
||||
carrier = json.loads(carrier)
|
||||
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
def extract_text_map(carrier):
|
||||
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
|
||||
|
||||
|
||||
# Tracing decorators
|
||||
|
||||
|
||||
def trace_deferred(func):
|
||||
"""Decorator to trace a deferred function. Sets the operation name to that of the
|
||||
function's."""
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
@defer.inlineCallbacks
|
||||
def _trace_deferred_inner(self, *args, **kwargs):
|
||||
with start_active_span(func.__name__):
|
||||
r = yield func(self, *args, **kwargs)
|
||||
defer.returnValue(r)
|
||||
|
||||
return _trace_deferred_inner
|
||||
|
||||
|
||||
def trace_deferred_using_operation_name(name):
|
||||
"""Decorator to trace a deferred function. Explicitely sets the operation_name."""
|
||||
|
||||
def trace_deferred(func):
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
@defer.inlineCallbacks
|
||||
def _trace_deferred_inner(self, *args, **kwargs):
|
||||
# Start scope
|
||||
with start_active_span(name):
|
||||
r = yield func(self, *args, **kwargs)
|
||||
defer.returnValue(r)
|
||||
|
||||
return _trace_deferred_inner
|
||||
|
||||
return trace_deferred
|
||||
|
||||
|
||||
def trace(func):
|
||||
"""Decorator to trace a normal function. Sets the operation name to that of the
|
||||
function's."""
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _trace_inner(self, *args, **kwargs):
|
||||
with start_active_span(func.__name__):
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return _trace_inner
|
||||
|
||||
|
||||
def trace_using_operation_name(operation_name):
|
||||
"""Decorator to trace a function. Explicitely sets the operation_name."""
|
||||
|
||||
def trace(func):
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _trace_inner(self, *args, **kwargs):
|
||||
with start_active_span(operation_name):
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return _trace_inner
|
||||
|
||||
return trace
|
||||
|
||||
|
||||
def tag_args(func):
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
def _tag_args_inner(self, *args, **kwargs):
|
||||
argspec = inspect.getargspec(func)
|
||||
for i, arg in enumerate(argspec.args[1:]):
|
||||
set_tag("ARG_" + arg, args[i])
|
||||
set_tag("args", args[len(argspec.args) :])
|
||||
set_tag("kwargs", kwargs)
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
return _tag_args_inner
|
||||
|
||||
|
||||
def wrap_in_span(func):
|
||||
"""Its purpose is to wrap a function that is being passed into a context
|
||||
which is a complete break from the current logcontext. This function creates
|
||||
a non active span from the current context and closes it after the function
|
||||
executes."""
|
||||
global opentracing
|
||||
# I haven't use this function yet
|
||||
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
parent_span = opentracing.tracer.active_span
|
||||
|
||||
@wraps(func)
|
||||
def _wrap_in_span_inner(self, *args, **kwargs):
|
||||
span = opentracing.tracer.start_span(func.__name__, child_of=parent_span)
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
span.set_tag("error", True)
|
||||
span.log_kv({"exception", e})
|
||||
raise
|
||||
finally:
|
||||
span.finish()
|
||||
|
||||
return _wrap_in_span_inner
|
||||
|
||||
|
||||
def trace_servlet(servlet_name, func):
|
||||
"""Decorator which traces a serlet. It starts a span with some servlet specific
|
||||
tags such as the servlet_name and request information"""
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
@defer.inlineCallbacks
|
||||
@@ -357,6 +550,44 @@ def trace_servlet(servlet_name, func):
|
||||
},
|
||||
):
|
||||
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
|
||||
defer.returnValue(result)
|
||||
defer.returnValue(result)
|
||||
|
||||
return _trace_servlet_inner
|
||||
|
||||
|
||||
# Helper class
|
||||
|
||||
|
||||
class _DummyTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
|
||||
|
||||
tags = _DummyTagNames
|
||||
|
||||
@@ -133,7 +133,7 @@ class _LogContextScope(Scope):
|
||||
|
||||
def close(self):
|
||||
if self.manager.active is not self:
|
||||
logger.error("Tried to close a none active scope!")
|
||||
logger.error("Tried to close a non-active scope!")
|
||||
return
|
||||
|
||||
if self._finish_on_close:
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
|
||||
self.auth = hs.get_auth()
|
||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||
|
||||
@opentracing.trace_deferred_using_operation_name("upload_keys")
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, device_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
|
||||
# passing the device_id here is deprecated; however, we allow it
|
||||
# for now for compatibility with older clients.
|
||||
if requester.device_id is not None and device_id != requester.device_id:
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Client uploading keys for a different device",
|
||||
"logged_in_id": requester.device_id,
|
||||
"key_being_uploaded": device_id,
|
||||
}
|
||||
)
|
||||
logger.warning(
|
||||
"Client uploading keys for a different device "
|
||||
"(logged in as %s, uploading for %s)",
|
||||
@@ -173,20 +183,24 @@ class KeyChangesServlet(RestServlet):
|
||||
self.auth = hs.get_auth()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
from_token_string = parse_string(request, "from")
|
||||
opentracing.set_tag("from", from_token_string)
|
||||
|
||||
# We want to enforce they do pass us one, but we ignore it and return
|
||||
# changes after the "to" as well as before.
|
||||
parse_string(request, "to")
|
||||
opentracing.set_tag("to", parse_string(request, "to"))
|
||||
|
||||
from_token = StreamToken.from_string(from_token_string)
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
|
||||
results = yield self.device_handler.get_user_ids_changed(user_id, from_token)
|
||||
|
||||
defer.returnValue((200, results))
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
@@ -311,6 +312,7 @@ class RoomKeysVersionServlet(RestServlet):
|
||||
self.auth = hs.get_auth()
|
||||
self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
|
||||
|
||||
@opentracing.trace_deferred_using_operation_name("get_room_keys_version")
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, version):
|
||||
"""
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.http import servlet
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
|
||||
self.txns = HttpTransactionCache(hs)
|
||||
self.device_message_handler = hs.get_device_message_handler()
|
||||
|
||||
@opentracing.trace_deferred_using_operation_name("sendToDevice")
|
||||
def on_PUT(self, request, message_type, txn_id):
|
||||
opentracing.set_tag("message_type", message_type)
|
||||
opentracing.set_tag("txn_id", txn_id)
|
||||
return self.txns.fetch_or_execute_request(
|
||||
request, self._put, request, message_type, txn_id
|
||||
)
|
||||
|
||||
@@ -19,6 +19,7 @@ from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
"get_new_messages_for_device", get_new_messages_for_device_txn
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
|
||||
"""
|
||||
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
(user_id, device_id), None
|
||||
)
|
||||
|
||||
opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
|
||||
|
||||
if last_deleted_stream_id:
|
||||
has_changed = self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, last_deleted_stream_id
|
||||
)
|
||||
if not has_changed:
|
||||
opentracing.log_kv({"message": "No changes in cache since last check"})
|
||||
defer.returnValue(0)
|
||||
|
||||
def delete_messages_for_device_txn(txn):
|
||||
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
"delete_messages_for_device", delete_messages_for_device_txn
|
||||
)
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "deleted {} messages for device".format(count), "count": count}
|
||||
)
|
||||
|
||||
# Update the cache, ensuring that we only ever increase the value
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
(user_id, device_id), 0
|
||||
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue(count)
|
||||
|
||||
@opentracing.trace
|
||||
def get_new_device_msgs_for_remote(
|
||||
self, destination, last_stream_id, current_stream_id, limit
|
||||
):
|
||||
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
in the stream the messages got to.
|
||||
"""
|
||||
|
||||
opentracing.set_tag("destination", destination)
|
||||
opentracing.set_tag("last_stream_id", last_stream_id)
|
||||
opentracing.set_tag("current_stream_id", current_stream_id)
|
||||
opentracing.set_tag("limit", limit)
|
||||
|
||||
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
|
||||
destination, last_stream_id
|
||||
)
|
||||
if not has_changed or last_stream_id == current_stream_id:
|
||||
opentracing.log_kv({"message": "No new messages in stream"})
|
||||
return defer.succeed(([], current_stream_id))
|
||||
|
||||
if limit <= 0:
|
||||
# This can happen if we run out of room for EDUs in the transaction.
|
||||
return defer.succeed(([], last_stream_id))
|
||||
|
||||
@opentracing.trace
|
||||
def get_new_messages_for_remote_destination_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, messages_json FROM device_federation_outbox"
|
||||
@@ -156,6 +174,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
stream_pos = row[0]
|
||||
messages.append(json.loads(row[1]))
|
||||
if len(messages) < limit:
|
||||
opentracing.log_kv(
|
||||
{"message": "Set stream position to current position"}
|
||||
)
|
||||
stream_pos = current_stream_id
|
||||
return (messages, stream_pos)
|
||||
|
||||
@@ -164,6 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
get_new_messages_for_remote_destination_txn,
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
|
||||
"""Used to delete messages when the remote destination acknowledges
|
||||
their receipt.
|
||||
@@ -214,6 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def add_messages_to_device_inbox(
|
||||
self, local_messages_by_user_then_device, remote_messages_by_destination
|
||||
|
||||
@@ -20,6 +20,7 @@ from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
|
||||
@@ -73,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue({d["device_id"]: d for d in devices})
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_remote(self, destination, from_stream_id, limit):
|
||||
"""Get stream of updates to send to remote servers
|
||||
@@ -127,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
# (user_id, device_id) entries into a map, with the value being
|
||||
# the max stream_id across each set of duplicate entries
|
||||
#
|
||||
# maps (user_id, device_id) -> stream_id
|
||||
# maps (user_id, device_id) -> (stream_id, context)
|
||||
# as long as their stream_id does not match that of the last row
|
||||
# where context is any metadata about the message's context such as
|
||||
# opentracing data
|
||||
query_map = {}
|
||||
for update in updates:
|
||||
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
|
||||
@@ -136,7 +140,17 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
break
|
||||
|
||||
key = (update[0], update[1])
|
||||
query_map[key] = max(query_map.get(key, 0), update[2])
|
||||
logger.info("++++++++++++++++++++++++ update-0 %s", update[0])
|
||||
logger.info("++++++++++++++++++++++++ update-1 %s", update[1])
|
||||
logger.info("++++++++++++++++++++++++ update-2 %s", update[2])
|
||||
logger.info("++++++++++++++++++++++++ update-3 %s", update[3])
|
||||
logger.info(
|
||||
"+++++++++++++++++++++++++++ %s", (query_map.get(key, 0), update[2])
|
||||
)
|
||||
query_map[key] = (
|
||||
max(query_map.get(key, (0, None))[0], update[2]),
|
||||
update[3],
|
||||
)
|
||||
|
||||
# If we didn't find any updates with a stream_id lower than the cutoff, it
|
||||
# means that there are more than limit updates all of which have the same
|
||||
@@ -171,7 +185,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
List: List of device updates
|
||||
"""
|
||||
sql = """
|
||||
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
|
||||
SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
|
||||
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
@@ -210,12 +224,15 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
destination, user_id, from_stream_id
|
||||
)
|
||||
for device_id, device in iteritems(user_devices):
|
||||
stream_id = query_map[(user_id, device_id)]
|
||||
stream_id = query_map[(user_id, device_id)][0]
|
||||
result = {
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"prev_id": [prev_id] if prev_id else [],
|
||||
"stream_id": stream_id,
|
||||
"context": query_map[(user_id, device_id)][1]
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
|
||||
prev_id = stream_id
|
||||
@@ -299,6 +316,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
def get_device_stream_token(self):
|
||||
return self._device_list_id_gen.get_current_token()
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_user_devices_from_cache(self, query_list):
|
||||
"""Get the devices (and keys if any) for remote users from the cache.
|
||||
@@ -329,7 +347,8 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
results.setdefault(user_id, {})[device_id] = device
|
||||
else:
|
||||
results[user_id] = yield self._get_cached_devices_for_user(user_id)
|
||||
|
||||
opentracing.set_tag("in_cache", results)
|
||||
opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
|
||||
defer.returnValue((user_ids_not_in_cache, results))
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, tree=True)
|
||||
@@ -814,6 +833,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
],
|
||||
)
|
||||
|
||||
context = {"opentracing": {}}
|
||||
opentracing.inject_active_span_text_map(context["opentracing"])
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="device_lists_outbound_pokes",
|
||||
@@ -825,6 +847,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
"device_id": device_id,
|
||||
"sent": False,
|
||||
"ts": now,
|
||||
"context": json.dumps(context)
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
for destination in hosts
|
||||
for device_id in device_ids
|
||||
|
||||
@@ -17,6 +17,7 @@ import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import StoreError
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Set room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"room_key": room_key,
|
||||
}
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue(sessions)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
def create_e2e_room_keys_version(self, user_id, info):
|
||||
"""Atomically creates a new version of this user's e2e_room_keys store
|
||||
with the given version info.
|
||||
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
def update_e2e_room_keys_version(self, user_id, version, info):
|
||||
"""Update a given backup version
|
||||
|
||||
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
desc="update_e2e_room_keys_version",
|
||||
)
|
||||
|
||||
@opentracing.trace_deferred
|
||||
def delete_e2e_room_keys_version(self, user_id, version=None):
|
||||
"""Delete a given backup version of the user's room keys.
|
||||
Doesn't delete their actual key data.
|
||||
|
||||
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
||||
class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
@opentracing.trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_device_keys(
|
||||
self, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
Dict mapping from user-id to dict mapping from device_id to
|
||||
dict containing "key_json", "device_display_name".
|
||||
"""
|
||||
opentracing.set_tag("query_list", query_list)
|
||||
if not query_list:
|
||||
defer.returnValue({})
|
||||
|
||||
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@opentracing.trace
|
||||
def _get_e2e_device_keys_txn(
|
||||
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
):
|
||||
opentracing.set_tag("include_all_devices", include_all_devices)
|
||||
opentracing.set_tag("include_deleted_devices", include_deleted_devices)
|
||||
|
||||
query_clauses = []
|
||||
query_params = []
|
||||
|
||||
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
for user_id, device_id in deleted_devices:
|
||||
result.setdefault(user_id, {})[device_id] = None
|
||||
|
||||
opentracing.log_kv(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -129,10 +137,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
desc="add_e2e_one_time_keys_check",
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
{(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
opentracing.log_kv(
|
||||
{"message": "Fetched one time keys for user", "one_time_keys": result}
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
||||
@@ -148,6 +157,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
"""
|
||||
|
||||
def _add_e2e_one_time_keys(txn):
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("device_id", device_id)
|
||||
opentracing.set_tag("new_keys", new_keys)
|
||||
# We are protected from race between lookup and insertion due to
|
||||
# a unique constraint. If there is a race of two calls to
|
||||
# `add_e2e_one_time_keys` then they'll conflict and we will only
|
||||
@@ -204,6 +216,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
"""
|
||||
|
||||
def _set_e2e_device_keys_txn(txn):
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("device_id", device_id)
|
||||
opentracing.set_tag("time_now", time_now)
|
||||
opentracing.set_tag("device_keys", device_keys)
|
||||
|
||||
old_key_json = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
@@ -217,6 +234,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
||||
|
||||
if old_key_json == new_key_json:
|
||||
opentracing.log_kv({"Message": "Device key already stored."})
|
||||
return False
|
||||
|
||||
self._simple_upsert_txn(
|
||||
@@ -225,7 +243,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
values={"ts_added_ms": time_now, "key_json": new_key_json},
|
||||
)
|
||||
|
||||
opentracing.log_kv({"message": "Device keys stored."})
|
||||
return True
|
||||
|
||||
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
|
||||
@@ -233,6 +251,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
def claim_e2e_one_time_keys(self, query_list):
|
||||
"""Take a list of one time keys out of the database"""
|
||||
|
||||
@opentracing.trace
|
||||
def _claim_e2e_one_time_keys(txn):
|
||||
sql = (
|
||||
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
|
||||
@@ -254,7 +273,15 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
" AND key_id = ?"
|
||||
)
|
||||
for user_id, device_id, algorithm, key_id in delete:
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Executing claim e2e_one_time_keys transaction on database."
|
||||
}
|
||||
)
|
||||
txn.execute(sql, (user_id, device_id, algorithm, key_id))
|
||||
opentracing.log_kv(
|
||||
{"message": "finished executing and invalidating cache"}
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.count_e2e_one_time_keys, (user_id, device_id)
|
||||
)
|
||||
@@ -264,6 +291,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
|
||||
def delete_e2e_keys_by_device(self, user_id, device_id):
|
||||
def delete_e2e_keys_by_device_txn(txn):
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Deleting keys for device",
|
||||
"device_id": device_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE device_lists_outbound_pokes ADD context TEXT;
|
||||
Reference in New Issue
Block a user