Track PDU in opentracing
This commit is contained in:
committed by
Brendan Abolivier
parent
ae7460b9f4
commit
caa0004466
@@ -14,6 +14,9 @@
|
||||
# limitations under the License.
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import json
|
||||
import opentracing
|
||||
|
||||
from six import itervalues
|
||||
|
||||
@@ -247,17 +250,21 @@ class TransactionQueue(object):
|
||||
self._is_processing = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _send_pdu(self, pdu, destinations):
|
||||
def _send_pdu(self, pdu, destinations, span=None):
|
||||
# We loop through all destinations to see whether we already have
|
||||
# a transaction in progress. If we do, stick it in the pending_pdus
|
||||
# table and we'll get back to it later.
|
||||
|
||||
order = self._order
|
||||
self._order += 1
|
||||
references = []
|
||||
if span:
|
||||
references = [opentracing.follows_from(span.context)]
|
||||
|
||||
destinations = set(destinations)
|
||||
destinations.discard(self.server_name)
|
||||
with self.tracer.start_span('_send_pdu', references=references) as span:
|
||||
span.set_tag("event_id", pdu.event_id)
|
||||
span.set_tag("room_id", pdu.room_id)
|
||||
span.set_tag("sender", pdu.sender)
|
||||
|
||||
<<<<<<< HEAD
|
||||
destinations = yield self._compute_relay_destinations(
|
||||
pdu, joined_hosts=destinations,
|
||||
)
|
||||
@@ -283,6 +290,53 @@ class TransactionQueue(object):
|
||||
)
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
=======
|
||||
order = self._order
|
||||
self._order += 1
|
||||
|
||||
destinations = set(destinations)
|
||||
destinations.discard(self.server_name)
|
||||
|
||||
event_destinations = yield self._compute_relay_destinations(
|
||||
pdu, joined_hosts=destinations,
|
||||
)
|
||||
|
||||
for pdu, destinations in event_destinations:
|
||||
logger.info("Sending to: %s", str(destinations))
|
||||
|
||||
pdu_logger.info(
|
||||
"RelayingPDU",
|
||||
extra={
|
||||
"event_id": pdu.event_id, "room_id": pdu.room_id,
|
||||
"destinations": json.dumps(destinations),
|
||||
"server": self.server_name,
|
||||
},
|
||||
)
|
||||
|
||||
if not destinations:
|
||||
break
|
||||
|
||||
sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.inc()
|
||||
|
||||
# XXX: Should we decide where to route here.
|
||||
|
||||
for destination in destinations:
|
||||
dest_span = self.tracer.start_span(
|
||||
'_send_pdu_to_destination',
|
||||
references=[opentracing.follows_from(span.context)],
|
||||
)
|
||||
dest_span.log_kv({
|
||||
"via": destination,
|
||||
"relay_to": list(pdu.unsigned.get("destinations", {}))
|
||||
})
|
||||
|
||||
self.pending_pdus_by_dest.setdefault(destination, []).append(
|
||||
(pdu, order, dest_span)
|
||||
)
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
>>>>>>> 96acdad12... Track PDU in opentracing
|
||||
|
||||
def _compute_relay_destinations(self, pdu, joined_hosts):
|
||||
"""Compute where we should send an event. Returning an empty set stops
|
||||
@@ -453,6 +507,7 @@ class TransactionQueue(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _transaction_transmission_loop(self, destination):
|
||||
pdu_spans = {}
|
||||
pending_pdus = []
|
||||
try:
|
||||
self.pending_transactions[destination] = 1
|
||||
@@ -526,13 +581,18 @@ class TransactionQueue(object):
|
||||
)
|
||||
return
|
||||
|
||||
pdu_span_references = []
|
||||
for pdu, _, span in pending_pdus:
|
||||
pdu_spans[pdu.event_id] = span
|
||||
pdu_span_references.append(opentracing.follows_from(span.context))
|
||||
|
||||
# END CRITICAL SECTION
|
||||
|
||||
with self.tracer.start_span('_send_new_transaction') as span:
|
||||
with self.tracer.start_span('_send_new_transaction', references=pdu_span_references) as span:
|
||||
span.set_tag("destination", destination)
|
||||
|
||||
success = yield self._send_new_transaction(
|
||||
destination, pending_pdus, pending_edus, span,
|
||||
destination, pending_pdus, pending_edus, span, pdu_spans,
|
||||
)
|
||||
span.set_tag("success", success)
|
||||
if success:
|
||||
@@ -575,6 +635,8 @@ class TransactionQueue(object):
|
||||
finally:
|
||||
# We want to be *very* sure we delete this after we stop processing
|
||||
self.pending_transactions.pop(destination, None)
|
||||
for span in pdu_spans.values():
|
||||
span.finish()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_device_messages(self, destination):
|
||||
@@ -610,7 +672,7 @@ class TransactionQueue(object):
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus, span):
|
||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus, span, pdu_spans):
|
||||
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
@@ -714,22 +776,31 @@ class TransactionQueue(object):
|
||||
for p in pdus:
|
||||
yield self._pdu_send_result(
|
||||
destination, txn_id, p,
|
||||
response=pdu_results.get(p.event_id, {})
|
||||
response=pdu_results.get(p.event_id, {}),
|
||||
span=pdu_spans[p.event_id],
|
||||
)
|
||||
else:
|
||||
for p in pdus:
|
||||
yield self._pdu_send_txn_failed(destination, txn_id, p)
|
||||
yield self._pdu_send_txn_failed(
|
||||
destination, txn_id, p,
|
||||
span=pdu_spans[p.event_id],
|
||||
)
|
||||
success = False
|
||||
|
||||
defer.returnValue(success)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _pdu_send_result(self, destination, txn_id, pdu, response):
|
||||
def _pdu_send_result(self, destination, txn_id, pdu, response, span):
|
||||
"""Gets called after sending the event in a transaction, with the
|
||||
result for the event from the remote server.
|
||||
"""
|
||||
# XXX: Hook for routing shenanigans
|
||||
if "error" in response:
|
||||
span.log_kv({
|
||||
"error.kind": "pdu",
|
||||
"response.error": response["error"],
|
||||
})
|
||||
|
||||
logger.warn(
|
||||
"TX [%s] {%s} Remote returned error for %s: %s",
|
||||
destination, txn_id, pdu.event_id, response,
|
||||
@@ -745,7 +816,7 @@ class TransactionQueue(object):
|
||||
|
||||
new_destinations = set(pdu.unsigned.get("destinations", []))
|
||||
new_destinations.discard(destination)
|
||||
yield self._send_pdu(pdu, list(new_destinations))
|
||||
yield self._send_pdu(pdu, list(new_destinations), span)
|
||||
elif "did_not_relay" in response and response["did_not_relay"]:
|
||||
new_destinations = set(response["did_not_relay"])
|
||||
new_destinations.discard(destination)
|
||||
@@ -760,10 +831,14 @@ class TransactionQueue(object):
|
||||
},
|
||||
)
|
||||
|
||||
yield self._send_pdu(pdu, list(new_destinations))
|
||||
span.log_kv({
|
||||
"did_not_relay_to": list(new_destinations),
|
||||
})
|
||||
|
||||
yield self._send_pdu(pdu, list(new_destinations), span)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _pdu_send_txn_failed(self, destination, txn_id, pdu):
|
||||
def _pdu_send_txn_failed(self, destination, txn_id, pdu, span):
|
||||
"""Gets called when sending a transaction failed (after retries)
|
||||
"""
|
||||
# XXX: Hook for routing shenanigans
|
||||
@@ -772,6 +847,10 @@ class TransactionQueue(object):
|
||||
destination, txn_id, pdu.event_id,
|
||||
)
|
||||
|
||||
span.log_kv({
|
||||
"error.kind": "transaction",
|
||||
})
|
||||
|
||||
pdu_logger.info(
|
||||
"SendFailPDU",
|
||||
extra={
|
||||
@@ -783,4 +862,4 @@ class TransactionQueue(object):
|
||||
|
||||
new_destinations = set(pdu.unsigned.get("destinations", []))
|
||||
new_destinations.discard(destination)
|
||||
yield self._send_pdu(pdu, list(new_destinations))
|
||||
yield self._send_pdu(pdu, list(new_destinations), span)
|
||||
|
||||
Reference in New Issue
Block a user