Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5980756e09 | |||
| 4f135c31b8 | |||
| 2dfccdc1c3 | |||
| ac559329b2 | |||
| 665156fe88 | |||
| 3ad65ac778 | |||
| d68b11c323 | |||
| 6f8080ed50 | |||
| be45edb35b | |||
| 4b9c67b33e | |||
| 53ca28a657 | |||
| 312a754664 | |||
| 6ac2f573ec | |||
| b1be9a742a | |||
| adc7903762 | |||
| dc201b1810 | |||
| b0603dabc9 | |||
| a0dfa69ffc | |||
| 6e62451c26 |
@@ -0,0 +1 @@
|
||||
Have mypy report `Awaitable`s that are not `await`ed or otherwise consumed.
|
||||
@@ -374,7 +374,7 @@ async def do_something() -> None:
|
||||
# `do_something_else` will get its own independent
|
||||
# logging context. `request-1` will not count any
|
||||
# metrics from `do_something_else`.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"do_something_else",
|
||||
do_something_else,
|
||||
to_resolve,
|
||||
|
||||
@@ -12,6 +12,7 @@ local_partial_types = True
|
||||
no_implicit_optional = True
|
||||
disallow_untyped_defs = True
|
||||
strict_equality = True
|
||||
enable_error_code = unused-awaitable
|
||||
warn_redundant_casts = True
|
||||
|
||||
files =
|
||||
|
||||
@@ -59,7 +59,7 @@ def run_background_updates(hs: HomeServer) -> None:
|
||||
|
||||
def run() -> None:
|
||||
# Apply all background updates on the database.
|
||||
defer.ensureDeferred(
|
||||
defer.ensureDeferred( # type: ignore[unused-awaitable]
|
||||
run_as_background_process("background_updates", run_background_updates)
|
||||
)
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
|
||||
)
|
||||
hs.get_datastores().main.reap_monthly_active_users()
|
||||
hs.get_datastores().main.reap_monthly_active_users() # type: ignore[unused-awaitable]
|
||||
|
||||
@wrap_as_background_process("generate_monthly_active_users")
|
||||
async def generate_monthly_active_users() -> None:
|
||||
@@ -212,7 +212,7 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
max_mau_gauge.set(float(hs.config.server.max_mau_value))
|
||||
|
||||
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
|
||||
generate_monthly_active_users()
|
||||
generate_monthly_active_users() # type: ignore[unused-awaitable]
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
# End of monthly active user settings
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ class _ServiceQueuer:
|
||||
if service.id in self.requests_in_flight:
|
||||
return
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"as-sender-%s" % (service.id,), self._send_request, service
|
||||
)
|
||||
|
||||
@@ -407,10 +407,10 @@ class _TransactionController:
|
||||
if sent:
|
||||
await txn.complete(self.store)
|
||||
else:
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
|
||||
except Exception:
|
||||
logger.exception("Error creating appservice transaction")
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
run_in_background(self._on_txn_fail, service) # type: ignore[unused-awaitable]
|
||||
|
||||
async def on_recovered(self, recoverer: "_Recoverer") -> None:
|
||||
logger.info(
|
||||
@@ -479,7 +479,7 @@ class _Recoverer:
|
||||
|
||||
def recover(self) -> None:
|
||||
def _retry() -> None:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"as-recoverer-%s" % (self.service.id,), self.retry
|
||||
)
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ class FederationServer(FederationBase):
|
||||
)
|
||||
if lock:
|
||||
logger.info("Handling old staged inbound events in %s", room_id)
|
||||
self._process_incoming_pdus_in_room_inner(
|
||||
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
|
||||
room_id,
|
||||
room_version,
|
||||
lock,
|
||||
@@ -277,7 +277,7 @@ class FederationServer(FederationBase):
|
||||
# any old events in the staging area.
|
||||
if not self._started_handling_of_staged_events:
|
||||
self._started_handling_of_staged_events = True
|
||||
self._handle_old_staged_events()
|
||||
self._handle_old_staged_events() # type: ignore[unused-awaitable]
|
||||
|
||||
# Start a periodic check for old staged events. This is to handle
|
||||
# the case where locks time out, e.g. if another process gets killed
|
||||
@@ -1144,7 +1144,7 @@ class FederationServer(FederationBase):
|
||||
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
|
||||
)
|
||||
if lock:
|
||||
self._process_incoming_pdus_in_room_inner(
|
||||
self._process_incoming_pdus_in_room_inner( # type: ignore[unused-awaitable]
|
||||
pdu.room_id, room_version, lock, origin, pdu
|
||||
)
|
||||
|
||||
|
||||
@@ -187,7 +187,7 @@ class _DestinationWakeupQueue:
|
||||
self.queue[destination] = None
|
||||
|
||||
if not self.processing:
|
||||
self._handle()
|
||||
self._handle() # type: ignore[unused-awaitable]
|
||||
|
||||
@wrap_as_background_process("_DestinationWakeupQueue.handle")
|
||||
async def _handle(self) -> None:
|
||||
@@ -342,7 +342,7 @@ class FederationSender(AbstractFederationSender):
|
||||
return
|
||||
|
||||
# fire off a processing loop in the background
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"process_event_queue_for_federation", self._process_event_queue_loop
|
||||
)
|
||||
|
||||
|
||||
@@ -301,7 +301,7 @@ class PerDestinationQueue:
|
||||
|
||||
logger.debug("TX [%s] Starting transaction loop", self._destination)
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"federation_transaction_transmission_loop",
|
||||
self._transaction_transmission_loop,
|
||||
)
|
||||
|
||||
@@ -132,7 +132,7 @@ class Authenticator:
|
||||
# alive
|
||||
retry_timings = await self.store.get_destination_retry_timings(origin)
|
||||
if retry_timings and retry_timings.retry_last_ts:
|
||||
run_in_background(self.reset_retry_timings, origin)
|
||||
run_in_background(self.reset_retry_timings, origin) # type: ignore[unused-awaitable]
|
||||
|
||||
return origin
|
||||
|
||||
|
||||
@@ -97,7 +97,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
# We only start a new background process if necessary rather than
|
||||
# optimistically (to cut down on overhead).
|
||||
self._notify_interested_services(max_token)
|
||||
self._notify_interested_services(max_token) # type: ignore[unused-awaitable]
|
||||
|
||||
@wrap_as_background_process("notify_interested_services")
|
||||
async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
|
||||
@@ -144,7 +144,7 @@ class ApplicationServicesHandler:
|
||||
except Exception:
|
||||
logger.error("Application Services Failure")
|
||||
|
||||
run_as_background_process("as_scheduler", start_scheduler)
|
||||
run_as_background_process("as_scheduler", start_scheduler) # type: ignore[unused-awaitable]
|
||||
self.started_scheduler = True
|
||||
|
||||
# Fork off pushes to these services
|
||||
@@ -307,7 +307,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
# We only start a new background process if necessary rather than
|
||||
# optimistically (to cut down on overhead).
|
||||
self._notify_interested_services_ephemeral(
|
||||
self._notify_interested_services_ephemeral( # type: ignore[unused-awaitable]
|
||||
services, stream_key, new_token, users
|
||||
)
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ class DeactivateAccountHandler:
|
||||
pending deactivation, if it isn't already running.
|
||||
"""
|
||||
if not self._user_parter_running:
|
||||
run_as_background_process("user_parter_loop", self._user_parter_loop)
|
||||
run_as_background_process("user_parter_loop", self._user_parter_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _user_parter_loop(self) -> None:
|
||||
"""Loop that parts deactivated users from rooms"""
|
||||
|
||||
@@ -589,7 +589,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
# We may need to do some processing asynchronously for local user IDs.
|
||||
if self.hs.is_mine_id(user_id):
|
||||
self._handle_new_device_update_async()
|
||||
self._handle_new_device_update_async() # type: ignore[unused-awaitable]
|
||||
|
||||
async def notify_user_signature_update(
|
||||
self, from_user_id: str, user_ids: List[str]
|
||||
|
||||
@@ -198,7 +198,7 @@ class DeviceMessageHandler:
|
||||
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
|
||||
|
||||
# Immediately attempt a resync in the background
|
||||
run_in_background(self._user_device_resync, user_id=sender_user_id)
|
||||
run_in_background(self._user_device_resync, user_id=sender_user_id) # type: ignore[unused-awaitable]
|
||||
|
||||
async def send_device_message(
|
||||
self,
|
||||
|
||||
@@ -192,7 +192,7 @@ class FederationHandler:
|
||||
# any partial-state-resync operations which were in flight when we
|
||||
# were shut down.
|
||||
if not hs.config.worker.worker_app:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
|
||||
)
|
||||
|
||||
@@ -778,7 +778,7 @@ class FederationHandler:
|
||||
# lots of requests for missing prev_events which we do actually
|
||||
# have. Hence we fire off the background task, but don't wait for it.
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"handle_queued_pdus", self._handle_queued_pdus, room_queue
|
||||
)
|
||||
|
||||
@@ -1838,7 +1838,7 @@ class FederationHandler:
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
|
||||
)
|
||||
|
||||
|
||||
@@ -1461,7 +1461,7 @@ class FederationEventHandler:
|
||||
resync = True
|
||||
|
||||
if resync:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"resync_device_due_to_pdu",
|
||||
self._resync_device,
|
||||
event.sender,
|
||||
|
||||
@@ -101,7 +101,7 @@ class MessageHandler:
|
||||
self._scheduled_expiry: Optional[IDelayedCall] = None
|
||||
|
||||
if not hs.config.worker.worker_app:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_schedule_next_expiry", self._schedule_next_expiry
|
||||
)
|
||||
|
||||
@@ -1954,7 +1954,7 @@ class EventCreationHandler:
|
||||
if event.type == EventTypes.Message:
|
||||
# We don't want to block sending messages on any presence code. This
|
||||
# matters as sometimes presence code can take a while.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"bump_presence_active_time", self._bump_active_time, requester.user
|
||||
)
|
||||
|
||||
@@ -1966,7 +1966,7 @@ class EventCreationHandler:
|
||||
except Exception:
|
||||
logger.exception("Error notifying about new room events")
|
||||
|
||||
run_in_background(_notify)
|
||||
run_in_background(_notify) # type: ignore[unused-awaitable]
|
||||
|
||||
return persisted_events[-1]
|
||||
|
||||
|
||||
@@ -293,7 +293,7 @@ class PaginationHandler:
|
||||
# We want to purge everything, including local events, and to run the purge in
|
||||
# the background so that it's not blocking any other operation apart from
|
||||
# other purges in the same room.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_purge_history",
|
||||
self._purge_history,
|
||||
purge_id,
|
||||
@@ -328,7 +328,7 @@ class PaginationHandler:
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"purge_history",
|
||||
self._purge_history,
|
||||
purge_id,
|
||||
@@ -777,7 +777,7 @@ class PaginationHandler:
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"shutdown_and_purge_room",
|
||||
self._shutdown_and_purge_room,
|
||||
delete_id,
|
||||
|
||||
@@ -1064,7 +1064,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
yield
|
||||
finally:
|
||||
if affect_presence:
|
||||
run_in_background(_end)
|
||||
run_in_background(_end) # type: ignore[unused-awaitable]
|
||||
|
||||
return _user_syncing()
|
||||
|
||||
@@ -1337,7 +1337,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
finally:
|
||||
self._event_processing = False
|
||||
|
||||
run_as_background_process("presence.notify_new_event", _process_presence)
|
||||
run_as_background_process("presence.notify_new_event", _process_presence) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _unsafe_process(self) -> None:
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
@@ -75,7 +75,7 @@ class StatsHandler:
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
run_as_background_process("stats.notify_new_event", process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _unsafe_process(self) -> None:
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
|
||||
@@ -116,7 +116,7 @@ class FollowerTypingHandler:
|
||||
if self.federation and self.is_mine_id(member.user_id):
|
||||
last_fed_poke = self._member_last_federation_poke.get(member, None)
|
||||
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"typing._push_remote", self._push_remote, member=member, typing=True
|
||||
)
|
||||
|
||||
@@ -180,7 +180,7 @@ class FollowerTypingHandler:
|
||||
self._room_typing[row.room_id] = now_typing
|
||||
|
||||
if self.federation:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"_send_changes_in_typing_to_remotes",
|
||||
self._send_changes_in_typing_to_remotes,
|
||||
row.room_id,
|
||||
@@ -327,7 +327,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
def _push_update(self, member: RoomMember, typing: bool) -> None:
|
||||
if self.hs.is_mine_id(member.user_id):
|
||||
# Only send updates for changes to our own users.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"typing._push_remote", self._push_remote, member, typing
|
||||
)
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
self._is_processing = False
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("user_directory.notify_new_event", process)
|
||||
run_as_background_process("user_directory.notify_new_event", process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def handle_local_profile_change(
|
||||
self, user_id: str, profile: ProfileInfo
|
||||
|
||||
@@ -458,7 +458,7 @@ class SimpleHttpClient:
|
||||
)
|
||||
|
||||
# turn timeouts into RequestTimedOutErrors
|
||||
request_deferred.addErrback(_timeout_to_request_timed_out_error)
|
||||
request_deferred.addErrback(_timeout_to_request_timed_out_error) # type: ignore[unused-awaitable]
|
||||
|
||||
response = await make_deferred_yieldable(request_deferred)
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ class HTTPConnectProxyEndpoint:
|
||||
d = self._proxy_endpoint.connect(f)
|
||||
# once the tcp socket connects successfully, we need to wait for the
|
||||
# CONNECT to complete.
|
||||
d.addCallback(lambda conn: f.on_connection)
|
||||
d.addCallback(lambda conn: f.on_connection) # type: ignore[unused-awaitable]
|
||||
return d
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ class HTTPConnectProtocol(protocol.Protocol):
|
||||
self.http_setup_client = HTTPConnectSetupClient(
|
||||
self.host, self.port, self.proxy_creds
|
||||
)
|
||||
self.http_setup_client.on_connected.addCallback(self.proxyConnected)
|
||||
self.http_setup_client.on_connected.addCallback(self.proxyConnected) # type: ignore[unused-awaitable]
|
||||
|
||||
def connectionMade(self) -> None:
|
||||
self.http_setup_client.makeConnection(self.transport)
|
||||
|
||||
@@ -1216,7 +1216,7 @@ class MatrixFederationHttpClient:
|
||||
|
||||
try:
|
||||
d = read_body_with_max_size(response, output_stream, max_size)
|
||||
d.addTimeout(self.default_timeout, self.reactor)
|
||||
d.addTimeout(self.default_timeout, self.reactor) # type: ignore[unused-awaitable]
|
||||
length = await make_deferred_yieldable(d)
|
||||
except BodyExceededMaxSize:
|
||||
msg = "Requested file is too large > %r bytes" % (max_size,)
|
||||
|
||||
@@ -764,7 +764,7 @@ def respond_with_json(
|
||||
if send_cors:
|
||||
set_cors_headers(request)
|
||||
|
||||
run_in_background(
|
||||
run_in_background( # type: ignore[unused-awaitable]
|
||||
_async_write_json_to_request_in_thread, request, encoder, json_object
|
||||
)
|
||||
return NOT_DONE_YET
|
||||
|
||||
@@ -193,7 +193,7 @@ class RemoteHandler(logging.Handler):
|
||||
self._connection_waiter = None
|
||||
|
||||
deferred: Deferred = self._service.whenConnected(failAfterFailures=1)
|
||||
deferred.addCallbacks(writer, fail)
|
||||
deferred.addCallbacks(writer, fail) # type: ignore[unused-awaitable]
|
||||
self._connection_waiter = deferred
|
||||
|
||||
def _handle_pressure(self) -> None:
|
||||
|
||||
@@ -843,7 +843,7 @@ def run_in_background( # type: ignore[misc]
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
res.addBoth(_set_context_cb, ctx)
|
||||
res.addBoth(_set_context_cb, ctx) # type: ignore[unused-awaitable]
|
||||
return res
|
||||
|
||||
|
||||
@@ -870,7 +870,7 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
deferred.addBoth(_set_context_cb, prev_context) # type: ignore[unused-awaitable]
|
||||
return deferred
|
||||
|
||||
|
||||
|
||||
@@ -933,7 +933,7 @@ def _custom_sync_async_decorator(
|
||||
scope.__exit__(None, None, None)
|
||||
return result
|
||||
|
||||
result.addCallbacks(call_back, err_back)
|
||||
result.addCallbacks(call_back, err_back) # type: ignore[unused-awaitable]
|
||||
|
||||
else:
|
||||
if inspect.isawaitable(result):
|
||||
|
||||
@@ -129,7 +129,7 @@ def install_gc_manager() -> None:
|
||||
gc_unreachable.labels(i).set(unreachable)
|
||||
|
||||
gc_task = task.LoopingCall(_maybe_gc)
|
||||
gc_task.start(0.1)
|
||||
gc_task.start(0.1) # type: ignore[unused-awaitable]
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -207,6 +207,9 @@ def run_as_background_process(
|
||||
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
|
||||
normal synapse async function).
|
||||
|
||||
Mypy will flag up this Deferred as unawaited. This is safe to ignore: the background
|
||||
process runs automatically, even if we don't await the returned deferred.
|
||||
|
||||
Args:
|
||||
desc: a description for this background process type
|
||||
func: a function, which may return a Deferred or a coroutine
|
||||
|
||||
@@ -54,7 +54,7 @@ class CommonUsageMetricsManager:
|
||||
|
||||
async def setup(self) -> None:
|
||||
"""Keep the gauges for common usage metrics up to date."""
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
desc="common_usage_metrics_update_gauges", func=self._update_gauges
|
||||
)
|
||||
self._clock.looping_call(
|
||||
|
||||
@@ -113,7 +113,7 @@ class EmailPusher(Pusher):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
run_as_background_process("emailpush.process", self._process)
|
||||
run_as_background_process("emailpush.process", self._process) # type: ignore[unused-awaitable]
|
||||
|
||||
def _pause_processing(self) -> None:
|
||||
"""Used by tests to temporarily pause processing of events.
|
||||
|
||||
@@ -160,7 +160,7 @@ class HttpPusher(Pusher):
|
||||
|
||||
# We could check the receipts are actually m.read receipts here,
|
||||
# but currently that's the only type of receipt anyway...
|
||||
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
|
||||
run_as_background_process("http_pusher.on_new_receipts", self._update_badge) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _update_badge(self) -> None:
|
||||
# XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
|
||||
@@ -189,7 +189,7 @@ class HttpPusher(Pusher):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
run_as_background_process("httppush.process", self._process)
|
||||
run_as_background_process("httppush.process", self._process) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _process(self) -> None:
|
||||
# we should never get here if we are already processing
|
||||
|
||||
@@ -92,7 +92,7 @@ class PusherPool:
|
||||
if not self._should_start_pushers:
|
||||
logger.info("Not starting pushers because they are disabled in the config")
|
||||
return
|
||||
run_as_background_process("start_pushers", self._start_pushers)
|
||||
run_as_background_process("start_pushers", self._start_pushers) # type: ignore[unused-awaitable]
|
||||
|
||||
async def add_or_update_pusher(
|
||||
self,
|
||||
@@ -236,7 +236,7 @@ class PusherPool:
|
||||
|
||||
# We only start a new background process if necessary rather than
|
||||
# optimistically (to cut down on overhead).
|
||||
self._on_new_notifications(max_token)
|
||||
self._on_new_notifications(max_token) # type: ignore[unused-awaitable]
|
||||
|
||||
@wrap_as_background_process("on_new_notifications")
|
||||
async def _on_new_notifications(self, max_token: RoomStreamToken) -> None:
|
||||
|
||||
@@ -526,7 +526,7 @@ class FederationSenderHandler:
|
||||
# no need to queue up another task.
|
||||
return
|
||||
|
||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
|
||||
run_as_background_process("_save_and_send_ack", self._save_and_send_ack) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _save_and_send_ack(self) -> None:
|
||||
"""Save the current federation position in the database and send an ACK
|
||||
|
||||
@@ -291,7 +291,7 @@ class ReplicationCommandHandler:
|
||||
return
|
||||
|
||||
# fire off a background process to start processing the queue.
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"process-replication-data", self._unsafe_process_queue, stream_name
|
||||
)
|
||||
|
||||
|
||||
@@ -300,7 +300,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
|
||||
# if so.
|
||||
|
||||
if isawaitable(res):
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"replication-" + cmd.get_logcontext_id(), lambda: res
|
||||
)
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
def connectionMade(self) -> None:
|
||||
logger.info("Connected to redis")
|
||||
super().connectionMade()
|
||||
run_as_background_process("subscribe-replication", self._send_subscribe)
|
||||
run_as_background_process("subscribe-replication", self._send_subscribe) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _send_subscribe(self) -> None:
|
||||
# it's important to make sure that we only send the REPLICATE command once we
|
||||
@@ -183,7 +183,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
# if so.
|
||||
|
||||
if isawaitable(res):
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"replication-" + cmd.get_logcontext_id(), lambda: res
|
||||
)
|
||||
|
||||
@@ -205,7 +205,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
|
||||
Args:
|
||||
cmd: The command to send
|
||||
"""
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"send-cmd", self._async_send_command, cmd, bg_start_span=False
|
||||
)
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ class ReplicationStreamer:
|
||||
logger.debug("Notifier poke loop already running")
|
||||
return
|
||||
|
||||
run_as_background_process("replication_notifier", self._run_notifier_loop)
|
||||
run_as_background_process("replication_notifier", self._run_notifier_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _run_notifier_loop(self) -> None:
|
||||
self.is_looping = True
|
||||
|
||||
@@ -1124,7 +1124,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
|
||||
)
|
||||
|
||||
if with_relations:
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"redact_related_events",
|
||||
self._relation_handler.redact_events_related_to,
|
||||
requester=requester,
|
||||
|
||||
@@ -116,7 +116,7 @@ class HttpTransactionCache:
|
||||
# we deliberately do not propagate the error any further, as we
|
||||
# expect the observers to have reported it.
|
||||
|
||||
deferred.addErrback(remove_from_map)
|
||||
deferred.addErrback(remove_from_map) # type: ignore[unused-awaitable]
|
||||
|
||||
return make_deferred_yieldable(observable.observe())
|
||||
|
||||
|
||||
@@ -272,7 +272,7 @@ class BackgroundUpdater:
|
||||
# if we start a new background update, not all updates are done.
|
||||
self._all_done = False
|
||||
sleep = self.sleep_enabled
|
||||
run_as_background_process(
|
||||
run_as_background_process( # type: ignore[unused-awaitable]
|
||||
"background_updates", self.run_background_updates, sleep
|
||||
)
|
||||
|
||||
|
||||
@@ -293,7 +293,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
self._currently_persisting_rooms.discard(room_id)
|
||||
|
||||
# set handle_queue_loop off in the background
|
||||
run_as_background_process("persist_events", handle_queue_loop)
|
||||
run_as_background_process("persist_events", handle_queue_loop) # type: ignore[unused-awaitable]
|
||||
|
||||
def _get_drainining_queue(
|
||||
self, room_id: str
|
||||
|
||||
@@ -1065,7 +1065,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
should_start = False
|
||||
|
||||
if should_start:
|
||||
run_as_background_process("fetch_events", self._fetch_thread)
|
||||
run_as_background_process("fetch_events", self._fetch_thread) # type: ignore[unused-awaitable]
|
||||
|
||||
async def _fetch_thread(self) -> None:
|
||||
"""Services requests for events from `_event_fetch_list`."""
|
||||
|
||||
@@ -132,7 +132,7 @@ class Clock:
|
||||
call = task.LoopingCall(f, *args, **kwargs)
|
||||
call.clock = self._reactor
|
||||
d = call.start(msec / 1000.0, now=False)
|
||||
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
|
||||
d.addErrback(log_failure, "Looping call died", consumeErrors=False) # type: ignore[unused-awaitable]
|
||||
return call
|
||||
|
||||
def call_later(
|
||||
|
||||
@@ -154,7 +154,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||
else:
|
||||
return f
|
||||
|
||||
deferred.addCallbacks(callback, errback)
|
||||
deferred.addCallbacks(callback, errback) # type: ignore[unused-awaitable]
|
||||
|
||||
def observe(self) -> "defer.Deferred[_T]":
|
||||
"""Observe the underlying deferred.
|
||||
@@ -635,7 +635,7 @@ class ReadWriteLock:
|
||||
# writer waiting for us and it completed entirely within the
|
||||
# `new_defer.callback()` call above.
|
||||
if self.key_to_current_writer.get(key) == new_defer:
|
||||
self.key_to_current_writer.pop(key)
|
||||
self.key_to_current_writer.pop(key) # type: ignore[unused-awaitable]
|
||||
|
||||
return _ctx_manager()
|
||||
|
||||
@@ -693,7 +693,7 @@ def timeout_deferred(
|
||||
raise defer.TimeoutError("Timed out after %gs" % (timeout,))
|
||||
return value
|
||||
|
||||
deferred.addErrback(convert_cancelled)
|
||||
deferred.addErrback(convert_cancelled) # type: ignore[unused-awaitable]
|
||||
|
||||
def cancel_timeout(result: _T) -> _T:
|
||||
# stop the pending call to cancel the deferred if it's been fired
|
||||
@@ -701,7 +701,7 @@ def timeout_deferred(
|
||||
delayed_call.cancel()
|
||||
return result
|
||||
|
||||
deferred.addBoth(cancel_timeout)
|
||||
deferred.addBoth(cancel_timeout) # type: ignore[unused-awaitable]
|
||||
|
||||
def success_cb(val: _T) -> None:
|
||||
if not new_d.called:
|
||||
@@ -711,7 +711,7 @@ def timeout_deferred(
|
||||
if not new_d.called:
|
||||
new_d.errback(val)
|
||||
|
||||
deferred.addCallbacks(success_cb, failure_cb)
|
||||
deferred.addCallbacks(success_cb, failure_cb) # type: ignore[unused-awaitable]
|
||||
|
||||
return new_d
|
||||
|
||||
@@ -759,7 +759,7 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
wrapped with `make_deferred_yieldable`.
|
||||
"""
|
||||
new_deferred: "defer.Deferred[T]" = defer.Deferred()
|
||||
deferred.chainDeferred(new_deferred)
|
||||
deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
|
||||
return new_deferred
|
||||
|
||||
|
||||
@@ -821,10 +821,10 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
|
||||
new_deferred.pause()
|
||||
new_deferred.errback(Failure(CancelledError()))
|
||||
|
||||
deferred.addBoth(lambda _: new_deferred.unpause())
|
||||
deferred.addBoth(lambda _: new_deferred.unpause()) # type: ignore[unused-awaitable]
|
||||
|
||||
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
|
||||
deferred.chainDeferred(new_deferred)
|
||||
deferred.chainDeferred(new_deferred) # type: ignore[unused-awaitable]
|
||||
return new_deferred
|
||||
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ class BatchingQueue(Generic[V, R]):
|
||||
# If we're not currently processing the key fire off a background
|
||||
# process to start processing.
|
||||
if key not in self._processing_keys:
|
||||
run_as_background_process(self._name, self._process_queue, key)
|
||||
run_as_background_process(self._name, self._process_queue, key) # type: ignore[unused-awaitable]
|
||||
|
||||
with self._number_in_flight_metric.track_inprogress():
|
||||
return await make_deferred_yieldable(d)
|
||||
|
||||
@@ -89,7 +89,7 @@ class CachedCall(Generic[TV]):
|
||||
def got_result(r: Union[TV, Failure]) -> None:
|
||||
self._result = r
|
||||
|
||||
self._deferred.addBoth(got_result)
|
||||
self._deferred.addBoth(got_result) # type: ignore[unused-awaitable]
|
||||
|
||||
# TODO: consider cancellation semantics. Currently, if the call to get()
|
||||
# is cancelled, the underlying call will continue (and any future calls
|
||||
|
||||
@@ -377,7 +377,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
for k, v in r.items():
|
||||
results[cache_key_to_arg(k)] = v
|
||||
|
||||
pending_deferred.addCallback(update_results)
|
||||
pending_deferred.addCallback(update_results) # type: ignore[unused-awaitable]
|
||||
cached_defers.append(pending_deferred)
|
||||
|
||||
if missing:
|
||||
|
||||
@@ -84,7 +84,7 @@ class Distributor:
|
||||
if name not in self.signals:
|
||||
raise KeyError("%r does not have a signal named %s" % (self, name))
|
||||
|
||||
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
|
||||
run_as_background_process(name, self.signals[name].fire, *args, **kwargs) # type: ignore[unused-awaitable]
|
||||
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
@@ -108,7 +108,7 @@ def do_patch() -> None:
|
||||
raise Exception(err)
|
||||
return r
|
||||
|
||||
res.addBoth(check_ctx)
|
||||
res.addBoth(check_ctx) # type: ignore[unused-awaitable]
|
||||
return res
|
||||
|
||||
return wrapped
|
||||
|
||||
@@ -334,7 +334,7 @@ class _PerHostRatelimiter:
|
||||
queue_defer = queue_request()
|
||||
return queue_defer
|
||||
|
||||
ret_defer.addBoth(on_wait_finished)
|
||||
ret_defer.addBoth(on_wait_finished) # type: ignore[unused-awaitable]
|
||||
else:
|
||||
ret_defer = queue_request()
|
||||
|
||||
@@ -358,8 +358,8 @@ class _PerHostRatelimiter:
|
||||
self.ready_request_queue.pop(request_id, None)
|
||||
return r
|
||||
|
||||
ret_defer.addCallbacks(on_start, on_err)
|
||||
ret_defer.addBoth(on_both)
|
||||
ret_defer.addCallbacks(on_start, on_err) # type: ignore[unused-awaitable]
|
||||
ret_defer.addBoth(on_both) # type: ignore[unused-awaitable]
|
||||
return make_deferred_yieldable(ret_defer)
|
||||
|
||||
def _on_exit(self, request_id: object) -> None:
|
||||
|
||||
@@ -265,4 +265,4 @@ class RetryDestinationLimiter:
|
||||
logger.exception("Failed to store destination_retry_timings")
|
||||
|
||||
# we deliberately do this in the background.
|
||||
run_as_background_process("store_retry_timings", store_retry_timings)
|
||||
run_as_background_process("store_retry_timings", store_retry_timings) # type: ignore[unused-awaitable]
|
||||
|
||||
@@ -130,7 +130,9 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
pass
|
||||
|
||||
self.assertFalse(res_deferreds[0].called)
|
||||
res_deferreds[0].addBoth(self.check_context, None)
|
||||
# type-ignore: Deferred.addBoth returns `self`; it doesn't need to be
|
||||
# awaited as long as we use the await the deferred elsewhere
|
||||
res_deferreds[0].addBoth(self.check_context, None) # type: ignore[unused-awaitable]
|
||||
|
||||
await make_deferred_yieldable(res_deferreds[0])
|
||||
|
||||
@@ -166,7 +168,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
]
|
||||
)
|
||||
res_deferreds_2[0].addBoth(self.check_context, None)
|
||||
res_deferreds_2[0].addBoth(self.check_context, None) # type: ignore[unused-awaitable]
|
||||
second_lookup_state[0] = 1
|
||||
await make_deferred_yieldable(res_deferreds_2[0])
|
||||
second_lookup_state[0] = 2
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from typing import Any, Awaitable, ContextManager, Dict, Optional, Tuple
|
||||
from typing import Any, ContextManager, Dict, Optional, Tuple
|
||||
from unittest.mock import ANY, Mock, patch
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
@@ -287,7 +287,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
|
||||
"""Provider metadatas are extensively validated."""
|
||||
h = self.provider
|
||||
|
||||
def force_load_metadata() -> Awaitable[None]:
|
||||
def force_load_metadata() -> "OpenIDProviderMetadata":
|
||||
async def force_load() -> "OpenIDProviderMetadata":
|
||||
return await h.load_metadata(force=True)
|
||||
|
||||
|
||||
@@ -460,7 +460,7 @@ class FederationClientTests(HomeserverTestCase):
|
||||
self.failureResultOf(d)
|
||||
|
||||
def test_client_sends_body(self) -> None:
|
||||
defer.ensureDeferred(
|
||||
defer.ensureDeferred( # type: ignore[unused-awaitable]
|
||||
self.cl.post_json(
|
||||
"testserv:8008", "foo/bar", timeout=10000, data={"a": "b"}
|
||||
)
|
||||
|
||||
@@ -151,7 +151,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
|
||||
outfile.write(yaml.dump(as_yaml))
|
||||
self.as_yaml_files.append(as_token)
|
||||
|
||||
def _set_state(self, id: str, state: ApplicationServiceState) -> defer.Deferred:
|
||||
def _set_state(
|
||||
self, id: str, state: ApplicationServiceState
|
||||
) -> "defer.Deferred[None]":
|
||||
return self.db_pool.runOperation(
|
||||
self.engine.convert_param_style(
|
||||
"INSERT INTO application_services_state(as_id, state) VALUES(?,?)"
|
||||
@@ -297,7 +299,9 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
|
||||
service = Mock(id=self.as_list[0]["id"])
|
||||
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||
txn_id = 5
|
||||
self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
|
||||
self.get_success(
|
||||
self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
|
||||
)
|
||||
self.get_success(self._insert_txn(service.id, txn_id, events))
|
||||
self.get_success(
|
||||
self.store.complete_appservice_txn(txn_id=txn_id, service=service)
|
||||
|
||||
@@ -117,7 +117,7 @@ class JsonResourceTests(unittest.TestCase):
|
||||
|
||||
def _callback(request: SynapseRequest, **kwargs: object) -> "Deferred[None]":
|
||||
d: "Deferred[None]" = Deferred()
|
||||
d.addCallback(_throw)
|
||||
d.addCallback(_throw) # type: ignore[unused-awaitable]
|
||||
self.reactor.callLater(0.5, d.callback, True)
|
||||
return make_deferred_yieldable(d)
|
||||
|
||||
|
||||
+1
-1
@@ -563,7 +563,7 @@ class HomeserverTestCase(TestCase):
|
||||
deferred: Deferred[TV] = ensureDeferred(d) # type: ignore[arg-type]
|
||||
|
||||
results: list = []
|
||||
deferred.addBoth(results.append)
|
||||
deferred.addBoth(results.append) # type: ignore[unused-awaitable]
|
||||
|
||||
self.pump(by=by)
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ class DeferredCacheTestCase(TestCase):
|
||||
def test_empty(self) -> None:
|
||||
cache: DeferredCache[str, int] = DeferredCache("test")
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get("foo")
|
||||
cache.get("foo") # type: ignore[unused-awaitable]
|
||||
|
||||
def test_hit(self) -> None:
|
||||
cache: DeferredCache[str, int] = DeferredCache("test")
|
||||
@@ -48,7 +48,7 @@ class DeferredCacheTestCase(TestCase):
|
||||
self.assertTrue(set_d.called)
|
||||
return r
|
||||
|
||||
get_d.addCallback(check1)
|
||||
get_d.addCallback(check1) # type: ignore[unused-awaitable]
|
||||
|
||||
# now fire off all the deferreds
|
||||
origin_d.callback(99)
|
||||
@@ -130,7 +130,7 @@ class DeferredCacheTestCase(TestCase):
|
||||
def test_get_immediate(self) -> None:
|
||||
cache: DeferredCache[str, int] = DeferredCache("test")
|
||||
d1: "defer.Deferred[int]" = defer.Deferred()
|
||||
cache.set("key1", d1)
|
||||
cache.set("key1", d1) # type: ignore[unused-awaitable]
|
||||
|
||||
# get_immediate should return default
|
||||
v = cache.get_immediate("key1", 1)
|
||||
@@ -149,7 +149,7 @@ class DeferredCacheTestCase(TestCase):
|
||||
cache.invalidate(("foo",))
|
||||
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(("foo",))
|
||||
cache.get(("foo",)) # type: ignore[unused-awaitable]
|
||||
|
||||
def test_invalidate_all(self) -> None:
|
||||
cache: DeferredCache[str, str] = DeferredCache("testcache")
|
||||
@@ -161,10 +161,10 @@ class DeferredCacheTestCase(TestCase):
|
||||
|
||||
# add a couple of pending entries
|
||||
d1: "defer.Deferred[str]" = defer.Deferred()
|
||||
cache.set("key1", d1, partial(record_callback, 0))
|
||||
cache.set("key1", d1, partial(record_callback, 0)) # type: ignore[unused-awaitable]
|
||||
|
||||
d2: "defer.Deferred[str]" = defer.Deferred()
|
||||
cache.set("key2", d2, partial(record_callback, 1))
|
||||
cache.set("key2", d2, partial(record_callback, 1)) # type: ignore[unused-awaitable]
|
||||
|
||||
# lookup should return pending deferreds
|
||||
self.assertFalse(cache.get("key1").called)
|
||||
@@ -181,9 +181,9 @@ class DeferredCacheTestCase(TestCase):
|
||||
|
||||
# lookup should fail
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get("key1")
|
||||
cache.get("key1") # type: ignore[unused-awaitable]
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get("key2")
|
||||
cache.get("key2") # type: ignore[unused-awaitable]
|
||||
|
||||
# both callbacks should have been callbacked
|
||||
self.assertTrue(callback_record[0], "Invalidation callback for key1 not called")
|
||||
@@ -192,7 +192,7 @@ class DeferredCacheTestCase(TestCase):
|
||||
# letting the other lookup complete should do nothing
|
||||
d1.callback("result1")
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get("key1", None)
|
||||
cache.get("key1", None) # type: ignore[unused-awaitable]
|
||||
|
||||
def test_eviction(self) -> None:
|
||||
cache: DeferredCache[int, str] = DeferredCache(
|
||||
@@ -204,10 +204,10 @@ class DeferredCacheTestCase(TestCase):
|
||||
cache.prefill(3, "three") # 1 will be evicted
|
||||
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(1)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
|
||||
cache.get(2)
|
||||
cache.get(3)
|
||||
cache.get(2) # type: ignore[unused-awaitable]
|
||||
cache.get(3) # type: ignore[unused-awaitable]
|
||||
|
||||
def test_eviction_lru(self) -> None:
|
||||
cache: DeferredCache[int, str] = DeferredCache(
|
||||
@@ -218,15 +218,15 @@ class DeferredCacheTestCase(TestCase):
|
||||
cache.prefill(2, "two")
|
||||
|
||||
# Now access 1 again, thus causing 2 to be least-recently used
|
||||
cache.get(1)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
|
||||
cache.prefill(3, "three")
|
||||
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(2)
|
||||
cache.get(2) # type: ignore[unused-awaitable]
|
||||
|
||||
cache.get(1)
|
||||
cache.get(3)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
cache.get(3) # type: ignore[unused-awaitable]
|
||||
|
||||
def test_eviction_iterable(self) -> None:
|
||||
cache: DeferredCache[int, List[str]] = DeferredCache(
|
||||
@@ -240,40 +240,40 @@ class DeferredCacheTestCase(TestCase):
|
||||
cache.prefill(2, ["three"])
|
||||
|
||||
# Now access 1 again, thus causing 2 to be least-recently used
|
||||
cache.get(1)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
|
||||
# Now add an item to the cache, which evicts 2.
|
||||
cache.prefill(3, ["four"])
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(2)
|
||||
cache.get(2) # type: ignore[unused-awaitable]
|
||||
|
||||
# Ensure 1 & 3 are in the cache.
|
||||
cache.get(1)
|
||||
cache.get(3)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
cache.get(3) # type: ignore[unused-awaitable]
|
||||
|
||||
# Now access 1 again, thus causing 3 to be least-recently used
|
||||
cache.get(1)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
|
||||
# Now add an item with multiple elements to the cache
|
||||
cache.prefill(4, ["five", "six"])
|
||||
|
||||
# Both 1 and 3 are evicted since there's too many elements.
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(1)
|
||||
cache.get(1) # type: ignore[unused-awaitable]
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(3)
|
||||
cache.get(3) # type: ignore[unused-awaitable]
|
||||
|
||||
# Now add another item to fill the cache again.
|
||||
cache.prefill(5, ["seven"])
|
||||
|
||||
# Now access 4, thus causing 5 to be least-recently used
|
||||
cache.get(4)
|
||||
cache.get(4) # type: ignore[unused-awaitable]
|
||||
|
||||
# Add an empty item.
|
||||
cache.prefill(6, [])
|
||||
|
||||
# 5 gets evicted and replaced since an empty element counts as an item.
|
||||
with self.assertRaises(KeyError):
|
||||
cache.get(5)
|
||||
cache.get(4)
|
||||
cache.get(6)
|
||||
cache.get(5) # type: ignore[unused-awaitable]
|
||||
cache.get(4) # type: ignore[unused-awaitable]
|
||||
cache.get(6) # type: ignore[unused-awaitable]
|
||||
|
||||
@@ -292,12 +292,14 @@ class DescriptorTestCase(unittest.TestCase):
|
||||
# set off a deferred which will do a cache lookup
|
||||
d1 = do_lookup()
|
||||
self.assertEqual(current_context(), SENTINEL_CONTEXT)
|
||||
d1.addCallback(check_result)
|
||||
# type-ignore: addCallback returns self, so as long as we await d1 (and d2)
|
||||
# below, this error is a false positive.
|
||||
d1.addCallback(check_result) # type: ignore[unused-awaitable]
|
||||
|
||||
# and another
|
||||
d2 = do_lookup()
|
||||
self.assertEqual(current_context(), SENTINEL_CONTEXT)
|
||||
d2.addCallback(check_result)
|
||||
d2.addCallback(check_result) # type: ignore[unused-awaitable]
|
||||
|
||||
# let the lookup complete
|
||||
complete_lookup.callback(None)
|
||||
|
||||
@@ -57,7 +57,7 @@ class ObservableDeferredTest(TestCase):
|
||||
self.assertFalse(observer2.called)
|
||||
return res
|
||||
|
||||
observer1.addBoth(check_called_first)
|
||||
observer1.addBoth(check_called_first) # type: ignore[unused-awaitable]
|
||||
|
||||
# store the results
|
||||
results: List[Optional[ObservableDeferred[int]]] = [None, None]
|
||||
@@ -68,8 +68,8 @@ class ObservableDeferredTest(TestCase):
|
||||
results[idx] = res
|
||||
return res
|
||||
|
||||
observer1.addCallback(check_val, 0)
|
||||
observer2.addCallback(check_val, 1)
|
||||
observer1.addCallback(check_val, 0) # type: ignore[unused-awaitable]
|
||||
observer2.addCallback(check_val, 1) # type: ignore[unused-awaitable]
|
||||
|
||||
origin_d.callback(123)
|
||||
self.assertEqual(results[0], 123, "observer 1 callback result")
|
||||
@@ -90,7 +90,7 @@ class ObservableDeferredTest(TestCase):
|
||||
self.assertFalse(observer2.called)
|
||||
return res
|
||||
|
||||
observer1.addBoth(check_called_first)
|
||||
observer1.addBoth(check_called_first) # type: ignore[unused-awaitable]
|
||||
|
||||
# store the results
|
||||
results: List[Optional[ObservableDeferred[str]]] = [None, None]
|
||||
@@ -99,8 +99,8 @@ class ObservableDeferredTest(TestCase):
|
||||
results[idx] = res
|
||||
return None
|
||||
|
||||
observer1.addErrback(check_val, 0)
|
||||
observer2.addErrback(check_val, 1)
|
||||
observer1.addErrback(check_val, 0) # type: ignore[unused-awaitable]
|
||||
observer2.addErrback(check_val, 1) # type: ignore[unused-awaitable]
|
||||
|
||||
try:
|
||||
raise Exception("gah!")
|
||||
@@ -208,11 +208,11 @@ class TimeoutDeferredTest(TestCase):
|
||||
return res
|
||||
|
||||
original_deferred = blocking()
|
||||
original_deferred.addErrback(errback, "orig")
|
||||
original_deferred.addErrback(errback, "orig") # type: ignore[unused-awaitable]
|
||||
timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock)
|
||||
self.assertNoResult(timing_out_d)
|
||||
self.assertIs(current_context(), SENTINEL_CONTEXT)
|
||||
timing_out_d.addErrback(errback, "timingout")
|
||||
timing_out_d.addErrback(errback, "timingout") # type: ignore[unused-awaitable]
|
||||
|
||||
self.clock.pump((1.0,))
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
|
||||
_, _, unblock = self._start_task(linearizer, key)
|
||||
for i in range(1, 100):
|
||||
defer.ensureDeferred(func(i))
|
||||
defer.ensureDeferred(func(i)) # type: ignore[unused-awaitable]
|
||||
|
||||
d = defer.ensureDeferred(func(1000))
|
||||
unblock()
|
||||
|
||||
@@ -74,7 +74,8 @@ class LoggingContextTestCase(unittest.TestCase):
|
||||
callback_completed = True
|
||||
return res
|
||||
|
||||
d2.addCallback(cb)
|
||||
# type-ignore: this doesn't create a new Deferred: allCallback returns self.
|
||||
d2.addCallback(cb) # type: ignore[unused-awaitable]
|
||||
|
||||
self._check_test_key("one")
|
||||
|
||||
@@ -195,5 +196,5 @@ def _chained_deferred_function() -> defer.Deferred:
|
||||
reactor.callLater(0, d2.callback, res)
|
||||
return d2
|
||||
|
||||
d.addCallback(cb)
|
||||
d.addCallback(cb) # type: ignore[unused-awaitable]
|
||||
return d
|
||||
|
||||
@@ -109,7 +109,7 @@ class FederationRateLimiterTestCase(TestCase):
|
||||
await d
|
||||
|
||||
for _ in range(1, 100):
|
||||
defer.ensureDeferred(task())
|
||||
defer.ensureDeferred(task()) # type: ignore[unused-awaitable]
|
||||
|
||||
last_task = defer.ensureDeferred(task())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user