Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 074e7b185f | |||
| 52135531cc | |||
| 83864cec6a | |||
| d910c4418e | |||
| f73a196ff2 | |||
| 06bae97015 | |||
| 75483b6bf9 | |||
| 9ee798e327 | |||
| 458040a081 | |||
| 38c2c5a215 | |||
| f7c873a643 | |||
| bc604e7f94 | |||
| 1a6ae33309 | |||
| ef20aa52eb | |||
| 7093790fbc | |||
| 5ade977d08 | |||
| 909827b422 | |||
| 93bc9d73bf | |||
| 1d65292e94 | |||
| a0d294c306 | |||
| b9cfd3c375 | |||
| 90d17a3d28 |
@@ -0,0 +1 @@
|
||||
Compatibility with v2 Identity Service APIs other than /lookup.
|
||||
@@ -0,0 +1 @@
|
||||
Add POST /_matrix/client/r0/account/3pid/unbind endpoint from MSC2140 for unbinding a 3PID from an identity server without removing it from the homeserver user account.
|
||||
@@ -0,0 +1 @@
|
||||
Include missing opentracing contexts in outbout replication requests.
|
||||
@@ -0,0 +1 @@
|
||||
Add minimum opentracing for client servlets.
|
||||
@@ -0,0 +1 @@
|
||||
Fix sending of EDUs when opentracing is enabled with an empty whitelist.
|
||||
@@ -0,0 +1 @@
|
||||
Trace replication send times.
|
||||
@@ -0,0 +1 @@
|
||||
Fix invalid references to None while opentracing if the log context slips.
|
||||
@@ -0,0 +1 @@
|
||||
Give appropriate exit codes when synctl fails.
|
||||
@@ -268,6 +268,7 @@ class SynapseCmd(cmd.Cmd):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_emailrequest(self, args):
|
||||
# TODO: Update to use v2 Identity Service API endpoint
|
||||
url = (
|
||||
self._identityServerUrl()
|
||||
+ "/_matrix/identity/api/v1/validate/email/requestToken"
|
||||
@@ -302,6 +303,7 @@ class SynapseCmd(cmd.Cmd):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_emailvalidate(self, args):
|
||||
# TODO: Update to use v2 Identity Service API endpoint
|
||||
url = (
|
||||
self._identityServerUrl()
|
||||
+ "/_matrix/identity/api/v1/validate/email/submitToken"
|
||||
@@ -330,6 +332,7 @@ class SynapseCmd(cmd.Cmd):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_3pidbind(self, args):
|
||||
# TODO: Update to use v2 Identity Service API endpoint
|
||||
url = self._identityServerUrl() + "/_matrix/identity/api/v1/3pid/bind"
|
||||
|
||||
json_res = yield self.http_client.do_request(
|
||||
@@ -398,6 +401,7 @@ class SynapseCmd(cmd.Cmd):
|
||||
@defer.inlineCallbacks
|
||||
def _do_invite(self, roomid, userstring):
|
||||
if not userstring.startswith("@") and self._is_on("complete_usernames"):
|
||||
# TODO: Update to use v2 Identity Service API endpoint
|
||||
url = self._identityServerUrl() + "/_matrix/identity/api/v1/lookup"
|
||||
|
||||
json_res = yield self.http_client.do_request(
|
||||
@@ -407,6 +411,7 @@ class SynapseCmd(cmd.Cmd):
|
||||
mxid = None
|
||||
|
||||
if "mxid" in json_res and "signatures" in json_res:
|
||||
# TODO: Update to use v2 Identity Service API endpoint
|
||||
url = (
|
||||
self._identityServerUrl()
|
||||
+ "/_matrix/identity/api/v1/pubkey/ed25519"
|
||||
|
||||
@@ -26,6 +26,7 @@ from synapse.logging.opentracing import (
|
||||
set_tag,
|
||||
start_active_span_follows_from,
|
||||
tags,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.util.metrics import measure_func
|
||||
|
||||
@@ -59,9 +60,15 @@ class TransactionManager(object):
|
||||
# The span_contexts is a generator so that it won't be evaluated if
|
||||
# opentracing is disabled. (Yay speed!)
|
||||
|
||||
span_contexts = (
|
||||
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
|
||||
)
|
||||
span_contexts = []
|
||||
keep_destination = whitelisted_homeserver(destination)
|
||||
|
||||
for edu in pending_edus:
|
||||
context = edu.get_context()
|
||||
if context:
|
||||
span_contexts.append(extract_text_map(json.loads(context)))
|
||||
if keep_destination:
|
||||
edu.strip_context()
|
||||
|
||||
with start_active_span_follows_from("send_transaction", span_contexts):
|
||||
|
||||
|
||||
@@ -342,7 +342,11 @@ class BaseFederationServlet(object):
|
||||
continue
|
||||
|
||||
server.register_paths(
|
||||
method, (pattern,), self._wrap(code), self.__class__.__name__
|
||||
method,
|
||||
(pattern,),
|
||||
self._wrap(code),
|
||||
self.__class__.__name__,
|
||||
trace=False,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -41,6 +41,9 @@ class Edu(JsonEncodedObject):
|
||||
def get_context(self):
|
||||
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
|
||||
|
||||
def strip_context(self):
|
||||
getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
|
||||
|
||||
|
||||
class Transaction(JsonEncodedObject):
|
||||
""" A transaction is a list of Pdus and Edus to be sent to a remote home
|
||||
|
||||
@@ -25,7 +25,6 @@ from synapse.logging.opentracing import (
|
||||
log_kv,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -121,9 +120,7 @@ class DeviceMessageHandler(object):
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"org.matrix.opentracing_context": json.dumps(context)
|
||||
if whitelisted_homeserver(destination)
|
||||
else None,
|
||||
"org.matrix.opentracing_context": json.dumps(context),
|
||||
}
|
||||
|
||||
log_kv({"local_messages": local_messages})
|
||||
|
||||
+122
-43
@@ -61,21 +61,76 @@ class IdentityHandler(BaseHandler):
|
||||
return False
|
||||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def threepid_from_creds(self, creds):
|
||||
if "id_server" in creds:
|
||||
id_server = creds["id_server"]
|
||||
elif "idServer" in creds:
|
||||
id_server = creds["idServer"]
|
||||
else:
|
||||
raise SynapseError(400, "No id_server in creds")
|
||||
def _extract_items_from_creds_dict(self, creds):
|
||||
"""
|
||||
Retrieve entries from a "credentials" dictionary
|
||||
|
||||
if "client_secret" in creds:
|
||||
client_secret = creds["client_secret"]
|
||||
elif "clientSecret" in creds:
|
||||
client_secret = creds["clientSecret"]
|
||||
Args:
|
||||
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
|
||||
* client_secret|clientSecret: A unique secret str provided by the client
|
||||
* id_server|idServer: the domain of the identity server to query
|
||||
* id_access_token: The access token to authenticate to the identity
|
||||
server with.
|
||||
|
||||
Returns:
|
||||
tuple(str, str, str|None): A tuple containing the client_secret, the id_server,
|
||||
and the id_access_token value if available.
|
||||
"""
|
||||
client_secret = creds.get("client_secret") or creds.get("clientSecret")
|
||||
if not client_secret:
|
||||
raise SynapseError(
|
||||
400, "No client_secret in creds", errcode=Codes.MISSING_PARAM
|
||||
)
|
||||
|
||||
id_server = creds.get("id_server") or creds.get("idServer")
|
||||
if not id_server:
|
||||
raise SynapseError(
|
||||
400, "No id_server in creds", errcode=Codes.MISSING_PARAM
|
||||
)
|
||||
|
||||
id_access_token = creds.get("id_access_token")
|
||||
return client_secret, id_server, id_access_token
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def threepid_from_creds(self, creds, use_v2=True):
|
||||
"""
|
||||
Retrieve and validate a threepid identitier from a "credentials" dictionary
|
||||
|
||||
Args:
|
||||
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
|
||||
* client_secret|clientSecret: A unique secret str provided by the client
|
||||
* id_server|idServer: the domain of the identity server to query
|
||||
* id_access_token: The access token to authenticate to the identity
|
||||
server with. Required if use_v2 is true
|
||||
use_v2 (bool): Whether to use v2 Identity Service API endpoints
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
|
||||
the /getValidated3pid endpoint of the Identity Service API, or None if the
|
||||
threepid was not found
|
||||
"""
|
||||
client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
|
||||
creds
|
||||
)
|
||||
|
||||
# If an id_access_token is not supplied, force usage of v1
|
||||
if id_access_token is None:
|
||||
use_v2 = False
|
||||
|
||||
query_params = {"sid": creds["sid"], "client_secret": client_secret}
|
||||
|
||||
# Decide which API endpoint URLs and query parameters to use
|
||||
if use_v2:
|
||||
url = "https://%s%s" % (
|
||||
id_server,
|
||||
"/_matrix/identity/v2/3pid/getValidated3pid",
|
||||
)
|
||||
query_params["id_access_token"] = id_access_token
|
||||
else:
|
||||
raise SynapseError(400, "No client_secret in creds")
|
||||
url = "https://%s%s" % (
|
||||
id_server,
|
||||
"/_matrix/identity/api/v1/3pid/getValidated3pid",
|
||||
)
|
||||
|
||||
if not self._should_trust_id_server(id_server):
|
||||
logger.warn(
|
||||
@@ -85,43 +140,55 @@ class IdentityHandler(BaseHandler):
|
||||
return None
|
||||
|
||||
try:
|
||||
data = yield self.http_client.get_json(
|
||||
"https://%s%s"
|
||||
% (id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid"),
|
||||
{"sid": creds["sid"], "client_secret": client_secret},
|
||||
)
|
||||
data = yield self.http_client.get_json(url, query_params)
|
||||
return data if "medium" in data else None
|
||||
except HttpResponseException as e:
|
||||
logger.info("getValidated3pid failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
if e.code != 404 or not use_v2:
|
||||
# Generic failure
|
||||
logger.info("getValidated3pid failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
|
||||
if "medium" in data:
|
||||
return data
|
||||
return None
|
||||
# This identity server is too old to understand Identity Service API v2
|
||||
# Attempt v1 endpoint
|
||||
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", url)
|
||||
return (yield self.threepid_from_creds(creds, use_v2=False))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bind_threepid(self, creds, mxid):
|
||||
def bind_threepid(self, creds, mxid, use_v2=True):
|
||||
"""Bind a 3PID to an identity server
|
||||
|
||||
Args:
|
||||
creds (dict[str, str]): Dictionary of credentials that contain the following keys:
|
||||
* client_secret|clientSecret: A unique secret str provided by the client
|
||||
* id_server|idServer: the domain of the identity server to query
|
||||
* id_access_token: The access token to authenticate to the identity
|
||||
server with. Required if use_v2 is true
|
||||
mxid (str): The MXID to bind the 3PID to
|
||||
use_v2 (bool): Whether to use v2 Identity Service API endpoints
|
||||
|
||||
Returns:
|
||||
Deferred[dict]: The response from the identity server
|
||||
"""
|
||||
logger.debug("binding threepid %r to %s", creds, mxid)
|
||||
data = None
|
||||
|
||||
if "id_server" in creds:
|
||||
id_server = creds["id_server"]
|
||||
elif "idServer" in creds:
|
||||
id_server = creds["idServer"]
|
||||
else:
|
||||
raise SynapseError(400, "No id_server in creds")
|
||||
client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
|
||||
creds
|
||||
)
|
||||
|
||||
if "client_secret" in creds:
|
||||
client_secret = creds["client_secret"]
|
||||
elif "clientSecret" in creds:
|
||||
client_secret = creds["clientSecret"]
|
||||
# If an id_access_token is not supplied, force usage of v1
|
||||
if id_access_token is None:
|
||||
use_v2 = False
|
||||
|
||||
# Decide which API endpoint URLs to use
|
||||
bind_data = {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid}
|
||||
if use_v2:
|
||||
bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
|
||||
bind_data["id_access_token"] = id_access_token
|
||||
else:
|
||||
raise SynapseError(400, "No client_secret in creds")
|
||||
bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
|
||||
|
||||
try:
|
||||
data = yield self.http_client.post_json_get_json(
|
||||
"https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
|
||||
{"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
|
||||
)
|
||||
data = yield self.http_client.post_json_get_json(bind_url, bind_data)
|
||||
logger.debug("bound threepid %r to %s", creds, mxid)
|
||||
|
||||
# Remember where we bound the threepid
|
||||
@@ -131,13 +198,23 @@ class IdentityHandler(BaseHandler):
|
||||
address=data["address"],
|
||||
id_server=id_server,
|
||||
)
|
||||
|
||||
return data
|
||||
except HttpResponseException as e:
|
||||
if e.code != 404 or not use_v2:
|
||||
logger.error("3PID bind failed with Matrix error: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except CodeMessageException as e:
|
||||
data = json.loads(e.msg) # XXX WAT?
|
||||
return data
|
||||
return data
|
||||
|
||||
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
|
||||
return (yield self.bind_threepid(creds, mxid, use_v2=False))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def try_unbind_threepid(self, mxid, threepid):
|
||||
"""Removes a binding from an identity server
|
||||
"""Attempt to remove a 3PID from an identity server, or if one is not provided, all
|
||||
identity servers we're aware the binding is present on
|
||||
|
||||
Args:
|
||||
mxid (str): Matrix user ID of binding to be removed
|
||||
@@ -188,6 +265,8 @@ class IdentityHandler(BaseHandler):
|
||||
server doesn't support unbinding
|
||||
"""
|
||||
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
|
||||
url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
|
||||
|
||||
content = {
|
||||
"mxid": mxid,
|
||||
"threepid": {"medium": threepid["medium"], "address": threepid["address"]},
|
||||
@@ -199,7 +278,7 @@ class IdentityHandler(BaseHandler):
|
||||
auth_headers = self.federation_http_client.build_auth_headers(
|
||||
destination=None,
|
||||
method="POST",
|
||||
url_bytes="/_matrix/identity/api/v1/3pid/unbind".encode("ascii"),
|
||||
url_bytes=url_bytes,
|
||||
content=content,
|
||||
destination_is=id_server,
|
||||
)
|
||||
|
||||
+49
-34
@@ -46,6 +46,7 @@ from synapse.http import (
|
||||
redact_uri,
|
||||
)
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
||||
from synapse.util.async_helpers import timeout_deferred
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
|
||||
@@ -269,42 +270,56 @@ class SimpleHttpClient(object):
|
||||
# log request but strip `access_token` (AS requests for example include this)
|
||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||
|
||||
try:
|
||||
body_producer = None
|
||||
if data is not None:
|
||||
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
||||
with start_active_span(
|
||||
"outgoing-client-request",
|
||||
tags={
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
||||
tags.HTTP_METHOD: method,
|
||||
tags.HTTP_URL: uri,
|
||||
},
|
||||
finish_on_close=True,
|
||||
):
|
||||
try:
|
||||
body_producer = None
|
||||
if data is not None:
|
||||
body_producer = QuieterFileBodyProducer(BytesIO(data))
|
||||
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
uri,
|
||||
agent=self.agent,
|
||||
data=body_producer,
|
||||
headers=headers,
|
||||
**self._extra_treq_args
|
||||
)
|
||||
request_deferred = timeout_deferred(
|
||||
request_deferred,
|
||||
60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(request_deferred)
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
uri,
|
||||
agent=self.agent,
|
||||
data=body_producer,
|
||||
headers=headers,
|
||||
**self._extra_treq_args
|
||||
)
|
||||
request_deferred = timeout_deferred(
|
||||
request_deferred,
|
||||
60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(request_deferred)
|
||||
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s", method, redact_uri(uri), response.code
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method,
|
||||
redact_uri(uri),
|
||||
type(e).__name__,
|
||||
e.args[0],
|
||||
)
|
||||
raise
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method,
|
||||
redact_uri(uri),
|
||||
response.code,
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method,
|
||||
redact_uri(uri),
|
||||
type(e).__name__,
|
||||
e.args[0],
|
||||
)
|
||||
set_tag(tags.ERROR, True)
|
||||
set_tag("error_reason", e.args[0])
|
||||
raise
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_urlencoded_get_json(self, uri, args={}, headers=None):
|
||||
|
||||
@@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
|
||||
else:
|
||||
query_bytes = b""
|
||||
|
||||
# Retreive current span
|
||||
scope = start_active_span(
|
||||
"outgoing-federation-request",
|
||||
tags={
|
||||
|
||||
+12
-1
@@ -40,6 +40,7 @@ from synapse.api.errors import (
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.logging.context import preserve_fn
|
||||
from synapse.logging.opentracing import trace_servlet
|
||||
from synapse.util.caches import intern_dict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -257,7 +258,9 @@ class JsonResource(HttpServer, resource.Resource):
|
||||
self.path_regexs = {}
|
||||
self.hs = hs
|
||||
|
||||
def register_paths(self, method, path_patterns, callback, servlet_classname):
|
||||
def register_paths(
|
||||
self, method, path_patterns, callback, servlet_classname, trace=True
|
||||
):
|
||||
"""
|
||||
Registers a request handler against a regular expression. Later request URLs are
|
||||
checked against these regular expressions in order to identify an appropriate
|
||||
@@ -273,8 +276,16 @@ class JsonResource(HttpServer, resource.Resource):
|
||||
|
||||
servlet_classname (str): The name of the handler to be used in prometheus
|
||||
and opentracing logs.
|
||||
|
||||
trace (bool): Whether we should start a span to trace the servlet.
|
||||
"""
|
||||
method = method.encode("utf-8") # method is bytes on py3
|
||||
|
||||
if trace:
|
||||
# We don't extract the context from the servlet because we can't
|
||||
# trust the sender
|
||||
callback = trace_servlet(servlet_classname)(callback)
|
||||
|
||||
for path_pattern in path_patterns:
|
||||
logger.debug("Registering for %s %s", method, path_pattern.pattern)
|
||||
self.path_regexs.setdefault(method, []).append(
|
||||
|
||||
@@ -20,7 +20,6 @@ import logging
|
||||
from canonicaljson import json
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.logging.opentracing import trace_servlet
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -298,10 +297,7 @@ class RestServlet(object):
|
||||
servlet_classname = self.__class__.__name__
|
||||
method_handler = getattr(self, "on_%s" % (method,))
|
||||
http_server.register_paths(
|
||||
method,
|
||||
patterns,
|
||||
trace_servlet(servlet_classname)(method_handler),
|
||||
servlet_classname,
|
||||
method, patterns, method_handler, servlet_classname
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
@@ -239,8 +239,7 @@ _homeserver_whitelist = None
|
||||
|
||||
|
||||
def only_if_tracing(func):
|
||||
"""Executes the function only if we're tracing. Otherwise return.
|
||||
Assumes the function wrapped may return None"""
|
||||
"""Executes the function only if we're tracing. Otherwise returns None."""
|
||||
|
||||
@wraps(func)
|
||||
def _only_if_tracing_inner(*args, **kwargs):
|
||||
@@ -252,6 +251,41 @@ def only_if_tracing(func):
|
||||
return _only_if_tracing_inner
|
||||
|
||||
|
||||
def ensure_active_span(message, ret=None):
|
||||
"""Executes the operation only if opentracing is enabled and there is an active span.
|
||||
If there is no active span it logs message at the error level.
|
||||
|
||||
Args:
|
||||
message (str): Message which fills in "There was no active span when trying to %s"
|
||||
in the error log if there is no active span and opentracing is enabled.
|
||||
ret (object): return value if opentracing is None or there is no active span.
|
||||
|
||||
Returns (object): The result of the func or ret if opentracing is disabled or there
|
||||
was no active span.
|
||||
"""
|
||||
|
||||
def ensure_active_span_inner_1(func):
|
||||
@wraps(func)
|
||||
def ensure_active_span_inner_2(*args, **kwargs):
|
||||
if not opentracing:
|
||||
return ret
|
||||
|
||||
if not opentracing.tracer.active_span:
|
||||
logger.error(
|
||||
"There was no active span when trying to %s."
|
||||
" Did you forget to start one or did a context slip?",
|
||||
message,
|
||||
)
|
||||
|
||||
return ret
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return ensure_active_span_inner_2
|
||||
|
||||
return ensure_active_span_inner_1
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _noop_context_manager(*args, **kwargs):
|
||||
"""Does exactly what it says on the tin"""
|
||||
@@ -319,7 +353,7 @@ def whitelisted_homeserver(destination):
|
||||
Args:
|
||||
destination (str)
|
||||
"""
|
||||
_homeserver_whitelist
|
||||
|
||||
if _homeserver_whitelist:
|
||||
return _homeserver_whitelist.match(destination)
|
||||
return False
|
||||
@@ -349,26 +383,24 @@ def start_active_span(
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
else:
|
||||
# We need to enter the scope here for the logcontext to become active
|
||||
return opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=child_of,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
return opentracing.tracer.start_active_span(
|
||||
operation_name,
|
||||
child_of=child_of,
|
||||
references=references,
|
||||
tags=tags,
|
||||
start_time=start_time,
|
||||
ignore_active_span=ignore_active_span,
|
||||
finish_on_close=finish_on_close,
|
||||
)
|
||||
|
||||
|
||||
def start_active_span_follows_from(operation_name, contexts):
|
||||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
else:
|
||||
references = [opentracing.follows_from(context) for context in contexts]
|
||||
scope = start_active_span(operation_name, references=references)
|
||||
return scope
|
||||
|
||||
references = [opentracing.follows_from(context) for context in contexts]
|
||||
scope = start_active_span(operation_name, references=references)
|
||||
return scope
|
||||
|
||||
|
||||
def start_active_span_from_request(
|
||||
@@ -465,19 +497,19 @@ def start_active_span_from_edu(
|
||||
# Opentracing setters for tags, logs, etc
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("set a tag")
|
||||
def set_tag(key, value):
|
||||
"""Sets a tag on the active span"""
|
||||
opentracing.tracer.active_span.set_tag(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("log")
|
||||
def log_kv(key_values, timestamp=None):
|
||||
"""Log to the active span"""
|
||||
opentracing.tracer.active_span.log_kv(key_values, timestamp)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("set the traces operation name")
|
||||
def set_operation_name(operation_name):
|
||||
"""Sets the operation name of the active span"""
|
||||
opentracing.tracer.active_span.set_operation_name(operation_name)
|
||||
@@ -486,13 +518,18 @@ def set_operation_name(operation_name):
|
||||
# Injection and extraction
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("inject the span into a header")
|
||||
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into twisted headers in-place
|
||||
|
||||
Args:
|
||||
headers (twisted.web.http_headers.Headers)
|
||||
destination (str): address of entity receiving the span context. If check_destination
|
||||
is true the context will only be injected if the destination matches the
|
||||
opentracing whitelist
|
||||
check_destination (bool): If false, destination will be ignored and the context
|
||||
will always be injected.
|
||||
span (opentracing.Span)
|
||||
|
||||
Returns:
|
||||
@@ -517,7 +554,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
|
||||
headers.addRawHeaders(key, value)
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("inject the span into a byte dict")
|
||||
def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into a dict where the headers are encoded as byte
|
||||
@@ -525,6 +562,11 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||
|
||||
Args:
|
||||
headers (dict)
|
||||
destination (str): address of entity receiving the span context. If check_destination
|
||||
is true the context will only be injected if the destination matches the
|
||||
opentracing whitelist
|
||||
check_destination (bool): If false, destination will be ignored and the context
|
||||
will always be injected.
|
||||
span (opentracing.Span)
|
||||
|
||||
Returns:
|
||||
@@ -537,7 +579,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||
here:
|
||||
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||
"""
|
||||
if not whitelisted_homeserver(destination):
|
||||
if check_destination and not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
span = opentracing.tracer.active_span
|
||||
@@ -549,16 +591,18 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||
headers[key.encode()] = [value.encode()]
|
||||
|
||||
|
||||
@only_if_tracing
|
||||
@ensure_active_span("inject the span into a text map")
|
||||
def inject_active_span_text_map(carrier, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into a dict
|
||||
|
||||
Args:
|
||||
carrier (dict)
|
||||
destination (str): the name of the remote server. The span context
|
||||
will only be injected if the destination matches the homeserver_whitelist
|
||||
or destination is None.
|
||||
destination (str): address of entity receiving the span context. If check_destination
|
||||
is true the context will only be injected if the destination matches the
|
||||
opentracing whitelist
|
||||
check_destination (bool): If false, destination will be ignored and the context
|
||||
will always be injected.
|
||||
|
||||
Returns:
|
||||
In-place modification of carrier
|
||||
@@ -579,6 +623,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
|
||||
)
|
||||
|
||||
|
||||
@ensure_active_span("get the active span context as a dict", ret={})
|
||||
def get_active_span_text_map(destination=None):
|
||||
"""
|
||||
Gets a span context as a dict. This can be used instead of manually
|
||||
@@ -591,7 +636,7 @@ def get_active_span_text_map(destination=None):
|
||||
dict: the active span's context if opentracing is enabled, otherwise empty.
|
||||
"""
|
||||
|
||||
if not opentracing or (destination and not whitelisted_homeserver(destination)):
|
||||
if destination and not whitelisted_homeserver(destination):
|
||||
return {}
|
||||
|
||||
carrier = {}
|
||||
@@ -602,6 +647,7 @@ def get_active_span_text_map(destination=None):
|
||||
return carrier
|
||||
|
||||
|
||||
@ensure_active_span("get the span context as a string.", ret={})
|
||||
def active_span_context_as_string():
|
||||
"""
|
||||
Returns:
|
||||
|
||||
@@ -22,13 +22,17 @@ from six.moves import urllib
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
HttpResponseException,
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import (
|
||||
inject_active_span_byte_dict,
|
||||
trace,
|
||||
trace_servlet,
|
||||
)
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
|
||||
|
||||
client = hs.get_simple_http_client()
|
||||
|
||||
@trace(opname="outgoing_replication_request")
|
||||
@defer.inlineCallbacks
|
||||
def send_request(**kwargs):
|
||||
data = yield cls._serialize_payload(**kwargs)
|
||||
@@ -167,9 +172,7 @@ class ReplicationEndpoint(object):
|
||||
# the master, and so whether we should clean up or not.
|
||||
while True:
|
||||
headers = {}
|
||||
opentracing.inject_active_span_byte_dict(
|
||||
headers, None, check_destination=False
|
||||
)
|
||||
inject_active_span_byte_dict(headers, None, check_destination=False)
|
||||
try:
|
||||
result = yield request_func(uri, data, headers=headers)
|
||||
break
|
||||
@@ -210,13 +213,11 @@ class ReplicationEndpoint(object):
|
||||
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
|
||||
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
|
||||
|
||||
handler = trace_servlet(self.__class__.__name__, extract_context=True)(handler)
|
||||
# We don't let register paths trace this servlet using the default tracing
|
||||
# options because we wish to extract the context explicitly.
|
||||
http_server.register_paths(
|
||||
method,
|
||||
[pattern],
|
||||
opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
|
||||
handler
|
||||
),
|
||||
self.__class__.__name__,
|
||||
method, [pattern], handler, self.__class__.__name__, trace=False
|
||||
)
|
||||
|
||||
def _cached_handler(self, request, txn_id, **kwargs):
|
||||
|
||||
@@ -542,15 +542,16 @@ class ThreepidRestServlet(RestServlet):
|
||||
def on_POST(self, request):
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
threePidCreds = body.get("threePidCreds")
|
||||
threePidCreds = body.get("three_pid_creds", threePidCreds)
|
||||
if threePidCreds is None:
|
||||
raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
|
||||
threepid_creds = body.get("threePidCreds") or body.get("three_pid_creds")
|
||||
if threepid_creds is None:
|
||||
raise SynapseError(
|
||||
400, "Missing param three_pid_creds", Codes.MISSING_PARAM
|
||||
)
|
||||
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
|
||||
threepid = yield self.identity_handler.threepid_from_creds(threepid_creds)
|
||||
|
||||
if not threepid:
|
||||
raise SynapseError(400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED)
|
||||
@@ -566,11 +567,43 @@ class ThreepidRestServlet(RestServlet):
|
||||
|
||||
if "bind" in body and body["bind"]:
|
||||
logger.debug("Binding threepid %s to %s", threepid, user_id)
|
||||
yield self.identity_handler.bind_threepid(threePidCreds, user_id)
|
||||
yield self.identity_handler.bind_threepid(threepid_creds, user_id)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
class ThreepidUnbindRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/account/3pid/unbind$")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(ThreepidUnbindRestServlet, self).__init__()
|
||||
self.hs = hs
|
||||
self.identity_handler = hs.get_handlers().identity_handler
|
||||
self.auth = hs.get_auth()
|
||||
self.datastore = self.hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
"""Unbind the given 3pid from a specific identity server, or identity servers that are
|
||||
known to have this 3pid bound
|
||||
"""
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
body = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(body, ["medium", "address"])
|
||||
|
||||
medium = body.get("medium")
|
||||
address = body.get("address")
|
||||
id_server = body.get("id_server")
|
||||
|
||||
# Attempt to unbind the threepid from an identity server. If id_server is None, try to
|
||||
# unbind from all identity servers this threepid has been added to in the past
|
||||
result = yield self.identity_handler.try_unbind_threepid(
|
||||
requester.user.to_string(),
|
||||
{"address": address, "medium": medium, "id_server": id_server},
|
||||
)
|
||||
return 200, {"id_server_unbind_result": "success" if result else "no-support"}
|
||||
|
||||
|
||||
class ThreepidDeleteRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/account/3pid/delete$")
|
||||
|
||||
@@ -629,5 +662,6 @@ def register_servlets(hs, http_server):
|
||||
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
|
||||
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
|
||||
ThreepidRestServlet(hs).register(http_server)
|
||||
ThreepidUnbindRestServlet(hs).register(http_server)
|
||||
ThreepidDeleteRestServlet(hs).register(http_server)
|
||||
WhoamiRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -856,7 +856,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
"ts": now,
|
||||
"opentracing_context": json.dumps(context)
|
||||
if whitelisted_homeserver(destination)
|
||||
else None,
|
||||
else "{}",
|
||||
}
|
||||
for destination in hosts
|
||||
for device_id in device_ids
|
||||
|
||||
@@ -71,7 +71,20 @@ def abort(message, colour=RED, stream=sys.stderr):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def start(configfile, daemonize=True):
|
||||
def start(configfile: str, daemonize: bool = True) -> bool:
|
||||
"""Attempts to start synapse.
|
||||
Args:
|
||||
configfile: path to a yaml synapse config file
|
||||
daemonize: whether to daemonize synapse or keep it attached to the current
|
||||
session
|
||||
|
||||
Returns:
|
||||
True if the process started successfully
|
||||
False if there was an error starting the process
|
||||
|
||||
If deamonize is False it will only return once synapse exits.
|
||||
"""
|
||||
|
||||
write("Starting ...")
|
||||
args = SYNAPSE
|
||||
|
||||
@@ -83,25 +96,40 @@ def start(configfile, daemonize=True):
|
||||
try:
|
||||
subprocess.check_call(args)
|
||||
write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
write(
|
||||
"error starting (exit code: %d); see above for logs" % e.returncode,
|
||||
colour=RED,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def start_worker(app, configfile, worker_configfile):
|
||||
def start_worker(app: str, configfile: str, worker_configfile: str) -> bool:
|
||||
"""Attempts to start a synapse worker.
|
||||
Args:
|
||||
app: name of the worker's appservice
|
||||
configfile: path to a yaml synapse config file
|
||||
worker_configfile: path to worker specific yaml synapse file
|
||||
|
||||
Returns:
|
||||
True if the process started successfully
|
||||
False if there was an error starting the process
|
||||
"""
|
||||
|
||||
args = [sys.executable, "-B", "-m", app, "-c", configfile, "-c", worker_configfile]
|
||||
|
||||
try:
|
||||
subprocess.check_call(args)
|
||||
write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
write(
|
||||
"error starting %s(%r) (exit code: %d); see above for logs"
|
||||
% (app, worker_configfile, e.returncode),
|
||||
colour=RED,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def stop(pidfile, app):
|
||||
@@ -292,11 +320,14 @@ def main():
|
||||
write("All processes exited; now restarting...")
|
||||
|
||||
if action == "start" or action == "restart":
|
||||
error = False
|
||||
if start_stop_synapse:
|
||||
# Check if synapse is already running
|
||||
if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
|
||||
abort("synapse.app.homeserver already running")
|
||||
start(configfile, bool(options.daemonize))
|
||||
|
||||
if not start(configfile, bool(options.daemonize)):
|
||||
error = True
|
||||
|
||||
for worker in workers:
|
||||
env = os.environ.copy()
|
||||
@@ -307,12 +338,16 @@ def main():
|
||||
for cache_name, factor in iteritems(worker.cache_factors):
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
start_worker(worker.app, configfile, worker.configfile)
|
||||
if not start_worker(worker.app, configfile, worker.configfile):
|
||||
error = True
|
||||
|
||||
# Reset env back to the original
|
||||
os.environ.clear()
|
||||
os.environ.update(env)
|
||||
|
||||
if error:
|
||||
exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user