WIP for updating the stats store
This commit is contained in:
@@ -210,8 +210,8 @@ class StatsHandler(object):
|
||||
|
||||
self.store.update_room_stats(
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"ts": now,
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": current_state_events,
|
||||
"joined_members": joined_members,
|
||||
|
||||
@@ -502,7 +502,7 @@ class SQLBaseStore(object):
|
||||
|
||||
Args:
|
||||
table (str): The table to upsert into
|
||||
keyvalues (dict): The unique key tables and their new values
|
||||
keyvalues (dict): The unique key columns and their new values
|
||||
values (dict): The nonunique columns and their new values
|
||||
insertion_values (dict): additional key/values to use only when
|
||||
inserting
|
||||
|
||||
@@ -276,7 +276,7 @@ class GroupServerStore(SQLBaseStore):
|
||||
"category_id": category_id,
|
||||
"room_id": room_id,
|
||||
},
|
||||
values=to_update,
|
||||
updatevalues=to_update,
|
||||
)
|
||||
else:
|
||||
if is_public is None:
|
||||
@@ -562,7 +562,7 @@ class GroupServerStore(SQLBaseStore):
|
||||
"role_id": role_id,
|
||||
"user_id": user_id,
|
||||
},
|
||||
values=to_update,
|
||||
updatevalues=to_update,
|
||||
)
|
||||
else:
|
||||
if is_public is None:
|
||||
|
||||
@@ -33,6 +33,8 @@ CREATE TABLE user_stats (
|
||||
sent_file_size INT NOT NULL,
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
|
||||
|
||||
CREATE TABLE room_stats (
|
||||
room_id TEXT NOT NULL,
|
||||
ts BIGINT NOT NULL,
|
||||
@@ -48,6 +50,8 @@ CREATE TABLE room_stats (
|
||||
sent_events INT NOT NULL, -- number sent per timeslice
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
|
||||
|
||||
-- cache of current room state; useful for the publicRooms list
|
||||
CREATE TABLE room_state (
|
||||
room_id TEXT NOT NULL,
|
||||
@@ -60,6 +64,8 @@ CREATE TABLE room_state (
|
||||
-- get aliases straight from the right table
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
|
||||
|
||||
CREATE TABLE media_stats (
|
||||
ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
@@ -67,4 +73,6 @@ CREATE TABLE media_stats (
|
||||
local_media_size INT NOT NULL,
|
||||
remote_media_count INT NOT NULL,
|
||||
remote_media_size INT NOT NULL,
|
||||
);
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX media_stats_ts ON media_stats(ts);
|
||||
|
||||
@@ -37,3 +37,94 @@ class StatsStore(StateDeltasStore):
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_stats_stream_pos",
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
return self._simple_upsert(
|
||||
table="room_state",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
},
|
||||
values=fields,
|
||||
desc="update_room_state",
|
||||
)
|
||||
|
||||
def update_stats(self, stats_type, stats_id, ts, fields):
|
||||
return self._simple_upsert(
|
||||
table=("%s_stats" % stats_type),
|
||||
keyvalues={
|
||||
("%s_id" % stats_type): stats_id,
|
||||
"ts": ts,
|
||||
},
|
||||
updatevalues=fields,
|
||||
desc="update_stats",
|
||||
)
|
||||
|
||||
# these fields track relative numbers (e.g. number of events sent in this timeslice)
|
||||
RELATIVE_STATS_FIELDS = {
|
||||
"room": {
|
||||
"sent_events": True
|
||||
}
|
||||
}
|
||||
|
||||
# these fields track rather than absolutes (e.g. total number of rooms on the server)
|
||||
ABSOLUTE_STATS_FIELDS = {
|
||||
"room": (
|
||||
"current_state_events",
|
||||
"joined_members",
|
||||
"invited_members",
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"state_events",
|
||||
"local_events",
|
||||
"remote_events",
|
||||
)
|
||||
}
|
||||
|
||||
def update_stats_delta(self, stats_type, stats_id, field, value):
|
||||
def _update_stats_delta(txn):
|
||||
table = "%s_stats" % stats_type
|
||||
id_col = "%s_id" % stats_type
|
||||
|
||||
sql = (
|
||||
"SELECT * FROM %s"
|
||||
" WHERE %s=? and ts=("
|
||||
" SELECT MAX(ts) FROM %s"
|
||||
" WHERE where %s=?"
|
||||
")"
|
||||
) % (table, id_col, table, id_col)
|
||||
txn.execute(sql, (stats_id, stats_id))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if len(rows) == 0:
|
||||
# silently skip as we don't have anything to apply a delta to yet.
|
||||
return
|
||||
|
||||
latest_ts = rows[0]["ts"]
|
||||
if ts != latest_ts:
|
||||
# we have to copy our absolute counters over to the new entry.
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table=table,
|
||||
values={
|
||||
id_col: stats_id,
|
||||
"ts": ts,
|
||||
key: rows[0][key]
|
||||
for key in ABSOLUTE_STATS_FIELDS[stats_type],
|
||||
}
|
||||
)
|
||||
|
||||
# actually update the new value
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table=table,
|
||||
keyvalues={
|
||||
id_col: stats_id,
|
||||
"ts": ts,
|
||||
}
|
||||
updatevalues={
|
||||
field: value
|
||||
}
|
||||
)
|
||||
|
||||
return self.runInteraction(
|
||||
"update_stats_delta", _update_stats_delta
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user