1
0

Add a background update stage to sort the remote users into the stale profile queue as appropriate

This commit is contained in:
Olivier Wilkinson (reivilibre)
2023-02-17 11:57:41 +00:00
parent 461cdb631f
commit d448469ea7
@@ -93,6 +93,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
"populate_user_directory_process_users",
self._populate_user_directory_process_local_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_process_remote_users",
self._populate_user_directory_process_remote_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)
@@ -441,6 +445,104 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return len(users_to_work_on)
async def _populate_user_directory_process_remote_users(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Sorts through the `_remote_users_needing_lookup` table and adds the
users within to the list of stale remote profiles,
unless we already populated a user directory entry for them (i.e. they were
also in a public room).
"""
def _get_next_batch_txn(
txn: LoggingTransaction, done_up_to_user_id: str
) -> Optional[str]:
"""
Given the last user ID we've processed,
Returns
- a user ID to process up to and including; or
- `None` if there is no limit left (i.e. we should just process all
remaining rows).
"""
# Should be a B-Tree index only scan: so reasonably efficient despite the
# OFFSET
# If we're lucky, will also warm up the disk cache for the subsequent query
# that actually does some work.
txn.execute(
f"""
SELECT user_id
FROM {TEMP_TABLE}_remote_users_needing_lookup
WHERE user_id > ?
ORDER BY user_id
LIMIT 1 OFFSET ?
""",
(done_up_to_user_id, batch_size),
)
row = txn.fetchone()
if row:
return row[0]
else:
return None
def _add_private_only_users_to_stale_profile_refresh_queue_txn(
txn: LoggingTransaction, from_exc: str, until_inc: Optional[str]
) -> None:
end_condition = "AND user_id <= ?" if until_inc is not None else ""
end_args = (until_inc,) if until_inc is not None else ()
user_id_serverpart: str
if isinstance(self.database_engine, PostgresEngine):
user_id_serverpart = (
"SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)"
)
elif isinstance(self.database_engine, Sqlite3Engine):
user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)"
else:
raise RuntimeError("Unknown database engine!")
txn.execute(
f"""
INSERT INTO user_directory_stale_remote_users
(user_id, next_try_at_ts, retry_counter, user_server_name)
SELECT
user_id, 0, 0, {user_id_serverpart}
FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl
LEFT JOIN user_directory AS ud USING (user_id)
WHERE ud.user_id IS NULL
AND ? < user_id {end_condition}
""",
(from_exc,) + end_args,
)
def _do_txn(txn: LoggingTransaction) -> None:
"""
Does a step of background update.
"""
last_user_id = progress.get("last_user_id", "@")
next_end_limit_inc = _get_next_batch_txn(txn, last_user_id)
_add_private_only_users_to_stale_profile_refresh_queue_txn(
txn, last_user_id, next_end_limit_inc
)
# Update the progress
progress["last_user_id"] = next_end_limit_inc
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_remote_users", progress
)
if progress.get("last_user_id", "@") is None:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_remote_users"
)
return 1
await self.db_pool.runInteraction(
"populate_user_directory_process_remote_users",
_do_txn,
)
return batch_size
async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Is this user one of them?