1
0

Compare commits

..

6 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
3bf7d6457c Add logging to diagnose a suspicion 2021-12-20 17:01:31 +00:00
Olivier Wilkinson (reivilibre)
ecd5a03e46 Bypass caching from the old system 2021-12-20 16:45:43 +00:00
Olivier Wilkinson (reivilibre)
c26843c3bb Bleh logbugfix 2021-12-20 16:39:08 +00:00
Olivier Wilkinson (reivilibre)
4da1fdfd28 Change things around 2021-12-20 16:31:06 +00:00
Olivier Wilkinson (reivilibre)
7abb4bbe73 Log a little bit more 2021-12-20 16:21:33 +00:00
Olivier Wilkinson (reivilibre)
9858a2ca68 Add some logging and validation to help determine whether this all works or not :) 2021-12-20 13:21:30 +00:00

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import logging
from collections import Counter
from typing import (
TYPE_CHECKING,
Collection,
@@ -52,6 +53,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
validation_logger = logging.getLogger(f"{__name__}.validation")
validation_counter: Counter[bool] = Counter()
effectiveness_counter: Counter[str] = Counter()
log_ticker = 0
MAX_STATE_DELTA_HOPS = 100
MAX_INFLIGHT_REQUESTS_PER_GROUP = 5
@@ -146,6 +152,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
self._state_group_inflight_requests: Dict[
int, SortedDict[StateFilter, ObservableDeferred[StateMap[str]]]
] = {}
validation_logger.info("(Validation Logging present)")
def get_max_state_group_txn(txn: Cursor) -> int:
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
@@ -402,6 +409,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
state_filter_left_over,
) = self._get_state_for_group_gather_inflight_requests(group, state_filter)
if state_filter_left_over == StateFilter.none():
effectiveness_counter["fully"] += 1
elif state_filter_left_over == state_filter:
effectiveness_counter["useless"] += 1
else:
effectiveness_counter["partially"] += 1
if state_filter_left_over != StateFilter.none():
# Fetch remaining state
remaining = await self._get_state_for_group_fire_request(
@@ -426,6 +440,40 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
global log_ticker, effectiveness_counter, validation_counter
old_result = await self._OLD_get_state_for_groups(groups, state_filter)
new_result = await self._NEW_get_state_for_groups(groups, state_filter)
the_same = old_result == new_result
validation_counter[the_same] += 1
if not the_same:
validation_logger.critical(
"NOT THE SAME: for groups %r SF %r", groups, state_filter
)
log_ticker += 1
if log_ticker % 20 == 0:
validation_logger.info(
"LT=%d. Correct: %r. Effective: %r",
log_ticker,
validation_counter,
effectiveness_counter,
)
if len(set(groups)) != len(groups):
try:
raise RuntimeError("Inefficient")
except RuntimeError:
validation_logger.exception(
"Warning (not of bug, but fishy): len(s(g)) != len(g). g=%r.",
groups,
)
return old_result
async def _NEW_get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -476,6 +524,76 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state
async def _OLD_get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
Args:
groups: list of state groups for which we want
to get the state.
state_filter: The state filter used to fetch state
from the database.
Returns:
Dict of state group to state map.
"""
state_filter = state_filter or StateFilter.all()
member_filter, non_member_filter = state_filter.get_member_split()
# Now we look them up in the member and non-member caches
# (
# non_member_state,
# incomplete_groups_nm,
# ) = self._get_state_for_groups_using_cache(
# groups, self._state_group_cache, state_filter=non_member_filter
# )
#
# (member_state, incomplete_groups_m,) = self._get_state_for_groups_using_cache(
# groups, self._state_group_members_cache, state_filter=member_filter
# )
state = {}
# state = dict(non_member_state)
# for group in groups:
# state[group].update(member_state[group])
# Now fetch any missing groups from the database
# incomplete_groups = incomplete_groups_m | incomplete_groups_nm
# if not incomplete_groups:
# return state
incomplete_groups = groups
# cache_sequence_nm = self._state_group_cache.sequence
# cache_sequence_m = self._state_group_members_cache.sequence
# Help the cache hit ratio by expanding the filter a bit
db_state_filter = state_filter.return_expanded()
group_to_state_dict = await self._get_state_groups_from_groups(
list(incomplete_groups), state_filter=db_state_filter
)
# Now lets update the caches
# self._insert_into_cache(
# group_to_state_dict,
# db_state_filter,
# cache_seq_num_members=cache_sequence_m,
# cache_seq_num_non_members=cache_sequence_nm,
# )
# And finally update the result dict, by filtering out any extra
# stuff we pulled out of the database.
for group, group_state_dict in group_to_state_dict.items():
# We just replace any existing entries, as we will have loaded
# everything we need from the database anyway.
state[group] = state_filter.filter_state(group_state_dict)
return state
def _get_state_for_groups_using_cache(
self,
groups: Iterable[int],