WIP
This commit is contained in:
@@ -644,25 +644,7 @@ class EventsPersistenceStorageController:
|
||||
# TODO: Add a table to track what state groups we're currently
|
||||
# inserting? There's a race where this transaction takes so long
|
||||
# that we delete the state groups we're inserting.
|
||||
#
|
||||
# NYARGH: We don't have access to the state store here? Do we do
|
||||
# this a layer above? There's not *that* much happening here?
|
||||
|
||||
min_state_epoch = min(ctx.state_epoch for _, ctx in chunk)
|
||||
state_groups = {
|
||||
ctx.state_group
|
||||
for _, ctx in chunk
|
||||
if ctx.state_group and not ctx.rejected
|
||||
}
|
||||
state_groups.update(
|
||||
ctx.state_group_before_event
|
||||
for _, ctx in chunk
|
||||
if ctx.state_group_before_event is not None
|
||||
)
|
||||
await self.state_store.mark_state_groups_as_used(
|
||||
min_state_epoch,
|
||||
state_groups,
|
||||
)
|
||||
await self.state_store.mark_state_groups_as_used(events_and_contexts)
|
||||
|
||||
await self.persist_events_store._persist_events_and_state_updates(
|
||||
room_id,
|
||||
|
||||
@@ -186,26 +186,34 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
async def mark_state_groups_as_used(
|
||||
self, event_and_contexts: Collection[Tuple[EventBase, EventContext]]
|
||||
) -> None:
|
||||
min_state_epoch = min(ctx.state_epoch for _, ctx in event_and_contexts)
|
||||
state_groups = {
|
||||
ctx.state_group
|
||||
for _, ctx in event_and_contexts
|
||||
if ctx.state_group and not ctx.rejected
|
||||
}
|
||||
state_groups.update(
|
||||
ctx.state_group_before_event
|
||||
for _, ctx in event_and_contexts
|
||||
if ctx.state_group_before_event is not None
|
||||
)
|
||||
referenced_state_groups = []
|
||||
state_epochs = []
|
||||
for event, ctx in event_and_contexts:
|
||||
if ctx.rejected or event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
if not state_groups:
|
||||
assert ctx.state_epoch is not None
|
||||
assert ctx.state_group is not None
|
||||
|
||||
state_epochs.append(ctx.state_epoch)
|
||||
|
||||
referenced_state_groups.append(ctx.state_group)
|
||||
|
||||
if ctx.state_group_before_event:
|
||||
referenced_state_groups.append(ctx.state_group_before_event)
|
||||
|
||||
if not referenced_state_groups:
|
||||
# We don't reference any state groups, so nothing to do
|
||||
return
|
||||
|
||||
assert state_epochs # If we have state groups we have a state epoch
|
||||
min_state_epoch = min(state_epochs)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"mark_state_groups_as_used",
|
||||
self._mark_state_groups_as_used_txn,
|
||||
min_state_epoch,
|
||||
state_groups,
|
||||
referenced_state_groups,
|
||||
)
|
||||
|
||||
def _mark_state_groups_as_used_txn(
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 88 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 89 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
|
||||
Reference in New Issue
Block a user