Implement pagination using chunks
This commit is contained in:
@@ -235,7 +235,7 @@ class MessageHandler(BaseHandler):
|
||||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
events, next_key, extremities = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
|
||||
@@ -534,7 +534,7 @@ class RoomEventSource(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pagination_rows(self, user, config, key):
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
events, next_key, _ = yield self.store.paginate_room_events(
|
||||
room_id=key,
|
||||
from_key=config.from_key,
|
||||
to_key=config.to_key,
|
||||
|
||||
@@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
import abc
|
||||
@@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
end_token = RoomStreamToken.parse(end_token)
|
||||
|
||||
rows, token = yield self.runInteraction(
|
||||
rows, token, _ = yield self.runInteraction(
|
||||
"get_recent_event_ids_for_room", self._paginate_room_events_txn,
|
||||
room_id, from_token=end_token, limit=limit,
|
||||
)
|
||||
@@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
results["stream_ordering"],
|
||||
)
|
||||
|
||||
rows, start_token = self._paginate_room_events_txn(
|
||||
rows, start_token, _ = self._paginate_room_events_txn(
|
||||
txn, room_id, before_token, direction='b', limit=before_limit,
|
||||
)
|
||||
events_before = [r.event_id for r in rows]
|
||||
|
||||
rows, end_token = self._paginate_room_events_txn(
|
||||
rows, end_token, _ = self._paginate_room_events_txn(
|
||||
txn, room_id, after_token, direction='f', limit=after_limit,
|
||||
)
|
||||
events_after = [r.event_id for r in rows]
|
||||
@@ -720,12 +721,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
those that match the filter.
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
|
||||
as a list of _EventDictReturn and a token that points to the end
|
||||
of the result set.
|
||||
Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns
|
||||
the results as a list of _EventDictReturn, a token that points to
|
||||
the end of the result set, and a list of chunks iterated over.
|
||||
"""
|
||||
|
||||
assert int(limit) >= 0
|
||||
limit = int(limit) # Sometimes we are passed a string from somewhere
|
||||
assert limit >= 0
|
||||
|
||||
# There are two modes of fetching events: by stream order or by
|
||||
# topological order. This is determined by whether the from_token is a
|
||||
# stream or topological token. If stream then we can simply do a select
|
||||
# ordered by stream_ordering column. If topological, then we need to
|
||||
# fetch events from one chunk at a time until we hit the limit.
|
||||
|
||||
# Tokens really represent positions between elements, but we use
|
||||
# the convention of pointing to the event before the gap. Hence
|
||||
@@ -756,7 +764,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
bounds += " AND " + filter_clause
|
||||
args.extend(filter_args)
|
||||
|
||||
args.append(int(limit))
|
||||
args.append(limit)
|
||||
|
||||
sql = (
|
||||
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
|
||||
@@ -771,7 +779,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
|
||||
rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
|
||||
|
||||
# If we are paginating topologically and we haven't hit the limit on
|
||||
# number of events then we need to fetch events from the previous or
|
||||
# next chunk.
|
||||
|
||||
iterated_chunks = []
|
||||
|
||||
chunk_id = None
|
||||
if from_token.chunk: # FIXME: may be topological but no chunk.
|
||||
if rows:
|
||||
chunk_id = rows[-1].chunk_id
|
||||
iterated_chunks = [r.chunk_id for r in rows]
|
||||
else:
|
||||
chunk_id = from_token.chunk
|
||||
iterated_chunks = [chunk_id]
|
||||
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
)
|
||||
|
||||
if filter_clause:
|
||||
filter_clause = "AND " + filter_clause
|
||||
|
||||
sql = (
|
||||
"SELECT event_id, chunk_id, topological_ordering, stream_ordering"
|
||||
" FROM events"
|
||||
" WHERE outlier = ? AND room_id = ? %(filter_clause)s"
|
||||
" ORDER BY topological_ordering %(order)s,"
|
||||
" stream_ordering %(order)s LIMIT ?"
|
||||
) % {
|
||||
"filter_clause": filter_clause,
|
||||
"order": order,
|
||||
}
|
||||
|
||||
args = [False, room_id] + filter_args + [limit]
|
||||
|
||||
while chunk_id and (limit <= 0 or len(rows) < limit):
|
||||
if chunk_id not in iterated_chunks:
|
||||
iterated_chunks.append(chunk_id)
|
||||
|
||||
if direction == 'b':
|
||||
chunk_id = table.get_prev(chunk_id)
|
||||
else:
|
||||
chunk_id = table.get_next(chunk_id)
|
||||
|
||||
if chunk_id is None:
|
||||
break
|
||||
|
||||
txn.execute(sql, args)
|
||||
new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn]
|
||||
|
||||
if not new_rows:
|
||||
break
|
||||
|
||||
rows.extend(new_rows)
|
||||
|
||||
if limit > 0:
|
||||
rows = rows[:limit]
|
||||
|
||||
if rows:
|
||||
chunk = rows[-1].chunk_id
|
||||
@@ -809,18 +875,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
those that match the filter.
|
||||
|
||||
Returns:
|
||||
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||
a token that points to the end of the result set. The dicts have
|
||||
the keys "event_id", "topological_ordering" and "stream_orderign".
|
||||
tuple[list[dict], str, list[str]]: Returns the results as a list of
|
||||
dicts, a token that points to the end of the result set, and a list
|
||||
of backwards extremities. The dicts have the keys "event_id",
|
||||
"topological_ordering" and "stream_ordering".
|
||||
"""
|
||||
|
||||
from_key = RoomStreamToken.parse(from_key)
|
||||
if to_key:
|
||||
to_key = RoomStreamToken.parse(to_key)
|
||||
|
||||
rows, token = yield self.runInteraction(
|
||||
"paginate_room_events", self._paginate_room_events_txn,
|
||||
room_id, from_key, to_key, direction, limit, event_filter,
|
||||
def _do_paginate_room_events(txn):
|
||||
rows, token, chunks = self._paginate_room_events_txn(
|
||||
txn, room_id, from_key, to_key, direction, limit, event_filter,
|
||||
)
|
||||
|
||||
# We now fetch the extremities by fetching the extremities for
|
||||
# each chunk we iterated over.
|
||||
extremities = []
|
||||
seen = set()
|
||||
for chunk_id in chunks:
|
||||
if chunk_id in seen:
|
||||
continue
|
||||
seen.add(chunk_id)
|
||||
|
||||
event_ids = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="chunk_backwards_extremities",
|
||||
keyvalues={"chunk_id": chunk_id},
|
||||
retcol="event_id"
|
||||
)
|
||||
|
||||
extremities.extend(e for e in event_ids if e not in extremities)
|
||||
|
||||
return rows, token, extremities
|
||||
|
||||
rows, token, extremities = yield self.runInteraction(
|
||||
"paginate_room_events", _do_paginate_room_events,
|
||||
)
|
||||
|
||||
events = yield self._get_events(
|
||||
@@ -830,7 +921,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
|
||||
self._set_before_and_after(events, rows)
|
||||
|
||||
defer.returnValue((events, token))
|
||||
defer.returnValue((events, token, extremities))
|
||||
|
||||
|
||||
class StreamStore(StreamWorkerStore):
|
||||
|
||||
Reference in New Issue
Block a user