Merge pull request #5923 from matrix-org/rei/rss_inc5
Separated Statistics [5/7ish]
This commit is contained in:
@@ -91,25 +91,31 @@ class StatsHandler(StateDeltasHandler):
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
while True:
|
||||
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
if not deltas:
|
||||
break
|
||||
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
if self.pos is not None:
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
with Measure(self.clock, "stats_total_events_and_bytes"):
|
||||
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
|
||||
self.pos
|
||||
)
|
||||
|
||||
if not deltas and not had_counts:
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
|
||||
@@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
@@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
|
||||
@@ -19,6 +19,7 @@ from itertools import chain
|
||||
|
||||
from twisted.internet.defer import DeferredLock
|
||||
|
||||
from synapse.storage import PostgresEngine
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -36,6 +37,7 @@ ABSOLUTE_STATS_FIELDS = {
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"total_events",
|
||||
"total_event_bytes",
|
||||
),
|
||||
"user": ("public_rooms", "private_rooms"),
|
||||
}
|
||||
@@ -475,3 +477,113 @@ class StatsStore(StateDeltasStore):
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
||||
|
||||
def incremental_update_room_total_events_and_bytes(self, in_positions):
|
||||
"""
|
||||
Counts the number of events and total event bytes per-room and then adds
|
||||
these to the respective total_events and total_event_bytes room counts.
|
||||
|
||||
Args:
|
||||
in_positions (dict): Positions,
|
||||
as retrieved from L{get_stats_positions}.
|
||||
|
||||
Returns (Deferred[tuple[dict, bool]]):
|
||||
First element (dict):
|
||||
The new positions. Note that this is for reference only –
|
||||
the new positions WILL be committed by this function.
|
||||
Second element (bool):
|
||||
true iff there was a change to the positions, false otherwise
|
||||
"""
|
||||
|
||||
def incremental_update_total_events_and_bytes_txn(txn):
|
||||
positions = in_positions.copy()
|
||||
|
||||
max_pos = self.get_room_max_stream_ordering()
|
||||
min_pos = self.get_room_min_stream_ordering()
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=positions["total_events_max_stream_ordering"],
|
||||
high_pos=max_pos,
|
||||
)
|
||||
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=min_pos,
|
||||
high_pos=positions["total_events_min_stream_ordering"],
|
||||
)
|
||||
|
||||
if (
|
||||
positions["total_events_max_stream_ordering"] != max_pos
|
||||
or positions["total_events_min_stream_ordering"] != min_pos
|
||||
):
|
||||
positions["total_events_max_stream_ordering"] = max_pos
|
||||
positions["total_events_min_stream_ordering"] = min_pos
|
||||
|
||||
self._update_stats_positions_txn(txn, positions)
|
||||
|
||||
return positions, True
|
||||
else:
|
||||
return positions, False
|
||||
|
||||
return self.runInteraction(
|
||||
"stats_incremental_total_events_and_bytes",
|
||||
incremental_update_total_events_and_bytes_txn,
|
||||
)
|
||||
|
||||
def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
|
||||
"""
|
||||
Updates the total_events and total_event_bytes counts for rooms,
|
||||
in a range of stream_orderings.
|
||||
|
||||
Inclusivity of low_pos and high_pos is dependent upon their signs.
|
||||
This makes it intuitive to use this function for both backfilled
|
||||
and non-backfilled events.
|
||||
|
||||
Examples:
|
||||
(low, high) → (kind)
|
||||
(3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
|
||||
(-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
|
||||
(-7, 7) → -7 <= … <= 7 (both)
|
||||
|
||||
Args:
|
||||
txn: Database transaction.
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
"""
|
||||
|
||||
if low_pos >= high_pos:
|
||||
# nothing to do here.
|
||||
return
|
||||
|
||||
now = self.hs.clock.time_msec()
|
||||
|
||||
# we choose comparators based on the signs
|
||||
low_comparator = "<=" if low_pos < 0 else "<"
|
||||
high_comparator = "<" if high_pos < 0 else "<="
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
new_bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE ? %s stream_ordering AND stream_ordering %s ?
|
||||
GROUP BY room_id
|
||||
""" % (
|
||||
low_comparator,
|
||||
high_comparator,
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos))
|
||||
|
||||
for room_id, new_events, new_bytes in txn.fetchall():
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
{"total_events": new_events, "total_event_bytes": new_bytes},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user