1
0

Use threading.Lock to prevent concurrent incremental position updates

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
Olivier Wilkinson (reivilibre)
2019-08-08 11:28:42 +01:00
parent 259014b7ad
commit 5216299124
2 changed files with 15 additions and 8 deletions

View File

@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -65,18 +62,24 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
lock.release()
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
if lock.acquire(blocking=False):
# we only want to run this process one-at-a-time,
# and also, if the initial background updater wants us to keep out,
# we should respect that.
try:
run_as_background_process("stats.notify_new_event", process)
except: # noqa: E722 re-raised so fine
lock.release()
raise
@defer.inlineCallbacks
def _unsafe_process(self):

View File

@@ -14,6 +14,8 @@
# limitations under the License.
import logging
from itertools import chain
from threading import Lock
from twisted.internet import defer
@@ -51,6 +53,8 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
self.stats_delta_processing_lock = Lock()
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
)