1
0

Compare commits

...

19 Commits

Author SHA1 Message Date
Mathieu Velten 5980756e09 Merge remote-tracking branch 'origin/develop' into mv/mypy-unused-awaitable 2023-03-13 13:59:16 +01:00
Mathieu Velten 4f135c31b8 Update changelog.d/14519.misc
Co-authored-by: David Robertson <davidr@element.io>
2023-03-13 12:54:02 +01:00
David Robertson 2dfccdc1c3 Ignore other false-positives 2023-02-06 14:35:43 +00:00
David Robertson ac559329b2 DeferredCache.get & .set are fine to not await(?) 2023-02-06 14:35:43 +00:00
David Robertson 665156fe88 Ignore false positives from run_as_background_process 2023-02-06 13:55:08 +00:00
David Robertson 3ad65ac778 Missing get_success-es 2023-02-06 13:55:08 +00:00
David Robertson d68b11c323 Fix force_load_metadata return type 2023-02-06 13:07:17 +00:00
David Robertson 6f8080ed50 Merge remote-tracking branch 'origin/develop' into mv/mypy-unused-awaitable 2023-02-06 12:56:27 +00:00
David Robertson be45edb35b Merge remote-tracking branch 'origin/develop' into mv/mypy-unused-awaitable 2023-02-06 12:55:59 +00:00
Mathieu Velten 4b9c67b33e Manual format of md example 2022-11-22 18:12:17 +01:00
Mathieu Velten 53ca28a657 Reformat 2022-11-22 18:10:40 +01:00
Mathieu Velten 312a754664 Add type ignore unused-awaitable to LoopingCall start call 2022-11-22 18:10:08 +01:00
Mathieu Velten 6ac2f573ec Add type ignore unused-awaitable to looping_call immediate call 2022-11-22 18:10:08 +01:00
Mathieu Velten b1be9a742a Add type ignore unused-awaitable to places adding callbacks to a deffered 2022-11-22 18:10:08 +01:00
Mathieu Velten adc7903762 Add type ignore unused-awaitable to ensureDeffered calls 2022-11-22 18:10:08 +01:00
Mathieu Velten dc201b1810 Add type ignore to calls of fcts using wrap_as_background_process 2022-11-22 18:10:08 +01:00
Mathieu Velten b0603dabc9 Add type ignore unused-awaitable to run_in_background calls 2022-11-22 18:09:13 +01:00
Mathieu Velten a0dfa69ffc Add type ignore unused-awaitable to run_as_background_process calls 2022-11-22 18:09:13 +01:00
Mathieu Velten 6e62451c26 Enable unused awaitable error in mypy 2022-11-22 17:49:37 +01:00
66 changed files with 148 additions and 134 deletions
+1
View File
@@ -0,0 +1 @@
Have mypy report `Awaitable`s that are not `await`ed or otherwise consumed.
@@ -374,7 +374,7 @@ async def do_something() -> None:
# `do_something_else` will get its own independent
# logging context. `request-1` will not count any
# metrics from `do_something_else`.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"do_something_else",
do_something_else,
to_resolve,
+1
View File
@@ -12,6 +12,7 @@ local_partial_types = True
no_implicit_optional = True
disallow_untyped_defs = True
strict_equality = True
enable_error_code = unused-awaitable
warn_redundant_casts = True
files =
+1 -1
View File
@@ -59,7 +59,7 @@ def run_background_updates(hs: HomeServer) -> None:
def run() -> None:
# Apply all background updates on the database.
defer.ensureDeferred(
defer.ensureDeferred( # type: ignore[unused-awaitable]
run_as_background_process("background_updates", run_background_updates)
)
+2 -2
View File
@@ -189,7 +189,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastores().main.reap_monthly_active_users()
hs.get_datastores().main.reap_monthly_active_users() # type: ignore[unused-awaitable]
@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users() -> None:
@@ -212,7 +212,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
max_mau_gauge.set(float(hs.config.server.max_mau_value))
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
generate_monthly_active_users() # type: ignore[unused-awaitable]
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
+4 -4
View File
@@ -200,7 +200,7 @@ class _ServiceQueuer:
if service.id in self.requests_in_flight:
return
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"as-sender-%s" % (service.id,), self._send_request, service
)
@@ -407,10 +407,10 @@ class _TransactionController:
if sent:
await txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
async def on_recovered(self, recoverer: "_Recoverer") -> None:
logger.info(
@@ -479,7 +479,7 @@ class _Recoverer:
def recover(self) -> None:
def _retry() -> None:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"as-recoverer-%s" % (self.service.id,), self.retry
)
+3 -3
View File
@@ -200,7 +200,7 @@ class FederationServer(FederationBase):
)
if lock:
logger.info("Handling old staged inbound events in %s", room_id)
self._process_incoming_pdus_in_room_inner(
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
room_id,
room_version,
lock,
@@ -277,7 +277,7 @@ class FederationServer(FederationBase):
# any old events in the staging area.
if not self._started_handling_of_staged_events:
self._started_handling_of_staged_events = True
self._handle_old_staged_events()
self._handle_old_staged_events() # type: ignore[unused-awaitable]
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
@@ -1144,7 +1144,7 @@ class FederationServer(FederationBase):
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
)
if lock:
self._process_incoming_pdus_in_room_inner(
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
pdu.room_id, room_version, lock, origin, pdu
)
+2 -2
View File
@@ -187,7 +187,7 @@ class _DestinationWakeupQueue:
self.queue[destination] = None
if not self.processing:
self._handle()
self._handle() # type: ignore[unused-awaitable]
@wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None:
@@ -342,7 +342,7 @@ class FederationSender(AbstractFederationSender):
return
# fire off a processing loop in the background
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"process_event_queue_for_federation", self._process_event_queue_loop
)
@@ -301,7 +301,7 @@ class PerDestinationQueue:
logger.debug("TX [%s] Starting transaction loop", self._destination)
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
)
+1 -1
View File
@@ -132,7 +132,7 @@ class Authenticator:
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings.retry_last_ts:
run_in_background(self.reset_retry_timings, origin)
run_in_background(self.reset_retry_timings, origin) # type: ignore[unused-awaitable]
return origin
+3 -3
View File
@@ -97,7 +97,7 @@ class ApplicationServicesHandler:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services(max_token)
self._notify_interested_services(max_token) # type: ignore[unused-awaitable]
@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
@@ -144,7 +144,7 @@ class ApplicationServicesHandler:
except Exception:
logger.error("Application Services Failure")
run_as_background_process("as_scheduler", start_scheduler)
run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable]
self.started_scheduler = True
# Fork off pushes to these services
@@ -307,7 +307,7 @@ class ApplicationServicesHandler:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
self._notify_interested_services_ephemeral( # type: ignore[unused-awaitable]
services, stream_key, new_token, users
)
+1 -1
View File
@@ -223,7 +223,7 @@ class DeactivateAccountHandler:
pending deactivation, if it isn't already running.
"""
if not self._user_parter_running:
run_as_background_process("user_parter_loop", self._user_parter_loop)
run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable]
async def _user_parter_loop(self) -> None:
"""Loop that parts deactivated users from rooms"""
+1 -1
View File
@@ -589,7 +589,7 @@ class DeviceHandler(DeviceWorkerHandler):
# We may need to do some processing asynchronously for local user IDs.
if self.hs.is_mine_id(user_id):
self._handle_new_device_update_async()
self._handle_new_device_update_async() # type: ignore[unused-awaitable]
async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
+1 -1
View File
@@ -198,7 +198,7 @@ class DeviceMessageHandler:
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
# Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id)
run_in_background(self._user_device_resync, user_id=sender_user_id) # type: ignore[unused-awaitable]
async def send_device_message(
self,
+3 -3
View File
@@ -192,7 +192,7 @@ class FederationHandler:
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
)
@@ -778,7 +778,7 @@ class FederationHandler:
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the background task, but don't wait for it.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"handle_queued_pdus", self._handle_queued_pdus, room_queue
)
@@ -1838,7 +1838,7 @@ class FederationHandler:
room_id=room_id,
)
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)
+1 -1
View File
@@ -1461,7 +1461,7 @@ class FederationEventHandler:
resync = True
if resync:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"resync_device_due_to_pdu",
self._resync_device,
event.sender,
+3 -3
View File
@@ -101,7 +101,7 @@ class MessageHandler:
self._scheduled_expiry: Optional[IDelayedCall] = None
if not hs.config.worker.worker_app:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_schedule_next_expiry", self._schedule_next_expiry
)
@@ -1954,7 +1954,7 @@ class EventCreationHandler:
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"bump_presence_active_time", self._bump_active_time, requester.user
)
@@ -1966,7 +1966,7 @@ class EventCreationHandler:
except Exception:
logger.exception("Error notifying about new room events")
run_in_background(_notify)
run_in_background(_notify) # type: ignore[unused-awaitable]
return persisted_events[-1]
+3 -3
View File
@@ -293,7 +293,7 @@ class PaginationHandler:
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_purge_history",
self._purge_history,
purge_id,
@@ -328,7 +328,7 @@ class PaginationHandler:
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"purge_history",
self._purge_history,
purge_id,
@@ -777,7 +777,7 @@ class PaginationHandler:
self._delete_by_id[delete_id] = DeleteStatus()
self._delete_by_room.setdefault(room_id, []).append(delete_id)
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"shutdown_and_purge_room",
self._shutdown_and_purge_room,
delete_id,
+2 -2
View File
@@ -1064,7 +1064,7 @@ class PresenceHandler(BasePresenceHandler):
yield
finally:
if affect_presence:
run_in_background(_end)
run_in_background(_end) # type: ignore[unused-awaitable]
return _user_syncing()
@@ -1337,7 +1337,7 @@ class PresenceHandler(BasePresenceHandler):
finally:
self._event_processing = False
run_as_background_process("presence.notify_new_event", _process_presence)
run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable]
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
+1 -1
View File
@@ -75,7 +75,7 @@ class StatsHandler:
finally:
self._is_processing = False
run_as_background_process("stats.notify_new_event", process)
run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable]
async def _unsafe_process(self) -> None:
# If self.pos is None then means we haven't fetched it from DB
+3 -3
View File
@@ -116,7 +116,7 @@ class FollowerTypingHandler:
if self.federation and self.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member=member, typing=True
)
@@ -180,7 +180,7 @@ class FollowerTypingHandler:
self._room_typing[row.room_id] = now_typing
if self.federation:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"_send_changes_in_typing_to_remotes",
self._send_changes_in_typing_to_remotes,
row.room_id,
@@ -327,7 +327,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def _push_update(self, member: RoomMember, typing: bool) -> None:
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"typing._push_remote", self._push_remote, member, typing
)
+1 -1
View File
@@ -122,7 +122,7 @@ class UserDirectoryHandler(StateDeltasHandler):
self._is_processing = False
self._is_processing = True
run_as_background_process("user_directory.notify_new_event", process)
run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable]
async def handle_local_profile_change(
self, user_id: str, profile: ProfileInfo
+1 -1
View File
@@ -458,7 +458,7 @@ class SimpleHttpClient:
)
# turn timeouts into RequestTimedOutErrors
request_deferred.addErrback(_timeout_to_request_timed_out_error)
request_deferred.addErrback(_timeout_to_request_timed_out_error) # type: ignore[unused-awaitable]
response = await make_deferred_yieldable(request_deferred)
+2 -2
View File
@@ -102,7 +102,7 @@ class HTTPConnectProxyEndpoint:
d = self._proxy_endpoint.connect(f)
# once the tcp socket connects successfully, we need to wait for the
# CONNECT to complete.
d.addCallback(lambda conn: f.on_connection)
d.addCallback(lambda conn: f.on_connection) # type: ignore[unused-awaitable]
return d
@@ -196,7 +196,7 @@ class HTTPConnectProtocol(protocol.Protocol):
self.http_setup_client = HTTPConnectSetupClient(
self.host, self.port, self.proxy_creds
)
self.http_setup_client.on_connected.addCallback(self.proxyConnected)
self.http_setup_client.on_connected.addCallback(self.proxyConnected) # type: ignore[unused-awaitable]
def connectionMade(self) -> None:
self.http_setup_client.makeConnection(self.transport)
+1 -1
View File
@@ -1216,7 +1216,7 @@ class MatrixFederationHttpClient:
try:
d = read_body_with_max_size(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.reactor)
d.addTimeout(self.default_timeout, self.reactor) # type: ignore[unused-awaitable]
length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (max_size,)
+1 -1
View File
@@ -764,7 +764,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)
run_in_background(
run_in_background( # type: ignore[unused-awaitable]
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET
+1 -1
View File
@@ -193,7 +193,7 @@ class RemoteHandler(logging.Handler):
self._connection_waiter = None
deferred: Deferred = self._service.whenConnected(failAfterFailures=1)
deferred.addCallbacks(writer, fail)
deferred.addCallbacks(writer, fail) # type: ignore[unused-awaitable]
self._connection_waiter = deferred
def _handle_pressure(self) -> None:
+2 -2
View File
@@ -843,7 +843,7 @@ def run_in_background( # type: ignore[misc]
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(_set_context_cb, ctx)
res.addBoth(_set_context_cb, ctx) # type: ignore[unused-awaitable]
return res
@@ -870,7 +870,7 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = set_current_context(SENTINEL_CONTEXT)
deferred.addBoth(_set_context_cb, prev_context)
deferred.addBoth(_set_context_cb, prev_context) # type: ignore[unused-awaitable]
return deferred
+1 -1
View File
@@ -933,7 +933,7 @@ def _custom_sync_async_decorator(
scope.__exit__(None, None, None)
return result
result.addCallbacks(call_back, err_back)
result.addCallbacks(call_back, err_back) # type: ignore[unused-awaitable]
else:
if inspect.isawaitable(result):
+1 -1
View File
@@ -129,7 +129,7 @@ def install_gc_manager() -> None:
gc_unreachable.labels(i).set(unreachable)
gc_task = task.LoopingCall(_maybe_gc)
gc_task.start(0.1)
gc_task.start(0.1) # type: ignore[unused-awaitable]
#
@@ -207,6 +207,9 @@ def run_as_background_process(
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
Mypy will flag up this Deferred as unawaited. This is safe to ignore: the background
process runs automatically, even if we don't await the returned deferred.
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
+1 -1
View File
@@ -54,7 +54,7 @@ class CommonUsageMetricsManager:
async def setup(self) -> None:
"""Keep the gauges for common usage metrics up to date."""
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
desc="common_usage_metrics_update_gauges", func=self._update_gauges
)
self._clock.looping_call(
+1 -1
View File
@@ -113,7 +113,7 @@ class EmailPusher(Pusher):
if self._is_processing:
return
run_as_background_process("emailpush.process", self._process)
run_as_background_process("emailpush.process", self._process) # type: ignore[unused-awaitable]
def _pause_processing(self) -> None:
"""Used by tests to temporarily pause processing of events.
+2 -2
View File
@@ -160,7 +160,7 @@ class HttpPusher(Pusher):
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
run_as_background_process("http_pusher.on_new_receipts", self._update_badge) # type: ignore[unused-awaitable]
async def _update_badge(self) -> None:
# XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
@@ -189,7 +189,7 @@ class HttpPusher(Pusher):
if self._is_processing:
return
run_as_background_process("httppush.process", self._process)
run_as_background_process("httppush.process", self._process) # type: ignore[unused-awaitable]
async def _process(self) -> None:
# we should never get here if we are already processing
+2 -2
View File
@@ -92,7 +92,7 @@ class PusherPool:
if not self._should_start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
run_as_background_process("start_pushers", self._start_pushers)
run_as_background_process("start_pushers", self._start_pushers) # type: ignore[unused-awaitable]
async def add_or_update_pusher(
self,
@@ -236,7 +236,7 @@ class PusherPool:
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._on_new_notifications(max_token)
self._on_new_notifications(max_token) # type: ignore[unused-awaitable]
@wrap_as_background_process("on_new_notifications")
async def _on_new_notifications(self, max_token: RoomStreamToken) -> None:
+1 -1
View File
@@ -526,7 +526,7 @@ class FederationSenderHandler:
# no need to queue up another task.
return
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
run_as_background_process("_save_and_send_ack", self._save_and_send_ack) # type: ignore[unused-awaitable]
async def _save_and_send_ack(self) -> None:
"""Save the current federation position in the database and send an ACK
+1 -1
View File
@@ -291,7 +291,7 @@ class ReplicationCommandHandler:
return
# fire off a background process to start processing the queue.
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"process-replication-data", self._unsafe_process_queue, stream_name
)
+1 -1
View File
@@ -300,7 +300,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# if so.
if isawaitable(res):
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"replication-" + cmd.get_logcontext_id(), lambda: res
)
+3 -3
View File
@@ -112,7 +112,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
def connectionMade(self) -> None:
logger.info("Connected to redis")
super().connectionMade()
run_as_background_process("subscribe-replication", self._send_subscribe)
run_as_background_process("subscribe-replication", self._send_subscribe) # type: ignore[unused-awaitable]
async def _send_subscribe(self) -> None:
# it's important to make sure that we only send the REPLICATE command once we
@@ -183,7 +183,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# if so.
if isawaitable(res):
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"replication-" + cmd.get_logcontext_id(), lambda: res
)
@@ -205,7 +205,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
Args:
cmd: The command to send
"""
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"send-cmd", self._async_send_command, cmd, bg_start_span=False
)
+1 -1
View File
@@ -137,7 +137,7 @@ class ReplicationStreamer:
logger.debug("Notifier poke loop already running")
return
run_as_background_process("replication_notifier", self._run_notifier_loop)
run_as_background_process("replication_notifier", self._run_notifier_loop) # type: ignore[unused-awaitable]
async def _run_notifier_loop(self) -> None:
self.is_looping = True
+1 -1
View File
@@ -1124,7 +1124,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
)
if with_relations:
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"redact_related_events",
self._relation_handler.redact_events_related_to,
requester=requester,
+1 -1
View File
@@ -116,7 +116,7 @@ class HttpTransactionCache:
# we deliberately do not propagate the error any further, as we
# expect the observers to have reported it.
deferred.addErrback(remove_from_map)
deferred.addErrback(remove_from_map) # type: ignore[unused-awaitable]
return make_deferred_yieldable(observable.observe())
+1 -1
View File
@@ -272,7 +272,7 @@ class BackgroundUpdater:
# if we start a new background update, not all updates are done.
self._all_done = False
sleep = self.sleep_enabled
run_as_background_process(
run_as_background_process( # type: ignore[unused-awaitable]
"background_updates", self.run_background_updates, sleep
)
@@ -293,7 +293,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)
run_as_background_process("persist_events", handle_queue_loop) # type: ignore[unused-awaitable]
def _get_drainining_queue(
self, room_id: str
@@ -1065,7 +1065,7 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
run_as_background_process("fetch_events", self._fetch_thread)
run_as_background_process("fetch_events", self._fetch_thread) # type: ignore[unused-awaitable]
async def _fetch_thread(self) -> None:
"""Services requests for events from `_event_fetch_list`."""
+1 -1
View File
@@ -132,7 +132,7 @@ class Clock:
call = task.LoopingCall(f, *args, **kwargs)
call.clock = self._reactor
d = call.start(msec / 1000.0, now=False)
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
d.addErrback(log_failure, "Looping call died", consumeErrors=False) # type: ignore[unused-awaitable]
return call
def call_later(
+8 -8
View File
@@ -154,7 +154,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
else:
return f
deferred.addCallbacks(callback, errback)
deferred.addCallbacks(callback, errback) # type: ignore[unused-awaitable]
def observe(self) -> "defer.Deferred[_T]":
"""Observe the underlying deferred.
@@ -635,7 +635,7 @@ class ReadWriteLock:
# writer waiting for us and it completed entirely within the
# `new_defer.callback()` call above.
if self.key_to_current_writer.get(key) == new_defer:
self.key_to_current_writer.pop(key)
self.key_to_current_writer.pop(key) # type: ignore[unused-awaitable]
return _ctx_manager()
@@ -693,7 +693,7 @@ def timeout_deferred(
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
return value
deferred.addErrback(convert_cancelled)
deferred.addErrback(convert_cancelled) # type: ignore[unused-awaitable]
def cancel_timeout(result: _T) -> _T:
# stop the pending call to cancel the deferred if it's been fired
@@ -701,7 +701,7 @@ def timeout_deferred(
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
deferred.addBoth(cancel_timeout) # type: ignore[unused-awaitable]
def success_cb(val: _T) -> None:
if not new_d.called:
@@ -711,7 +711,7 @@ def timeout_deferred(
if not new_d.called:
new_d.errback(val)
deferred.addCallbacks(success_cb, failure_cb)
deferred.addCallbacks(success_cb, failure_cb) # type: ignore[unused-awaitable]
return new_d
@@ -759,7 +759,7 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
wrapped with `make_deferred_yieldable`.
"""
new_deferred: "defer.Deferred[T]" = defer.Deferred()
deferred.chainDeferred(new_deferred)
deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
return new_deferred
@@ -821,10 +821,10 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
new_deferred.pause()
new_deferred.errback(Failure(CancelledError()))
deferred.addBoth(lambda _: new_deferred.unpause())
deferred.addBoth(lambda _: new_deferred.unpause()) # type: ignore[unused-awaitable]
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred)
deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
return new_deferred
+1 -1
View File
@@ -128,7 +128,7 @@ class BatchingQueue(Generic[V, R]):
# If we're not currently processing the key fire off a background
# process to start processing.
if key not in self._processing_keys:
run_as_background_process(self._name, self._process_queue, key)
run_as_background_process(self._name, self._process_queue, key) # type: ignore[unused-awaitable]
with self._number_in_flight_metric.track_inprogress():
return await make_deferred_yieldable(d)
+1 -1
View File
@@ -89,7 +89,7 @@ class CachedCall(Generic[TV]):
def got_result(r: Union[TV, Failure]) -> None:
self._result = r
self._deferred.addBoth(got_result)
self._deferred.addBoth(got_result) # type: ignore[unused-awaitable]
# TODO: consider cancellation semantics. Currently, if the call to get()
# is cancelled, the underlying call will continue (and any future calls
+1 -1
View File
@@ -377,7 +377,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
for k, v in r.items():
results[cache_key_to_arg(k)] = v
pending_deferred.addCallback(update_results)
pending_deferred.addCallback(update_results) # type: ignore[unused-awaitable]
cached_defers.append(pending_deferred)
if missing:
+1 -1
View File
@@ -84,7 +84,7 @@ class Distributor:
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
run_as_background_process(name, self.signals[name].fire, *args, **kwargs) # type: ignore[unused-awaitable]
P = ParamSpec("P")
+1 -1
View File
@@ -108,7 +108,7 @@ def do_patch() -> None:
raise Exception(err)
return r
res.addBoth(check_ctx)
res.addBoth(check_ctx) # type: ignore[unused-awaitable]
return res
return wrapped
+3 -3
View File
@@ -334,7 +334,7 @@ class _PerHostRatelimiter:
queue_defer = queue_request()
return queue_defer
ret_defer.addBoth(on_wait_finished)
ret_defer.addBoth(on_wait_finished) # type: ignore[unused-awaitable]
else:
ret_defer = queue_request()
@@ -358,8 +358,8 @@ class _PerHostRatelimiter:
self.ready_request_queue.pop(request_id, None)
return r
ret_defer.addCallbacks(on_start, on_err)
ret_defer.addBoth(on_both)
ret_defer.addCallbacks(on_start, on_err) # type: ignore[unused-awaitable]
ret_defer.addBoth(on_both) # type: ignore[unused-awaitable]
return make_deferred_yieldable(ret_defer)
def _on_exit(self, request_id: object) -> None:
+1 -1
View File
@@ -265,4 +265,4 @@ class RetryDestinationLimiter:
logger.exception("Failed to store destination_retry_timings")
# we deliberately do this in the background.
run_as_background_process("store_retry_timings", store_retry_timings)
run_as_background_process("store_retry_timings", store_retry_timings) # type: ignore[unused-awaitable]
+4 -2
View File
@@ -130,7 +130,9 @@ class KeyringTestCase(unittest.HomeserverTestCase):
pass
self.assertFalse(res_deferreds[0].called)
res_deferreds[0].addBoth(self.check_context, None)
# type-ignore: Deferred.addBoth returns `self`; it doesn't need to be
# awaited as long as we use the await the deferred elsewhere
res_deferreds[0].addBoth(self.check_context, None) # type: ignore[unused-awaitable]
await make_deferred_yieldable(res_deferreds[0])
@@ -166,7 +168,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
)
]
)
res_deferreds_2[0].addBoth(self.check_context, None)
res_deferreds_2[0].addBoth(self.check_context, None) # type: ignore[unused-awaitable]
second_lookup_state[0] = 1
await make_deferred_yieldable(res_deferreds_2[0])
second_lookup_state[0] = 2
+2 -2
View File
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Any, Awaitable, ContextManager, Dict, Optional, Tuple
from typing import Any, ContextManager, Dict, Optional, Tuple
from unittest.mock import ANY, Mock, patch
from urllib.parse import parse_qs, urlparse
@@ -287,7 +287,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
"""Provider metadatas are extensively validated."""
h = self.provider
def force_load_metadata() -> Awaitable[None]:
def force_load_metadata() -> "OpenIDProviderMetadata":
async def force_load() -> "OpenIDProviderMetadata":
return await h.load_metadata(force=True)
+1 -1
View File
@@ -460,7 +460,7 @@ class FederationClientTests(HomeserverTestCase):
self.failureResultOf(d)
def test_client_sends_body(self) -> None:
defer.ensureDeferred(
defer.ensureDeferred( # type: ignore[unused-awaitable]
self.cl.post_json(
"testserv:8008", "foo/bar", timeout=10000, data={"a": "b"}
)
+6 -2
View File
@@ -151,7 +151,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
outfile.write(yaml.dump(as_yaml))
self.as_yaml_files.append(as_token)
def _set_state(self, id: str, state: ApplicationServiceState) -> defer.Deferred:
def _set_state(
self, id: str, state: ApplicationServiceState
) -> "defer.Deferred[None]":
return self.db_pool.runOperation(
self.engine.convert_param_style(
"INSERT INTO application_services_state(as_id, state) VALUES(?,?)"
@@ -297,7 +299,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
service = Mock(id=self.as_list[0]["id"])
events = [Mock(event_id="e1"), Mock(event_id="e2")]
txn_id = 5
self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
self.get_success(
self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
)
self.get_success(self._insert_txn(service.id, txn_id, events))
self.get_success(
self.store.complete_appservice_txn(txn_id=txn_id, service=service)
+1 -1
View File
@@ -117,7 +117,7 @@ class JsonResourceTests(unittest.TestCase):
def _callback(request: SynapseRequest, **kwargs: object) -> "Deferred[None]":
d: "Deferred[None]" = Deferred()
d.addCallback(_throw)
d.addCallback(_throw) # type: ignore[unused-awaitable]
self.reactor.callLater(0.5, d.callback, True)
return make_deferred_yieldable(d)
+1 -1
View File
@@ -563,7 +563,7 @@ class HomeserverTestCase(TestCase):
deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type]
results: list = []
deferred.addBoth(results.append)
deferred.addBoth(results.append) # type: ignore[unused-awaitable]
self.pump(by=by)
+27 -27
View File
@@ -26,7 +26,7 @@ class DeferredCacheTestCase(TestCase):
def test_empty(self) -> None:
cache: DeferredCache[str, int] = DeferredCache("test")
with self.assertRaises(KeyError):
cache.get("foo")
cache.get("foo") # type: ignore[unused-awaitable]
def test_hit(self) -> None:
cache: DeferredCache[str, int] = DeferredCache("test")
@@ -48,7 +48,7 @@ class DeferredCacheTestCase(TestCase):
self.assertTrue(set_d.called)
return r
get_d.addCallback(check1)
get_d.addCallback(check1) # type: ignore[unused-awaitable]
# now fire off all the deferreds
origin_d.callback(99)
@@ -130,7 +130,7 @@ class DeferredCacheTestCase(TestCase):
def test_get_immediate(self) -> None:
cache: DeferredCache[str, int] = DeferredCache("test")
d1: "defer.Deferred[int]" = defer.Deferred()
cache.set("key1", d1)
cache.set("key1", d1) # type: ignore[unused-awaitable]
# get_immediate should return default
v = cache.get_immediate("key1", 1)
@@ -149,7 +149,7 @@ class DeferredCacheTestCase(TestCase):
cache.invalidate(("foo",))
with self.assertRaises(KeyError):
cache.get(("foo",))
cache.get(("foo",)) # type: ignore[unused-awaitable]
def test_invalidate_all(self) -> None:
cache: DeferredCache[str, str] = DeferredCache("testcache")
@@ -161,10 +161,10 @@ class DeferredCacheTestCase(TestCase):
# add a couple of pending entries
d1: "defer.Deferred[str]" = defer.Deferred()
cache.set("key1", d1, partial(record_callback, 0))
cache.set("key1", d1, partial(record_callback, 0)) # type: ignore[unused-awaitable]
d2: "defer.Deferred[str]" = defer.Deferred()
cache.set("key2", d2, partial(record_callback, 1))
cache.set("key2", d2, partial(record_callback, 1)) # type: ignore[unused-awaitable]
# lookup should return pending deferreds
self.assertFalse(cache.get("key1").called)
@@ -181,9 +181,9 @@ class DeferredCacheTestCase(TestCase):
# lookup should fail
with self.assertRaises(KeyError):
cache.get("key1")
cache.get("key1") # type: ignore[unused-awaitable]
with self.assertRaises(KeyError):
cache.get("key2")
cache.get("key2") # type: ignore[unused-awaitable]
# both callbacks should have been callbacked
self.assertTrue(callback_record[0], "Invalidation callback for key1 not called")
@@ -192,7 +192,7 @@ class DeferredCacheTestCase(TestCase):
# letting the other lookup complete should do nothing
d1.callback("result1")
with self.assertRaises(KeyError):
cache.get("key1", None)
cache.get("key1", None) # type: ignore[unused-awaitable]
def test_eviction(self) -> None:
cache: DeferredCache[int, str] = DeferredCache(
@@ -204,10 +204,10 @@ class DeferredCacheTestCase(TestCase):
cache.prefill(3, "three") # 1 will be evicted
with self.assertRaises(KeyError):
cache.get(1)
cache.get(1) # type: ignore[unused-awaitable]
cache.get(2)
cache.get(3)
cache.get(2) # type: ignore[unused-awaitable]
cache.get(3) # type: ignore[unused-awaitable]
def test_eviction_lru(self) -> None:
cache: DeferredCache[int, str] = DeferredCache(
@@ -218,15 +218,15 @@ class DeferredCacheTestCase(TestCase):
cache.prefill(2, "two")
# Now access 1 again, thus causing 2 to be least-recently used
cache.get(1)
cache.get(1) # type: ignore[unused-awaitable]
cache.prefill(3, "three")
with self.assertRaises(KeyError):
cache.get(2)
cache.get(2) # type: ignore[unused-awaitable]
cache.get(1)
cache.get(3)
cache.get(1) # type: ignore[unused-awaitable]
cache.get(3) # type: ignore[unused-awaitable]
def test_eviction_iterable(self) -> None:
cache: DeferredCache[int, List[str]] = DeferredCache(
@@ -240,40 +240,40 @@ class DeferredCacheTestCase(TestCase):
cache.prefill(2, ["three"])
# Now access 1 again, thus causing 2 to be least-recently used
cache.get(1)
cache.get(1) # type: ignore[unused-awaitable]
# Now add an item to the cache, which evicts 2.
cache.prefill(3, ["four"])
with self.assertRaises(KeyError):
cache.get(2)
cache.get(2) # type: ignore[unused-awaitable]
# Ensure 1 & 3 are in the cache.
cache.get(1)
cache.get(3)
cache.get(1) # type: ignore[unused-awaitable]
cache.get(3) # type: ignore[unused-awaitable]
# Now access 1 again, thus causing 3 to be least-recently used
cache.get(1)
cache.get(1) # type: ignore[unused-awaitable]
# Now add an item with multiple elements to the cache
cache.prefill(4, ["five", "six"])
# Both 1 and 3 are evicted since there's too many elements.
with self.assertRaises(KeyError):
cache.get(1)
cache.get(1) # type: ignore[unused-awaitable]
with self.assertRaises(KeyError):
cache.get(3)
cache.get(3) # type: ignore[unused-awaitable]
# Now add another item to fill the cache again.
cache.prefill(5, ["seven"])
# Now access 4, thus causing 5 to be least-recently used
cache.get(4)
cache.get(4) # type: ignore[unused-awaitable]
# Add an empty item.
cache.prefill(6, [])
# 5 gets evicted and replaced since an empty element counts as an item.
with self.assertRaises(KeyError):
cache.get(5)
cache.get(4)
cache.get(6)
cache.get(5) # type: ignore[unused-awaitable]
cache.get(4) # type: ignore[unused-awaitable]
cache.get(6) # type: ignore[unused-awaitable]
+4 -2
View File
@@ -292,12 +292,14 @@ class DescriptorTestCase(unittest.TestCase):
# set off a deferred which will do a cache lookup
d1 = do_lookup()
self.assertEqual(current_context(), SENTINEL_CONTEXT)
d1.addCallback(check_result)
# type-ignore: addCallback returns self, so as long as we await d1 (and d2)
# below, this error is a false positive.
d1.addCallback(check_result) # type: ignore[unused-awaitable]
# and another
d2 = do_lookup()
self.assertEqual(current_context(), SENTINEL_CONTEXT)
d2.addCallback(check_result)
d2.addCallback(check_result) # type: ignore[unused-awaitable]
# let the lookup complete
complete_lookup.callback(None)
+8 -8
View File
@@ -57,7 +57,7 @@ class ObservableDeferredTest(TestCase):
self.assertFalse(observer2.called)
return res
observer1.addBoth(check_called_first)
observer1.addBoth(check_called_first) # type: ignore[unused-awaitable]
# store the results
results: List[Optional[ObservableDeferred[int]]] = [None, None]
@@ -68,8 +68,8 @@ class ObservableDeferredTest(TestCase):
results[idx] = res
return res
observer1.addCallback(check_val, 0)
observer2.addCallback(check_val, 1)
observer1.addCallback(check_val, 0) # type: ignore[unused-awaitable]
observer2.addCallback(check_val, 1) # type: ignore[unused-awaitable]
origin_d.callback(123)
self.assertEqual(results[0], 123, "observer 1 callback result")
@@ -90,7 +90,7 @@ class ObservableDeferredTest(TestCase):
self.assertFalse(observer2.called)
return res
observer1.addBoth(check_called_first)
observer1.addBoth(check_called_first) # type: ignore[unused-awaitable]
# store the results
results: List[Optional[ObservableDeferred[str]]] = [None, None]
@@ -99,8 +99,8 @@ class ObservableDeferredTest(TestCase):
results[idx] = res
return None
observer1.addErrback(check_val, 0)
observer2.addErrback(check_val, 1)
observer1.addErrback(check_val, 0) # type: ignore[unused-awaitable]
observer2.addErrback(check_val, 1) # type: ignore[unused-awaitable]
try:
raise Exception("gah!")
@@ -208,11 +208,11 @@ class TimeoutDeferredTest(TestCase):
return res
original_deferred = blocking()
original_deferred.addErrback(errback, "orig")
original_deferred.addErrback(errback, "orig") # type: ignore[unused-awaitable]
timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock)
self.assertNoResult(timing_out_d)
self.assertIs(current_context(), SENTINEL_CONTEXT)
timing_out_d.addErrback(errback, "timingout")
timing_out_d.addErrback(errback, "timingout") # type: ignore[unused-awaitable]
self.clock.pump((1.0,))
+1 -1
View File
@@ -140,7 +140,7 @@ class LinearizerTestCase(unittest.TestCase):
_, _, unblock = self._start_task(linearizer, key)
for i in range(1, 100):
defer.ensureDeferred(func(i))
defer.ensureDeferred(func(i)) # type: ignore[unused-awaitable]
d = defer.ensureDeferred(func(1000))
unblock()
+3 -2
View File
@@ -74,7 +74,8 @@ class LoggingContextTestCase(unittest.TestCase):
callback_completed = True
return res
d2.addCallback(cb)
# type-ignore: this doesn't create a new Deferred: allCallback returns self.
d2.addCallback(cb) # type: ignore[unused-awaitable]
self._check_test_key("one")
@@ -195,5 +196,5 @@ def _chained_deferred_function() -> defer.Deferred:
reactor.callLater(0, d2.callback, res)
return d2
d.addCallback(cb)
d.addCallback(cb) # type: ignore[unused-awaitable]
return d
+1 -1
View File
@@ -109,7 +109,7 @@ class FederationRateLimiterTestCase(TestCase):
await d
for _ in range(1, 100):
defer.ensureDeferred(task())
defer.ensureDeferred(task()) # type: ignore[unused-awaitable]
last_task = defer.ensureDeferred(task())