Add caches to new hot path functions (#18337)

We call these two functions for every authed request when using
delegated auth.
This commit is contained in:
Erik Johnston
2025-04-14 17:54:47 +01:00
committed by GitHub
parent dd05cc55ee
commit ae701e1709
4 changed files with 23 additions and 15 deletions

1
changelog.d/18337.misc Normal file
View File

@@ -0,0 +1 @@
Add cache to storage functions used to auth requests when using delegated auth.

View File

@@ -163,6 +163,8 @@ class DeviceWorkerHandler:
raise errors.NotFoundError()
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
device = dict(device)
_update_device_from_client_ips(device, ips)
set_tag("device", str(device))

View File

@@ -282,9 +282,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
"count_devices_by_users", count_devices_by_users_txn, user_ids
)
@cached()
async def get_device(
self, user_id: str, device_id: str
) -> Optional[Dict[str, Any]]:
) -> Optional[Mapping[str, Any]]:
"""Retrieve a device. Only returns devices that are not marked as
hidden.
@@ -1817,6 +1818,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
},
desc="store_device",
)
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
if not inserted:
# if the device already exists, check if it's a real device, or
# if the device ID is reserved by something else
@@ -1882,6 +1885,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values=device_ids,
keyvalues={"user_id": user_id},
)
self._invalidate_cache_and_stream_bulk(
txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
)
for batch in batch_iter(device_ids, 100):
await self.db_pool.runInteraction(
@@ -1915,6 +1921,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
updatevalues=updates,
desc="update_device",
)
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
async def update_remote_device_list_cache_entry(
self, user_id: str, device_id: str, content: JsonDict, stream_id: str

View File

@@ -759,6 +759,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
self._invalidate_cache_and_stream(
txn, self.get_user_by_external_id, (auth_provider, external_id)
)
self.db_pool.simple_insert_txn(
txn,
@@ -789,6 +792,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
},
desc="remove_user_external_id",
)
await self.invalidate_cache_and_stream(
"get_user_by_external_id", (auth_provider, external_id)
)
async def replace_user_external_id(
self,
@@ -809,29 +815,20 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
ExternalIDReuseException if the new external_id could not be mapped.
"""
def _remove_user_external_ids_txn(
def _replace_user_external_id_txn(
txn: LoggingTransaction,
user_id: str,
) -> None:
"""Remove all mappings from external user ids to a mxid
If these mappings are not found, this method does nothing.
Args:
user_id: complete mxid that it is mapped to
"""
self.db_pool.simple_delete_txn(
txn,
table="user_external_ids",
keyvalues={"user_id": user_id},
)
def _replace_user_external_id_txn(
txn: LoggingTransaction,
) -> None:
_remove_user_external_ids_txn(txn, user_id)
for auth_provider, external_id in record_external_ids:
self._invalidate_cache_and_stream(
txn, self.get_user_by_external_id, (auth_provider, external_id)
)
self._record_user_external_id_txn(
txn,
auth_provider,
@@ -847,6 +844,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
except self.database_engine.module.IntegrityError:
raise ExternalIDReuseException()
@cached()
async def get_user_by_external_id(
self, auth_provider: str, external_id: str
) -> Optional[str]: