1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre) e79b3aed87 I mustn't forget my morning corn flake(8)s
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:40:50 +01:00
Olivier Wilkinson (reivilibre) 279e63aea2 Collect old current stats rows when updating stats with deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:36:14 +01:00
+81 -2
View File
@@ -15,10 +15,12 @@
# limitations under the License.
import logging
from itertools import chain
from threading import Lock
from twisted.internet import defer
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -213,6 +215,84 @@ class StatsStore(StateDeltasStore):
allow_none=True,
)
def _collect_old_txn(self, txn, stats_type, limit=500):
"""
See {collect_old}. Runs only a small batch, specified by limit.
Returns (bool):
True iff there is possibly more to do (i.e. this needs re-running),
False otherwise.
"""
# we do them in batches to prevent concurrent updates from
# messing us over with lots of retries
now = self.hs.get_reactor().seconds()
quantised_ts = self.quantise_stats_time(now)
table, id_col = TYPE_TO_TABLE[stats_type]
fields = ", ".join(
field
for field in chain(
ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
)
)
# `end_ts IS NOT NULL` is for partial index optimisation
if isinstance(self.database_engine, Sqlite3Engine):
# SQLite doesn't support SELECT FOR UPDATE
sql = (
"SELECT %s FROM %s_current"
" WHERE end_ts <= ? AND end_ts IS NOT NULL"
" LIMIT %d"
) % (id_col, table, limit)
else:
sql = (
"SELECT %s FROM %s_current"
" WHERE end_ts <= ? AND end_ts IS NOT NULL"
" LIMIT %d FOR UPDATE"
) % (id_col, table, limit)
txn.execute(sql, (quantised_ts,))
maybe_more = txn.rowcount == limit
updates = txn.fetchall()
sql = (
"INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
" SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
" FROM %s_current WHERE %s = ?"
) % (table, id_col, fields, id_col, fields, table, id_col)
txn.executemany(sql, updates)
sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
table,
id_col,
)
txn.executemany(sql, updates)
return maybe_more
@defer.inlineCallbacks
def collect_old(self, stats_type):
"""
Run 'old collection' on current stats rows.
Old collection is the process of copying dirty (updated) stats rows
from the current table to the historical table, when those rows have
finished their stats time slice.
Collected rows are then cleared of their dirty status.
Args:
stats_type: "room" or "user" the type of stats to run old collection
on.
"""
while True:
maybe_more = yield self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type
)
if not maybe_more:
return None
@defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
@@ -246,8 +326,7 @@ class StatsStore(StateDeltasStore):
return res
except OldCollectionRequired:
# retry after collecting old rows
# TODO (implement later)
raise NotImplementedError("old collection not in this PR")
yield self.collect_old(stats_type)
def _update_stats_delta_txn(
self,