Compare commits

...

12 Commits

Author SHA1 Message Date
Eric Eastwood
4b2a5fe74f Merge branch 'develop' into madlittlemods/re-use-work-to-grab-state-from-previous-group 2023-05-18 15:00:07 -05:00
Eric Eastwood
79e6d61141 Fix typo 2023-05-18 02:49:39 -05:00
Eric Eastwood
7abb745003 Fix lint 2023-05-18 02:32:05 -05:00
Eric Eastwood
17aeee764d More comments 2023-05-18 02:31:12 -05:00
Eric Eastwood
333fc51e6c Replace sketchy string manip with placeholders 2023-05-18 02:20:31 -05:00
Eric Eastwood
1f60fcb1af Remove debug logs 2023-05-18 02:06:24 -05:00
Eric Eastwood
ab576b6b6b Fix when the state_filter prevented us from returning any rows before 2023-05-18 01:59:34 -05:00
Eric Eastwood
3d80449d6b Fix empty case 2023-05-17 21:57:13 -05:00
Eric Eastwood
5704e3b0fd Fix lints 2023-05-17 20:37:19 -05:00
Eric Eastwood
02a9959a6f Add changelog 2023-05-17 20:24:51 -05:00
Eric Eastwood
6a19afcdad Re-use work from previous state_groups 2023-05-17 20:03:28 -05:00
Eric Eastwood
4676e53e65 Start of idea to re-use work of getting state for a given state_group 2023-05-17 18:57:59 -05:00
2 changed files with 116 additions and 19 deletions

View File

@@ -0,0 +1 @@
Make `/messages` faster by efficiently grabbing state out of database whenever we have to backfill and process new events.

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Set, Tuple, Union
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
@@ -89,6 +89,18 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
groups: List[int],
state_filter: Optional[StateFilter] = None,
) -> Mapping[int, StateMap[str]]:
"""
Given a number of state groups, fetch the latest state for each group.
Args:
txn: The transaction object.
groups: The given state groups that you want to fetch the latest state for.
state_filter: The state filter to apply the state we fetch state from the database.
Returns:
Map from state_group to a StateMap at that point.
"""
state_filter = state_filter or StateFilter.all()
results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups}
@@ -98,24 +110,49 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
# a temporary hack until we can add the right indices in
txn.execute("SET LOCAL enable_seqscan=off")
# The below query walks the state_group tree so that the "state"
# The query below walks the state_group tree so that the "state"
# table includes all state_groups in the tree. It then joins
# against `state_groups_state` to fetch the latest state.
# It assumes that previous state groups are always numerically
# lesser.
# This may return multiple rows per (type, state_key), but last_value
# should be the same.
sql = """
WITH RECURSIVE sgs(state_group) AS (
VALUES(?::bigint)
WITH RECURSIVE sgs(state_group, state_group_reached) AS (
VALUES(?::bigint, NULL::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, sgs s
WHERE s.state_group = e.state_group
SELECT
prev_state_group,
CASE
/* Specify state_groups we have already done the work for */
WHEN @prev_state_group IN (%s /* state_groups_we_have_already_fetched_string */) THEN prev_state_group
ELSE NULL
END AS state_group_reached
FROM
state_group_edges e, sgs s
WHERE
s.state_group = e.state_group
/* Stop when we connect up to another state_group that we already did the work for */
AND s.state_group_reached IS NULL
)
%s
%s /* overall_select_clause */
"""
overall_select_query_args: List[Union[int, str]] = []
# Make sure we always have a row that tells us if we linked up to another
# state_group chain that we already processed (indicated by
# `state_group_reached`) regardless of whether we find any state according
# to the state_filter.
#
# We use a `UNION ALL` to make sure it is always the first row returned.
# `UNION` will merge and sort in with the rows from the next query
# otherwise.
overall_select_clause = """
(
SELECT NULL, NULL, NULL, state_group_reached
FROM sgs
ORDER BY state_group ASC
LIMIT 1
) UNION ALL (%s /* main_select_clause */)
"""
# This is an optimization to create a select clause per-condition. This
# makes the query planner a lot smarter on what rows should pull out in the
@@ -154,7 +191,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
f"""
(
SELECT DISTINCT ON (type, state_key)
type, state_key, event_id
type, state_key, event_id, state_group
FROM state_groups_state
INNER JOIN sgs USING (state_group)
WHERE {where_clause}
@@ -163,7 +200,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
"""
)
overall_select_clause = " UNION ".join(select_clause_list)
main_select_clause = " UNION ".join(select_clause_list)
else:
where_clause, where_args = state_filter.make_sql_filter_clause()
# Unless the filter clause is empty, we're going to append it after an
@@ -173,9 +210,9 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
overall_select_query_args.extend(where_args)
overall_select_clause = f"""
main_select_clause = f"""
SELECT DISTINCT ON (type, state_key)
type, state_key, event_id
type, state_key, event_id, state_group
FROM state_groups_state
WHERE state_group IN (
SELECT state_group FROM sgs
@@ -183,15 +220,73 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
ORDER BY type, state_key, state_group DESC
"""
for group in groups:
# We can sort from least to greatest state_group and re-use the work from a
# lesser state_group for a greater one if we see that the edge chain links
# up.
#
# What this means in practice is that if we fetch the latest state for
# `state_group = 20`, and then we want `state_group = 30`, it will traverse
# down the edge chain to `20`, see that we linked up to `20` and bail out
# early and re-use the work we did for `20`. This can have massive savings
# in rooms like Matrix HQ where the edge chain is 88k events long and
# fetching the mostly-same chain over and over isn't very efficient.
sorted_groups = sorted(groups)
state_groups_we_have_already_fetched: Set[int] = {
# We default to `[-1]` just to fill in the query with something that
# will have no effect but not bork our query when it would be empty
# otherwise
-1
}
for group in sorted_groups:
args: List[Union[int, str]] = [group]
args.extend(state_groups_we_have_already_fetched)
args.extend(overall_select_query_args)
txn.execute(sql % (overall_select_clause,), args)
state_groups_we_have_already_fetched_string = ", ".join(
["?::bigint"] * len(state_groups_we_have_already_fetched)
)
txn.execute(
sql
% (
state_groups_we_have_already_fetched_string,
overall_select_clause % (main_select_clause,),
),
args,
)
# The first row is always our special `state_group_reached` row which
# tells us if we linked up to any other existing state_group that we
# already fetched and if so, which one we linked up to (see the `UNION
# ALL` above which drives this special row)
first_row = txn.fetchone()
if first_row:
_, _, _, state_group_reached = first_row
partial_state_map_for_state_group: MutableStateMap[str] = {}
for row in txn:
typ, state_key, event_id = row
typ, state_key, event_id, _state_group = row
key = (intern_string(typ), intern_string(state_key))
results[group][key] = event_id
partial_state_map_for_state_group[key] = event_id
# If we see a state_group edge link to a previous state_group that we
# already fetched from the database, link up the base state to the
# partial state we retrieved from the database to build on top of.
if state_group_reached in results:
resultant_state_map = dict(results[state_group_reached])
resultant_state_map.update(partial_state_map_for_state_group)
results[group] = resultant_state_map
else:
# It's also completely normal for us not to have a previous
# state_group to build on top of if this is the first group being
# processed or we are processing a bunch of groups from different
# rooms which of course will never link together (competely
# different DAGs).
results[group] = partial_state_map_for_state_group
state_groups_we_have_already_fetched.add(group)
else:
max_entries_returned = state_filter.max_entries_returned()
@@ -201,8 +296,9 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
if where_clause:
where_clause = " AND (%s)" % (where_clause,)
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
# XXX: We could `WITH RECURSIVE` here since it's supported on SQLite 3.8.3
# or higher and our minimum supported version is greater than that. We just
# haven't put in the time to refactor this.
for group in groups:
next_group: Optional[int] = group