Track more stats positions
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018, 2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -324,20 +325,56 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
|
||||
|
||||
def get_stats_stream_pos(self):
|
||||
return self._simple_select_one_onecol(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
desc="stats_stream_pos",
|
||||
def get_stats_positions(self, for_initial_processor=False):
|
||||
return self._simple_select_one(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
desc="stats_incremental_position",
|
||||
)
|
||||
|
||||
def update_stats_stream_pos(self, stream_id):
|
||||
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
|
||||
return self._simple_select_one_txn(
|
||||
txn=txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
)
|
||||
|
||||
def update_stats_positions(self, positions, for_initial_processor=False):
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_stats_stream_pos",
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
desc="update_stats_incremental_position",
|
||||
)
|
||||
|
||||
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one_txn(
|
||||
txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
|
||||
Reference in New Issue
Block a user