1
0

Add counting for total_events in room statistics.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre)
2019-07-22 17:32:19 +01:00
parent df3fafa827
commit 0887bb3052
2 changed files with 129 additions and 58 deletions

View File

@@ -88,20 +88,42 @@ class StatsHandler(StateDeltasHandler):
if self.pos is None:
defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
max_pos = yield self.store.get_room_max_stream_ordering()
if max_pos == self.pos:
return
pos_of_delta = self.pos
with Measure(self.clock, "stats_delta"):
# Loop round handling deltas until we're up to date with deltas
while True:
# note that get_current_state_deltas isn't greedy
# it is limited
deltas = yield self.store.get_current_state_deltas(pos_of_delta)
if not deltas:
return
break
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
pos_of_delta = deltas[-1]["stream_id"]
event_processing_positions.labels("stats").set(self.pos)
if pos_of_delta > self.pos:
new_pos = max(pos_of_delta, max_pos)
else:
new_pos = max_pos
with Measure(self.clock, "stats_total_events"):
yield self.store.update_total_event_count_between(
old_pos=self.pos, new_pos=new_pos
)
self.pos = new_pos
yield self.store.update_stats_stream_pos(self.pos)
event_processing_positions.labels("stats").set(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -268,12 +290,15 @@ class StatsHandler(StateDeltasHandler):
now,
{
"bucket_size": self.stats_bucket_size,
# This m.room.create state event is indeed a state event
# so we count it as one.
"current_state_events": 1,
"joined_members": 0,
"invited_members": 0,
"left_members": 0,
"banned_members": 0,
# this column is disused but not yet removed from the
"total_events": 0,
# This column is disused but not yet removed from the
# schema, so fill with -1.
"state_events": -1,
},

View File

@@ -97,7 +97,7 @@ class StatsStore(StateDeltasStore):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
+ "_position(position BIGINT NOT NULL)"
)
txn.execute(sql)
@@ -258,6 +258,8 @@ class StatsStore(StateDeltasStore):
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
room_total_event_count = self._count_events_in_room_txn(txn, room_id, current_token)
self._update_stats_txn(
txn,
"room",
@@ -270,6 +272,7 @@ class StatsStore(StateDeltasStore):
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"total_events": room_total_event_count,
# this column is disused but not (yet) removed from the
# schema, so we fill it with -1.
"state_events": -1,
@@ -425,56 +428,99 @@ class StatsStore(StateDeltasStore):
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
return self.runInteraction("update_stats_delta", self._update_stats_delta_txn, ts, stats_type, stats_id, field, value)
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
def _update_stats_delta_txn(self, txn, ts, stats_type, stats_id, field, value):
table, id_col = TYPE_TO_ROOM[stats_type]
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
self._simple_insert_txn(txn, table=table, values=values)
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
self._simple_insert_txn(txn, table=table, values=values)
return self.runInteraction("update_stats_delta", _update_stats_delta)
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
def update_total_event_count_between(self, old_pos, new_pos):
"""
Updates the total_events counts for rooms
Args:
old_pos: The old stream position (stream position of the last event
that was already handled.)
new_pos: The new stream position (stream position of the new last
event to handle.)
"""
# TODO pass in now or calc here?
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _update_total_event_count_between(txn):
sql = """
SELECT room_id, COUNT(*) AS new_events
FROM events
WHERE ? < stream_ordering AND stream_ordering <= ?
GROUP BY room_id
"""
txn.execute(sql, (old_pos, new_pos))
for room_id, new_events in txn:
self._update_stats_delta_txn(txn, now, "room", room_id, "total_events", new_events)
return self.runInteraction("update_total_event_count_between", _update_total_event_count_between)
def _count_events_in_room_txn(self, txn, room_id, up_to_token):
sql = """
SELECT COUNT(*) AS num_events
FROM events
WHERE room_id = ? AND stream_ordering <= ?
"""
txn.execute(sql, (room_id, up_to_token))
return txn.fetchone()[0]