diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c021add936..a4e760f935 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -14,6 +14,13 @@ # limitations under the License. import datetime import logging +<<<<<<< HEAD +======= +import random +import json +import opentracing +import string +>>>>>>> 46aae2456... Reduce size of fed transaction IDs from six import itervalues @@ -128,7 +135,7 @@ class TransactionQueue(object): self.last_device_list_stream_id_by_dest = {} # HACK to get unique tx id - self._next_txn_id = int(self.clock.time_msec()) + self._next_txn_id = 1 self._order = 1 @@ -423,6 +430,15 @@ class TransactionQueue(object): pending_pdus = [] while True: +<<<<<<< HEAD +======= + txn_id = _encode_id(self._next_txn_id) + self._next_txn_id += 1 + + for s in pdu_spans.values(): + s.set_tag("txn-id", txn_id) + +>>>>>>> 46aae2456... Reduce size of fed transaction IDs device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) ) @@ -670,3 +686,19 @@ class TransactionQueue(object): success = False defer.returnValue(success) + + +def _numberToBase(n, b): + if n == 0: + return [0] + digits = [] + while n: + digits.append(int(n % b)) + n //= b + return digits[::-1] + + +def _encode_id(i): + digits = string.digits + string.ascii_letters + val_slice = _numberToBase(i, len(digits)) + return "".join(digits[x] for x in val_slice) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 91025037a3..df528113a4 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -194,8 +194,11 @@ class _WrappedConnection(object): # In Twisted >18.4; the TLS connection will be None if it has closed # which will make abortConnection() throw. Check that the TLS connection # is not None before trying to close it. - if self.transport.getHandle() is not None: - self.transport.abortConnection() + try: + if self.transport.getHandle() is not None: + self.transport.abortConnection() + except: + logger.warning("Failed to abort connection") def request(self, request): self.last_request = time.time()