From 1c9732d64bdcbdf9a7522e933102a1dbbde0ee36 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 8 Aug 2019 11:43:50 +0100 Subject: [PATCH] Update incremental processor to use new interfaces and track total_events Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 76bab4a80e..422645cd27 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -85,27 +85,36 @@ class StatsHandler(StateDeltasHandler): @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB - if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() + if self.pos is None or None in self.pos.values(): + self.pos = yield self.store.get_stats_positions() - # If still None then the initial background update hasn't happened yet - if self.pos is None: + # If still None then the initial background update hasn't started yet + if self.pos is None or None in self.pos.values(): defer.returnValue(None) # Loop round handling deltas until we're up to date - while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) + with Measure(self.clock, "stats_delta"): + while True: + deltas = yield self.store.get_current_state_deltas( + self.pos["state_delta_stream_id"] + ) if not deltas: - return + break logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] - event_processing_positions.labels("stats").set(self.pos) + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) + + if self.pos is not None: + yield self.store.update_stats_positions(self.pos) + + with Measure(self.clock, "stats_total_events"): + self.pos = yield self.store.incremental_update_total_events(self.pos) @defer.inlineCallbacks def _handle_deltas(self, deltas):