WIP
This commit is contained in:
@@ -194,6 +194,7 @@ class StateHandler:
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
self._events_shard_config = hs.config.worker.events_shard_config
|
||||
self._instance_name = hs.get_instance_name()
|
||||
self._state_store = hs.get_datastores().state
|
||||
|
||||
self._update_current_state_client = (
|
||||
ReplicationUpdateCurrentStateRestServlet.make_client(hs)
|
||||
@@ -311,6 +312,8 @@ class StateHandler:
|
||||
"""
|
||||
assert not event.internal_metadata.is_outlier()
|
||||
|
||||
state_epoch = await self._state_store.get_state_epoch() # TODO: Get state epoch
|
||||
|
||||
#
|
||||
# first of all, figure out the state before the event, unless we
|
||||
# already have it.
|
||||
@@ -396,6 +399,7 @@ class StateHandler:
|
||||
delta_ids_to_state_group_before_event=deltas_to_state_group_before_event,
|
||||
partial_state=partial_state,
|
||||
state_map_before_event=state_ids_before_event,
|
||||
state_epoch=state_epoch,
|
||||
)
|
||||
|
||||
#
|
||||
@@ -426,6 +430,7 @@ class StateHandler:
|
||||
delta_ids_to_state_group_before_event=deltas_to_state_group_before_event,
|
||||
partial_state=partial_state,
|
||||
state_map_before_event=state_ids_before_event,
|
||||
state_epoch=state_epoch,
|
||||
)
|
||||
|
||||
async def compute_event_context(
|
||||
@@ -511,6 +516,9 @@ class StateHandler:
|
||||
) = await self._state_storage_controller.get_state_group_delta(
|
||||
state_group_id
|
||||
)
|
||||
|
||||
# TODO: Check for deleted state groups
|
||||
|
||||
return _StateCacheEntry(
|
||||
state=None,
|
||||
state_group=state_group_id,
|
||||
@@ -663,6 +671,7 @@ class StateResolutionHandler:
|
||||
async with self.resolve_linearizer.queue(group_names):
|
||||
cache = self._state_cache.get(group_names, None)
|
||||
if cache:
|
||||
# TODO: Check for deleted state groups
|
||||
return cache
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -635,6 +635,35 @@ class EventsPersistenceStorageController:
|
||||
room_id, [e for e, _ in chunk]
|
||||
)
|
||||
|
||||
# TODO: Clear pending deletion status of state groups that are
|
||||
# referenced. Should we mark as to be deleted in the case we fail to
|
||||
# persist below? No, as we don't know how long the below will take.
|
||||
#
|
||||
# TODO: Check that state epoch isn't too old.
|
||||
#
|
||||
# TODO: Add a table to track what state groups we're currently
|
||||
# inserting? There's a race where this transaction takes so long
|
||||
# that we delete the state groups we're inserting.
|
||||
#
|
||||
# NYARGH: We don't have access to the state store here? Do we do
|
||||
# this a layer above? There's not *that* much happening here?
|
||||
|
||||
min_state_epoch = min(ctx.state_epoch for _, ctx in chunk)
|
||||
state_groups = {
|
||||
ctx.state_group
|
||||
for _, ctx in chunk
|
||||
if ctx.state_group and not ctx.rejected
|
||||
}
|
||||
state_groups.update(
|
||||
ctx.state_group_before_event
|
||||
for _, ctx in chunk
|
||||
if ctx.state_group_before_event is not None
|
||||
)
|
||||
await self.state_store.mark_state_groups_as_used(
|
||||
min_state_epoch,
|
||||
state_groups,
|
||||
)
|
||||
|
||||
await self.persist_events_store._persist_events_and_state_updates(
|
||||
room_id,
|
||||
chunk,
|
||||
|
||||
@@ -36,7 +36,11 @@ import attr
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase
|
||||
from synapse.events.snapshot import (
|
||||
EventContext,
|
||||
UnpersistedEventContext,
|
||||
UnpersistedEventContextBase,
|
||||
)
|
||||
from synapse.logging.opentracing import tag_args, trace
|
||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
@@ -171,6 +175,60 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
"_advance_state_epoch", advance_state_epoch_txn, db_autocommit=True
|
||||
)
|
||||
|
||||
async def get_state_epoch(self) -> int:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="state_epoch",
|
||||
retcol="state_epoch",
|
||||
keyvalues={},
|
||||
desc="get_state_epoch",
|
||||
)
|
||||
|
||||
async def mark_state_groups_as_used(
|
||||
self, event_and_contexts: Collection[Tuple[EventBase, EventContext]]
|
||||
) -> None:
|
||||
min_state_epoch = min(ctx.state_epoch for _, ctx in event_and_contexts)
|
||||
state_groups = {
|
||||
ctx.state_group
|
||||
for _, ctx in event_and_contexts
|
||||
if ctx.state_group and not ctx.rejected
|
||||
}
|
||||
state_groups.update(
|
||||
ctx.state_group_before_event
|
||||
for _, ctx in event_and_contexts
|
||||
if ctx.state_group_before_event is not None
|
||||
)
|
||||
|
||||
if not state_groups:
|
||||
return
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
"mark_state_groups_as_used",
|
||||
self._mark_state_groups_as_used_txn,
|
||||
min_state_epoch,
|
||||
state_groups,
|
||||
)
|
||||
|
||||
def _mark_state_groups_as_used_txn(
|
||||
self, txn: LoggingTransaction, state_epoch: int, state_groups: Collection[int]
|
||||
) -> None:
|
||||
current_state_epoch = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_epoch",
|
||||
retcol="state_epoch",
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
# TODO: Move to constant. Is the equality correct?
|
||||
if current_state_epoch - state_epoch >= 2:
|
||||
raise Exception("FOO")
|
||||
|
||||
self.db_pool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="state_groups_pending_deletion",
|
||||
keys=("state_group",),
|
||||
values=[(state_group,) for state_group in state_groups],
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def is_state_group_pending_deletion_before(
|
||||
self, state_epoch: int, state_group: int
|
||||
@@ -535,7 +593,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
#
|
||||
# If prev_group is in state_deletions? => Mark these groups as for
|
||||
# deletion too in the same state epoch. Then clear them when we
|
||||
# persist the events.
|
||||
# persist the events. Maybe bump epoch to the latest?
|
||||
#
|
||||
# Deny if scheduled for deletion in an old state epoch that is too old?
|
||||
#
|
||||
# OR just copy over stuff from `prev_group`?
|
||||
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
|
||||
Reference in New Issue
Block a user