1
0

Merge pull request #5924 from matrix-org/rei/rss_inc6

Separated Statistics [6/7ish]
This commit is contained in:
reivilibre
2019-08-30 14:39:22 +01:00
committed by GitHub
2 changed files with 54 additions and 16 deletions
+12
View File
@@ -869,6 +869,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@@ -1140,6 +1151,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(
+42 -16
View File
@@ -260,6 +260,10 @@ class StatsStore(StateDeltasStore):
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""
if absolute_field_overrides is None:
absolute_field_overrides = {}
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
@@ -290,9 +294,6 @@ class StatsStore(StateDeltasStore):
if key not in absolute_field_overrides
}
if absolute_field_overrides is None:
absolute_field_overrides = {}
if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
@@ -321,10 +322,8 @@ class StatsStore(StateDeltasStore):
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
extra_dst_keyvalues={"end_ts": end_ts},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
@@ -357,7 +356,7 @@ class StatsStore(StateDeltasStore):
]
insert_cols = []
qargs = [table]
qargs = []
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
@@ -368,20 +367,21 @@ class StatsStore(StateDeltasStore):
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT DO UPDATE SET %(updates)s
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = chain(absolutes.keys(), additive_relatives.keys())
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
@@ -400,6 +400,7 @@ class StatsStore(StateDeltasStore):
into_table,
keyvalues,
extra_dst_keyvalues,
extra_dst_insvalues,
additive_relatives,
src_table,
copy_columns,
@@ -412,6 +413,8 @@ class StatsStore(StateDeltasStore):
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
extra_dst_insvalues (dict[str, any]): Additional values to insert
on new row creation for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
@@ -421,18 +424,28 @@ class StatsStore(StateDeltasStore):
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
keyvalues,
copy_columns,
additive_relatives,
extra_dst_keyvalues,
extra_dst_insvalues,
)
sel_exprs = chain(
keyvalues,
copy_columns,
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
(
"?"
for _ in chain(
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
)
),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
for f in additive_relatives
)
sql = """
@@ -455,7 +468,14 @@ class StatsStore(StateDeltasStore):
"additional_where": additional_where,
}
qargs = chain(additive_relatives.values(), keyvalues.values())
qargs = list(
chain(
additive_relatives.values(),
extra_dst_keyvalues.values(),
extra_dst_insvalues.values(),
keyvalues.values(),
)
)
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
@@ -466,12 +486,18 @@ class StatsStore(StateDeltasStore):
txn,
into_table,
keyvalues,
chain(additive_relatives.keys(), copy_columns),
retcols=list(chain(additive_relatives.keys(), copy_columns)),
allow_none=True,
)
if dest_current_row is None:
merged_dict = {**keyvalues, **src_row, **additive_relatives}
merged_dict = {
**keyvalues,
**extra_dst_keyvalues,
**extra_dst_insvalues,
**src_row,
**additive_relatives,
}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():