1
0

Implement backgroud update for chunks

This commit is contained in:
Erik Johnston
2018-06-05 10:49:43 +01:00
parent fbafc86aca
commit 2d97fb6740
2 changed files with 76 additions and 0 deletions

View File

@@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
psql_only=True,
)
self.register_background_update_handler(
self.EVENT_FIELDS_CHUNK,
self._background_compute_chunks,
)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -1823,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(result)
@defer.inlineCallbacks
def _background_compute_chunks(self, progress, batch_size):
"""Iterates over events and assigns them chunk IDs
"""
up_to_stream_id = progress.get("up_to_stream_id")
if up_to_stream_id is None:
up_to_stream_id = self.get_current_events_token() + 1
rows_inserted = progress.get("rows_inserted", 0)
def reindex_chunks_txn(txn):
txn.execute("""
SELECT stream_ordering, room_id, event_id FROM events
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
ORDER BY stream_ordering DESC
LIMIT ?
""", (up_to_stream_id, False, batch_size))
rows = txn.fetchall()
stream_ordering = up_to_stream_id
for stream_ordering, room_id, event_id in rows:
prev_events = self._simple_select_onecol_txn(
txn,
table="event_edges",
keyvalues={
"event_id": event_id,
},
retcol="prev_event_id",
)
chunk_id, topo = self._insert_into_chunk_txn(
txn, room_id, event_id, prev_events,
)
self._simple_update_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
updatevalues={
"chunk_id": chunk_id,
"topological_ordering": topo,
},
)
progress = {
"up_to_stream_id": stream_ordering,
"rows_inserted": rows_inserted + len(rows)
}
self._background_update_progress_txn(
txn, self.EVENT_FIELDS_CHUNK, progress
)
return len(rows)
result = yield self.runInteraction(
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
)
if not result:
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
defer.returnValue(result)
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()

View File

@@ -58,6 +58,10 @@ CREATE TABLE chunk_linearized (
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
INSERT into background_updates (update_name, progress_json)
VALUES ('event_fields_chunk_id', '{}');
"""