Compare commits
49 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4824e61c97 | |||
| 775853e012 | |||
| eee4eff050 | |||
| a68119e676 | |||
| 82d6eb186c | |||
| 4f36a2dfec | |||
| bd0ed7b0c4 | |||
| 6a355ca20a | |||
| 22b7e6a011 | |||
| f76b071c99 | |||
| 7ab2088ddc | |||
| 03958693ce | |||
| 50964d2341 | |||
| e7f42857bb | |||
| d859c34fb9 | |||
| 46e41ee08d | |||
| 35eb018c02 | |||
| 8b53f8e9bd | |||
| 08aaad0756 | |||
| 1801578795 | |||
| beea2e31b9 | |||
| d395650bcf | |||
| cffba280de | |||
| d26cbb4e23 | |||
| 6944c994a5 | |||
| 30738e978e | |||
| 4824e30810 | |||
| e49487fc91 | |||
| ab191f99f9 | |||
| a293759d8c | |||
| d44f303fe0 | |||
| 2fd49cedbb | |||
| d4bdc2ba80 | |||
| 794c9e2a75 | |||
| 01229a4af9 | |||
| 1ed790d67a | |||
| 957cd77e95 | |||
| bfc50050fd | |||
| 7ae7e796ff | |||
| d9f0c7ff47 | |||
| 28113ad335 | |||
| d94897e818 | |||
| 21940cadf0 | |||
| c988c02c7c | |||
| fd669e5e44 | |||
| d876cda725 | |||
| 1e7099d04c | |||
| 565544b603 | |||
| 823c34a940 |
@@ -0,0 +1 @@
|
||||
Add OpenTracing instrumentation for e2e code paths.
|
||||
@@ -22,6 +22,7 @@ from netaddr import IPAddress
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
import synapse.types
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
@@ -178,6 +179,7 @@ class Auth(object):
|
||||
def get_public_keys(self, invite_event):
|
||||
return event_auth.get_public_keys(invite_event)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_req(
|
||||
self, request, allow_guest=False, rights="access", allow_expired=False
|
||||
@@ -209,6 +211,10 @@ class Auth(object):
|
||||
user_id, app_service = yield self._get_appservice_user_id(request)
|
||||
if user_id:
|
||||
request.authenticated_entity = user_id
|
||||
opentracing.set_tag("authenticated_entity", user_id)
|
||||
# there is at least one other place where authenticated entity is
|
||||
# set. user_id is tagged in case authenticated_entity is clobbered
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
|
||||
if ip_addr and self.hs.config.track_appservice_user_ips:
|
||||
yield self.store.insert_client_ip(
|
||||
@@ -260,6 +266,11 @@ class Auth(object):
|
||||
|
||||
request.authenticated_entity = user.to_string()
|
||||
|
||||
opentracing.set_tag("authenticated_entity", user.to_string())
|
||||
# there is at least one other place where authenticated entity is
|
||||
# set. user_id is tagged incase authenticated_entity is clobbered
|
||||
opentracing.set_tag("user_id", user.to_string())
|
||||
|
||||
return synapse.types.create_requester(
|
||||
user, token_id, is_guest, device_id, app_service=app_service
|
||||
)
|
||||
|
||||
@@ -25,6 +25,7 @@ from twisted.internet import defer
|
||||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
|
||||
def on_query_user_devices(self, origin, user_id):
|
||||
return self.on_query_request("user_devices", user_id)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_claim_client_keys(self, origin, content):
|
||||
@@ -515,6 +517,9 @@ class FederationServer(FederationBase):
|
||||
for device_id, algorithm in device_keys.items():
|
||||
query.append((user_id, device_id, algorithm))
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "Claiming one time keys.", "user, device pairs": query}
|
||||
)
|
||||
results = yield self.store.claim_e2e_one_time_keys(query)
|
||||
|
||||
json_result = {}
|
||||
@@ -808,12 +813,13 @@ class FederationHandlerRegistry(object):
|
||||
if not handler:
|
||||
logger.warn("No handler registered for EDU type %s", edu_type)
|
||||
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
with opentracing.start_active_span_from_edu(content, "handle_edu"):
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
|
||||
def on_query(self, query_type, args):
|
||||
handler = self.query_handlers.get(query_type)
|
||||
|
||||
@@ -16,10 +16,12 @@
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
FederationDeniedError,
|
||||
HttpResponseException,
|
||||
@@ -204,97 +206,137 @@ class PerDestinationQueue(object):
|
||||
|
||||
pending_edus = device_update_edus + to_device_edus
|
||||
|
||||
# BEGIN CRITICAL SECTION
|
||||
#
|
||||
# In order to avoid a race condition, we need to make sure that
|
||||
# the following code (from popping the queues up to the point
|
||||
# where we decide if we actually have any pending messages) is
|
||||
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||
# meantime, but not get sent because we hold the
|
||||
# transmission_loop_running flag.
|
||||
# Make a transaction sending span, this span follows on from all the
|
||||
# edus in that transaction. This needs to be done because if the edus
|
||||
# are never received on the remote the span effectively has no causality.
|
||||
|
||||
pending_pdus = self._pending_pdus
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
|
||||
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=False))
|
||||
pending_presence = self._pending_presence
|
||||
self._pending_presence = {}
|
||||
if pending_presence:
|
||||
pending_edus.append(
|
||||
Edu(
|
||||
origin=self._server_name,
|
||||
destination=self._destination,
|
||||
edu_type="m.presence",
|
||||
content={
|
||||
"push": [
|
||||
format_user_presence_state(
|
||||
presence, self._clock.time_msec()
|
||||
)
|
||||
for presence in pending_presence.values()
|
||||
]
|
||||
},
|
||||
)
|
||||
span_contexts = [
|
||||
opentracing.extract_text_map(
|
||||
json.loads(
|
||||
edu.get_dict().get("content", {}).get("context", "{}")
|
||||
).get("opentracing", {})
|
||||
)
|
||||
for edu in pending_edus
|
||||
]
|
||||
|
||||
pending_edus.extend(
|
||||
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
|
||||
)
|
||||
while (
|
||||
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
|
||||
and self._pending_edus_keyed
|
||||
with opentracing.start_active_span_follows_from(
|
||||
"send_transaction", span_contexts
|
||||
):
|
||||
_, val = self._pending_edus_keyed.popitem()
|
||||
pending_edus.append(val)
|
||||
# Link each sent edu to this transaction's span
|
||||
_pending_edus = []
|
||||
for edu in pending_edus:
|
||||
edu_dict = edu.get_dict()
|
||||
span_context = json.loads(
|
||||
edu_dict.get("content", {}).get("context", "{}")
|
||||
).get("opentracing", {})
|
||||
# If there is no span context then we are either blacklisting
|
||||
# this destination or we are not tracing
|
||||
if span_context:
|
||||
span_context.setdefault("references", []).append(
|
||||
opentracing.active_span_context_as_string()
|
||||
)
|
||||
edu_dict["content"]["context"] = json.dumps(
|
||||
{"opentracing": span_context}
|
||||
)
|
||||
_pending_edus.append(Edu(**edu_dict))
|
||||
pending_edus = _pending_edus
|
||||
|
||||
if pending_pdus:
|
||||
logger.debug(
|
||||
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||
self._destination,
|
||||
len(pending_pdus),
|
||||
# BEGIN CRITICAL SECTION
|
||||
#
|
||||
# In order to avoid a race condition, we need to make sure that
|
||||
# the following code (from popping the queues up to the point
|
||||
# where we decide if we actually have any pending messages) is
|
||||
# atomic - otherwise new PDUs or EDUs might arrive in the
|
||||
# meantime, but not get sent because we hold the
|
||||
# transmission_loop_running flag.
|
||||
|
||||
pending_pdus = self._pending_pdus
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, self._pending_pdus = (
|
||||
pending_pdus[:50],
|
||||
pending_pdus[50:],
|
||||
)
|
||||
|
||||
if not pending_pdus and not pending_edus:
|
||||
logger.debug("TX [%s] Nothing to send", self._destination)
|
||||
self._last_device_stream_id = device_stream_id
|
||||
return
|
||||
|
||||
# if we've decided to send a transaction anyway, and we have room, we
|
||||
# may as well send any pending RRs
|
||||
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=True))
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
# Remove the acknowledged device messages from the database
|
||||
# Only bother if we actually sent some device messages
|
||||
if to_device_edus:
|
||||
yield self._store.delete_device_msgs_for_remote(
|
||||
self._destination, device_stream_id
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=False))
|
||||
pending_presence = self._pending_presence
|
||||
self._pending_presence = {}
|
||||
if pending_presence:
|
||||
pending_edus.append(
|
||||
Edu(
|
||||
origin=self._server_name,
|
||||
destination=self._destination,
|
||||
edu_type="m.presence",
|
||||
content={
|
||||
"push": [
|
||||
format_user_presence_state(
|
||||
presence, self._clock.time_msec()
|
||||
)
|
||||
for presence in pending_presence.values()
|
||||
]
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# also mark the device updates as sent
|
||||
if device_update_edus:
|
||||
logger.info(
|
||||
"Marking as sent %r %r", self._destination, dev_list_id
|
||||
pending_edus.extend(
|
||||
self._pop_pending_edus(
|
||||
MAX_EDUS_PER_TRANSACTION - len(pending_edus)
|
||||
)
|
||||
yield self._store.mark_as_sent_devices_by_remote(
|
||||
self._destination, dev_list_id
|
||||
)
|
||||
while (
|
||||
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
|
||||
and self._pending_edus_keyed
|
||||
):
|
||||
_, val = self._pending_edus_keyed.popitem()
|
||||
pending_edus.append(val)
|
||||
|
||||
if pending_pdus:
|
||||
logger.debug(
|
||||
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||
self._destination,
|
||||
len(pending_pdus),
|
||||
)
|
||||
|
||||
self._last_device_stream_id = device_stream_id
|
||||
self._last_device_list_stream_id = dev_list_id
|
||||
else:
|
||||
break
|
||||
if not pending_pdus and not pending_edus:
|
||||
logger.debug("TX [%s] Nothing to send", self._destination)
|
||||
self._last_device_stream_id = device_stream_id
|
||||
return
|
||||
|
||||
# if we've decided to send a transaction anyway, and we have room, we
|
||||
# may as well send any pending RRs
|
||||
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
|
||||
pending_edus.extend(self._get_rr_edus(force_flush=True))
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._transaction_manager.send_new_transaction(
|
||||
self._destination, pending_pdus, pending_edus
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
# Remove the acknowledged device messages from the database
|
||||
# Only bother if we actually sent some device messages
|
||||
if to_device_edus:
|
||||
yield self._store.delete_device_msgs_for_remote(
|
||||
self._destination, device_stream_id
|
||||
)
|
||||
|
||||
# also mark the device updates as sent
|
||||
if device_update_edus:
|
||||
logger.info(
|
||||
"Marking as sent %r %r", self._destination, dev_list_id
|
||||
)
|
||||
yield self._store.mark_as_sent_devices_by_remote(
|
||||
self._destination, dev_list_id
|
||||
)
|
||||
|
||||
self._last_device_stream_id = device_stream_id
|
||||
self._last_device_list_stream_id = dev_list_id
|
||||
else:
|
||||
break
|
||||
except NotRetryingDestination as e:
|
||||
logger.debug(
|
||||
"TX [%s] not ready for retry yet (next retry at %s) - "
|
||||
|
||||
@@ -297,6 +297,7 @@ class BaseFederationServlet(object):
|
||||
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
|
||||
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
},
|
||||
):
|
||||
if origin:
|
||||
|
||||
@@ -18,6 +18,7 @@ from six import iteritems, itervalues
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api import errors
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import (
|
||||
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
self.state = hs.get_state_handler()
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_user(self, user_id):
|
||||
"""
|
||||
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
defer.Deferred: list[dict[str, X]]: info on each device
|
||||
"""
|
||||
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
device_map = yield self.store.get_devices_by_user(user_id)
|
||||
|
||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
|
||||
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
for device in devices:
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
opentracing.log_kv(device_map)
|
||||
return devices
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_device(self, user_id, device_id):
|
||||
""" Retrieve the given device
|
||||
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
raise errors.NotFoundError
|
||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
opentracing.set_tag("device", device)
|
||||
opentracing.set_tag("ips", ips)
|
||||
|
||||
return device
|
||||
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_user_ids_changed(self, user_id, from_token):
|
||||
"""Get list of users that have had the devices updated, or have newly
|
||||
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
user_id (str)
|
||||
from_token (StreamToken)
|
||||
"""
|
||||
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("from_token", from_token)
|
||||
now_room_key = yield self.store.get_room_events_max_id()
|
||||
|
||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||
@@ -133,7 +146,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
if etype != EventTypes.Member:
|
||||
continue
|
||||
possibly_left.add(state_key)
|
||||
continue
|
||||
continue
|
||||
|
||||
# Fetch the current state at the time.
|
||||
try:
|
||||
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
# special-case for an empty prev state: include all members
|
||||
# in the changed list
|
||||
if not event_ids:
|
||||
opentracing.log_kv(
|
||||
{"event": "encountered empty previous state", "room_id": room_id}
|
||||
)
|
||||
for key, event_id in iteritems(current_state_ids):
|
||||
etype, state_key = key
|
||||
if etype != EventTypes.Member:
|
||||
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
possibly_joined = []
|
||||
possibly_left = []
|
||||
|
||||
return {"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
result = {"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
|
||||
opentracing.log_kv(result)
|
||||
|
||||
return {result}
|
||||
|
||||
|
||||
class DeviceHandler(DeviceWorkerHandler):
|
||||
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_device(self, user_id, device_id):
|
||||
""" Delete the given device
|
||||
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{"reason": "User doesn't have device id.", "device_id": device_id}
|
||||
)
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_all_devices_for_user(self, user_id, except_device_id=None):
|
||||
"""Delete all of the user's devices
|
||||
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
except errors.StoreError as e:
|
||||
if e.code == 404:
|
||||
# no match
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", "User doesn't have that device id.")
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
else:
|
||||
raise
|
||||
|
||||
@opentracing.trace
|
||||
@measure_func("notify_device_update")
|
||||
@defer.inlineCallbacks
|
||||
def notify_device_update(self, user_id, device_ids):
|
||||
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
|
||||
hosts.discard(self.server_name)
|
||||
|
||||
opentracing.set_tag("hosts to update", hosts)
|
||||
|
||||
position = yield self.store.add_device_change_to_streams(
|
||||
user_id, device_ids, list(hosts)
|
||||
)
|
||||
@@ -405,6 +436,9 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
)
|
||||
for host in hosts:
|
||||
self.federation_sender.send_device_messages(host)
|
||||
opentracing.log_kv(
|
||||
{"message": "sent device update to host", "host": host}
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_federation_query_user_devices(self, user_id):
|
||||
@@ -451,12 +485,15 @@ class DeviceListUpdater(object):
|
||||
iterable=True,
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def incoming_device_list_update(self, origin, edu_content):
|
||||
"""Called on incoming device list update from federation. Responsible
|
||||
for parsing the EDU and adding to pending updates list.
|
||||
"""
|
||||
|
||||
opentracing.set_tag("origin", origin)
|
||||
opentracing.set_tag("edu_content", edu_content)
|
||||
user_id = edu_content.pop("user_id")
|
||||
device_id = edu_content.pop("device_id")
|
||||
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
|
||||
@@ -471,12 +508,30 @@ class DeviceListUpdater(object):
|
||||
device_id,
|
||||
origin,
|
||||
)
|
||||
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Got a device list update edu from a user and "
|
||||
"device which does not match the origin of the request.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
return
|
||||
|
||||
room_ids = yield self.store.get_rooms_for_user(user_id)
|
||||
if not room_ids:
|
||||
# We don't share any rooms with this user. Ignore update, as we
|
||||
# probably won't get any further updates.
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Got an update from a user for which "
|
||||
"we don't share any rooms",
|
||||
"other user_id": user_id,
|
||||
}
|
||||
)
|
||||
logger.warning(
|
||||
"Got device list update edu for %r/%r, but don't share a room",
|
||||
user_id,
|
||||
@@ -578,6 +633,7 @@ class DeviceListUpdater(object):
|
||||
request:
|
||||
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
|
||||
"""
|
||||
opentracing.log_kv({"message": "Doing resync to update device list."})
|
||||
# Fetch all devices for the user.
|
||||
origin = get_domain_from_id(user_id)
|
||||
try:
|
||||
@@ -594,13 +650,20 @@ class DeviceListUpdater(object):
|
||||
# eventually become consistent.
|
||||
return
|
||||
except FederationDeniedError as e:
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv({"reason": "FederationDeniedError"})
|
||||
logger.info(e)
|
||||
return
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{"message": "Exception raised by federation request", "exception": e}
|
||||
)
|
||||
logger.exception("Failed to handle device list update for %s", user_id)
|
||||
return
|
||||
opentracing.log_kv({"result": result})
|
||||
stream_id = result["stream_id"]
|
||||
devices = result["devices"]
|
||||
|
||||
|
||||
@@ -15,8 +15,11 @@
|
||||
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -78,7 +81,8 @@ class DeviceMessageHandler(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_device_message(self, sender_user_id, message_type, messages):
|
||||
|
||||
opentracing.set_tag("number_of_messages", len(messages))
|
||||
opentracing.set_tag("sender", sender_user_id)
|
||||
local_messages = {}
|
||||
remote_messages = {}
|
||||
for user_id, by_device in messages.items():
|
||||
@@ -100,15 +104,24 @@ class DeviceMessageHandler(object):
|
||||
|
||||
message_id = random_string(16)
|
||||
|
||||
context = {"opentracing": {}}
|
||||
opentracing.inject_active_span_text_map(context["opentracing"])
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
}
|
||||
with opentracing.start_active_span("to_device_for_user"):
|
||||
opentracing.set_tag("destination", destination)
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"context": json.dumps(context)
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
|
||||
opentracing.log_kv({"local_messages": local_messages})
|
||||
stream_id = yield self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
)
|
||||
@@ -117,6 +130,7 @@ class DeviceMessageHandler(object):
|
||||
"to_device_key", stream_id, users=local_messages.keys()
|
||||
)
|
||||
|
||||
opentracing.log_kv({"remote_messages": remote_messages})
|
||||
for destination in remote_messages.keys():
|
||||
# Enqueue a new federation transaction to send the new
|
||||
# device messages to each remote destination.
|
||||
|
||||
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import CodeMessageException, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
|
||||
"client_keys", self.on_federation_query_client_keys
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body, timeout):
|
||||
""" Handle a device key query from a client
|
||||
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
|
||||
else:
|
||||
remote_queries[user_id] = device_ids
|
||||
|
||||
opentracing.set_tag("local_key_query", local_query)
|
||||
opentracing.set_tag("remote_key_query", remote_queries)
|
||||
|
||||
# First get local devices.
|
||||
failures = {}
|
||||
results = {}
|
||||
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
|
||||
r[user_id] = remote_queries[user_id]
|
||||
|
||||
# Now fetch any devices that we don't have in our cache
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def do_remote_query(destination):
|
||||
"""This is called when we are querying the device list of a user on
|
||||
@@ -131,6 +137,7 @@ class E2eKeysHandler(object):
|
||||
|
||||
destination_query = remote_queries_not_in_cache[destination]
|
||||
|
||||
opentracing.set_tag("key_query", destination_query)
|
||||
# We first consider whether we wish to update the device list cache with
|
||||
# the users device list. We want to track a user's devices when the
|
||||
# authenticated user shares a room with the queried user and the query
|
||||
@@ -185,6 +192,8 @@ class E2eKeysHandler(object):
|
||||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -198,6 +207,7 @@ class E2eKeysHandler(object):
|
||||
|
||||
return {"device_keys": results, "failures": failures}
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
"""Get E2E device keys for local users
|
||||
@@ -210,6 +220,7 @@ class E2eKeysHandler(object):
|
||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
map from user_id -> device_id -> device details
|
||||
"""
|
||||
opentracing.set_tag("local_query", query)
|
||||
local_query = []
|
||||
|
||||
result_dict = {}
|
||||
@@ -217,6 +228,14 @@ class E2eKeysHandler(object):
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Requested a local key for a user which"
|
||||
" was not local to the homeserver",
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
opentracing.set_tag("error", True)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not device_ids:
|
||||
@@ -241,6 +260,7 @@ class E2eKeysHandler(object):
|
||||
r["unsigned"]["device_display_name"] = display_name
|
||||
result_dict[user_id][device_id] = r
|
||||
|
||||
opentracing.log_kv(result_dict)
|
||||
return result_dict
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -251,6 +271,7 @@ class E2eKeysHandler(object):
|
||||
res = yield self.query_local_devices(device_keys_query)
|
||||
return {"device_keys": res}
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_one_time_keys(self, query, timeout):
|
||||
local_query = []
|
||||
@@ -265,6 +286,9 @@ class E2eKeysHandler(object):
|
||||
domain = get_domain_from_id(user_id)
|
||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||
|
||||
opentracing.set_tag("local_key_query", local_query)
|
||||
opentracing.set_tag("remote_key_query", remote_queries)
|
||||
|
||||
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
@@ -276,8 +300,10 @@ class E2eKeysHandler(object):
|
||||
key_id: json.loads(json_bytes)
|
||||
}
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_client_keys(destination):
|
||||
opentracing.set_tag("destination", destination)
|
||||
device_keys = remote_queries[destination]
|
||||
try:
|
||||
remote_result = yield self.federation.claim_client_keys(
|
||||
@@ -290,6 +316,8 @@ class E2eKeysHandler(object):
|
||||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -313,9 +341,11 @@ class E2eKeysHandler(object):
|
||||
),
|
||||
)
|
||||
|
||||
opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
|
||||
return {"one_time_keys": json_result, "failures": failures}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@opentracing.tag_args
|
||||
def upload_keys_for_user(self, user_id, device_id, keys):
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
@@ -329,6 +359,13 @@ class E2eKeysHandler(object):
|
||||
user_id,
|
||||
time_now,
|
||||
)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Updating device_keys for user.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
# TODO: Sign the JSON with the server key
|
||||
changed = yield self.store.set_e2e_device_keys(
|
||||
user_id, device_id, time_now, device_keys
|
||||
@@ -336,12 +373,26 @@ class E2eKeysHandler(object):
|
||||
if changed:
|
||||
# Only notify about device updates *if* the keys actually changed
|
||||
yield self.device_handler.notify_device_update(user_id, [device_id])
|
||||
|
||||
else:
|
||||
opentracing.log_kv(
|
||||
{"message": "Not updating device_keys for user", "user_id": user_id}
|
||||
)
|
||||
one_time_keys = keys.get("one_time_keys", None)
|
||||
if one_time_keys:
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Updating one_time_keys for device.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
yield self._upload_one_time_keys_for_user(
|
||||
user_id, device_id, time_now, one_time_keys
|
||||
)
|
||||
else:
|
||||
opentracing.log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
|
||||
# the device should have been registered already, but it may have been
|
||||
# deleted due to a race with a DELETE request. Or we may be using an
|
||||
@@ -352,6 +403,7 @@ class E2eKeysHandler(object):
|
||||
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
|
||||
opentracing.set_tag("one_time_key_counts", result)
|
||||
return {"one_time_key_counts": result}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -395,6 +447,9 @@ class E2eKeysHandler(object):
|
||||
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
||||
)
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "Inserting new one_time_keys.", "keys": new_keys}
|
||||
)
|
||||
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from six import iteritems
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
NotFoundError,
|
||||
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
|
||||
# changed.
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
|
||||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
opentracing.log_kv(results)
|
||||
return results
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
|
||||
with (yield self._upload_linearizer.queue(user_id)):
|
||||
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def upload_room_keys(self, user_id, version, room_keys):
|
||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
|
||||
session_id(str): the session whose room_key we're setting
|
||||
room_key(dict): the room_key being set
|
||||
"""
|
||||
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Trying to upload room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
# get the room_key for this particular row
|
||||
current_room_key = None
|
||||
try:
|
||||
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
|
||||
)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
pass
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Room key not found.",
|
||||
"room_id": room_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
if self._should_replace_room_key(current_room_key, room_key):
|
||||
opentracing.log_kv({"message": "Replacing room key."})
|
||||
yield self.store.set_e2e_room_key(
|
||||
user_id, version, room_id, session_id, room_key
|
||||
)
|
||||
else:
|
||||
opentracing.log_kv({"message": "Not replacing room_key."})
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(current_room_key, room_key):
|
||||
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def create_version(self, user_id, version_info):
|
||||
"""Create a new backup version. This automatically becomes the new
|
||||
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
|
||||
raise
|
||||
return res
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_version(self, user_id, version=None):
|
||||
"""Deletes a given version of the user's e2e_room_keys backup
|
||||
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
|
||||
else:
|
||||
raise
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def update_version(self, user_id, version, version_info):
|
||||
"""Update the info about a given version of the user's backup
|
||||
|
||||
@@ -527,6 +527,31 @@ def inject_active_span_text_map(carrier, destination=None):
|
||||
)
|
||||
|
||||
|
||||
def get_active_span_text_map(destination=None):
|
||||
"""
|
||||
Gets a span context as a dict. This can be used instead of injecting a span
|
||||
into an empty carrier.
|
||||
|
||||
Args:
|
||||
destination (str): the name of the remote server. The dict will only
|
||||
contain a span context if the destination matches the homeserver_whitelist
|
||||
or if destination is None.
|
||||
|
||||
Returns:
|
||||
A dict containing the span context.
|
||||
"""
|
||||
|
||||
if not opentracing or (destination and not whitelisted_homeserver(destination)):
|
||||
return {}
|
||||
|
||||
carrier = {}
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
|
||||
return carrier
|
||||
|
||||
|
||||
def active_span_context_as_string():
|
||||
"""
|
||||
Returns:
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
|
||||
self.auth = hs.get_auth()
|
||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||
|
||||
@opentracing.trace_using_operation_name("upload_keys")
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, device_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
|
||||
# passing the device_id here is deprecated; however, we allow it
|
||||
# for now for compatibility with older clients.
|
||||
if requester.device_id is not None and device_id != requester.device_id:
|
||||
opentracing.set_tag("error", True)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Client uploading keys for a different device",
|
||||
"logged_in_id": requester.device_id,
|
||||
"key_being_uploaded": device_id,
|
||||
}
|
||||
)
|
||||
logger.warning(
|
||||
"Client uploading keys for a different device "
|
||||
"(logged in as %s, uploading for %s)",
|
||||
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
from_token_string = parse_string(request, "from")
|
||||
opentracing.set_tag("from", from_token_string)
|
||||
|
||||
# We want to enforce they do pass us one, but we ignore it and return
|
||||
# changes after the "to" as well as before.
|
||||
parse_string(request, "to")
|
||||
opentracing.set_tag("to", parse_string(request, "to"))
|
||||
|
||||
from_token = StreamToken.from_string(from_token_string)
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.http import servlet
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
|
||||
self.txns = HttpTransactionCache(hs)
|
||||
self.device_message_handler = hs.get_device_message_handler()
|
||||
|
||||
@opentracing.trace_using_operation_name("sendToDevice")
|
||||
def on_PUT(self, request, message_type, txn_id):
|
||||
opentracing.set_tag("message_type", message_type)
|
||||
opentracing.set_tag("txn_id", txn_id)
|
||||
return self.txns.fetch_or_execute_request(
|
||||
request, self._put, request, message_type, txn_id
|
||||
)
|
||||
|
||||
@@ -19,6 +19,7 @@ from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
"get_new_messages_for_device", get_new_messages_for_device_txn
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
|
||||
"""
|
||||
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
(user_id, device_id), None
|
||||
)
|
||||
|
||||
opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
|
||||
|
||||
if last_deleted_stream_id:
|
||||
has_changed = self._device_inbox_stream_cache.has_entity_changed(
|
||||
user_id, last_deleted_stream_id
|
||||
)
|
||||
if not has_changed:
|
||||
opentracing.log_kv({"message": "No changes in cache since last check"})
|
||||
return 0
|
||||
|
||||
def delete_messages_for_device_txn(txn):
|
||||
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
"delete_messages_for_device", delete_messages_for_device_txn
|
||||
)
|
||||
|
||||
opentracing.log_kv(
|
||||
{"message": "deleted {} messages for device".format(count), "count": count}
|
||||
)
|
||||
|
||||
# Update the cache, ensuring that we only ever increase the value
|
||||
last_deleted_stream_id = self._last_device_delete_cache.get(
|
||||
(user_id, device_id), 0
|
||||
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
|
||||
return count
|
||||
|
||||
@opentracing.trace
|
||||
def get_new_device_msgs_for_remote(
|
||||
self, destination, last_stream_id, current_stream_id, limit
|
||||
):
|
||||
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
in the stream the messages got to.
|
||||
"""
|
||||
|
||||
opentracing.set_tag("destination", destination)
|
||||
opentracing.set_tag("last_stream_id", last_stream_id)
|
||||
opentracing.set_tag("current_stream_id", current_stream_id)
|
||||
opentracing.set_tag("limit", limit)
|
||||
|
||||
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
|
||||
destination, last_stream_id
|
||||
)
|
||||
if not has_changed or last_stream_id == current_stream_id:
|
||||
opentracing.log_kv({"message": "No new messages in stream"})
|
||||
return defer.succeed(([], current_stream_id))
|
||||
|
||||
if limit <= 0:
|
||||
# This can happen if we run out of room for EDUs in the transaction.
|
||||
return defer.succeed(([], last_stream_id))
|
||||
|
||||
@opentracing.trace
|
||||
def get_new_messages_for_remote_destination_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, messages_json FROM device_federation_outbox"
|
||||
@@ -156,6 +174,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
stream_pos = row[0]
|
||||
messages.append(json.loads(row[1]))
|
||||
if len(messages) < limit:
|
||||
opentracing.log_kv(
|
||||
{"message": "Set stream position to current position"}
|
||||
)
|
||||
stream_pos = current_stream_id
|
||||
return (messages, stream_pos)
|
||||
|
||||
@@ -164,6 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
||||
get_new_messages_for_remote_destination_txn,
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
|
||||
"""Used to delete messages when the remote destination acknowledges
|
||||
their receipt.
|
||||
@@ -214,6 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
||||
expiry_ms=30 * 60 * 1000,
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def add_messages_to_device_inbox(
|
||||
self, local_messages_by_user_then_device, remote_messages_by_destination
|
||||
|
||||
@@ -20,6 +20,7 @@ from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
|
||||
@@ -73,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
|
||||
return {d["device_id"]: d for d in devices}
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_remote(self, destination, from_stream_id, limit):
|
||||
"""Get stream of updates to send to remote servers
|
||||
@@ -127,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
# (user_id, device_id) entries into a map, with the value being
|
||||
# the max stream_id across each set of duplicate entries
|
||||
#
|
||||
# maps (user_id, device_id) -> stream_id
|
||||
# maps (user_id, device_id) -> (stream_id, context)
|
||||
# as long as their stream_id does not match that of the last row
|
||||
# where context is any metadata about the message's context such as
|
||||
# opentracing data
|
||||
query_map = {}
|
||||
for update in updates:
|
||||
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
|
||||
@@ -136,7 +140,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
break
|
||||
|
||||
key = (update[0], update[1])
|
||||
query_map[key] = max(query_map.get(key, 0), update[2])
|
||||
context = update[3]
|
||||
stream_id = max(query_map.get(key, (0, None))[0], update[2])
|
||||
|
||||
query_map[key] = (stream_id, context)
|
||||
|
||||
# If we didn't find any updates with a stream_id lower than the cutoff, it
|
||||
# means that there are more than limit updates all of which have the same
|
||||
@@ -171,7 +178,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
List: List of device updates
|
||||
"""
|
||||
sql = """
|
||||
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
|
||||
SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
|
||||
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
@@ -187,8 +194,8 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
Args:
|
||||
destination (str): The host the device updates are intended for
|
||||
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
|
||||
query_map (Dict[(str, str): int]): Dictionary mapping
|
||||
user_id/device_id to update stream_id
|
||||
query_map (Dict[(str, str): (int, str)]): Dictionary mapping
|
||||
user_id/device_id to update stream_id and the relevent opentracing context
|
||||
|
||||
Returns:
|
||||
List[Dict]: List of objects representing an device update EDU
|
||||
@@ -210,12 +217,15 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
destination, user_id, from_stream_id
|
||||
)
|
||||
for device_id, device in iteritems(user_devices):
|
||||
stream_id = query_map[(user_id, device_id)]
|
||||
stream_id, _ = query_map[(user_id, device_id)]
|
||||
result = {
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"prev_id": [prev_id] if prev_id else [],
|
||||
"stream_id": stream_id,
|
||||
"context": query_map[(user_id, device_id)][1]
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
|
||||
prev_id = stream_id
|
||||
@@ -299,6 +309,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
def get_device_stream_token(self):
|
||||
return self._device_list_id_gen.get_current_token()
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_user_devices_from_cache(self, query_list):
|
||||
"""Get the devices (and keys if any) for remote users from the cache.
|
||||
@@ -330,6 +341,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
else:
|
||||
results[user_id] = yield self._get_cached_devices_for_user(user_id)
|
||||
|
||||
opentracing.set_tag("in_cache", results)
|
||||
opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
|
||||
|
||||
return (user_ids_not_in_cache, results)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, tree=True)
|
||||
@@ -814,6 +828,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
],
|
||||
)
|
||||
|
||||
context = {"opentracing": opentracing.get_active_span_text_map()}
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="device_lists_outbound_pokes",
|
||||
@@ -825,6 +841,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
"device_id": device_id,
|
||||
"sent": False,
|
||||
"ts": now,
|
||||
"context": json.dumps(context)
|
||||
if opentracing.whitelisted_homeserver(destination)
|
||||
else "{}",
|
||||
}
|
||||
for destination in hosts
|
||||
for device_id in device_ids
|
||||
|
||||
@@ -17,6 +17,7 @@ import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import StoreError
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Set room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"room_key": room_key,
|
||||
}
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
|
||||
return sessions
|
||||
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
def create_e2e_room_keys_version(self, user_id, info):
|
||||
"""Atomically creates a new version of this user's e2e_room_keys store
|
||||
with the given version info.
|
||||
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
def update_e2e_room_keys_version(self, user_id, version, info):
|
||||
"""Update a given backup version
|
||||
|
||||
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
desc="update_e2e_room_keys_version",
|
||||
)
|
||||
|
||||
@opentracing.trace
|
||||
def delete_e2e_room_keys_version(self, user_id, version=None):
|
||||
"""Delete a given backup version of the user's room keys.
|
||||
Doesn't delete their actual key data.
|
||||
|
||||
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
||||
class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
@opentracing.trace
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_device_keys(
|
||||
self, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
Dict mapping from user-id to dict mapping from device_id to
|
||||
dict containing "key_json", "device_display_name".
|
||||
"""
|
||||
opentracing.set_tag("query_list", query_list)
|
||||
if not query_list:
|
||||
return {}
|
||||
|
||||
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
|
||||
return results
|
||||
|
||||
@opentracing.trace
|
||||
def _get_e2e_device_keys_txn(
|
||||
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
):
|
||||
opentracing.set_tag("include_all_devices", include_all_devices)
|
||||
opentracing.set_tag("include_deleted_devices", include_deleted_devices)
|
||||
|
||||
query_clauses = []
|
||||
query_params = []
|
||||
|
||||
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
for user_id, device_id in deleted_devices:
|
||||
result.setdefault(user_id, {})[device_id] = None
|
||||
|
||||
opentracing.log_kv(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -129,8 +137,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
desc="add_e2e_one_time_keys_check",
|
||||
)
|
||||
|
||||
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
opentracing.log_kv(
|
||||
{"message": "Fetched one time keys for user", "one_time_keys": result}
|
||||
)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
||||
@@ -146,6 +157,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
"""
|
||||
|
||||
def _add_e2e_one_time_keys(txn):
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("device_id", device_id)
|
||||
opentracing.set_tag("new_keys", new_keys)
|
||||
# We are protected from race between lookup and insertion due to
|
||||
# a unique constraint. If there is a race of two calls to
|
||||
# `add_e2e_one_time_keys` then they'll conflict and we will only
|
||||
@@ -202,6 +216,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
"""
|
||||
|
||||
def _set_e2e_device_keys_txn(txn):
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
opentracing.set_tag("device_id", device_id)
|
||||
opentracing.set_tag("time_now", time_now)
|
||||
opentracing.set_tag("device_keys", device_keys)
|
||||
|
||||
old_key_json = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
@@ -215,6 +234,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
||||
|
||||
if old_key_json == new_key_json:
|
||||
opentracing.log_kv({"Message": "Device key already stored."})
|
||||
return False
|
||||
|
||||
self._simple_upsert_txn(
|
||||
@@ -223,7 +243,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
values={"ts_added_ms": time_now, "key_json": new_key_json},
|
||||
)
|
||||
|
||||
opentracing.log_kv({"message": "Device keys stored."})
|
||||
return True
|
||||
|
||||
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
|
||||
@@ -231,6 +251,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
def claim_e2e_one_time_keys(self, query_list):
|
||||
"""Take a list of one time keys out of the database"""
|
||||
|
||||
@opentracing.trace
|
||||
def _claim_e2e_one_time_keys(txn):
|
||||
sql = (
|
||||
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
|
||||
@@ -252,7 +273,15 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
" AND key_id = ?"
|
||||
)
|
||||
for user_id, device_id, algorithm, key_id in delete:
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Executing claim e2e_one_time_keys transaction on database."
|
||||
}
|
||||
)
|
||||
txn.execute(sql, (user_id, device_id, algorithm, key_id))
|
||||
opentracing.log_kv(
|
||||
{"message": "finished executing and invalidating cache"}
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.count_e2e_one_time_keys, (user_id, device_id)
|
||||
)
|
||||
@@ -262,6 +291,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
|
||||
def delete_e2e_keys_by_device(self, user_id, device_id):
|
||||
def delete_e2e_keys_by_device_txn(txn):
|
||||
opentracing.log_kv(
|
||||
{
|
||||
"message": "Deleting keys for device",
|
||||
"device_id": device_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Opentracing needs to inject a span_context into data item which leaves the
|
||||
* current execution context. Since device list updates are dumped in here
|
||||
* and then processed later they need to include the span context for opentraing.
|
||||
* Since we may also decide later to include other tracking information the column
|
||||
* has just been called "context", the structure of the data within it may change.
|
||||
*/
|
||||
ALTER TABLE device_lists_outbound_pokes ADD context TEXT;
|
||||
Reference in New Issue
Block a user