Add type ignore unused-awaitable to run_as_background_process calls
This commit is contained in:
@@ -374,7 +374,7 @@ async def do_something() -> None:
|
||||
# `do_something_else` will get its own independent
|
||||
# logging context. `request-1` will not count any
|
||||
# metrics from `do_something_else`.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"do_something_else",
|
||||
do_something_else,
|
||||
to_resolve,
|
||||
|
||||
@@ -199,7 +199,7 @@ class _ServiceQueuer:
|
||||
if service.id in self.requests_in_flight:
|
||||
return
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"as-sender-%s" % (service.id,), self._send_request, service
|
||||
)
|
||||
|
||||
@@ -478,7 +478,7 @@ class _Recoverer:
|
||||
|
||||
def recover(self) -> None:
|
||||
def _retry() -> None:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"as-recoverer-%s" % (self.service.id,), self.retry
|
||||
)
|
||||
|
||||
|
||||
@@ -342,7 +342,7 @@ class FederationSender(AbstractFederationSender):
|
||||
return
|
||||
|
||||
# fire off a processing loop in the background
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"process_event_queue_for_federation", self._process_event_queue_loop
|
||||
)
|
||||
|
||||
|
||||
@@ -262,7 +262,7 @@ class PerDestinationQueue:
|
||||
|
||||
logger.debug("TX [%s] Starting transaction loop", self._destination)
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"federation_transaction_transmission_loop",
|
||||
self._transaction_transmission_loop,
|
||||
)
|
||||
|
||||
@@ -144,7 +144,7 @@ class ApplicationServicesHandler:
|
||||
except Exception:
|
||||
logger.error("Application Services Failure")
|
||||
|
||||
run_as_background_process("as_scheduler", start_scheduler)
|
||||
run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable]
|
||||
self.started_scheduler = True
|
||||
|
||||
# Fork off pushes to these services
|
||||
|
||||
@@ -222,7 +222,7 @@ class DeactivateAccountHandler:
|
||||
pending deactivation, if it isn't already running.
|
||||
"""
|
||||
if not self._user_parter_running:
|
||||
run_as_background_process("user_parter_loop", self._user_parter_loop)
|
||||
run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _user_parter_loop(self) -> None:
|
||||
"""Loop that parts deactivated users from rooms"""
|
||||
|
||||
@@ -174,7 +174,7 @@ class FederationHandler:
|
||||
# any partial-state-resync operations which were in flight when we
|
||||
# were shut down.
|
||||
if not hs.config.worker.worker_app:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
|
||||
)
|
||||
|
||||
@@ -678,7 +678,7 @@ class FederationHandler:
|
||||
if ret.partial_state:
|
||||
# Kick off the process of asynchronously fetching the state for this
|
||||
# room.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
desc="sync_partial_state_room",
|
||||
func=self._sync_partial_state_room,
|
||||
initial_destination=origin,
|
||||
@@ -722,7 +722,7 @@ class FederationHandler:
|
||||
# lots of requests for missing prev_events which we do actually
|
||||
# have. Hence we fire off the background task, but don't wait for it.
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"handle_queued_pdus", self._handle_queued_pdus, room_queue
|
||||
)
|
||||
|
||||
@@ -1626,7 +1626,7 @@ class FederationHandler:
|
||||
|
||||
partial_state_rooms = await self.store.get_partial_state_room_resync_info()
|
||||
for room_id, resync_info in partial_state_rooms.items():
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
desc="sync_partial_state_room",
|
||||
func=self._sync_partial_state_room,
|
||||
initial_destination=resync_info.joined_via,
|
||||
|
||||
@@ -1408,7 +1408,7 @@ class FederationEventHandler:
|
||||
resync = True
|
||||
|
||||
if resync:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"resync_device_due_to_pdu",
|
||||
self._resync_device,
|
||||
event.sender,
|
||||
|
||||
@@ -101,7 +101,7 @@ class MessageHandler:
|
||||
self._scheduled_expiry: Optional[IDelayedCall] = None
|
||||
|
||||
if not hs.config.worker.worker_app:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_schedule_next_expiry", self._schedule_next_expiry
|
||||
)
|
||||
|
||||
|
||||
@@ -293,7 +293,7 @@ class PaginationHandler:
|
||||
# We want to purge everything, including local events, and to run the purge in
|
||||
# the background so that it's not blocking any other operation apart from
|
||||
# other purges in the same room.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_purge_history",
|
||||
self._purge_history,
|
||||
purge_id,
|
||||
@@ -328,7 +328,7 @@ class PaginationHandler:
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"purge_history",
|
||||
self._purge_history,
|
||||
purge_id,
|
||||
@@ -769,7 +769,7 @@ class PaginationHandler:
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"shutdown_and_purge_room",
|
||||
self._shutdown_and_purge_room,
|
||||
delete_id,
|
||||
|
||||
@@ -1333,7 +1333,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
finally:
|
||||
self._event_processing = False
|
||||
|
||||
run_as_background_process("presence.notify_new_event", _process_presence)
|
||||
run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
@@ -75,7 +75,7 @@ class StatsHandler:
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _unsafe_process(self) -> None:
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
|
||||
@@ -116,7 +116,7 @@ class FollowerTypingHandler:
|
||||
if self.federation and self.is_mine_id(member.user_id):
|
||||
last_fed_poke = self._member_last_federation_poke.get(member, None)
|
||||
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"typing._push_remote", self._push_remote, member=member, typing=True
|
||||
)
|
||||
|
||||
@@ -180,7 +180,7 @@ class FollowerTypingHandler:
|
||||
self._room_typing[row.room_id] = now_typing
|
||||
|
||||
if self.federation:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_send_changes_in_typing_to_remotes",
|
||||
self._send_changes_in_typing_to_remotes,
|
||||
row.room_id,
|
||||
@@ -327,7 +327,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
def _push_update(self, member: RoomMember, typing: bool) -> None:
|
||||
if self.hs.is_mine_id(member.user_id):
|
||||
# Only send updates for changes to our own users.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"typing._push_remote", self._push_remote, member, typing
|
||||
)
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
self._is_processing = False
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("user_directory.notify_new_event", process)
|
||||
run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def handle_local_profile_change(
|
||||
self, user_id: str, profile: ProfileInfo
|
||||
|
||||
@@ -113,7 +113,7 @@ class EmailPusher(Pusher):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
run_as_background_process("emailpush.process", self._process)
|
||||
run_as_background_process("emailpush.process", self._process) # type: ignore[unused-awaitable]
|
||||
|
||||
def _pause_processing(self) -> None:
|
||||
"""Used by tests to temporarily pause processing of events.
|
||||
|
||||
@@ -160,7 +160,7 @@ class HttpPusher(Pusher):
|
||||
|
||||
# We could check the receipts are actually m.read receipts here,
|
||||
# but currently that's the only type of receipt anyway...
|
||||
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
|
||||
run_as_background_process("http_pusher.on_new_receipts", self._update_badge) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _update_badge(self) -> None:
|
||||
# XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
|
||||
@@ -189,7 +189,7 @@ class HttpPusher(Pusher):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
run_as_background_process("httppush.process", self._process)
|
||||
run_as_background_process("httppush.process", self._process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _process(self) -> None:
|
||||
# we should never get here if we are already processing
|
||||
|
||||
@@ -92,7 +92,7 @@ class PusherPool:
|
||||
if not self._should_start_pushers:
|
||||
logger.info("Not starting pushers because they are disabled in the config")
|
||||
return
|
||||
run_as_background_process("start_pushers", self._start_pushers)
|
||||
run_as_background_process("start_pushers", self._start_pushers) # type: ignore[unused-awaitable]
|
||||
|
||||
async def add_or_update_pusher(
|
||||
self,
|
||||
|
||||
@@ -451,7 +451,7 @@ class FederationSenderHandler:
|
||||
# no need to queue up another task.
|
||||
return
|
||||
|
||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
|
||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _save_and_send_ack(self) -> None:
|
||||
"""Save the current federation position in the database and send an ACK
|
||||
|
||||
@@ -292,7 +292,7 @@ class ReplicationCommandHandler:
|
||||
return
|
||||
|
||||
# fire off a background process to start processing the queue.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"process-replication-data", self._unsafe_process_queue, stream_name
|
||||
)
|
||||
|
||||
|
||||
@@ -300,7 +300,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
|
||||
# if so.
|
||||
|
||||
if isawaitable(res):
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"replication-" + cmd.get_logcontext_id(), lambda: res
|
||||
)
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
def connectionMade(self) -> None:
|
||||
logger.info("Connected to redis")
|
||||
super().connectionMade()
|
||||
run_as_background_process("subscribe-replication", self._send_subscribe)
|
||||
run_as_background_process("subscribe-replication", self._send_subscribe) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _send_subscribe(self) -> None:
|
||||
# it's important to make sure that we only send the REPLICATE command once we
|
||||
@@ -183,7 +183,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
# if so.
|
||||
|
||||
if isawaitable(res):
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"replication-" + cmd.get_logcontext_id(), lambda: res
|
||||
)
|
||||
|
||||
@@ -205,7 +205,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
Args:
|
||||
cmd: The command to send
|
||||
"""
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"send-cmd", self._async_send_command, cmd, bg_start_span=False
|
||||
)
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ class ReplicationStreamer:
|
||||
logger.debug("Notifier poke loop already running")
|
||||
return
|
||||
|
||||
run_as_background_process("replication_notifier", self._run_notifier_loop)
|
||||
run_as_background_process("replication_notifier", self._run_notifier_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _run_notifier_loop(self) -> None:
|
||||
self.is_looping = True
|
||||
|
||||
@@ -1079,7 +1079,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
|
||||
)
|
||||
|
||||
if with_relations:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"redact_related_events",
|
||||
self._relation_handler.redact_events_related_to,
|
||||
requester=requester,
|
||||
|
||||
@@ -272,7 +272,7 @@ class BackgroundUpdater:
|
||||
# if we start a new background update, not all updates are done.
|
||||
self._all_done = False
|
||||
sleep = self.sleep_enabled
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"background_updates", self.run_background_updates, sleep
|
||||
)
|
||||
|
||||
|
||||
@@ -293,7 +293,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
self._currently_persisting_rooms.discard(room_id)
|
||||
|
||||
# set handle_queue_loop off in the background
|
||||
run_as_background_process("persist_events", handle_queue_loop)
|
||||
run_as_background_process("persist_events", handle_queue_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
def _get_drainining_queue(
|
||||
self, room_id: str
|
||||
|
||||
@@ -942,7 +942,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
should_start = False
|
||||
|
||||
if should_start:
|
||||
run_as_background_process("fetch_events", self._fetch_thread)
|
||||
run_as_background_process("fetch_events", self._fetch_thread) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _fetch_thread(self) -> None:
|
||||
"""Services requests for events from `_event_fetch_list`."""
|
||||
|
||||
@@ -128,7 +128,7 @@ class BatchingQueue(Generic[V, R]):
|
||||
# If we're not currently processing the key fire off a background
|
||||
# process to start processing.
|
||||
if key not in self._processing_keys:
|
||||
run_as_background_process(self._name, self._process_queue, key)
|
||||
run_as_background_process(self._name, self._process_queue, key) # type: ignore[unused-awaitable]
|
||||
|
||||
with self._number_in_flight_metric.track_inprogress():
|
||||
return await make_deferred_yieldable(d)
|
||||
|
||||
@@ -84,7 +84,7 @@ class Distributor:
|
||||
if name not in self.signals:
|
||||
raise KeyError("%r does not have a signal named %s" % (self, name))
|
||||
|
||||
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
|
||||
run_as_background_process(name, self.signals[name].fire, *args, **kwargs) # type: ignore[unused-awaitable]
|
||||
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
@@ -265,4 +265,4 @@ class RetryDestinationLimiter:
|
||||
logger.exception("Failed to store destination_retry_timings")
|
||||
|
||||
# we deliberately do this in the background.
|
||||
run_as_background_process("store_retry_timings", store_retry_timings)
|
||||
run_as_background_process("store_retry_timings", store_retry_timings) # type: ignore[unused-awaitable]
|
||||
|
||||
Reference in New Issue
Block a user