From caa00044665f8dde72fb613ce51d72bc27b20880 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Nov 2018 11:56:20 +0000 Subject: [PATCH] Track PDU in opentracing --- synapse/federation/transaction_queue.py | 109 ++++++++++++++++++++---- 1 file changed, 94 insertions(+), 15 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index fe02ace8a7..9e28bde32c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -14,6 +14,9 @@ # limitations under the License. import datetime import logging +import random +import json +import opentracing from six import itervalues @@ -247,17 +250,21 @@ class TransactionQueue(object): self._is_processing = False @defer.inlineCallbacks - def _send_pdu(self, pdu, destinations): + def _send_pdu(self, pdu, destinations, span=None): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. - order = self._order - self._order += 1 + references = [] + if span: + references = [opentracing.follows_from(span.context)] - destinations = set(destinations) - destinations.discard(self.server_name) + with self.tracer.start_span('_send_pdu', references=references) as span: + span.set_tag("event_id", pdu.event_id) + span.set_tag("room_id", pdu.room_id) + span.set_tag("sender", pdu.sender) +<<<<<<< HEAD destinations = yield self._compute_relay_destinations( pdu, joined_hosts=destinations, ) @@ -283,6 +290,53 @@ class TransactionQueue(object): ) self._attempt_new_transaction(destination) +======= + order = self._order + self._order += 1 + + destinations = set(destinations) + destinations.discard(self.server_name) + + event_destinations = yield self._compute_relay_destinations( + pdu, joined_hosts=destinations, + ) + + for pdu, destinations in event_destinations: + logger.info("Sending to: %s", str(destinations)) + + pdu_logger.info( + "RelayingPDU", + extra={ + "event_id": pdu.event_id, "room_id": pdu.room_id, + "destinations": json.dumps(destinations), + "server": self.server_name, + }, + ) + + if not destinations: + break + + sent_pdus_destination_dist_total.inc(len(destinations)) + sent_pdus_destination_dist_count.inc() + + # XXX: Should we decide where to route here. + + for destination in destinations: + dest_span = self.tracer.start_span( + '_send_pdu_to_destination', + references=[opentracing.follows_from(span.context)], + ) + dest_span.log_kv({ + "via": destination, + "relay_to": list(pdu.unsigned.get("destinations", {})) + }) + + self.pending_pdus_by_dest.setdefault(destination, []).append( + (pdu, order, dest_span) + ) + + self._attempt_new_transaction(destination) +>>>>>>> 96acdad12... Track PDU in opentracing def _compute_relay_destinations(self, pdu, joined_hosts): """Compute where we should send an event. Returning an empty set stops @@ -453,6 +507,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _transaction_transmission_loop(self, destination): + pdu_spans = {} pending_pdus = [] try: self.pending_transactions[destination] = 1 @@ -526,13 +581,18 @@ class TransactionQueue(object): ) return + pdu_span_references = [] + for pdu, _, span in pending_pdus: + pdu_spans[pdu.event_id] = span + pdu_span_references.append(opentracing.follows_from(span.context)) + # END CRITICAL SECTION - with self.tracer.start_span('_send_new_transaction') as span: + with self.tracer.start_span('_send_new_transaction', references=pdu_span_references) as span: span.set_tag("destination", destination) success = yield self._send_new_transaction( - destination, pending_pdus, pending_edus, span, + destination, pending_pdus, pending_edus, span, pdu_spans, ) span.set_tag("success", success) if success: @@ -575,6 +635,8 @@ class TransactionQueue(object): finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) + for span in pdu_spans.values(): + span.finish() @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -610,7 +672,7 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks - def _send_new_transaction(self, destination, pending_pdus, pending_edus, span): + def _send_new_transaction(self, destination, pending_pdus, pending_edus, span, pdu_spans): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -714,22 +776,31 @@ class TransactionQueue(object): for p in pdus: yield self._pdu_send_result( destination, txn_id, p, - response=pdu_results.get(p.event_id, {}) + response=pdu_results.get(p.event_id, {}), + span=pdu_spans[p.event_id], ) else: for p in pdus: - yield self._pdu_send_txn_failed(destination, txn_id, p) + yield self._pdu_send_txn_failed( + destination, txn_id, p, + span=pdu_spans[p.event_id], + ) success = False defer.returnValue(success) @defer.inlineCallbacks - def _pdu_send_result(self, destination, txn_id, pdu, response): + def _pdu_send_result(self, destination, txn_id, pdu, response, span): """Gets called after sending the event in a transaction, with the result for the event from the remote server. """ # XXX: Hook for routing shenanigans if "error" in response: + span.log_kv({ + "error.kind": "pdu", + "response.error": response["error"], + }) + logger.warn( "TX [%s] {%s} Remote returned error for %s: %s", destination, txn_id, pdu.event_id, response, @@ -745,7 +816,7 @@ class TransactionQueue(object): new_destinations = set(pdu.unsigned.get("destinations", [])) new_destinations.discard(destination) - yield self._send_pdu(pdu, list(new_destinations)) + yield self._send_pdu(pdu, list(new_destinations), span) elif "did_not_relay" in response and response["did_not_relay"]: new_destinations = set(response["did_not_relay"]) new_destinations.discard(destination) @@ -760,10 +831,14 @@ class TransactionQueue(object): }, ) - yield self._send_pdu(pdu, list(new_destinations)) + span.log_kv({ + "did_not_relay_to": list(new_destinations), + }) + + yield self._send_pdu(pdu, list(new_destinations), span) @defer.inlineCallbacks - def _pdu_send_txn_failed(self, destination, txn_id, pdu): + def _pdu_send_txn_failed(self, destination, txn_id, pdu, span): """Gets called when sending a transaction failed (after retries) """ # XXX: Hook for routing shenanigans @@ -772,6 +847,10 @@ class TransactionQueue(object): destination, txn_id, pdu.event_id, ) + span.log_kv({ + "error.kind": "transaction", + }) + pdu_logger.info( "SendFailPDU", extra={ @@ -783,4 +862,4 @@ class TransactionQueue(object): new_destinations = set(pdu.unsigned.get("destinations", [])) new_destinations.discard(destination) - yield self._send_pdu(pdu, list(new_destinations)) + yield self._send_pdu(pdu, list(new_destinations), span)