Merge pull request #5941 from matrix-org/rei/rss_inc7
Separated Statistics [7/7ish]
This commit is contained in:
@@ -17,9 +17,12 @@
|
||||
import logging
|
||||
from itertools import chain
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import DeferredLock
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage import PostgresEngine
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
@@ -49,6 +52,9 @@ PER_SLICE_FIELDS = {"room": (), "user": ()}
|
||||
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
# these are the tables (& ID columns) which contain our actual subjects
|
||||
TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
|
||||
|
||||
|
||||
class StatsStore(StateDeltasStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
@@ -61,8 +67,18 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
|
||||
self.register_noop_background_update("populate_stats_createtables")
|
||||
self.register_noop_background_update("populate_stats_process_rooms")
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_prepare", self._populate_stats_prepare
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_process_rooms", self._populate_stats_process_rooms
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_process_users", self._populate_stats_process_users
|
||||
)
|
||||
# we no longer need to perform clean-up, but we will give ourselves
|
||||
# the potential to reintroduce it in the future – so documentation
|
||||
# will still encourage the use of this no-op handler.
|
||||
self.register_noop_background_update("populate_stats_cleanup")
|
||||
|
||||
def quantise_stats_time(self, ts):
|
||||
@@ -81,6 +97,456 @@ class StatsStore(StateDeltasStore):
|
||||
"""
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unwedge_incremental_processor(self, forced_promise):
|
||||
"""
|
||||
Make a promise about what this stats regeneration will handle,
|
||||
so that we can allow the incremental processor to start doing things
|
||||
right away – 'unwedging' it.
|
||||
|
||||
Args:
|
||||
forced_promise (dict of positions):
|
||||
If supplied, this is the promise that is made.
|
||||
Otherwise, a promise is made that reduces the amount of work
|
||||
that must be performed by the incremental processor.
|
||||
"""
|
||||
|
||||
if forced_promise is None:
|
||||
promised_stats_delta_pos = (
|
||||
yield self.get_max_stream_id_in_current_state_deltas()
|
||||
)
|
||||
promised_max = self.get_room_max_stream_ordering()
|
||||
promised_min = self.get_room_min_stream_ordering()
|
||||
|
||||
promised_positions = {
|
||||
"state_delta_stream_id": promised_stats_delta_pos,
|
||||
"total_events_min_stream_ordering": promised_min,
|
||||
"total_events_max_stream_ordering": promised_max,
|
||||
}
|
||||
else:
|
||||
promised_positions = forced_promise
|
||||
|
||||
# this stores it for our reference later
|
||||
yield self.update_stats_positions(
|
||||
promised_positions, for_initial_processor=True
|
||||
)
|
||||
|
||||
# this unwedges the incremental processor
|
||||
yield self.update_stats_positions(
|
||||
promised_positions, for_initial_processor=False
|
||||
)
|
||||
|
||||
# with the delta processor unwedged, now let it catch up in case
|
||||
# anything was missed during the wedge period
|
||||
self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_prepare(self, progress, batch_size):
|
||||
"""
|
||||
This is a background update, which prepares the database for
|
||||
statistics regeneration.
|
||||
"""
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_prepare")
|
||||
return 1
|
||||
|
||||
def _wedge_incremental_processor(txn):
|
||||
"""
|
||||
Wedge the incremental processor (by setting its positions to NULL),
|
||||
and return its previous positions – atomically.
|
||||
"""
|
||||
|
||||
old = self._get_stats_positions_txn(txn, for_initial_processor=False)
|
||||
self._update_stats_positions_txn(txn, None, for_initial_processor=False)
|
||||
|
||||
return old
|
||||
|
||||
def _make_skeletons(txn, stats_type):
|
||||
"""
|
||||
Get all the rooms and users that we want to process, and create
|
||||
'skeletons' (incomplete _stats_current rows) for them, if they do
|
||||
not already have a row.
|
||||
"""
|
||||
|
||||
if isinstance(self.database_engine, Sqlite3Engine):
|
||||
sql = """
|
||||
INSERT OR IGNORE INTO %(table)s_current
|
||||
(%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
|
||||
SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
INSERT INTO %(table)s_current
|
||||
(%(id_col)s, completed_delta_stream_id, %(zero_cols)s)
|
||||
SELECT %(origin_id_col)s, NULL, %(zeroes)s FROM %(origin_table)s
|
||||
ON CONFLICT DO NOTHING
|
||||
"""
|
||||
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
origin_table, origin_id_col = TYPE_TO_ORIGIN_TABLE[stats_type]
|
||||
zero_cols = list(
|
||||
chain(ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type])
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
sql
|
||||
% {
|
||||
"table": table,
|
||||
"id_col": id_col,
|
||||
"origin_id_col": origin_id_col,
|
||||
"origin_table": origin_table,
|
||||
"zero_cols": ", ".join(zero_cols),
|
||||
"zeroes": ", ".join(["0"] * len(zero_cols)),
|
||||
}
|
||||
)
|
||||
|
||||
def _delete_dirty_skeletons(txn):
|
||||
"""
|
||||
Delete pre-existing rows which are incomplete.
|
||||
"""
|
||||
sql = """
|
||||
DELETE FROM %s_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
|
||||
for _k, (table, id_col) in TYPE_TO_TABLE.items():
|
||||
txn.execute(sql % (table,))
|
||||
|
||||
# first wedge the incremental processor and reset our promise
|
||||
yield self.stats_delta_processing_lock.acquire()
|
||||
try:
|
||||
old_positions = yield self.runInteraction(
|
||||
"populate_stats_wedge", _wedge_incremental_processor
|
||||
)
|
||||
finally:
|
||||
yield self.stats_delta_processing_lock.release()
|
||||
|
||||
if None in old_positions.values():
|
||||
old_positions = None
|
||||
|
||||
# with the incremental processor wedged, we delete dirty skeleton rows
|
||||
# since we don't want to double-count them.
|
||||
yield self.runInteraction(
|
||||
"populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
|
||||
)
|
||||
|
||||
yield self._unwedge_incremental_processor(old_positions)
|
||||
|
||||
yield self.runInteraction(
|
||||
"populate_stats_make_skeletons", _make_skeletons, "room"
|
||||
)
|
||||
yield self.runInteraction(
|
||||
"populate_stats_make_skeletons", _make_skeletons, "user"
|
||||
)
|
||||
self.get_earliest_token_for_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_prepare")
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_users(self, progress, batch_size):
|
||||
"""
|
||||
This is a background update which regenerates statistics for users.
|
||||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_users")
|
||||
return 1
|
||||
|
||||
def _get_next_batch(txn):
|
||||
# Only fetch 250 users, so we don't fetch too many at once, even
|
||||
# if those 250 users have less than batch_size state events.
|
||||
sql = """
|
||||
SELECT user_id FROM user_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
LIMIT 250
|
||||
"""
|
||||
txn.execute(sql)
|
||||
users_to_work_on = txn.fetchall()
|
||||
|
||||
if not users_to_work_on:
|
||||
return None
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
sql = """
|
||||
SELECT COUNT(*) FROM user_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
txn.execute(sql)
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return users_to_work_on
|
||||
|
||||
users_to_work_on = yield self.runInteraction(
|
||||
"populate_stats_users_get_batch", _get_next_batch
|
||||
)
|
||||
|
||||
# No more users -- complete the transaction.
|
||||
if not users_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_users")
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d users of %d remaining",
|
||||
len(users_to_work_on),
|
||||
progress["remaining"],
|
||||
)
|
||||
|
||||
processed_membership_count = 0
|
||||
|
||||
promised_positions = yield self.get_stats_positions(for_initial_processor=True)
|
||||
|
||||
if None in promised_positions:
|
||||
logger.error(
|
||||
"There is a None in promised_positions;"
|
||||
" dependency task must not have been run."
|
||||
" promised_positions: %r",
|
||||
promised_positions,
|
||||
)
|
||||
yield self._end_background_update("populate_stats_process_users")
|
||||
return 1
|
||||
|
||||
for (user_id,) in users_to_work_on:
|
||||
now = self.clock.time_msec()
|
||||
|
||||
def _process_user(txn):
|
||||
# Get the current token
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
(
|
||||
join_rules = 'public'
|
||||
OR history_visibility = 'world_readable'
|
||||
) AS is_public,
|
||||
COUNT(*) AS count
|
||||
FROM room_memberships
|
||||
JOIN room_stats_state USING (room_id)
|
||||
WHERE
|
||||
user_id = ? AND membership = 'join'
|
||||
GROUP BY is_public
|
||||
"""
|
||||
txn.execute(sql, (user_id,))
|
||||
room_counts_by_publicness = dict(txn.fetchall())
|
||||
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
{},
|
||||
complete_with_stream_id=current_token,
|
||||
absolute_field_overrides={
|
||||
# these are counted absolutely because it is
|
||||
# more difficult to count them from the promised time,
|
||||
# because counting them now can use the quick lookup
|
||||
# tables.
|
||||
"public_rooms": room_counts_by_publicness.get(True, 0),
|
||||
"private_rooms": room_counts_by_publicness.get(False, 0),
|
||||
},
|
||||
)
|
||||
|
||||
# we use this count for rate-limiting
|
||||
return sum(room_counts_by_publicness.values())
|
||||
|
||||
processed_membership_count += yield self.runInteraction(
|
||||
"update_user_stats", _process_user
|
||||
)
|
||||
|
||||
# Update the remaining counter.
|
||||
progress["remaining"] -= 1
|
||||
|
||||
if processed_membership_count > batch_size:
|
||||
# Don't process any more users, we've hit our batch size.
|
||||
return processed_membership_count
|
||||
|
||||
yield self.runInteraction(
|
||||
"populate_stats",
|
||||
self._background_update_progress_txn,
|
||||
"populate_stats_process_users",
|
||||
progress,
|
||||
)
|
||||
|
||||
return processed_membership_count
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_rooms(self, progress, batch_size):
|
||||
"""
|
||||
This is a background update which regenerates statistics for rooms.
|
||||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
def _get_next_batch(txn):
|
||||
# Only fetch 250 rooms, so we don't fetch too many at once, even
|
||||
# if those 250 rooms have less than batch_size state events.
|
||||
sql = """
|
||||
SELECT room_id FROM room_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
LIMIT 250
|
||||
"""
|
||||
txn.execute(sql)
|
||||
rooms_to_work_on = txn.fetchall()
|
||||
|
||||
if not rooms_to_work_on:
|
||||
return None
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
sql = """
|
||||
SELECT COUNT(*) FROM room_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
txn.execute(sql)
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return rooms_to_work_on
|
||||
|
||||
rooms_to_work_on = yield self.runInteraction(
|
||||
"populate_stats_rooms_get_batch", _get_next_batch
|
||||
)
|
||||
|
||||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
len(rooms_to_work_on),
|
||||
progress["remaining"],
|
||||
)
|
||||
|
||||
# Number of state events we've processed by going through each room
|
||||
processed_event_count = 0
|
||||
|
||||
promised_positions = yield self.get_stats_positions(for_initial_processor=True)
|
||||
|
||||
if None in promised_positions:
|
||||
logger.error(
|
||||
"There is a None in promised_positions;"
|
||||
" dependency task must not have been run."
|
||||
" promised_positions: %s",
|
||||
promised_positions,
|
||||
)
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
for (room_id,) in rooms_to_work_on:
|
||||
current_state_ids = yield self.get_current_state_ids(room_id)
|
||||
|
||||
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
|
||||
history_visibility_id = current_state_ids.get(
|
||||
(EventTypes.RoomHistoryVisibility, "")
|
||||
)
|
||||
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
|
||||
name_id = current_state_ids.get((EventTypes.Name, ""))
|
||||
topic_id = current_state_ids.get((EventTypes.Topic, ""))
|
||||
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
|
||||
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
|
||||
|
||||
event_ids = [
|
||||
join_rules_id,
|
||||
history_visibility_id,
|
||||
encryption_id,
|
||||
name_id,
|
||||
topic_id,
|
||||
avatar_id,
|
||||
canonical_alias_id,
|
||||
]
|
||||
|
||||
state_events = yield self.get_events(
|
||||
[ev for ev in event_ids if ev is not None]
|
||||
)
|
||||
|
||||
def _get_or_none(event_id, arg):
|
||||
event = state_events.get(event_id)
|
||||
if event:
|
||||
return event.content.get(arg)
|
||||
return None
|
||||
|
||||
yield self.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": _get_or_none(join_rules_id, "join_rule"),
|
||||
"history_visibility": _get_or_none(
|
||||
history_visibility_id, "history_visibility"
|
||||
),
|
||||
"encryption": _get_or_none(encryption_id, "algorithm"),
|
||||
"name": _get_or_none(name_id, "name"),
|
||||
"topic": _get_or_none(topic_id, "topic"),
|
||||
"avatar": _get_or_none(avatar_id, "url"),
|
||||
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
|
||||
},
|
||||
)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
|
||||
def _fetch_data(txn):
|
||||
# Get the current token of the room
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
|
||||
current_state_events = len(current_state_ids)
|
||||
|
||||
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
|
||||
|
||||
room_total_event_count, room_total_event_bytes = self._count_events_and_bytes_in_room_txn(
|
||||
txn,
|
||||
room_id,
|
||||
promised_positions["total_events_min_stream_ordering"],
|
||||
promised_positions["total_events_max_stream_ordering"],
|
||||
)
|
||||
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
{
|
||||
"total_events": room_total_event_count,
|
||||
"total_event_bytes": room_total_event_bytes,
|
||||
},
|
||||
complete_with_stream_id=current_token,
|
||||
absolute_field_overrides={
|
||||
# these are counted absolutely because it is
|
||||
# more difficult to count them from the promised time,
|
||||
# because counting them now can use the quick lookup
|
||||
# tables.
|
||||
"current_state_events": current_state_events,
|
||||
"joined_members": membership_counts.get(Membership.JOIN, 0),
|
||||
"invited_members": membership_counts.get(Membership.INVITE, 0),
|
||||
"left_members": membership_counts.get(Membership.LEAVE, 0),
|
||||
"banned_members": membership_counts.get(Membership.BAN, 0),
|
||||
},
|
||||
)
|
||||
|
||||
# we use this count for rate-limiting
|
||||
return room_total_event_count
|
||||
|
||||
room_event_count = yield self.runInteraction(
|
||||
"update_room_stats", _fetch_data
|
||||
)
|
||||
|
||||
# Update the remaining counter.
|
||||
progress["remaining"] -= 1
|
||||
|
||||
processed_event_count += room_event_count
|
||||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
return processed_event_count
|
||||
|
||||
yield self.runInteraction(
|
||||
"populate_stats",
|
||||
self._background_update_progress_txn,
|
||||
"populate_stats_process_rooms",
|
||||
progress,
|
||||
)
|
||||
|
||||
return processed_event_count
|
||||
|
||||
def get_stats_positions(self, for_initial_processor=False):
|
||||
"""
|
||||
Returns the stats processor positions.
|
||||
@@ -581,7 +1047,7 @@ class StatsStore(StateDeltasStore):
|
||||
# nothing to do here.
|
||||
return
|
||||
|
||||
now = self.hs.clock.time_msec()
|
||||
now = self.clock.time_msec()
|
||||
|
||||
# we choose comparators based on the signs
|
||||
low_comparator = "<=" if low_pos < 0 else "<"
|
||||
@@ -613,3 +1079,38 @@ class StatsStore(StateDeltasStore):
|
||||
room_id,
|
||||
{"total_events": new_events, "total_event_bytes": new_bytes},
|
||||
)
|
||||
|
||||
def _count_events_and_bytes_in_room_txn(self, txn, room_id, low_token, high_token):
|
||||
"""
|
||||
Count the number of events and event bytes in a room between two tokens,
|
||||
inclusive.
|
||||
Args:
|
||||
txn (cursor): The database
|
||||
room_id (str): The ID of the room to count events for
|
||||
low_token (int): the minimum stream ordering to count
|
||||
high_token (int): the maximum stream ordering to count
|
||||
|
||||
Returns (tuple[int, int]):
|
||||
First element (int):
|
||||
the number of events
|
||||
Second element (int):
|
||||
the number of bytes in events' event JSON
|
||||
"""
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT COUNT(*) AS num_events, SUM(%s) AS num_bytes
|
||||
FROM events
|
||||
JOIN event_json USING (event_id)
|
||||
WHERE events.room_id = ?
|
||||
AND ? <= stream_ordering
|
||||
AND stream_ordering <= ?
|
||||
""" % (
|
||||
bytes_expression,
|
||||
)
|
||||
txn.execute(sql, (room_id, low_token, high_token))
|
||||
return txn.fetchone()
|
||||
|
||||
Reference in New Issue
Block a user