Route full mesh if message contains 'mesh'
This commit is contained in:
committed by
Brendan Abolivier
parent
5f52a2c25e
commit
62fa8570ec
@@ -25,6 +25,7 @@ from prometheus_client import Counter
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.metrics
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import FederationDeniedError, HttpResponseException
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||
@@ -278,7 +279,6 @@ class TransactionQueue(object):
|
||||
span.set_tag("room_id", pdu.room_id)
|
||||
span.set_tag("sender", pdu.sender)
|
||||
|
||||
<<<<<<< HEAD
|
||||
destinations = yield self._compute_relay_destinations(
|
||||
pdu, joined_hosts=destinations,
|
||||
)
|
||||
@@ -304,53 +304,6 @@ class TransactionQueue(object):
|
||||
)
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
=======
|
||||
order = self._order
|
||||
self._order += 1
|
||||
|
||||
destinations = set(destinations)
|
||||
destinations.discard(self.server_name)
|
||||
|
||||
event_destinations = yield self._compute_relay_destinations(
|
||||
pdu, joined_hosts=destinations,
|
||||
)
|
||||
|
||||
for pdu, destinations in event_destinations:
|
||||
logger.info("Sending to: %s", str(destinations))
|
||||
|
||||
pdu_logger.info(
|
||||
"RelayingPDU",
|
||||
extra={
|
||||
"event_id": pdu.event_id, "room_id": pdu.room_id,
|
||||
"destinations": json.dumps(destinations),
|
||||
"server": self.server_name,
|
||||
},
|
||||
)
|
||||
|
||||
if not destinations:
|
||||
break
|
||||
|
||||
sent_pdus_destination_dist_total.inc(len(destinations))
|
||||
sent_pdus_destination_dist_count.inc()
|
||||
|
||||
# XXX: Should we decide where to route here.
|
||||
|
||||
for destination in destinations:
|
||||
dest_span = self.tracer.start_span(
|
||||
'_send_pdu_to_destination',
|
||||
references=[opentracing.follows_from(span.context)],
|
||||
)
|
||||
dest_span.log_kv({
|
||||
"via": destination,
|
||||
"relay_to": list(pdu.unsigned.get("destinations", {}))
|
||||
})
|
||||
|
||||
self.pending_pdus_by_dest.setdefault(destination, []).append(
|
||||
(pdu, order, dest_span)
|
||||
)
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
>>>>>>> 96acdad12... Track PDU in opentracing
|
||||
|
||||
def _compute_relay_destinations(self, pdu, joined_hosts):
|
||||
"""Compute where we should send an event. Returning an empty set stops
|
||||
@@ -372,12 +325,6 @@ class TransactionQueue(object):
|
||||
|
||||
# XXX: Hook for routing shenanigans
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
if from_federation and event.unsigned.get("destinations"):
|
||||
return True
|
||||
|
||||
>>>>>>> efdec3252... Only relay 'live' events
|
||||
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
|
||||
is_mine = self.is_mine_id(event.event_id)
|
||||
if not is_mine and send_on_behalf_of is None:
|
||||
|
||||
Reference in New Issue
Block a user