Compare commits
10 Commits
v1.99.0
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c8c9a6216 | ||
|
|
3b722499d2 | ||
|
|
51ff192af0 | ||
|
|
d42df2786e | ||
|
|
a790a45671 | ||
|
|
8b103f46a7 | ||
|
|
95a9e030e5 | ||
|
|
76c6e50f3f | ||
|
|
f0ee9f85d2 | ||
|
|
5b4f3ac0ce |
1
changelog.d/14531.misc
Normal file
1
changelog.d/14531.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove database differentiation (SQLite vs Postgres specific code) in `_get_state_groups_from_groups_txn`.
|
||||
@@ -21,7 +21,7 @@ from synapse.storage.database import (
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.caches import intern_string
|
||||
@@ -98,81 +98,115 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
# a temporary hack until we can add the right indices in
|
||||
txn.execute("SET LOCAL enable_seqscan=off")
|
||||
|
||||
# The below query walks the state_group tree so that the "state"
|
||||
# table includes all state_groups in the tree. It then joins
|
||||
# against `state_groups_state` to fetch the latest state.
|
||||
# It assumes that previous state groups are always numerically
|
||||
# lesser.
|
||||
# This may return multiple rows per (type, state_key), but last_value
|
||||
# should be the same.
|
||||
sql = """
|
||||
WITH RECURSIVE sgs(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, sgs s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
%s
|
||||
"""
|
||||
|
||||
overall_select_query_args: List[Union[int, str]] = []
|
||||
|
||||
# This is an optimization to create a select clause per-condition. This
|
||||
# makes the query planner a lot smarter on what rows should pull out in the
|
||||
# first place and we end up with something that takes 10x less time to get a
|
||||
# result.
|
||||
use_condition_optimization = (
|
||||
not state_filter.include_others and not state_filter.is_full()
|
||||
# The below query walks the state_group tree so that the "state"
|
||||
# table includes all state_groups in the tree. It then joins
|
||||
# against `state_groups_state` to fetch the latest state.
|
||||
# It assumes that previous state groups are always numerically
|
||||
# lesser.
|
||||
sql = """
|
||||
WITH RECURSIVE sgs(state_group) AS (
|
||||
VALUES(CAST(? AS bigint))
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, sgs s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
state_filter_condition_combos: List[Tuple[str, Optional[str]]] = []
|
||||
# We don't need to caclculate this list if we're not using the condition
|
||||
# optimization
|
||||
if use_condition_optimization:
|
||||
for etype, state_keys in state_filter.types.items():
|
||||
if state_keys is None:
|
||||
state_filter_condition_combos.append((etype, None))
|
||||
else:
|
||||
for state_key in state_keys:
|
||||
state_filter_condition_combos.append((etype, state_key))
|
||||
# And here is the optimization itself. We don't want to do the optimization
|
||||
# if there are too many individual conditions. 10 is an arbitrary number
|
||||
# with no testing behind it but we do know that we specifically made this
|
||||
# optimization for when we grab the necessary state out for
|
||||
# `filter_events_for_client` which just uses 2 conditions
|
||||
# (`EventTypes.RoomHistoryVisibility` and `EventTypes.Member`).
|
||||
if use_condition_optimization and len(state_filter_condition_combos) < 10:
|
||||
select_clause_list: List[str] = []
|
||||
for etype, skey in state_filter_condition_combos:
|
||||
if skey is None:
|
||||
where_clause = "(type = ?)"
|
||||
overall_select_query_args.extend([etype])
|
||||
else:
|
||||
where_clause = "(type = ? AND state_key = ?)"
|
||||
overall_select_query_args.extend([etype, skey])
|
||||
%s
|
||||
"""
|
||||
|
||||
select_clause_list.append(
|
||||
overall_select_query_args: List[Union[int, str]] = []
|
||||
|
||||
# This is an optimization to create a select clause per-condition. This
|
||||
# makes the query planner a lot smarter on what rows should pull out in the
|
||||
# first place and we end up with something that takes 10x less time to get a
|
||||
# result.
|
||||
use_condition_optimization = (
|
||||
not state_filter.include_others and not state_filter.is_full()
|
||||
)
|
||||
state_filter_condition_combos: List[Tuple[str, Optional[str]]] = []
|
||||
# We only need to caclculate this list if we're using the condition optimization
|
||||
if use_condition_optimization:
|
||||
for etype, state_keys in state_filter.types.items():
|
||||
if state_keys is None:
|
||||
state_filter_condition_combos.append((etype, None))
|
||||
else:
|
||||
for state_key in state_keys:
|
||||
state_filter_condition_combos.append((etype, state_key))
|
||||
# And here is the optimization itself. We don't want to do the optimization
|
||||
# if there are too many individual conditions. 10 is an arbitrary number
|
||||
# with no testing behind it but we do know that we specifically made this
|
||||
# optimization for when we grab the necessary state out for
|
||||
# `filter_events_for_client` which just uses 2 conditions
|
||||
# (`EventTypes.RoomHistoryVisibility` and `EventTypes.Member`).
|
||||
if use_condition_optimization and len(state_filter_condition_combos) < 10:
|
||||
select_clause_list: List[str] = []
|
||||
for etype, skey in state_filter_condition_combos:
|
||||
if skey is None:
|
||||
where_clause = "(type = ?)"
|
||||
overall_select_query_args.extend([etype])
|
||||
else:
|
||||
where_clause = "(type = ? AND state_key = ?)"
|
||||
overall_select_query_args.extend([etype, skey])
|
||||
|
||||
# Small helper function to wrap the union clause in parenthesis if we're
|
||||
# using postgres. This is because SQLite doesn't allow `LIMIT`/`ORDER`
|
||||
# clauses in the union subquery but postgres does as long as they are
|
||||
# wrapped in parenthesis which this function handles the complexity of
|
||||
# handling.
|
||||
def wrap_union_if_postgres(
|
||||
union_clause: str, extra_order_or_limit_clause: str = ""
|
||||
) -> str:
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
return f"""({union_clause} {extra_order_or_limit_clause})"""
|
||||
|
||||
return union_clause
|
||||
|
||||
# We could use `SELECT DISTINCT ON` here to align with the query below
|
||||
# but that isn't compatible with SQLite and we can get away with `LIMIT
|
||||
# 1` here instead because the `WHERE` clause will only ever match and
|
||||
# target one event; and is simpler anyway. And it's better to use
|
||||
# something that's simpler and compatible with both Database engines.
|
||||
select_clause_list.append(
|
||||
wrap_union_if_postgres(
|
||||
# We only select `state_group` here for use in the `ORDER`
|
||||
# clause later after the `UNION`
|
||||
f"""
|
||||
(
|
||||
SELECT DISTINCT ON (type, state_key)
|
||||
type, state_key, event_id
|
||||
FROM state_groups_state
|
||||
INNER JOIN sgs USING (state_group)
|
||||
WHERE {where_clause}
|
||||
ORDER BY type, state_key, state_group DESC
|
||||
)
|
||||
SELECT type, state_key, event_id, state_group
|
||||
FROM state_groups_state
|
||||
INNER JOIN sgs USING (state_group)
|
||||
WHERE {where_clause}
|
||||
""",
|
||||
# The `ORDER BY`/`LIMIT` is an extra nicety that saves us from
|
||||
# having to ferry a bunch of duplicate state pairs back from the
|
||||
# database since we only need the one with the greatest
|
||||
# state_group (most recent). Since this only applies to
|
||||
# postgres, we do have to be careful to take care of the
|
||||
# duplicate pairs in the downstream code when running with
|
||||
# SQLite.
|
||||
"""
|
||||
ORDER BY type, state_key, state_group DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
)
|
||||
)
|
||||
|
||||
overall_select_clause = " UNION ".join(select_clause_list)
|
||||
else:
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
# existing where clause
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
overall_select_clause = " UNION ".join(select_clause_list)
|
||||
|
||||
overall_select_query_args.extend(where_args)
|
||||
if isinstance(self.database_engine, Sqlite3Engine):
|
||||
# We `ORDER` after the union results because it's compatible with both
|
||||
# Postgres and SQLite. And we need the rows to by ordered by
|
||||
# `state_group` so the greatest state_group pairs are first and we only
|
||||
# care about the first distinct (type, state_key) pair later on.
|
||||
overall_select_clause += " ORDER BY type, state_key, state_group DESC"
|
||||
else:
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
# existing where clause
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
|
||||
overall_select_query_args.extend(where_args)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
overall_select_clause = f"""
|
||||
SELECT DISTINCT ON (type, state_key)
|
||||
type, state_key, event_id
|
||||
@@ -182,69 +216,35 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
) {where_clause}
|
||||
ORDER BY type, state_key, state_group DESC
|
||||
"""
|
||||
else:
|
||||
# SQLite doesn't support `SELECT DISTINCT ON`, so we have to just get
|
||||
# some potential duplicate (type, state_key) pairs and then only use the
|
||||
# first of each kind we see.
|
||||
overall_select_clause = f"""
|
||||
SELECT type, state_key, event_id
|
||||
FROM state_groups_state
|
||||
WHERE state_group IN (
|
||||
SELECT state_group FROM sgs
|
||||
) {where_clause}
|
||||
ORDER BY type, state_key, state_group DESC
|
||||
"""
|
||||
|
||||
for group in groups:
|
||||
args: List[Union[int, str]] = [group]
|
||||
args.extend(overall_select_query_args)
|
||||
for group in groups:
|
||||
args: List[Union[int, str]] = [group]
|
||||
args.extend(overall_select_query_args)
|
||||
|
||||
txn.execute(sql % (overall_select_clause,), args)
|
||||
for row in txn:
|
||||
typ, state_key, event_id = row
|
||||
key = (intern_string(typ), intern_string(state_key))
|
||||
txn.execute(sql % (overall_select_clause,), args)
|
||||
for row in txn:
|
||||
# The `*_` rest syntax is to ignore the `state_group` column which we
|
||||
# only select in the optimized case
|
||||
typ, state_key, event_id, *_ = row
|
||||
key = (intern_string(typ), intern_string(state_key))
|
||||
# Deal with the potential duplicate (type, state_key) pairs from the
|
||||
# SQLite specific query above. We only want to use the first row which
|
||||
# is from the greatest state group (most-recent) because that is that
|
||||
# applicable state in that state group.
|
||||
if key not in results[group]:
|
||||
results[group][key] = event_id
|
||||
else:
|
||||
max_entries_returned = state_filter.max_entries_returned()
|
||||
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
# existing where clause
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
for group in groups:
|
||||
next_group: Optional[int] = group
|
||||
|
||||
while next_group:
|
||||
# We did this before by getting the list of group ids, and
|
||||
# then passing that list to sqlite to get latest event for
|
||||
# each (type, state_key). However, that was terribly slow
|
||||
# without the right indices (which we can't add until
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
" WHERE state_group = ? " + where_clause,
|
||||
args,
|
||||
)
|
||||
results[group].update(
|
||||
((typ, state_key), event_id)
|
||||
for typ, state_key, event_id in txn
|
||||
if (typ, state_key) not in results[group]
|
||||
)
|
||||
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
max_entries_returned is not None
|
||||
and len(results[group]) == max_entries_returned
|
||||
):
|
||||
break
|
||||
|
||||
next_group = self.db_pool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
# The results shouldn't be considered mutable.
|
||||
return results
|
||||
|
||||
Reference in New Issue
Block a user