Move back to defer.inlineCallbacks from async as it makes stats
unergonomic if we move to `async` from the bottom-up. Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import ensureDeferred
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.handlers.state_deltas import StateDeltasHandler
|
||||
@@ -245,10 +244,8 @@ class StatsHandler(StateDeltasHandler):
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
yield ensureDeferred(
|
||||
self.store.update_stats_delta(
|
||||
now, "user", user_id, {field: delta}
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
now, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
@@ -329,20 +326,17 @@ class StatsHandler(StateDeltasHandler):
|
||||
)
|
||||
|
||||
if room_stats_complete:
|
||||
yield ensureDeferred(
|
||||
self.store.update_stats_delta(
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield ensureDeferred(
|
||||
self.store.update_stats_delta(
|
||||
now, "room", room_id, room_stats_delta
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -362,16 +356,14 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield ensureDeferred(
|
||||
self.store.update_stats_delta(
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -19,7 +19,6 @@ from itertools import chain
|
||||
from threading import Lock
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import ensureDeferred
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
@@ -81,7 +80,8 @@ class StatsStore(StateDeltasStore):
|
||||
def quantise_stats_time(self, ts):
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
async def _unwedge_incremental_processor(self, forced_promise):
|
||||
@defer.inlineCallbacks
|
||||
def _unwedge_incremental_processor(self, forced_promise):
|
||||
"""
|
||||
Make a promise about what this initial background count will handle,
|
||||
so that we can allow the incremental processor to start doing things
|
||||
@@ -90,7 +90,7 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
if forced_promise is None:
|
||||
promised_stats_delta_pos = (
|
||||
await self.get_max_stream_id_in_current_state_deltas()
|
||||
yield self.get_max_stream_id_in_current_state_deltas()
|
||||
)
|
||||
|
||||
promised_max = self.get_room_max_stream_ordering()
|
||||
@@ -105,15 +105,19 @@ class StatsStore(StateDeltasStore):
|
||||
promised_positions = forced_promise
|
||||
|
||||
# this stores it for our reference later
|
||||
await self.update_stats_positions(
|
||||
yield self.update_stats_positions(
|
||||
promised_positions, for_initial_processor=True
|
||||
)
|
||||
|
||||
# this unwedges the incremental processor
|
||||
await self.update_stats_positions(
|
||||
yield self.update_stats_positions(
|
||||
promised_positions, for_initial_processor=False
|
||||
)
|
||||
|
||||
# with the delta processor unwedged, now let it catch up in case
|
||||
# anything was missed during the wedge period
|
||||
self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_prepare(self, progress, batch_size):
|
||||
|
||||
@@ -188,7 +192,7 @@ class StatsStore(StateDeltasStore):
|
||||
"populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
|
||||
)
|
||||
|
||||
yield ensureDeferred(self._unwedge_incremental_processor(old_positions))
|
||||
yield self._unwedge_incremental_processor(old_positions)
|
||||
|
||||
yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
|
||||
self.get_earliest_token_for_stats.invalidate_all()
|
||||
@@ -821,15 +825,17 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
return maybe_more
|
||||
|
||||
async def collect_old(self, stats_type):
|
||||
@defer.inlineCallbacks
|
||||
def collect_old(self, stats_type):
|
||||
while True:
|
||||
maybe_more = await self.runInteraction(
|
||||
maybe_more = yield self.runInteraction(
|
||||
"stats_collect_old", self._collect_old_txn, stats_type
|
||||
)
|
||||
if not maybe_more:
|
||||
return
|
||||
defer.returnValue(None)
|
||||
|
||||
async def update_stats_delta(
|
||||
@defer.inlineCallbacks
|
||||
def update_stats_delta(
|
||||
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|
||||
):
|
||||
"""
|
||||
@@ -847,7 +853,7 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
while True:
|
||||
try:
|
||||
return await self.runInteraction(
|
||||
res = yield self.runInteraction(
|
||||
"update_stats_delta",
|
||||
self._update_stats_delta_txn,
|
||||
ts,
|
||||
@@ -856,9 +862,10 @@ class StatsStore(StateDeltasStore):
|
||||
fields,
|
||||
complete_with_stream_id=complete_with_stream_id,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
except OldCollectionRequired:
|
||||
# retry after collecting old rows
|
||||
await self.collect_old(stats_type)
|
||||
yield self.collect_old(stats_type)
|
||||
|
||||
def _update_stats_delta_txn(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user