Remove the new backfill implementation and pull some good parts of the refactor
This commit is contained in:
@@ -1043,217 +1043,6 @@ class FederationHandler:
|
||||
else:
|
||||
return []
|
||||
|
||||
async def get_backfill_events(
|
||||
self, origin: str, room_id: str, event_id_list: list, limit: int
|
||||
) -> List[EventBase]:
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s",
|
||||
room_id,
|
||||
event_id_list,
|
||||
limit,
|
||||
origin,
|
||||
)
|
||||
|
||||
event_id_results = set()
|
||||
|
||||
# In a PriorityQueue, the lowest valued entries are retrieved first.
|
||||
# We're using depth as the priority in the queue and tie-break based on
|
||||
# stream_ordering. Depth is lowest at the oldest-in-time message and
|
||||
# highest and newest-in-time message. We add events to the queue with a
|
||||
# negative depth so that we process the newest-in-time messages first
|
||||
# going backwards in time. stream_ordering follows the same pattern.
|
||||
queue = PriorityQueue()
|
||||
seed_events = await self.store.get_events_as_list(event_id_list)
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): seed_events=%s",
|
||||
room_id,
|
||||
[
|
||||
BackfillQueueNavigationItem(
|
||||
depth=seed_event.depth,
|
||||
stream_ordering=seed_event.internal_metadata.stream_ordering,
|
||||
event_id=seed_event.event_id,
|
||||
type=seed_event.type,
|
||||
)
|
||||
for seed_event in seed_events
|
||||
],
|
||||
)
|
||||
for seed_event in seed_events:
|
||||
# Make sure the seed event actually pertains to this room. We also
|
||||
# need to make sure the depth is available since our whole DAG
|
||||
# navigation here depends on depth.
|
||||
if seed_event.room_id == room_id and seed_event.depth:
|
||||
queue.put(
|
||||
(
|
||||
-seed_event.depth,
|
||||
-seed_event.internal_metadata.stream_ordering,
|
||||
seed_event.event_id,
|
||||
seed_event.type,
|
||||
)
|
||||
)
|
||||
|
||||
while not queue.empty() and len(event_id_results) < limit:
|
||||
try:
|
||||
_, _, event_id, event_type = queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
if event_id in event_id_results:
|
||||
continue
|
||||
|
||||
found_undiscovered_connected_historical_messages = False
|
||||
if self.hs.config.experimental.msc2716_enabled:
|
||||
# Try and find any potential historical batches of message history.
|
||||
#
|
||||
# First we look for an insertion event connected to the current
|
||||
# event (by prev_event). If we find any, we'll add them to the queue
|
||||
# and navigate up the DAG like normal in the next iteration of the
|
||||
# loop.
|
||||
connected_insertion_event_backfill_results = (
|
||||
await self.store.get_connected_insertion_event_backfill_results(
|
||||
event_id, limit - len(event_id_results)
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s",
|
||||
room_id,
|
||||
event_id,
|
||||
connected_insertion_event_backfill_results,
|
||||
)
|
||||
for (
|
||||
connected_insertion_event_backfill_item
|
||||
) in connected_insertion_event_backfill_results:
|
||||
if (
|
||||
connected_insertion_event_backfill_item.event_id
|
||||
not in event_id_results
|
||||
):
|
||||
# Check whether the insertion event is already on the
|
||||
# federating homeserver we're trying to send backfill
|
||||
# events to
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
event_exists_on_remote_server = None
|
||||
try:
|
||||
# Because of the nature of backfill giving events to
|
||||
# the federated homeserver in one chunk and then we
|
||||
# can possibly query about that same event in the
|
||||
# next chunk, we need to avoid getting a cached
|
||||
# response. We want to know *now* whether they have
|
||||
# backfilled the insertion event.
|
||||
event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw(
|
||||
origin,
|
||||
connected_insertion_event_backfill_item.event_id,
|
||||
room_version=room_version,
|
||||
outlier=True,
|
||||
timeout=10000,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s",
|
||||
room_id,
|
||||
connected_insertion_event_backfill_item.event_id,
|
||||
origin,
|
||||
e,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s",
|
||||
room_id,
|
||||
connected_insertion_event_backfill_item.event_id,
|
||||
origin,
|
||||
event_exists_on_remote_server,
|
||||
)
|
||||
|
||||
# If the event is already on the federated homeserver,
|
||||
# we don't need to try to branch off onto this
|
||||
# historical chain of messages. Below, we will instead
|
||||
# just go up the `prev_events` as normal.
|
||||
#
|
||||
# This is important so that the first time we backfill
|
||||
# the federated homeserver, we jump off and go down the
|
||||
# historical branch. But after the historical branch is
|
||||
# exhausted and the event comes up again in backfill, we
|
||||
# will choose the "live" DAG.
|
||||
if not event_exists_on_remote_server:
|
||||
found_undiscovered_connected_historical_messages = True
|
||||
queue.put(
|
||||
(
|
||||
-connected_insertion_event_backfill_item.depth,
|
||||
-connected_insertion_event_backfill_item.stream_ordering,
|
||||
connected_insertion_event_backfill_item.event_id,
|
||||
connected_insertion_event_backfill_item.type,
|
||||
)
|
||||
)
|
||||
|
||||
# Second, we need to go and try to find any batch events connected
|
||||
# to a given insertion event (by batch_id). If we find any, we'll
|
||||
# add them to the queue and navigate up the DAG like normal in the
|
||||
# next iteration of the loop.
|
||||
if event_type == EventTypes.MSC2716_INSERTION:
|
||||
connected_batch_event_backfill_results = (
|
||||
await self.store.get_connected_batch_event_backfill_results(
|
||||
event_id, limit - len(event_id_results)
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s",
|
||||
room_id,
|
||||
event_id,
|
||||
connected_batch_event_backfill_results,
|
||||
)
|
||||
for (
|
||||
connected_batch_event_backfill_item
|
||||
) in connected_batch_event_backfill_results:
|
||||
if (
|
||||
connected_batch_event_backfill_item.event_id
|
||||
not in event_id_results
|
||||
):
|
||||
queue.put(
|
||||
(
|
||||
-connected_batch_event_backfill_item.depth,
|
||||
-connected_batch_event_backfill_item.stream_ordering,
|
||||
connected_batch_event_backfill_item.event_id,
|
||||
connected_batch_event_backfill_item.type,
|
||||
)
|
||||
)
|
||||
|
||||
# If we found a historical branch of history off of the message lets
|
||||
# navigate down that in the next iteration of the loop instead of
|
||||
# the normal prev_event chain.
|
||||
if not found_undiscovered_connected_historical_messages:
|
||||
event_id_results.add(event_id)
|
||||
|
||||
# Now we just look up the DAG by prev_events as normal
|
||||
connected_prev_event_backfill_results = (
|
||||
await self.store.get_connected_prev_event_backfill_results(
|
||||
event_id, limit - len(event_id_results)
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s",
|
||||
room_id,
|
||||
event_id,
|
||||
connected_prev_event_backfill_results,
|
||||
)
|
||||
for (
|
||||
connected_prev_event_backfill_item
|
||||
) in connected_prev_event_backfill_results:
|
||||
if (
|
||||
connected_prev_event_backfill_item.event_id
|
||||
not in event_id_results
|
||||
):
|
||||
queue.put(
|
||||
(
|
||||
-connected_prev_event_backfill_item.depth,
|
||||
-connected_prev_event_backfill_item.stream_ordering,
|
||||
connected_prev_event_backfill_item.event_id,
|
||||
connected_prev_event_backfill_item.type,
|
||||
)
|
||||
)
|
||||
|
||||
events = await self.store.get_events_as_list(event_id_results)
|
||||
return sorted(
|
||||
events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
|
||||
)
|
||||
|
||||
@log_function
|
||||
async def on_backfill_request(
|
||||
self, origin: str, room_id: str, pdu_list: List[str], limit: int
|
||||
@@ -1280,22 +1069,6 @@ class FederationHandler:
|
||||
],
|
||||
)
|
||||
|
||||
# events = await self.get_backfill_events(origin, room_id, pdu_list, limit)
|
||||
# logger.info(
|
||||
# "new implementation backfill events(%d)=%s",
|
||||
# len(events),
|
||||
# [
|
||||
# "event_id=%s,depth=%d,body=%s,prevs=%s\n"
|
||||
# % (
|
||||
# event.event_id,
|
||||
# event.depth,
|
||||
# event.content.get("body", event.type),
|
||||
# event.prev_event_ids(),
|
||||
# )
|
||||
# for event in events
|
||||
# ],
|
||||
# )
|
||||
|
||||
events = await filter_events_for_server(self.storage, origin, events)
|
||||
|
||||
return events
|
||||
|
||||
@@ -995,116 +995,70 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
|
||||
)
|
||||
|
||||
async def get_connected_insertion_event_backfill_results(
|
||||
self, event_id: str, limit: int
|
||||
) -> List[BackfillQueueNavigationItem]:
|
||||
def _get_connected_insertion_event_backfill_results_txn(txn):
|
||||
# Look for the "insertion" events connected to the given event_id
|
||||
connected_insertion_event_query = """
|
||||
SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
|
||||
/* Get the depth of the insertion event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which points via prev_events to the given event_id */
|
||||
WHERE i.insertion_prev_event_id = ?
|
||||
LIMIT ?
|
||||
"""
|
||||
def _get_connected_batch_event_backfill_results_txn(
|
||||
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
|
||||
):
|
||||
# Find any batch connections of a given insertion event
|
||||
batch_connection_query = """
|
||||
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
|
||||
/* Find the batch that connects to the given insertion event */
|
||||
INNER JOIN batch_events AS c
|
||||
ON i.next_batch_id = c.batch_id
|
||||
/* Get the depth of the batch start event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which matches the given event_id */
|
||||
WHERE i.event_id = ?
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
connected_insertion_event_query,
|
||||
(event_id, limit),
|
||||
)
|
||||
connected_insertion_event_id_results = txn.fetchall()
|
||||
return [
|
||||
BackfillQueueNavigationItem(
|
||||
depth=row[0],
|
||||
stream_ordering=row[1],
|
||||
event_id=row[2],
|
||||
type=row[3],
|
||||
)
|
||||
for row in connected_insertion_event_id_results
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_connected_insertion_event_backfill_results",
|
||||
_get_connected_insertion_event_backfill_results_txn,
|
||||
# Find any batch connections for the given insertion event
|
||||
txn.execute(
|
||||
batch_connection_query,
|
||||
(insertion_event_id, limit),
|
||||
)
|
||||
|
||||
async def get_connected_batch_event_backfill_results(
|
||||
self, insertion_event_id: str, limit: int
|
||||
) -> List[BackfillQueueNavigationItem]:
|
||||
def _get_connected_batch_event_backfill_results_txn(txn):
|
||||
# Find any batch connections of a given insertion event
|
||||
batch_connection_query = """
|
||||
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
|
||||
/* Find the batch that connects to the given insertion event */
|
||||
INNER JOIN batch_events AS c
|
||||
ON i.next_batch_id = c.batch_id
|
||||
/* Get the depth of the batch start event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which matches the given event_id */
|
||||
WHERE i.event_id = ?
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# Find any batch connections for the given insertion event
|
||||
txn.execute(
|
||||
batch_connection_query,
|
||||
(insertion_event_id, limit),
|
||||
batch_start_event_id_results = txn.fetchall()
|
||||
return [
|
||||
BackfillQueueNavigationItem(
|
||||
depth=row[0],
|
||||
stream_ordering=row[1],
|
||||
event_id=row[2],
|
||||
type=row[3],
|
||||
)
|
||||
batch_start_event_id_results = txn.fetchall()
|
||||
return [
|
||||
BackfillQueueNavigationItem(
|
||||
depth=row[0],
|
||||
stream_ordering=row[1],
|
||||
event_id=row[2],
|
||||
type=row[3],
|
||||
)
|
||||
for row in batch_start_event_id_results
|
||||
]
|
||||
for row in batch_start_event_id_results
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_connected_batch_event_backfill_results",
|
||||
_get_connected_batch_event_backfill_results_txn,
|
||||
def _get_connected_prev_event_backfill_results_txn(
|
||||
self, txn: LoggingTransaction, event_id: str, limit: int
|
||||
):
|
||||
# Look for the prev_event_id connected to the given event_id
|
||||
connected_prev_event_query = """
|
||||
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
|
||||
/* Get the depth and stream_ordering of the prev_event_id from the events table */
|
||||
INNER JOIN events
|
||||
ON prev_event_id = events.event_id
|
||||
/* Look for an edge which matches the given event_id */
|
||||
WHERE event_edges.event_id = ?
|
||||
AND event_edges.is_state = ?
|
||||
/* Because we can have many events at the same depth,
|
||||
* we want to also tie-break and sort on stream_ordering */
|
||||
ORDER BY depth DESC, stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
connected_prev_event_query,
|
||||
(event_id, False, limit),
|
||||
)
|
||||
|
||||
async def get_connected_prev_event_backfill_results(
|
||||
self, event_id: str, limit: int
|
||||
) -> List[BackfillQueueNavigationItem]:
|
||||
def _get_connected_prev_event_backfill_results_txn(txn):
|
||||
# Look for the prev_event_id connected to the given event_id
|
||||
connected_prev_event_query = """
|
||||
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
|
||||
/* Get the depth and stream_ordering of the prev_event_id from the events table */
|
||||
INNER JOIN events
|
||||
ON prev_event_id = events.event_id
|
||||
/* Look for an edge which matches the given event_id */
|
||||
WHERE event_edges.event_id = ?
|
||||
AND event_edges.is_state = ?
|
||||
/* Because we can have many events at the same depth,
|
||||
* we want to also tie-break and sort on stream_ordering */
|
||||
ORDER BY depth DESC, stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(
|
||||
connected_prev_event_query,
|
||||
(event_id, False, limit),
|
||||
prev_event_id_results = txn.fetchall()
|
||||
return [
|
||||
BackfillQueueNavigationItem(
|
||||
depth=row[0],
|
||||
stream_ordering=row[1],
|
||||
event_id=row[2],
|
||||
type=row[3],
|
||||
)
|
||||
prev_event_id_results = txn.fetchall()
|
||||
return [
|
||||
BackfillQueueNavigationItem(
|
||||
depth=row[0],
|
||||
stream_ordering=row[1],
|
||||
event_id=row[2],
|
||||
type=row[3],
|
||||
)
|
||||
for row in prev_event_id_results
|
||||
]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_connected_prev_event_backfill_results",
|
||||
_get_connected_prev_event_backfill_results_txn,
|
||||
)
|
||||
for row in prev_event_id_results
|
||||
]
|
||||
|
||||
async def get_backfill_events(
|
||||
self, room_id: str, seed_event_id_list: list, limit: int
|
||||
@@ -1130,6 +1084,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||
)
|
||||
|
||||
def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
|
||||
"""
|
||||
We want to make sure that we do a breadth-first, "depth" ordered search.
|
||||
We also handle navigating historical branches of history connected by
|
||||
insertion and batch events.
|
||||
"""
|
||||
logger.info(
|
||||
"_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
|
||||
room_id,
|
||||
@@ -1139,47 +1098,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||
|
||||
event_id_results = set()
|
||||
|
||||
# We want to make sure that we do a breadth-first, "depth" ordered
|
||||
# search.
|
||||
|
||||
# Look for the prev_event_id connected to the given event_id
|
||||
connected_prev_event_query = """
|
||||
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
|
||||
/* Get the depth and stream_ordering of the prev_event_id from the events table */
|
||||
INNER JOIN events
|
||||
ON prev_event_id = events.event_id
|
||||
/* Look for an edge which matches the given event_id */
|
||||
WHERE event_edges.event_id = ?
|
||||
AND event_edges.is_state = ?
|
||||
/* Because we can have many events at the same depth,
|
||||
* we want to also tie-break and sort on stream_ordering */
|
||||
ORDER BY depth DESC, stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# Look for the "insertion" events connected to the given event_id
|
||||
connected_insertion_event_query = """
|
||||
SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
|
||||
/* Get the depth of the insertion event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which points via prev_events to the given event_id */
|
||||
WHERE i.insertion_prev_event_id = ?
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# Find any batch connections of a given insertion event
|
||||
batch_connection_query = """
|
||||
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
|
||||
/* Find the batch that connects to the given insertion event */
|
||||
INNER JOIN batch_events AS c
|
||||
ON i.next_batch_id = c.batch_id
|
||||
/* Get the depth of the batch start event from the events table */
|
||||
INNER JOIN events AS e USING (event_id)
|
||||
/* Find an insertion event which matches the given event_id */
|
||||
WHERE i.event_id = ?
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# In a PriorityQueue, the lowest valued entries are retrieved first.
|
||||
# We're using depth as the priority in the queue and tie-break based on
|
||||
# stream_ordering. Depth is lowest at the oldest-in-time message and
|
||||
@@ -1233,70 +1151,61 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||
|
||||
# Try and find any potential historical batches of message history.
|
||||
if self.hs.config.experimental.msc2716_enabled:
|
||||
# # First we look for an insertion event connected to the current
|
||||
# # event (by prev_event). If we find any, we'll add them to the queue
|
||||
# # and navigate up the DAG like normal in the next iteration of the
|
||||
# # loop.
|
||||
# txn.execute(
|
||||
# connected_insertion_event_query,
|
||||
# (event_id, limit - len(event_id_results)),
|
||||
# )
|
||||
# connected_insertion_event_id_results = txn.fetchall()
|
||||
# logger.debug(
|
||||
# "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
|
||||
# room_id,
|
||||
# connected_insertion_event_id_results,
|
||||
# )
|
||||
# for row in connected_insertion_event_id_results:
|
||||
# connected_insertion_event_depth = row[0]
|
||||
# connected_insertion_event_stream_ordering = row[1]
|
||||
# connected_insertion_event_id = row[2]
|
||||
# connected_insertion_event_type = row[3]
|
||||
# if connected_insertion_event_id not in event_id_results:
|
||||
# queue.put(
|
||||
# (
|
||||
# -connected_insertion_event_depth,
|
||||
# -connected_insertion_event_stream_ordering,
|
||||
# connected_insertion_event_id,
|
||||
# connected_insertion_event_type,
|
||||
# )
|
||||
# )
|
||||
|
||||
# Second, we need to go and try to find any batch events connected
|
||||
# We need to go and try to find any batch events connected
|
||||
# to a given insertion event (by batch_id). If we find any, we'll
|
||||
# add them to the queue and navigate up the DAG like normal in the
|
||||
# next iteration of the loop.
|
||||
if event_type == EventTypes.MSC2716_INSERTION:
|
||||
# Find any batch connections for the given insertion event
|
||||
txn.execute(
|
||||
batch_connection_query,
|
||||
(event_id, limit - len(event_id_results)),
|
||||
connected_batch_event_backfill_results = (
|
||||
self._get_connected_batch_event_backfill_results_txn(
|
||||
txn, event_id, limit - len(event_id_results)
|
||||
)
|
||||
)
|
||||
batch_start_event_id_results = txn.fetchall()
|
||||
logger.debug(
|
||||
"_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
|
||||
"_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
|
||||
room_id,
|
||||
batch_start_event_id_results,
|
||||
connected_batch_event_backfill_results,
|
||||
)
|
||||
for row in batch_start_event_id_results:
|
||||
if row[2] not in event_id_results:
|
||||
queue.put((-row[0], -row[1], row[2], row[3]))
|
||||
for (
|
||||
connected_batch_event_backfill_item
|
||||
) in connected_batch_event_backfill_results:
|
||||
if (
|
||||
connected_batch_event_backfill_item.event_id
|
||||
not in event_id_results
|
||||
):
|
||||
queue.put(
|
||||
(
|
||||
-connected_batch_event_backfill_item.depth,
|
||||
-connected_batch_event_backfill_item.stream_ordering,
|
||||
connected_batch_event_backfill_item.event_id,
|
||||
connected_batch_event_backfill_item.type,
|
||||
)
|
||||
)
|
||||
|
||||
# Now we just look up the DAG by prev_events as normal
|
||||
txn.execute(
|
||||
connected_prev_event_query,
|
||||
(event_id, False, limit - len(event_id_results)),
|
||||
connected_prev_event_backfill_results = (
|
||||
self._get_connected_prev_event_backfill_results_txn(
|
||||
txn, event_id, limit - len(event_id_results)
|
||||
)
|
||||
)
|
||||
prev_event_id_results = txn.fetchall()
|
||||
logger.debug(
|
||||
"_get_backfill_events(room_id=%s): prev_event_ids %s",
|
||||
"_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
|
||||
room_id,
|
||||
prev_event_id_results,
|
||||
connected_prev_event_backfill_results,
|
||||
)
|
||||
|
||||
for row in prev_event_id_results:
|
||||
if row[2] not in event_id_results:
|
||||
queue.put((-row[0], -row[1], row[2], row[3]))
|
||||
for (
|
||||
connected_prev_event_backfill_item
|
||||
) in connected_prev_event_backfill_results:
|
||||
if connected_prev_event_backfill_item.event_id not in event_id_results:
|
||||
queue.put(
|
||||
(
|
||||
-connected_prev_event_backfill_item.depth,
|
||||
-connected_prev_event_backfill_item.stream_ordering,
|
||||
connected_prev_event_backfill_item.event_id,
|
||||
connected_prev_event_backfill_item.type,
|
||||
)
|
||||
)
|
||||
|
||||
return event_id_results
|
||||
|
||||
|
||||
Reference in New Issue
Block a user