Support backfilling with MultiWriteIdGen
This commit is contained in:
@@ -80,4 +80,4 @@ class SlavedEventStore(
|
||||
return self._stream_id_gen.get_current_token()
|
||||
|
||||
def get_room_min_stream_ordering(self):
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
||||
@@ -155,8 +155,8 @@ class PersistEventsStore:
|
||||
# Note: Multiple instances of this function cannot be in flight at
|
||||
# the same time for the same room.
|
||||
if backfilled:
|
||||
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
|
||||
len(events_and_contexts)
|
||||
stream_ordering_manager = await maybe_awaitable(
|
||||
self._backfill_id_gen.get_next_mult(len(events_and_contexts))
|
||||
)
|
||||
else:
|
||||
stream_ordering_manager = await maybe_awaitable(
|
||||
@@ -164,6 +164,9 @@ class PersistEventsStore:
|
||||
)
|
||||
|
||||
with stream_ordering_manager as stream_orderings:
|
||||
if backfilled:
|
||||
stream_orderings = [-s for s in stream_orderings]
|
||||
|
||||
for (event, context), stream in zip(events_and_contexts, stream_orderings):
|
||||
event.internal_metadata.stream_ordering = stream
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.logging.context import PreserveLoggingContext, current_context
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams import BackfillStream
|
||||
from synapse.replication.tcp.streams.events import EventsStream
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||
@@ -92,18 +91,26 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
id_column="stream_ordering",
|
||||
sequence_name="events_stream_seq",
|
||||
)
|
||||
self._backfill_id_gen = MultiWriterIdGenerator(
|
||||
db_conn=db_conn,
|
||||
db=database,
|
||||
instance_name=hs.get_instance_name(),
|
||||
table="events",
|
||||
instance_column="instance_name",
|
||||
id_column="-stream_ordering",
|
||||
sequence_name="events_backfill_stream_seq",
|
||||
)
|
||||
else:
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "events", "stream_ordering",
|
||||
)
|
||||
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"stream_ordering",
|
||||
step=-1,
|
||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
self._backfill_id_gen = StreamIdGenerator(
|
||||
db_conn,
|
||||
"events",
|
||||
"-stream_ordering",
|
||||
# extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||
)
|
||||
else:
|
||||
# Another process is in charge of persisting events and generating
|
||||
# stream IDs: rely on the replication streams to let us know which
|
||||
@@ -117,8 +124,14 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
id_column="stream_ordering",
|
||||
sequence_name="events_stream_seq",
|
||||
)
|
||||
self._backfill_id_gen = SlavedIdTracker(
|
||||
db_conn, "events", "stream_ordering", step=-1
|
||||
self._backfill_id_gen = MultiWriterIdGenerator(
|
||||
db_conn=db_conn,
|
||||
db=database,
|
||||
instance_name=hs.get_instance_name(),
|
||||
table="events",
|
||||
instance_column="instance_name",
|
||||
id_column="-stream_ordering",
|
||||
sequence_name="events_backfill_stream_seq",
|
||||
)
|
||||
|
||||
self._get_event_cache = Cache(
|
||||
@@ -136,7 +149,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
if stream_name == EventsStream.NAME:
|
||||
self._stream_id_gen.advance(instance_name, token)
|
||||
elif stream_name == BackfillStream.NAME:
|
||||
self._backfill_id_gen.advance(-token)
|
||||
self._backfill_id_gen.advance(instance_name, token)
|
||||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
@@ -987,7 +1000,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
|
||||
def get_current_events_token(self):
|
||||
"""The current maximum token that events have reached"""
|
||||
|
||||
@@ -18,3 +18,9 @@ CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
|
||||
SELECT setval('events_stream_seq', (
|
||||
SELECT COALESCE(MAX(stream_ordering), 1) FROM events
|
||||
));
|
||||
|
||||
CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
|
||||
|
||||
SELECT setval('events_backfill_stream_seq', (
|
||||
SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
|
||||
));
|
||||
|
||||
@@ -202,6 +202,7 @@ class MultiWriterIdGenerator:
|
||||
):
|
||||
self._db = db
|
||||
self._instance_name = instance_name
|
||||
self._sequence_name = sequence_name
|
||||
|
||||
# We lock as some functions may be called from DB threads.
|
||||
self._lock = threading.Lock()
|
||||
@@ -219,7 +220,7 @@ class MultiWriterIdGenerator:
|
||||
# and b) noting that if we have seen a run of persisted positions
|
||||
# without gaps (e.g. 5, 6, 7) then we can skip forward (e.g. to 7).
|
||||
self._persisted_upto_position = (
|
||||
min(self._current_positions.values()) if self._current_positions else 0
|
||||
min(self._current_positions.values()) if self._current_positions else 1
|
||||
)
|
||||
self._known_persisted_positions = [] # type: List[int]
|
||||
|
||||
@@ -415,7 +416,8 @@ class MultiWriterIdGenerator:
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"Got new_id: %s, setting persited pos to %s",
|
||||
"Got new_id %s: %s, setting persited pos to %s",
|
||||
self._sequence_name,
|
||||
new_id,
|
||||
self._persisted_upto_position,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user