1
0

Introduce total_events tracking and rework statistics tracking.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre)
2019-08-08 11:37:29 +01:00
parent 5ca4cd5ad4
commit cf9f7ae366

View File

@@ -19,8 +19,10 @@ from itertools import chain
from threading import Lock
from twisted.internet import defer
from twisted.internet.defer import ensureDeferred
from synapse.api.constants import EventTypes, Membership
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -35,14 +37,21 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
"state_events",
"total_events", # TODO review this list
),
"user": ("public_rooms", "private_rooms"),
}
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
# these fields are per-timeslice and so should be reset to 0 upon a new slice
PER_SLICE_FIELDS = {"room": (), "user": ()}
TEMP_TABLE = "_temp_populate_stats"
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class OldCollectionRequired(Exception):
""" Signal that we need to collect old stats rows and retry. """
pass
class StatsStore(StateDeltasStore):
@@ -57,120 +66,295 @@ class StatsStore(StateDeltasStore):
self.stats_delta_processing_lock = Lock()
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
"populate_stats_prepare", self._populate_stats_prepare
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
"populate_stats_process_users", self._populate_stats_process_users
)
self.register_background_update_handler(
"populate_stats_cleanup", self._populate_stats_cleanup
)
def quantise_stats_time(self, ts):
return (ts // self.stats_bucket_size) * self.stats_bucket_size
async def _unwedge_incremental_processor(self, forced_promise):
"""
Make a promise about what this initial background count will handle,
so that we can allow the incremental processor to start doing things
right away 'unwedging' it.
"""
if forced_promise is None:
promised_stats_delta_pos = (
await self.get_max_stream_id_in_current_state_deltas()
)
promised_max = self.get_room_max_stream_ordering()
promised_min = self.get_room_min_stream_ordering()
promised_positions = {
"state_delta_stream_id": promised_stats_delta_pos,
"total_events_min_stream_ordering": promised_min,
"total_events_max_stream_ordering": promised_max,
}
else:
promised_positions = forced_promise
# this stores it for our reference later
await self.update_stats_positions(
promised_positions, for_initial_processor=True
)
# this unwedges the incremental processor
await self.update_stats_positions(
promised_positions, for_initial_processor=False
)
@defer.inlineCallbacks
def _populate_stats_createtables(self, progress, batch_size):
def _populate_stats_prepare(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
yield self._end_background_update("populate_stats_prepare")
defer.returnValue(1)
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
stmts = get_statements(
def _wedge_incremental_processor(txn):
"""
Wedge the incremental processor (by setting its positions to NULL),
and return its previous positions atomically.
"""
with self.stats_delta_processing_lock:
old = self._get_stats_positions_txn(txn, for_initial_processor=False)
self._update_stats_positions_txn(txn, None, for_initial_processor=False)
return old
def _make_skeletons(txn):
"""
Get all the rooms and users that we want to process, and create
'skeletons' (incomplete _stats_current rows) for them, if they do
not already have a row.
"""
if isinstance(self.database_engine, Sqlite3Engine):
sqls = """
INSERT OR IGNORE INTO room_stats_current (room_id)
SELECT room_id FROM rooms;
INSERT OR IGNORE INTO user_stats_current (user_id)
SELECT name AS user_id FROM users;
"""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
else:
sqls = """
INSERT INTO room_stats_current (room_id)
SELECT room_id FROM rooms
ON CONFLICT DO NOTHING;
CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);
INSERT INTO user_stats_current (user_id)
SELECT name AS user_id FROM users
ON CONFLICT DO NOTHING;
"""
CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(
temp=TEMP_TABLE
).splitlines()
)
for statement in stmts:
for statement in get_statements(sqls.splitlines()):
txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
def _delete_dirty_skeletons(txn):
sqls = """
DELETE FROM room_stats_current
WHERE completed_delta_stream_id IS NULL;
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (
TEMP_TABLE,
)
txn.execute(sql)
DELETE FROM user_stats_current
WHERE completed_delta_stream_id IS NULL;
"""
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
self.get_earliest_token_for_room_stats.invalidate_all()
for statement in get_statements(sqls.splitlines()):
txn.execute(statement)
yield self._end_background_update("populate_stats_createtables")
# first wedge the incremental processor and reset our promise
old_positions = yield self.runInteraction(
"populate_stats_wedge", _wedge_incremental_processor
)
if None in old_positions.values():
old_positions = None
# with the incremental processor wedged, we delete dirty skeleton rows
# since we don't want to double-count them.
yield self.runInteraction(
"populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
)
yield ensureDeferred(self._unwedge_incremental_processor(old_positions))
yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
self.get_earliest_token_for_stats.invalidate_all()
yield self._end_background_update("populate_stats_prepare")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
"""
Update the user directory stream position, then clean up the old tables.
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1)
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_stats_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
# TODO is there really no clean-up to be done?
# TODO if not self.stats_enabled….
yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
def _populate_stats_process_users(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_users")
defer.returnValue(1)
def _get_next_batch(txn):
# Only fetch 250 users, so we don't fetch too many at once, even
# if those 250 users have less than batch_size state events.
sql = """
SELECT user_id FROM user_stats_current
WHERE completed_delta_stream_id IS NULL
LIMIT 250
"""
txn.execute(sql)
users_to_work_on = txn.fetchall()
if not users_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute(
"SELECT COUNT(*) FROM room_stats_current"
" WHERE completed_delta_stream_id IS NULL"
)
progress["remaining"] = txn.fetchone()[0]
return users_to_work_on
users_to_work_on = yield self.runInteraction(
"populate_stats_users_get_batch", _get_next_batch
)
# No more users -- complete the transaction.
if not users_to_work_on:
yield self._end_background_update("populate_stats_process_users")
defer.returnValue(1)
logger.info(
"Processing the next %d users of %d remaining",
len(users_to_work_on),
progress["remaining"],
)
processed_membership_count = 0
promised_positions = yield self.get_stats_positions(for_initial_processor=True)
if None in promised_positions:
logger.error(
"There is a None in promised_positions;"
" dependency task must not have been run."
" promised_positions: %s",
promised_positions,
)
yield self._end_background_update("populate_stats_process_users")
defer.returnValue(1)
for user_id in users_to_work_on:
now = self.hs.get_reactor().seconds()
def _process_user(txn):
# Get the current token
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
sql = """
SELECT
(
join_rules = 'public'
OR history_visibility = 'world_readable'
) AS is_public,
COUNT(*) AS count
FROM room_memberships
JOIN room_state USING (room_id)
WHERE
user_id = ? AND membership = 'join'
GROUP BY is_public
"""
txn.execute(sql, (user_id,))
room_counts_by_publicness = dict(txn.fetchall())
try:
self._update_stats_delta_txn(
txn,
now,
"user",
user_id,
{},
complete_with_stream_id=current_token,
absolute_fields={
# these are counted absolutely because it is
# more difficult to count them from the promised time,
# because counting them now can use the quick lookup
# tables.
"public_rooms": room_counts_by_publicness.get(True, 0),
"private_rooms": room_counts_by_publicness.get(False, 0),
},
)
# TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired:
# this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows
# and incomplete rows can never be old.
# However, if it does, the most graceful handling is just to
# ignore it and carry on processing other users.
logger.error(
"Supposedly Impossible: OldCollectionRequired in initial"
" background update, for user ID %s",
user_id,
exc_info=True,
)
pass
# we use this count for rate-limiting
return sum(room_counts_by_publicness.values())
processed_membership_count += yield self.runInteraction(
"update_user_stats", _process_user
)
# Update the remaining counter.
progress["remaining"] -= 1
if processed_membership_count > batch_size:
# Don't process any more users, we've hit our batch size.
defer.returnValue(processed_membership_count)
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_users",
progress,
)
defer.returnValue(processed_membership_count)
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1)
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_stats()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
SELECT room_id, events FROM %s_rooms
ORDER BY events DESC
SELECT room_id FROM room_stats_current
WHERE completed_delta_stream_id IS NULL
LIMIT 250
""" % (
TEMP_TABLE,
)
"""
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
@@ -179,13 +363,16 @@ class StatsStore(StateDeltasStore):
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
txn.execute(
"SELECT COUNT(*) FROM room_stats_current"
" WHERE completed_delta_stream_id IS NULL"
)
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_stats_temp_read", _get_next_batch
"populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
@@ -202,8 +389,19 @@ class StatsStore(StateDeltasStore):
# Number of state events we've processed by going through each room
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
promised_positions = yield self.get_stats_positions(for_initial_processor=True)
if None in promised_positions:
logger.error(
"There is a None in promised_positions;"
" dependency task must not have been run."
" promised_positions: %s",
promised_positions,
)
yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1)
for room_id in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
@@ -251,11 +449,7 @@ class StatsStore(StateDeltasStore):
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _fetch_data(txn):
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
@@ -263,58 +457,140 @@ class StatsStore(StateDeltasStore):
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
self._update_stats_txn(
room_total_event_count = self._count_events_in_room_txn(
txn,
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
self._simple_insert_txn(
txn,
"room_stats_earliest_token",
{"room_id": room_id, "token": current_token},
promised_positions["total_events_min_stream_ordering"],
promised_positions["total_events_max_stream_ordering"],
)
# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
)
try:
self._update_stats_delta_txn(
txn,
now,
"room",
room_id,
{"total_events": room_total_event_count},
complete_with_stream_id=current_token,
absolute_fields={
# these are counted absolutely because it is
# more difficult to count them from the promised time,
# because counting them now can use the quick lookup
# tables.
"current_state_events": current_state_events,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(
Membership.INVITE, 0
),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
},
)
# TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired:
# this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows
# and incomplete rows can never be old.
# However, if it does, the most graceful handling is just to
# ignore it and carry on processing other rooms.
logger.error(
"Supposedly Impossible: OldCollectionRequired in initial"
" background update, for room ID %s",
room_id,
exc_info=True,
)
pass
yield self.runInteraction("update_room_stats", _fetch_data)
# we use this count for rate-limiting
return room_total_event_count
room_event_count = yield self.runInteraction(
"update_room_stats", _fetch_data
)
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
processed_event_count += event_count
processed_event_count += room_event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
defer.returnValue(processed_event_count)
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
defer.returnValue(processed_event_count)
def update_total_event_count_between_txn(self, txn, low_pos, high_pos):
"""
Updates the total_events counts for rooms
Args:
txn: Database transaction. It is assumed that you will have one,
since you probably want to update pointers at the same time.
low_pos: The old stream position (stream position of the last event
that was already handled.)
high_pos: The new stream position (stream position of the new last
event to handle.)
"""
if low_pos >= high_pos:
# nothing to do here.
return
now = self.hs.get_reactor().seconds()
# we choose comparators based on the signs
low_comparator = "<=" if low_pos < 0 else "<"
high_comparator = "<" if high_pos < 0 else "<="
# so, examples:
# 3, 7 → 3 < … <= 7 (normal-filled)
# -4, -2 → -4 <= … < -2 (backfilled)
# -7, 7 → -7 <= … <= 7 (both)
sql = """
SELECT room_id, COUNT(*) AS new_events
FROM events
WHERE ? %s stream_ordering AND stream_ordering %s ?
GROUP BY room_id
""" % (
low_comparator,
high_comparator,
)
txn.execute(sql, (low_pos, high_pos))
for room_id, new_events in txn.fetchall():
while True:
try:
self._update_stats_delta_txn(
txn, now, "room", room_id, {"total_events": new_events}
)
break
except OldCollectionRequired:
self._collect_old_txn(txn, "room")
continue
def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
sql = """
SELECT COUNT(*) AS num_events
FROM events
WHERE room_id = ?
AND ? <= stream_ordering
AND stream_ordering <= ?
"""
txn.execute(sql, (room_id, low_token, high_token))
return txn.fetchone()[0]
def delete_all_stats(self):
"""
Delete all statistics records.
TODO obsolete?
"""
def _delete_all_stats_txn(txn):
@@ -435,7 +711,7 @@ class StatsStore(StateDeltasStore):
)
@cached()
def get_earliest_token_for_room_stats(self, room_id):
def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -445,15 +721,17 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
table, id_col = TYPE_TO_TABLE[stats_type]
return self._simple_select_one_onecol(
"room_stats_earliest_token",
{"room_id": room_id},
retcol="token",
"%s_current" % (table,),
{id_col: id},
retcol="completed_delta_stream_id",
allow_none=True,
)
def update_stats(self, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
table, id_col = TYPE_TO_TABLE[stats_type]
return self._simple_upsert(
table=table,
keyvalues={id_col: stats_id, "ts": ts},
@@ -462,62 +740,218 @@ class StatsStore(StateDeltasStore):
)
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
table, id_col = TYPE_TO_TABLE[stats_type]
return self._simple_upsert_txn(
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
def _collect_old_txn(self, txn, stats_type, limit=500):
# we do them in batches to prevent concurrent updates from
# messing us over with lots of retries
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
now = self.hs.get_reactor().seconds()
quantised_ts = self.quantise_stats_time(now)
table, id_col = TYPE_TO_TABLE[stats_type]
fields = ", ".join(
field
for field in chain(
ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
)
)
sql = ("SELECT %s FROM %s_current WHERE end_ts <= ? LIMIT %d FOR UPDATE") % (
id_col,
table,
limit,
)
txn.execute(sql, (quantised_ts,))
maybe_more = txn.rowcount == limit
updates = txn.fetchall()
sql = (
"INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
" SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
" FROM %s_current WHERE %s = ?"
) % (table, id_col, fields, id_col, fields, table, id_col)
txn.executemany(sql, updates)
sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
table,
id_col,
)
txn.executemany(sql, updates)
return maybe_more
async def collect_old(self, stats_type):
while True:
maybe_more = await self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type
)
if not maybe_more:
return
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
async def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
self._simple_insert_txn(txn, table=table, values=values)
Args:
ts (int):
stats_type (str):
stats_id (str):
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
Returns:
"""
while True:
try:
return await self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
except OldCollectionRequired:
# retry after collecting old rows
await self.collect_old(stats_type)
return self.runInteraction("update_stats_delta", _update_stats_delta)
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_fields=None,
):
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
field_values = list(fields.values())
if absolute_fields is not None:
field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
field_values += list(absolute_fields.values())
if complete_with_stream_id is not None:
field_sqls.append("completed_delta_stream_id = ?")
field_values.append(complete_with_stream_id)
sql = (
"UPDATE %s_current SET end_ts = ?, %s"
" WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
" AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
txn.execute(sql, qargs)
if txn.rowcount > 0:
# success.
return
# if we're here, it's because we didn't succeed in updating a stats
# row. Why? Let's find out…
current_row = self._simple_select_one_txn(
txn,
table + "_current",
{id_col: stats_id},
("end_ts", "completed_delta_stream_id"),
allow_none=True,
)
if current_row is None:
# we need to insert a row! (insert a dirty, incomplete row)
insertee = {
id_col: stats_id,
"end_ts": end_ts,
"start_ts": ts, # TODO or do we use qts?
"completed_delta_stream_id": complete_with_stream_id,
}
# we assume that, by default, blank fields should be zero.
for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
insertee[field_name] = 0
for field_name in PER_SLICE_FIELDS[stats_type]:
insertee[field_name] = 0
for (field, value) in fields.items():
insertee[field] = value
if absolute_fields is not None:
for (field, value) in absolute_fields.items():
insertee[field] = value
self._simple_insert_txn(txn, table + "_current", insertee)
elif current_row["end_ts"] is None:
# update the row, including start_ts
sql = (
"UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
" WHERE end_ts IS NULL AND %s = ?"
) % (table, ", ".join(field_sqls), id_col)
qargs = (
[end_ts - self.stats_bucket_size, end_ts]
+ list(field_values)
+ [stats_id]
)
txn.execute(sql, qargs)
if txn.rowcount == 0:
raise RuntimeError(
"Should be impossible: No rows updated"
" but all conditions are known to be met."
)
elif current_row["end_ts"] < end_ts:
# we need to perform old collection first
raise OldCollectionRequired()
def incremental_update_total_events(self, in_positions):
def incremental_update_total_events_txn(txn):
positions = in_positions.copy()
max_pos = self.get_room_max_stream_ordering()
min_pos = self.get_room_min_stream_ordering()
self.update_total_event_count_between_txn(
txn,
low_pos=positions["total_events_max_stream_ordering"],
high_pos=max_pos,
)
self.update_total_event_count_between_txn(
txn,
low_pos=min_pos,
high_pos=positions["total_events_min_stream_ordering"],
)
if (
positions["total_events_max_stream_ordering"] != max_pos
or positions["total_events_min_stream_ordering"] != min_pos
):
positions["total_events_max_stream_ordering"] = max_pos
positions["total_events_min_stream_ordering"] = min_pos
self._update_stats_positions_txn(txn, positions)
return positions
return self.runInteraction(
"stats_incremental_total_events", incremental_update_total_events_txn
)