|
|
|
@@ -21,7 +21,7 @@ from synapse.storage.database import (
|
|
|
|
|
LoggingDatabaseConnection,
|
|
|
|
|
LoggingTransaction,
|
|
|
|
|
)
|
|
|
|
|
from synapse.storage.engines import PostgresEngine
|
|
|
|
|
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
|
|
|
|
from synapse.types import MutableStateMap, StateMap
|
|
|
|
|
from synapse.types.state import StateFilter
|
|
|
|
|
from synapse.util.caches import intern_string
|
|
|
|
@@ -98,81 +98,115 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|
|
|
|
# a temporary hack until we can add the right indices in
|
|
|
|
|
txn.execute("SET LOCAL enable_seqscan=off")
|
|
|
|
|
|
|
|
|
|
# The below query walks the state_group tree so that the "state"
|
|
|
|
|
# table includes all state_groups in the tree. It then joins
|
|
|
|
|
# against `state_groups_state` to fetch the latest state.
|
|
|
|
|
# It assumes that previous state groups are always numerically
|
|
|
|
|
# lesser.
|
|
|
|
|
# This may return multiple rows per (type, state_key), but last_value
|
|
|
|
|
# should be the same.
|
|
|
|
|
sql = """
|
|
|
|
|
WITH RECURSIVE sgs(state_group) AS (
|
|
|
|
|
VALUES(?::bigint)
|
|
|
|
|
UNION ALL
|
|
|
|
|
SELECT prev_state_group FROM state_group_edges e, sgs s
|
|
|
|
|
WHERE s.state_group = e.state_group
|
|
|
|
|
)
|
|
|
|
|
%s
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
overall_select_query_args: List[Union[int, str]] = []
|
|
|
|
|
|
|
|
|
|
# This is an optimization to create a select clause per-condition. This
|
|
|
|
|
# makes the query planner a lot smarter on what rows should pull out in the
|
|
|
|
|
# first place and we end up with something that takes 10x less time to get a
|
|
|
|
|
# result.
|
|
|
|
|
use_condition_optimization = (
|
|
|
|
|
not state_filter.include_others and not state_filter.is_full()
|
|
|
|
|
# The below query walks the state_group tree so that the "state"
|
|
|
|
|
# table includes all state_groups in the tree. It then joins
|
|
|
|
|
# against `state_groups_state` to fetch the latest state.
|
|
|
|
|
# It assumes that previous state groups are always numerically
|
|
|
|
|
# lesser.
|
|
|
|
|
sql = """
|
|
|
|
|
WITH RECURSIVE sgs(state_group) AS (
|
|
|
|
|
VALUES(CAST(? AS bigint))
|
|
|
|
|
UNION ALL
|
|
|
|
|
SELECT prev_state_group FROM state_group_edges e, sgs s
|
|
|
|
|
WHERE s.state_group = e.state_group
|
|
|
|
|
)
|
|
|
|
|
state_filter_condition_combos: List[Tuple[str, Optional[str]]] = []
|
|
|
|
|
# We don't need to caclculate this list if we're not using the condition
|
|
|
|
|
# optimization
|
|
|
|
|
if use_condition_optimization:
|
|
|
|
|
for etype, state_keys in state_filter.types.items():
|
|
|
|
|
if state_keys is None:
|
|
|
|
|
state_filter_condition_combos.append((etype, None))
|
|
|
|
|
else:
|
|
|
|
|
for state_key in state_keys:
|
|
|
|
|
state_filter_condition_combos.append((etype, state_key))
|
|
|
|
|
# And here is the optimization itself. We don't want to do the optimization
|
|
|
|
|
# if there are too many individual conditions. 10 is an arbitrary number
|
|
|
|
|
# with no testing behind it but we do know that we specifically made this
|
|
|
|
|
# optimization for when we grab the necessary state out for
|
|
|
|
|
# `filter_events_for_client` which just uses 2 conditions
|
|
|
|
|
# (`EventTypes.RoomHistoryVisibility` and `EventTypes.Member`).
|
|
|
|
|
if use_condition_optimization and len(state_filter_condition_combos) < 10:
|
|
|
|
|
select_clause_list: List[str] = []
|
|
|
|
|
for etype, skey in state_filter_condition_combos:
|
|
|
|
|
if skey is None:
|
|
|
|
|
where_clause = "(type = ?)"
|
|
|
|
|
overall_select_query_args.extend([etype])
|
|
|
|
|
else:
|
|
|
|
|
where_clause = "(type = ? AND state_key = ?)"
|
|
|
|
|
overall_select_query_args.extend([etype, skey])
|
|
|
|
|
%s
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
select_clause_list.append(
|
|
|
|
|
overall_select_query_args: List[Union[int, str]] = []
|
|
|
|
|
|
|
|
|
|
# This is an optimization to create a select clause per-condition. This
|
|
|
|
|
# makes the query planner a lot smarter on what rows should pull out in the
|
|
|
|
|
# first place and we end up with something that takes 10x less time to get a
|
|
|
|
|
# result.
|
|
|
|
|
use_condition_optimization = (
|
|
|
|
|
not state_filter.include_others and not state_filter.is_full()
|
|
|
|
|
)
|
|
|
|
|
state_filter_condition_combos: List[Tuple[str, Optional[str]]] = []
|
|
|
|
|
# We only need to caclculate this list if we're using the condition optimization
|
|
|
|
|
if use_condition_optimization:
|
|
|
|
|
for etype, state_keys in state_filter.types.items():
|
|
|
|
|
if state_keys is None:
|
|
|
|
|
state_filter_condition_combos.append((etype, None))
|
|
|
|
|
else:
|
|
|
|
|
for state_key in state_keys:
|
|
|
|
|
state_filter_condition_combos.append((etype, state_key))
|
|
|
|
|
# And here is the optimization itself. We don't want to do the optimization
|
|
|
|
|
# if there are too many individual conditions. 10 is an arbitrary number
|
|
|
|
|
# with no testing behind it but we do know that we specifically made this
|
|
|
|
|
# optimization for when we grab the necessary state out for
|
|
|
|
|
# `filter_events_for_client` which just uses 2 conditions
|
|
|
|
|
# (`EventTypes.RoomHistoryVisibility` and `EventTypes.Member`).
|
|
|
|
|
if use_condition_optimization and len(state_filter_condition_combos) < 10:
|
|
|
|
|
select_clause_list: List[str] = []
|
|
|
|
|
for etype, skey in state_filter_condition_combos:
|
|
|
|
|
if skey is None:
|
|
|
|
|
where_clause = "(type = ?)"
|
|
|
|
|
overall_select_query_args.extend([etype])
|
|
|
|
|
else:
|
|
|
|
|
where_clause = "(type = ? AND state_key = ?)"
|
|
|
|
|
overall_select_query_args.extend([etype, skey])
|
|
|
|
|
|
|
|
|
|
# Small helper function to wrap the union clause in parenthesis if we're
|
|
|
|
|
# using postgres. This is because SQLite doesn't allow `LIMIT`/`ORDER`
|
|
|
|
|
# clauses in the union subquery but postgres does as long as they are
|
|
|
|
|
# wrapped in parenthesis which this function handles the complexity of
|
|
|
|
|
# handling.
|
|
|
|
|
def wrap_union_if_postgres(
|
|
|
|
|
union_clause: str, extra_order_or_limit_clause: str = ""
|
|
|
|
|
) -> str:
|
|
|
|
|
if isinstance(self.database_engine, PostgresEngine):
|
|
|
|
|
return f"""({union_clause} {extra_order_or_limit_clause})"""
|
|
|
|
|
|
|
|
|
|
return union_clause
|
|
|
|
|
|
|
|
|
|
# We could use `SELECT DISTINCT ON` here to align with the query below
|
|
|
|
|
# but that isn't compatible with SQLite and we can get away with `LIMIT
|
|
|
|
|
# 1` here instead because the `WHERE` clause will only ever match and
|
|
|
|
|
# target one event; and is simpler anyway. And it's better to use
|
|
|
|
|
# something that's simpler and compatible with both Database engines.
|
|
|
|
|
select_clause_list.append(
|
|
|
|
|
wrap_union_if_postgres(
|
|
|
|
|
# We only select `state_group` here for use in the `ORDER`
|
|
|
|
|
# clause later after the `UNION`
|
|
|
|
|
f"""
|
|
|
|
|
(
|
|
|
|
|
SELECT DISTINCT ON (type, state_key)
|
|
|
|
|
type, state_key, event_id
|
|
|
|
|
FROM state_groups_state
|
|
|
|
|
INNER JOIN sgs USING (state_group)
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
ORDER BY type, state_key, state_group DESC
|
|
|
|
|
)
|
|
|
|
|
SELECT type, state_key, event_id, state_group
|
|
|
|
|
FROM state_groups_state
|
|
|
|
|
INNER JOIN sgs USING (state_group)
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
""",
|
|
|
|
|
# The `ORDER BY`/`LIMIT` is an extra nicety that saves us from
|
|
|
|
|
# having to ferry a bunch of duplicate state pairs back from the
|
|
|
|
|
# database since we only need the one with the greatest
|
|
|
|
|
# state_group (most recent). Since this only applies to
|
|
|
|
|
# postgres, we do have to be careful to take care of the
|
|
|
|
|
# duplicate pairs in the downstream code when running with
|
|
|
|
|
# SQLite.
|
|
|
|
|
"""
|
|
|
|
|
ORDER BY type, state_key, state_group DESC
|
|
|
|
|
LIMIT 1
|
|
|
|
|
""",
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
overall_select_clause = " UNION ".join(select_clause_list)
|
|
|
|
|
else:
|
|
|
|
|
where_clause, where_args = state_filter.make_sql_filter_clause()
|
|
|
|
|
# Unless the filter clause is empty, we're going to append it after an
|
|
|
|
|
# existing where clause
|
|
|
|
|
if where_clause:
|
|
|
|
|
where_clause = " AND (%s)" % (where_clause,)
|
|
|
|
|
overall_select_clause = " UNION ".join(select_clause_list)
|
|
|
|
|
|
|
|
|
|
overall_select_query_args.extend(where_args)
|
|
|
|
|
if isinstance(self.database_engine, Sqlite3Engine):
|
|
|
|
|
# We `ORDER` after the union results because it's compatible with both
|
|
|
|
|
# Postgres and SQLite. And we need the rows to by ordered by
|
|
|
|
|
# `state_group` so the greatest state_group pairs are first and we only
|
|
|
|
|
# care about the first distinct (type, state_key) pair later on.
|
|
|
|
|
overall_select_clause += " ORDER BY type, state_key, state_group DESC"
|
|
|
|
|
else:
|
|
|
|
|
where_clause, where_args = state_filter.make_sql_filter_clause()
|
|
|
|
|
# Unless the filter clause is empty, we're going to append it after an
|
|
|
|
|
# existing where clause
|
|
|
|
|
if where_clause:
|
|
|
|
|
where_clause = " AND (%s)" % (where_clause,)
|
|
|
|
|
|
|
|
|
|
overall_select_query_args.extend(where_args)
|
|
|
|
|
|
|
|
|
|
if isinstance(self.database_engine, PostgresEngine):
|
|
|
|
|
overall_select_clause = f"""
|
|
|
|
|
SELECT DISTINCT ON (type, state_key)
|
|
|
|
|
type, state_key, event_id
|
|
|
|
@@ -182,69 +216,35 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
|
|
|
|
) {where_clause}
|
|
|
|
|
ORDER BY type, state_key, state_group DESC
|
|
|
|
|
"""
|
|
|
|
|
else:
|
|
|
|
|
# SQLite doesn't support `SELECT DISTINCT ON`, so we have to just get
|
|
|
|
|
# some potential duplicate (type, state_key) pairs and then only use the
|
|
|
|
|
# first of each kind we see.
|
|
|
|
|
overall_select_clause = f"""
|
|
|
|
|
SELECT type, state_key, event_id
|
|
|
|
|
FROM state_groups_state
|
|
|
|
|
WHERE state_group IN (
|
|
|
|
|
SELECT state_group FROM sgs
|
|
|
|
|
) {where_clause}
|
|
|
|
|
ORDER BY type, state_key, state_group DESC
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
for group in groups:
|
|
|
|
|
args: List[Union[int, str]] = [group]
|
|
|
|
|
args.extend(overall_select_query_args)
|
|
|
|
|
for group in groups:
|
|
|
|
|
args: List[Union[int, str]] = [group]
|
|
|
|
|
args.extend(overall_select_query_args)
|
|
|
|
|
|
|
|
|
|
txn.execute(sql % (overall_select_clause,), args)
|
|
|
|
|
for row in txn:
|
|
|
|
|
typ, state_key, event_id = row
|
|
|
|
|
key = (intern_string(typ), intern_string(state_key))
|
|
|
|
|
txn.execute(sql % (overall_select_clause,), args)
|
|
|
|
|
for row in txn:
|
|
|
|
|
# The `*_` rest syntax is to ignore the `state_group` column which we
|
|
|
|
|
# only select in the optimized case
|
|
|
|
|
typ, state_key, event_id, *_ = row
|
|
|
|
|
key = (intern_string(typ), intern_string(state_key))
|
|
|
|
|
# Deal with the potential duplicate (type, state_key) pairs from the
|
|
|
|
|
# SQLite specific query above. We only want to use the first row which
|
|
|
|
|
# is from the greatest state group (most-recent) because that is that
|
|
|
|
|
# applicable state in that state group.
|
|
|
|
|
if key not in results[group]:
|
|
|
|
|
results[group][key] = event_id
|
|
|
|
|
else:
|
|
|
|
|
max_entries_returned = state_filter.max_entries_returned()
|
|
|
|
|
|
|
|
|
|
where_clause, where_args = state_filter.make_sql_filter_clause()
|
|
|
|
|
# Unless the filter clause is empty, we're going to append it after an
|
|
|
|
|
# existing where clause
|
|
|
|
|
if where_clause:
|
|
|
|
|
where_clause = " AND (%s)" % (where_clause,)
|
|
|
|
|
|
|
|
|
|
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
|
|
|
|
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
|
|
|
|
for group in groups:
|
|
|
|
|
next_group: Optional[int] = group
|
|
|
|
|
|
|
|
|
|
while next_group:
|
|
|
|
|
# We did this before by getting the list of group ids, and
|
|
|
|
|
# then passing that list to sqlite to get latest event for
|
|
|
|
|
# each (type, state_key). However, that was terribly slow
|
|
|
|
|
# without the right indices (which we can't add until
|
|
|
|
|
# after we finish deduping state, which requires this func)
|
|
|
|
|
args = [next_group]
|
|
|
|
|
args.extend(where_args)
|
|
|
|
|
|
|
|
|
|
txn.execute(
|
|
|
|
|
"SELECT type, state_key, event_id FROM state_groups_state"
|
|
|
|
|
" WHERE state_group = ? " + where_clause,
|
|
|
|
|
args,
|
|
|
|
|
)
|
|
|
|
|
results[group].update(
|
|
|
|
|
((typ, state_key), event_id)
|
|
|
|
|
for typ, state_key, event_id in txn
|
|
|
|
|
if (typ, state_key) not in results[group]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# If the number of entries in the (type,state_key)->event_id dict
|
|
|
|
|
# matches the number of (type,state_keys) types we were searching
|
|
|
|
|
# for, then we must have found them all, so no need to go walk
|
|
|
|
|
# further down the tree... UNLESS our types filter contained
|
|
|
|
|
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
|
|
|
|
# search
|
|
|
|
|
if (
|
|
|
|
|
max_entries_returned is not None
|
|
|
|
|
and len(results[group]) == max_entries_returned
|
|
|
|
|
):
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
next_group = self.db_pool.simple_select_one_onecol_txn(
|
|
|
|
|
txn,
|
|
|
|
|
table="state_group_edges",
|
|
|
|
|
keyvalues={"state_group": next_group},
|
|
|
|
|
retcol="prev_state_group",
|
|
|
|
|
allow_none=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# The results shouldn't be considered mutable.
|
|
|
|
|
return results
|
|
|
|
|