Merge branch 'rei/rss_inc7' into rei/rss_inc8
This commit is contained in:
+25
-16
@@ -157,9 +157,8 @@ class StatsStore(StateDeltasStore):
|
||||
and return its previous positions – atomically.
|
||||
"""
|
||||
|
||||
with self.stats_delta_processing_lock:
|
||||
old = self._get_stats_positions_txn(txn, for_initial_processor=False)
|
||||
self._update_stats_positions_txn(txn, None, for_initial_processor=False)
|
||||
old = self._get_stats_positions_txn(txn, for_initial_processor=False)
|
||||
self._update_stats_positions_txn(txn, None, for_initial_processor=False)
|
||||
|
||||
return old
|
||||
|
||||
@@ -210,13 +209,17 @@ class StatsStore(StateDeltasStore):
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
|
||||
for _k, (table, id_col) in TYPE_TO_TABLE:
|
||||
for _k, (table, id_col) in TYPE_TO_TABLE.items():
|
||||
txn.execute(sql % (table,))
|
||||
|
||||
# first wedge the incremental processor and reset our promise
|
||||
old_positions = yield self.runInteraction(
|
||||
"populate_stats_wedge", _wedge_incremental_processor
|
||||
)
|
||||
yield self.stats_delta_processing_lock.acquire()
|
||||
try:
|
||||
old_positions = yield self.runInteraction(
|
||||
"populate_stats_wedge", _wedge_incremental_processor
|
||||
)
|
||||
finally:
|
||||
yield self.stats_delta_processing_lock.release()
|
||||
|
||||
if None in old_positions.values():
|
||||
old_positions = None
|
||||
@@ -229,7 +232,12 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
yield self._unwedge_incremental_processor(old_positions)
|
||||
|
||||
yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
|
||||
yield self.runInteraction(
|
||||
"populate_stats_make_skeletons", _make_skeletons, "room"
|
||||
)
|
||||
yield self.runInteraction(
|
||||
"populate_stats_make_skeletons", _make_skeletons, "user"
|
||||
)
|
||||
self.get_earliest_token_for_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_prepare")
|
||||
@@ -260,10 +268,11 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
txn.execute(
|
||||
"SELECT COUNT(*) FROM room_stats_current"
|
||||
" WHERE completed_delta_stream_id IS NULL"
|
||||
)
|
||||
sql = """
|
||||
SELECT COUNT(*) FROM user_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
txn.execute(sql)
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return users_to_work_on
|
||||
@@ -385,10 +394,10 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
txn.execute(
|
||||
"SELECT COUNT(*) FROM room_stats_current"
|
||||
" WHERE completed_delta_stream_id IS NULL"
|
||||
)
|
||||
sql = """
|
||||
SELECT COUNT(*) FROM room_stats_current
|
||||
WHERE completed_delta_stream_id IS NULL
|
||||
"""
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return rooms_to_work_on
|
||||
|
||||
Reference in New Issue
Block a user