From d448469ea7ddb11eb0f856738a3ee6d9e69004bd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 11:57:41 +0000 Subject: [PATCH] Add a background update stage to sort the remote users into the stale profile queue as appropriate --- .../storage/databases/main/user_directory.py | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 376daefdcf..1beb01cd77 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -93,6 +93,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): "populate_user_directory_process_users", self._populate_user_directory_process_local_users, ) + self.db_pool.updates.register_background_update_handler( + "populate_user_directory_process_remote_users", + self._populate_user_directory_process_remote_users, + ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) @@ -441,6 +445,104 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return len(users_to_work_on) + async def _populate_user_directory_process_remote_users( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Sorts through the `_remote_users_needing_lookup` table and adds the + users within to the list of stale remote profiles, + unless we already populated a user directory entry for them (i.e. they were + also in a public room). + """ + + def _get_next_batch_txn( + txn: LoggingTransaction, done_up_to_user_id: str + ) -> Optional[str]: + """ + Given the last user ID we've processed, + Returns + - a user ID to process up to and including; or + - `None` if there is no limit left (i.e. we should just process all + remaining rows). + """ + # Should be a B-Tree index only scan: so reasonably efficient despite the + # OFFSET + # If we're lucky, will also warm up the disk cache for the subsequent query + # that actually does some work. + txn.execute( + f""" + SELECT user_id + FROM {TEMP_TABLE}_remote_users_needing_lookup + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET ? + """, + (done_up_to_user_id, batch_size), + ) + row = txn.fetchone() + if row: + return row[0] + else: + return None + + def _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn: LoggingTransaction, from_exc: str, until_inc: Optional[str] + ) -> None: + end_condition = "AND user_id <= ?" if until_inc is not None else "" + end_args = (until_inc,) if until_inc is not None else () + + user_id_serverpart: str + if isinstance(self.database_engine, PostgresEngine): + user_id_serverpart = ( + "SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)" + else: + raise RuntimeError("Unknown database engine!") + + txn.execute( + f""" + INSERT INTO user_directory_stale_remote_users + (user_id, next_try_at_ts, retry_counter, user_server_name) + SELECT + user_id, 0, 0, {user_id_serverpart} + FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl + LEFT JOIN user_directory AS ud USING (user_id) + WHERE ud.user_id IS NULL + AND ? < user_id {end_condition} + """, + (from_exc,) + end_args, + ) + + def _do_txn(txn: LoggingTransaction) -> None: + """ + Does a step of background update. + """ + last_user_id = progress.get("last_user_id", "@") + next_end_limit_inc = _get_next_batch_txn(txn, last_user_id) + _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn, last_user_id, next_end_limit_inc + ) + + # Update the progress + progress["last_user_id"] = next_end_limit_inc + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_remote_users", progress + ) + + if progress.get("last_user_id", "@") is None: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_remote_users" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_user_directory_process_remote_users", + _do_txn, + ) + return batch_size + async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. Is this user one of them?