Some backfill receive sorting fixes but not using it yet
This commit is contained in:
@@ -702,41 +702,37 @@ class FederationEventHandler:
|
||||
event.event_id
|
||||
)
|
||||
|
||||
# # Maybe we can get lucky and save ourselves a lookup
|
||||
# # by checking the events in the backfill first
|
||||
# insertion_event = event_map[
|
||||
# insertion_event_id
|
||||
# ] or await self._store.get_event(
|
||||
# insertion_event_id, allow_none=True
|
||||
# )
|
||||
# Maybe we can get lucky and save ourselves a lookup
|
||||
# by checking the events in the backfill first
|
||||
insertion_event = event_map[
|
||||
insertion_event_id
|
||||
] or await self._store.get_event(
|
||||
insertion_event_id, allow_none=True
|
||||
)
|
||||
|
||||
# if insertion_event:
|
||||
# # Connect the insertion events' `prev_event` successors
|
||||
# # via fake edges pointing to the insertion event itself
|
||||
# # so the insertion event sorts topologically
|
||||
# # behind-in-time the successor. Nestled perfectly
|
||||
# # between the prev_event and the successor.
|
||||
# for insertion_prev_event_id in insertion_event.prev_event_ids():
|
||||
# successor_event_ids = successor_event_id_map[
|
||||
# insertion_prev_event_id
|
||||
# ]
|
||||
# logger.info(
|
||||
# "insertion_event_id=%s successor_event_ids=%s",
|
||||
# insertion_event_id,
|
||||
# successor_event_ids,
|
||||
# )
|
||||
# if successor_event_ids:
|
||||
# for successor_event_id in successor_event_ids:
|
||||
# # Don't add itself back as a successor
|
||||
# if successor_event_id != insertion_event_id:
|
||||
# # Fake edge to point the successor back
|
||||
# # at the insertion event
|
||||
# event_id_graph.setdefault(
|
||||
# successor_event_id, []
|
||||
# ).append(insertion_event_id)
|
||||
if insertion_event:
|
||||
# Connect the insertion events' `prev_event` successors
|
||||
# via fake edges pointing to the insertion event itself
|
||||
# so the insertion event sorts topologically
|
||||
# behind-in-time the successor. Nestled perfectly
|
||||
# between the prev_event and the successor.
|
||||
for insertion_prev_event_id in insertion_event.prev_event_ids():
|
||||
successor_event_ids = successor_event_id_map[
|
||||
insertion_prev_event_id
|
||||
]
|
||||
if successor_event_ids:
|
||||
for successor_event_id in successor_event_ids:
|
||||
# Don't add itself back as a successor
|
||||
if successor_event_id != insertion_event_id:
|
||||
# Fake edge to point the successor back
|
||||
# at the insertion event
|
||||
event_id_graph.setdefault(
|
||||
successor_event_id, []
|
||||
).append(insertion_event_id)
|
||||
|
||||
# TODO: We also need to add fake edges to connect the oldest-in-time messages
|
||||
# in the batch to the event we branched off of, see https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985
|
||||
# TODO: We also need to add fake edges to connect insertion events -> to
|
||||
# the base event in the "live" DAG we branched off of, see scenario 2
|
||||
# https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985
|
||||
|
||||
return event_id_graph
|
||||
|
||||
@@ -774,6 +770,9 @@ class FederationEventHandler:
|
||||
# We want to sort these by depth so we process them and
|
||||
# tell clients about them in order.
|
||||
sorted_events = sorted(events, key=lambda x: x.depth)
|
||||
for ev in sorted_events:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
# # We want to sort topologically so we process them and tell clients
|
||||
# # about them in order.
|
||||
@@ -783,25 +782,24 @@ class FederationEventHandler:
|
||||
# event_id_graph = await self.generateEventIdGraphFromEvents(events)
|
||||
# for event_id in sorted_topologically(event_ids, event_id_graph):
|
||||
# sorted_events.append(event_map[event_id])
|
||||
# sorted_events = reversed(sorted_events)
|
||||
|
||||
logger.info(
|
||||
"backfill sorted_events=%s",
|
||||
[
|
||||
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
|
||||
% (
|
||||
event.event_id,
|
||||
event.depth,
|
||||
event.content.get("body", event.type),
|
||||
event.prev_event_ids(),
|
||||
)
|
||||
for event in sorted_events
|
||||
],
|
||||
)
|
||||
# logger.info(
|
||||
# "backfill sorted_events=%s",
|
||||
# [
|
||||
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
|
||||
# % (
|
||||
# event.event_id,
|
||||
# event.depth,
|
||||
# event.content.get("body", event.type),
|
||||
# event.prev_event_ids(),
|
||||
# )
|
||||
# for event in reversed(sorted_events)
|
||||
# ],
|
||||
# )
|
||||
|
||||
for ev in sorted_events:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
# for ev in reversed(sorted_events):
|
||||
# with nested_logging_context(ev.event_id):
|
||||
# await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
async def _process_pulled_event(
|
||||
self, origin: str, event: EventBase, backfilled: bool
|
||||
|
||||
Reference in New Issue
Block a user