1
0

Compare commits

...

49 Commits

Author SHA1 Message Date
Jorik Schellekens 4824e61c97 Log result_dict 2019-08-20 10:42:17 +01:00
Jorik Schellekens 775853e012 How does that '+' keep coming back? 2019-08-20 10:26:31 +01:00
Jorik Schellekens eee4eff050 Bind exception to name 2019-08-05 16:11:54 +01:00
Jorik Schellekens a68119e676 Merge remote-tracking branch 'origin/develop' into joriks/opentracing_e2e 2019-08-05 16:03:37 +01:00
Jorik Schellekens 82d6eb186c Remove unused import 2019-08-05 15:12:17 +01:00
Jorik Schellekens 4f36a2dfec Create and use a method to get the span context as a dict. 2019-08-05 15:02:29 +01:00
Jorik Schellekens bd0ed7b0c4 Docstrings shouldn't lie. 2019-08-05 14:44:19 +01:00
Jorik Schellekens 6a355ca20a Refactor for clarity. 2019-08-05 14:26:30 +01:00
Jorik Schellekens 22b7e6a011 Debug comments gone rampant. 2019-08-05 14:23:11 +01:00
Jorik Schellekens f76b071c99 Remove redundent spans. 2019-08-05 14:21:55 +01:00
Jorik Schellekens 7ab2088ddc Remove redundent tagging. 2019-08-05 14:06:56 +01:00
Jorik Schellekens 03958693ce Use underscores. 2019-08-05 13:59:04 +01:00
Jorik Schellekens 50964d2341 String concatenation without the '+' 2019-08-05 13:56:04 +01:00
Jorik Schellekens e7f42857bb Refactor return value so we don't create identical lists each time. 2019-08-05 13:48:09 +01:00
Jorik Schellekens d859c34fb9 Nicer use of dict update methods. 2019-08-05 12:16:10 +01:00
Jorik Schellekens 46e41ee08d Comment for 'context' column in device_lists_outbound_pokes' 2019-08-05 11:48:10 +01:00
Jorik Schellekens 35eb018c02 Double negatives do not make code that isn't unclear..
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2019-08-05 11:39:58 +01:00
Jorik Schellekens 8b53f8e9bd Typo
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2019-08-05 11:36:50 +01:00
Jorik Schellekens 08aaad0756 Nicer changelog
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2019-08-05 11:30:18 +01:00
Jorik Schellekens 1801578795 Use unified trace method 2019-07-23 16:06:33 +01:00
Jorik Schellekens beea2e31b9 I wish python had a good type system. 2019-07-23 16:06:33 +01:00
Jorik Schellekens d395650bcf Use better decorator names. 2019-07-23 16:06:33 +01:00
Jorik Schellekens cffba280de Trailing .d 2019-07-23 16:06:33 +01:00
Jorik Schellekens d26cbb4e23 Unbreak json parsing. 2019-07-23 16:06:33 +01:00
Jorik Schellekens 6944c994a5 Make sure there is an active span here. 2019-07-23 16:06:33 +01:00
Jorik Schellekens 30738e978e newsfile 2019-07-23 16:06:33 +01:00
Jorik Schellekens 4824e30810 Add user _id 2019-07-23 16:06:33 +01:00
Jorik Schellekens e49487fc91 Better args wrapper 2019-07-23 16:05:58 +01:00
Jorik Schellekens ab191f99f9 A little extra device_list tracing 2019-07-23 16:05:58 +01:00
Jorik Schellekens a293759d8c Though style is subjective it depends on a ruthless objectivity: you either have it, or you don't· 2019-07-23 16:05:58 +01:00
Jorik Schellekens d44f303fe0 Isort of ran out of puns for this one. 2019-07-23 16:05:58 +01:00
Jorik Schellekens 2fd49cedbb Nicer tracing 2019-07-23 16:05:58 +01:00
Jorik Schellekens d4bdc2ba80 Trace key claiming 2019-07-23 16:05:58 +01:00
Jorik Schellekens 794c9e2a75 Cleanup key upload tracing 2019-07-23 16:05:58 +01:00
Jorik Schellekens 01229a4af9 Clean up room key tracing 2019-07-23 16:05:21 +01:00
Jorik Schellekens 1ed790d67a Some tracing 2019-07-23 16:05:21 +01:00
Jorik Schellekens 957cd77e95 Opentracing across streams 2019-07-23 16:05:21 +01:00
Jorik Schellekens bfc50050fd The great logging/ migration 2019-07-23 16:05:20 +01:00
Jorik Schellekens 7ae7e796ff These functions were not deferreds! 2019-07-23 16:04:02 +01:00
Jorik Schellekens d9f0c7ff47 How did that half of the statement get deleted? 2019-07-23 16:04:02 +01:00
Jorik Schellekens 28113ad335 typo 2019-07-23 16:04:02 +01:00
Jorik Schellekens d94897e818 Include servletname in incoming-request trace 2019-07-23 16:04:02 +01:00
Jorik Schellekens 21940cadf0 Update to new access pattern 2019-07-23 16:04:02 +01:00
Jorik Schellekens c988c02c7c Trace device messages. 2019-07-23 16:01:06 +01:00
Jorik Schellekens fd669e5e44 Trace devices 2019-07-23 16:00:46 +01:00
Jorik Schellekens d876cda725 Trace more e2e stuff and less e2e stuff 2019-07-23 15:59:23 +01:00
Jorik Schellekens 1e7099d04c Fix e2e bugs 2019-07-23 15:59:23 +01:00
Jorik Schellekens 565544b603 Trace e2e 2019-07-23 15:59:23 +01:00
Jorik Schellekens 823c34a940 One tracing decorator to rule them all. 2019-07-23 15:56:29 +01:00
17 changed files with 480 additions and 108 deletions
+1
View File
@@ -0,0 +1 @@
Add OpenTracing instrumentation for e2e code paths.
+11
View File
@@ -22,6 +22,7 @@ from netaddr import IPAddress
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
@@ -178,6 +179,7 @@ class Auth(object):
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)
@opentracing.trace
@defer.inlineCallbacks
def get_user_by_req(
self, request, allow_guest=False, rights="access", allow_expired=False
@@ -209,6 +211,10 @@ class Auth(object):
user_id, app_service = yield self._get_appservice_user_id(request)
if user_id:
request.authenticated_entity = user_id
opentracing.set_tag("authenticated_entity", user_id)
# there is at least one other place where authenticated entity is
# set. user_id is tagged in case authenticated_entity is clobbered
opentracing.set_tag("user_id", user_id)
if ip_addr and self.hs.config.track_appservice_user_ips:
yield self.store.insert_client_ip(
@@ -260,6 +266,11 @@ class Auth(object):
request.authenticated_entity = user.to_string()
opentracing.set_tag("authenticated_entity", user.to_string())
# there is at least one other place where authenticated entity is
# set. user_id is tagged incase authenticated_entity is clobbered
opentracing.set_tag("user_id", user.to_string())
return synapse.types.create_requester(
user, token_id, is_guest, device_id, app_service=app_service
)
+12 -6
View File
@@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
import synapse.logging.opentracing as opentracing
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
@opentracing.trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,9 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
opentracing.log_kv(
{"message": "Claiming one time keys.", "user, device pairs": query}
)
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -808,12 +813,13 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with opentracing.start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
@@ -16,10 +16,12 @@
import datetime
import logging
from canonicaljson import json
from prometheus_client import Counter
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
@@ -204,97 +206,137 @@ class PerDestinationQueue(object):
pending_edus = device_update_edus + to_device_edus
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
# Make a transaction sending span, this span follows on from all the
# edus in that transaction. This needs to be done because if the edus
# are never received on the remote the span effectively has no causality.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
span_contexts = [
opentracing.extract_text_map(
json.loads(
edu.get_dict().get("content", {}).get("context", "{}")
).get("opentracing", {})
)
for edu in pending_edus
]
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
with opentracing.start_active_span_follows_from(
"send_transaction", span_contexts
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
# Link each sent edu to this transaction's span
_pending_edus = []
for edu in pending_edus:
edu_dict = edu.get_dict()
span_context = json.loads(
edu_dict.get("content", {}).get("context", "{}")
).get("opentracing", {})
# If there is no span context then we are either blacklisting
# this destination or we are not tracing
if span_context:
span_context.setdefault("references", []).append(
opentracing.active_span_context_as_string()
)
edu_dict["content"]["context"] = json.dumps(
{"opentracing": span_context}
)
_pending_edus.append(Edu(**edu_dict))
pending_edus = _pending_edus
if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.
pending_pdus = self._pending_pdus
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = (
pending_pdus[:50],
pending_pdus[50:],
)
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
pending_edus.extend(
self._pop_pending_edus(
MAX_EDUS_PER_TRANSACTION - len(pending_edus)
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)
if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
+1
View File
@@ -297,6 +297,7 @@ class BaseFederationServlet(object):
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
},
):
if origin:
+66 -3
View File
@@ -18,6 +18,7 @@ from six import iteritems, itervalues
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.api.errors import (
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
@opentracing.trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device
"""
opentracing.set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices:
_update_device_from_client_ips(device, ips)
opentracing.log_kv(device_map)
return devices
@opentracing.trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
opentracing.set_tag("device", device)
opentracing.set_tag("ips", ips)
return device
@measure_func("device.get_user_ids_changed")
@opentracing.trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -133,7 +146,7 @@ class DeviceWorkerHandler(BaseHandler):
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
continue
# Fetch the current state at the time.
try:
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
opentracing.log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = []
possibly_left = []
return {"changed": list(possibly_joined), "left": list(possibly_left)}
result = {"changed": list(possibly_joined), "left": list(possibly_left)}
opentracing.log_kv(result)
return {result}
class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@opentracing.trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
opentracing.set_tag("error", True)
opentracing.log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass
else:
raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id])
@opentracing.trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
opentracing.set_tag("error", True)
opentracing.set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
@opentracing.trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
opentracing.set_tag("hosts to update", hosts)
position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
@@ -405,6 +436,9 @@ class DeviceHandler(DeviceWorkerHandler):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
opentracing.log_kv(
{"message": "sent device update to host", "host": host}
)
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
@@ -451,12 +485,15 @@ class DeviceListUpdater(object):
iterable=True,
)
@opentracing.trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
opentracing.set_tag("origin", origin)
opentracing.set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -471,12 +508,30 @@ class DeviceListUpdater(object):
device_id,
origin,
)
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Got a device list update edu from a user and "
"device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Got an update from a user for which "
"we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
@@ -578,6 +633,7 @@ class DeviceListUpdater(object):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
opentracing.log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
@@ -594,13 +650,20 @@ class DeviceListUpdater(object):
# eventually become consistent.
return
except FederationDeniedError as e:
opentracing.set_tag("error", True)
opentracing.log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception:
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
opentracing.set_tag("error", True)
opentracing.log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
opentracing.log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
+21 -7
View File
@@ -15,8 +15,11 @@
import logging
from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import SynapseError
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -78,7 +81,8 @@ class DeviceMessageHandler(object):
@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
opentracing.set_tag("number_of_messages", len(messages))
opentracing.set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
@@ -100,15 +104,24 @@ class DeviceMessageHandler(object):
message_id = random_string(16)
context = {"opentracing": {}}
opentracing.inject_active_span_text_map(context["opentracing"])
remote_edu_contents = {}
for destination, messages in remote_messages.items():
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
}
with opentracing.start_active_span("to_device_for_user"):
opentracing.set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
opentracing.log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
@@ -117,6 +130,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
opentracing.log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
+56 -1
View File
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import UserID, get_domain_from_id
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
@opentracing.trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)
# First get local devices.
failures = {}
results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
@opentracing.trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
@@ -131,6 +137,7 @@ class E2eKeysHandler(object):
destination_query = remote_queries_not_in_cache[destination]
opentracing.set_tag("key_query", destination_query)
# We first consider whether we wish to update the device list cache with
# the users device list. We want to track a user's devices when the
# authenticated user shares a room with the queried user and the query
@@ -185,6 +192,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
opentracing.set_tag("error", True)
opentracing.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -198,6 +207,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures}
@opentracing.trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -210,6 +220,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
opentracing.set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -217,6 +228,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
opentracing.log_kv(
{
"message": "Requested a local key for a user which"
" was not local to the homeserver",
"user_id": user_id,
}
)
opentracing.set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -241,6 +260,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
opentracing.log_kv(result_dict)
return result_dict
@defer.inlineCallbacks
@@ -251,6 +271,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
@opentracing.trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -265,6 +286,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -276,8 +300,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
@opentracing.trace
@defer.inlineCallbacks
def claim_client_keys(destination):
opentracing.set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +316,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
opentracing.set_tag("error", True)
opentracing.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -313,9 +341,11 @@ class E2eKeysHandler(object):
),
)
opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
@defer.inlineCallbacks
@opentracing.tag_args
def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
@@ -329,6 +359,13 @@ class E2eKeysHandler(object):
user_id,
time_now,
)
opentracing.log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
@@ -336,12 +373,26 @@ class E2eKeysHandler(object):
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
else:
opentracing.log_kv(
{"message": "Not updating device_keys for user", "user_id": user_id}
)
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
opentracing.log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
opentracing.log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +403,7 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
opentracing.set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
@defer.inlineCallbacks
@@ -395,6 +447,9 @@ class E2eKeysHandler(object):
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
opentracing.log_kv(
{"message": "Inserting new one_time_keys.", "keys": new_keys}
)
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
+26 -2
View File
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
Codes,
NotFoundError,
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@opentracing.trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
opentracing.log_kv(results)
return results
@opentracing.trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@opentracing.trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
opentracing.log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
)
except StoreError as e:
if e.code == 404:
pass
opentracing.log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
opentracing.log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
opentracing.log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
return False
return True
@opentracing.trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
raise
return res
@opentracing.trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
else:
raise
@opentracing.trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
+25
View File
@@ -527,6 +527,31 @@ def inject_active_span_text_map(carrier, destination=None):
)
def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of injecting a span
into an empty carrier.
Args:
destination (str): the name of the remote server. The dict will only
contain a span context if the destination matches the homeserver_whitelist
or if destination is None.
Returns:
A dict containing the span context.
"""
if not opentracing or (destination and not whitelisted_homeserver(destination)):
return {}
carrier = {}
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
return carrier
def active_span_context_as_string():
"""
Returns:
+12 -1
View File
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import SynapseError
from synapse.http.servlet import (
RestServlet,
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@opentracing.trace_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
opentracing.set_tag("error", True)
opentracing.log_kv(
{
"message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id,
"key_being_uploaded": device_id,
}
)
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from")
opentracing.set_tag("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
parse_string(request, "to")
opentracing.set_tag("to", parse_string(request, "to"))
from_token = StreamToken.from_string(from_token_string)
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request
from synapse.rest.client.transactions import HttpTransactionCache
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler()
@opentracing.trace_using_operation_name("sendToDevice")
def on_PUT(self, request, message_type, txn_id):
opentracing.set_tag("message_type", message_type)
opentracing.set_tag("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)
+23
View File
@@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"get_new_messages_for_device", get_new_messages_for_device_txn
)
@opentracing.trace
@defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None
)
opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id
)
if not has_changed:
opentracing.log_kv({"message": "No changes in cache since last check"})
return 0
def delete_messages_for_device_txn(txn):
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn
)
opentracing.log_kv(
{"message": "deleted {} messages for device".format(count), "count": count}
)
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
@opentracing.trace
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
):
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to.
"""
opentracing.set_tag("destination", destination)
opentracing.set_tag("last_stream_id", last_stream_id)
opentracing.set_tag("current_stream_id", current_stream_id)
opentracing.set_tag("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
if not has_changed or last_stream_id == current_stream_id:
opentracing.log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id))
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id))
@opentracing.trace
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -156,6 +174,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0]
messages.append(json.loads(row[1]))
if len(messages) < limit:
opentracing.log_kv(
{"message": "Set stream position to current position"}
)
stream_pos = current_stream_id
return (messages, stream_pos)
@@ -164,6 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
get_new_messages_for_remote_destination_txn,
)
@opentracing.trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges
their receipt.
@@ -214,6 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000,
)
@opentracing.trace
@defer.inlineCallbacks
def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination
+25 -6
View File
@@ -20,6 +20,7 @@ from canonicaljson import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
@@ -73,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices}
@opentracing.trace
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
@@ -127,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
# maps (user_id, device_id) -> stream_id
# maps (user_id, device_id) -> (stream_id, context)
# as long as their stream_id does not match that of the last row
# where context is any metadata about the message's context such as
# opentracing data
query_map = {}
for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +140,10 @@ class DeviceWorkerStore(SQLBaseStore):
break
key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2])
context = update[3]
stream_id = max(query_map.get(key, (0, None))[0], update[2])
query_map[key] = (stream_id, context)
# If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same
@@ -171,7 +178,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates
"""
sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT ?
@@ -187,8 +194,8 @@ class DeviceWorkerStore(SQLBaseStore):
Args:
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
query_map (Dict[(str, str): int]): Dictionary mapping
user_id/device_id to update stream_id
query_map (Dict[(str, str): (int, str)]): Dictionary mapping
user_id/device_id to update stream_id and the relevent opentracing context
Returns:
List[Dict]: List of objects representing an device update EDU
@@ -210,12 +217,15 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
stream_id, _ = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
"context": query_map[(user_id, device_id)][1]
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
prev_id = stream_id
@@ -299,6 +309,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
@opentracing.trace
@defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@@ -330,6 +341,9 @@ class DeviceWorkerStore(SQLBaseStore):
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
opentracing.set_tag("in_cache", results)
opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
return (user_ids_not_in_cache, results)
@cachedInlineCallbacks(num_args=2, tree=True)
@@ -814,6 +828,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
],
)
context = {"opentracing": opentracing.get_active_span_text_map()}
self._simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
@@ -825,6 +841,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
"context": json.dumps(context)
if opentracing.whitelisted_homeserver(destination)
else "{}",
}
for destination in hosts
for device_id in device_ids
+14
View File
@@ -17,6 +17,7 @@ import json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import StoreError
from ._base import SQLBaseStore
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
lock=False,
)
opentracing.log_kv(
{
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
"room_key": room_key,
}
)
@opentracing.trace
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
@opentracing.trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
@opentracing.trace
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
@opentracing.trace
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
@opentracing.trace
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
+39 -3
View File
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
@opentracing.trace
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
opentracing.set_tag("query_list", query_list)
if not query_list:
return {}
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
@opentracing.trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
opentracing.set_tag("include_all_devices", include_all_devices)
opentracing.set_tag("include_deleted_devices", include_deleted_devices)
query_clauses = []
query_params = []
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
opentracing.log_kv(result)
return result
@defer.inlineCallbacks
@@ -129,8 +137,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
opentracing.log_kv(
{"message": "Fetched one time keys for user", "one_time_keys": result}
)
return result
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +157,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("device_id", device_id)
opentracing.set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +216,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("device_id", device_id)
opentracing.set_tag("time_now", time_now)
opentracing.set_tag("device_keys", device_keys)
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -215,6 +234,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
opentracing.log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@@ -223,7 +243,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
opentracing.log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +251,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
@opentracing.trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +273,15 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
opentracing.log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
opentracing.log_kv(
{"message": "finished executing and invalidating cache"}
)
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -262,6 +291,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
opentracing.log_kv(
{
"message": "Deleting keys for device",
"device_id": device_id,
"user_id": user_id,
}
)
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
@@ -0,0 +1,23 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Opentracing needs to inject a span_context into data item which leaves the
* current execution context. Since device list updates are dumped in here
* and then processed later they need to include the span context for opentraing.
* Since we may also decide later to include other tracking information the column
* has just been called "context", the structure of the data within it may change.
*/
ALTER TABLE device_lists_outbound_pokes ADD context TEXT;