Check state epoch on state group persistence
This commit is contained in:
@@ -118,8 +118,8 @@ class Databases(Generic[DataStoreT]):
|
||||
if state:
|
||||
raise Exception("'state' data store already configured")
|
||||
|
||||
state = StateGroupDataStore(database, db_conn, hs)
|
||||
state_epochs = StateEpochDataStore(database, db_conn, hs)
|
||||
state = StateGroupDataStore(database, db_conn, hs, state_epochs)
|
||||
|
||||
db_conn.commit()
|
||||
|
||||
|
||||
@@ -157,3 +157,31 @@ class StateEpochDataStore:
|
||||
keyvalues={"state_group": state_group},
|
||||
desc="mark_state_group_as_used",
|
||||
)
|
||||
|
||||
def check_prev_group_before_insertion_txn(
|
||||
self, txn: LoggingTransaction, prev_group: int, new_groups: Collection[int]
|
||||
) -> None:
|
||||
sql = """
|
||||
SELECT state_epoch, (SELECT state_epoch FROM state_epoch)
|
||||
FROM state_groups_pending_deletion
|
||||
WHERE state_group = ?
|
||||
"""
|
||||
txn.execute(sql, (prev_group,))
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
pending_deletion_epoch, current_epoch = row
|
||||
if current_epoch - pending_deletion_epoch >= 2:
|
||||
raise Exception("") # TODO
|
||||
|
||||
self.db_pool.simple_update_txn(
|
||||
txn,
|
||||
table="state_groups_pending_deletion",
|
||||
keyvalues={"state_group": prev_group},
|
||||
updatevalues={"state_epoch": current_epoch},
|
||||
)
|
||||
self.db_pool.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_pending_deletion",
|
||||
keys=("state_group", "state_epoch"),
|
||||
values=[(state_group, current_epoch) for state_group in new_groups],
|
||||
)
|
||||
|
||||
@@ -58,6 +58,7 @@ from synapse.util.cancellation import cancellable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.state.epochs import StateEpochDataStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -86,8 +87,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
epoch_store: "StateEpochDataStore",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
self._epoch_store = epoch_store
|
||||
|
||||
# Originally the state store used a single DictionaryCache to cache the
|
||||
# event IDs for the state types in a given state group to avoid hammering
|
||||
@@ -471,16 +474,6 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
A list of state groups
|
||||
"""
|
||||
|
||||
# TODO: Check state epochs
|
||||
#
|
||||
# If prev_group is in state_deletions? => Mark these groups as for
|
||||
# deletion too in the same state epoch. Then clear them when we
|
||||
# persist the events. Maybe bump epoch to the latest?
|
||||
#
|
||||
# Deny if scheduled for deletion in an old state epoch that is too old?
|
||||
#
|
||||
# OR just copy over stuff from `prev_group`?
|
||||
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
@@ -560,6 +553,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
for key, state_id in context.state_delta_due_to_event.items()
|
||||
],
|
||||
)
|
||||
|
||||
# We need to check that the prev group isn't about to be deleted
|
||||
self._epoch_store.check_prev_group_before_insertion_txn(
|
||||
txn, prev_group, state_groups
|
||||
)
|
||||
|
||||
return events_and_context
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
@@ -616,7 +615,6 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
needs to be persisted as a full state.
|
||||
"""
|
||||
|
||||
# TODO: Check state epoch.
|
||||
is_in_db = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
@@ -660,6 +658,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
],
|
||||
)
|
||||
|
||||
# We need to check that the prev group isn't about to be deleted
|
||||
self._epoch_store.check_prev_group_before_insertion_txn(
|
||||
txn, prev_group, [state_group]
|
||||
)
|
||||
|
||||
return state_group
|
||||
|
||||
def insert_full_state_txn(
|
||||
|
||||
Reference in New Issue
Block a user