Prune sliding_sync_connection_required_state table (#19306)
When we change the `required_state` config for a room in sliding sync, we insert a new entry into the `sliding_sync_connection_required_state` table. As the sliding sync connection advances we can accrue a lot of stale entries, so let's clear those out. This is a sort of follow on from #19211 --------- Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
1
changelog.d/19306.misc
Normal file
1
changelog.d/19306.misc
Normal file
@@ -0,0 +1 @@
|
||||
Prune stale entries from `sliding_sync_connection_required_state` table.
|
||||
@@ -450,6 +450,9 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
|
||||
# Now that we have seen the client has received and used the connection
|
||||
# position, we can delete all the other connection positions.
|
||||
#
|
||||
# Note: the rest of the code here assumes this is the only remaining
|
||||
# connection position.
|
||||
sql = """
|
||||
DELETE FROM sliding_sync_connection_positions
|
||||
WHERE connection_key = ? AND connection_position != ?
|
||||
@@ -485,9 +488,10 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
),
|
||||
)
|
||||
|
||||
required_state_map: dict[int, dict[str, set[str]]] = {}
|
||||
# Map from required_state_id -> event type -> set of state keys.
|
||||
stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {}
|
||||
for row in rows:
|
||||
state = required_state_map[row[0]] = {}
|
||||
state = stored_required_state_id_maps[row[0]] = {}
|
||||
for event_type, state_key in db_to_json(row[1]):
|
||||
state.setdefault(event_type, set()).add(state_key)
|
||||
|
||||
@@ -512,7 +516,44 @@ class SlidingSyncStore(SQLBaseStore):
|
||||
) in room_config_rows:
|
||||
room_configs[room_id] = RoomSyncConfig(
|
||||
timeline_limit=timeline_limit,
|
||||
required_state_map=required_state_map[required_state_id],
|
||||
required_state_map=stored_required_state_id_maps[required_state_id],
|
||||
)
|
||||
|
||||
# Clean up any `required_state_id`s that are no longer used by any
|
||||
# connection position on this connection.
|
||||
#
|
||||
# We store the required state config per-connection per-room. Since this
|
||||
# can be a lot of data, we deduplicate the required state JSON and store
|
||||
# it separately, with multiple rooms referencing the same `required_state_id`.
|
||||
# Over time as the required state configs change, some `required_state_id`s
|
||||
# may no longer be referenced by any room config, so we need
|
||||
# to clean them up.
|
||||
#
|
||||
# We do this by noting that we have pulled out *all* rows from
|
||||
# `sliding_sync_connection_required_state` for this connection above. We
|
||||
# have also pulled out all referenced `required_state_id`s for *this*
|
||||
# connection position, which is the only connection position that
|
||||
# remains (we deleted the others above).
|
||||
#
|
||||
# Thus we can compute the unused `required_state_id`s by looking for any
|
||||
# `required_state_id`s that are not referenced by the remaining connection
|
||||
# position.
|
||||
used_required_state_ids = {
|
||||
required_state_id for _, _, required_state_id in room_config_rows
|
||||
}
|
||||
|
||||
unused_required_state_ids = (
|
||||
stored_required_state_id_maps.keys() - used_required_state_ids
|
||||
)
|
||||
if unused_required_state_ids:
|
||||
self.db_pool.simple_delete_many_batch_txn(
|
||||
txn,
|
||||
table="sliding_sync_connection_required_state",
|
||||
keys=("connection_key", "required_state_id"),
|
||||
values=[
|
||||
(connection_key, required_state_id)
|
||||
for required_state_id in unused_required_state_ids
|
||||
],
|
||||
)
|
||||
|
||||
# Now look up the per-room stream data.
|
||||
|
||||
@@ -3120,6 +3120,248 @@ class SlidingSyncTablesTestCase(SlidingSyncTablesTestCaseBase):
|
||||
# The timestamp for user1 should be updated.
|
||||
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)
|
||||
|
||||
def test_pruning_sliding_sync_connection_required_state(self) -> None:
|
||||
"""Test that we prune old entries from
|
||||
`sliding_sync_connection_required_state`.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||
self.helper.send_state(
|
||||
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
|
||||
)
|
||||
|
||||
# Do an initial sync, this will pull down the above room and thus cause
|
||||
# us to store a single required state entry for the room.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Check that we have an entry in sliding_sync_connection_required_state
|
||||
connection_pos1 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
|
||||
connection_key = self.get_success(
|
||||
self.store.db_pool.simple_select_one_onecol(
|
||||
table="sliding_sync_connection_positions",
|
||||
keyvalues={"connection_position": connection_pos1},
|
||||
retcol="connection_key",
|
||||
)
|
||||
)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
# We expect a single entry here for the one room ID.
|
||||
self.assertEqual(len(required_state_entries), 1)
|
||||
first_required_state_id = required_state_entries[0][0]
|
||||
|
||||
# Update the sync body to request more required state, so that we get
|
||||
# another entry in the table.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Name, ""],
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# We need to send a message to cause the room to come down the next
|
||||
# sync. This shouldn't be necessary, but we don't currently implement
|
||||
# immediately sending down the room when required_state is updated,
|
||||
# see https://github.com/element-hq/synapse/issues/18844
|
||||
self.helper.send(room_id, "msg1", tok=user1_tok)
|
||||
|
||||
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
# We expect two entries here, one for old state and one for new state.
|
||||
# The old entry doesn't get pruned yet as the previous from_token could
|
||||
# still be used.
|
||||
self.assertEqual(len(required_state_entries), 2)
|
||||
|
||||
# Sync again with the latest token. This time we expect the old
|
||||
# entry to be pruned.
|
||||
self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(required_state_entries), 1)
|
||||
|
||||
# Double check that we have pruned the old entry.
|
||||
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)
|
||||
|
||||
def test_pruning_sliding_sync_connection_required_state_forks(self) -> None:
|
||||
"""Test that we prune entries in
|
||||
`sliding_sync_connection_required_state` for forked positions.
|
||||
"""
|
||||
|
||||
user1_id = self.register_user("user1", "pass")
|
||||
user1_tok = self.login(user1_id, "pass")
|
||||
|
||||
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
|
||||
self.helper.send_state(
|
||||
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
|
||||
)
|
||||
|
||||
# Do an initial sync, this will pull down the above room and thus cause
|
||||
# us to store a single required state entry for the room.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, tok=user1_tok)
|
||||
|
||||
# Check that we have an entry in sliding_sync_connection_required_state
|
||||
connection_pos1 = self.get_success(
|
||||
SlidingSyncStreamToken.from_string(self.store, from_token)
|
||||
).connection_position
|
||||
|
||||
connection_key = self.get_success(
|
||||
self.store.db_pool.simple_select_one_onecol(
|
||||
table="sliding_sync_connection_positions",
|
||||
keyvalues={"connection_position": connection_pos1},
|
||||
retcol="connection_key",
|
||||
)
|
||||
)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
# We expect a single entry here for the one room ID.
|
||||
self.assertEqual(len(required_state_entries), 1)
|
||||
first_required_state_id = required_state_entries[0][0]
|
||||
|
||||
# Update the sync body to request more required state, so that we get
|
||||
# another entry in the table.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Name, ""],
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# We need to send a message to cause the room to come down the next
|
||||
# sync. This shouldn't be necessary, but we don't currently implement
|
||||
# immediately sending down the room when required_state is updated,
|
||||
# see https://github.com/element-hq/synapse/issues/18844
|
||||
self.helper.send(room_id, "msg1", tok=user1_tok)
|
||||
|
||||
_, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
# We expect two entries here, one for old state and one for new state.
|
||||
# The old entry doesn't get pruned yet as the previous from_token could
|
||||
# still be used.
|
||||
self.assertEqual(len(required_state_entries), 2)
|
||||
second_required_state_id = sorted(required_state_entries)[1][0]
|
||||
|
||||
# We sync again, but with the old token, creating a fork in the
|
||||
# connection positions. We change the sync body again so that the
|
||||
# `required_state` doesn't get deduplicated.
|
||||
sync_body = {
|
||||
"lists": {
|
||||
"foo-list": {
|
||||
"ranges": [[0, 1]],
|
||||
"required_state": [
|
||||
[EventTypes.Topic, ""],
|
||||
[EventTypes.Member, StateValues.LAZY],
|
||||
],
|
||||
"timeline_limit": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
# There should now be three entries, one for each of the required_state.
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(required_state_entries), 3)
|
||||
|
||||
# Sync again with the latest token. This should prune all except the
|
||||
# latest entry in `sliding_sync_connection_required_state`.
|
||||
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
|
||||
|
||||
required_state_entries = self.get_success(
|
||||
self.store.db_pool.simple_select_list(
|
||||
table="sliding_sync_connection_required_state",
|
||||
keyvalues={"connection_key": connection_key},
|
||||
retcols=("required_state_id", "required_state"),
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(len(required_state_entries), 1)
|
||||
|
||||
# Double check that we have pruned the old entry.
|
||||
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)
|
||||
self.assertNotEqual(required_state_entries[0][0], second_required_state_id)
|
||||
|
||||
|
||||
class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user