From eedf64a20acfbf61862e2c3b73f8a6790063bed1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 17 Mar 2022 14:51:14 +0000 Subject: [PATCH] Fold ClientIpStore into ClientIpWorkerStore --- synapse/storage/databases/main/__init__.py | 4 +- synapse/storage/databases/main/client_ips.py | 78 ++++++++++---------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 9643696170..1ea0b2aa6f 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -33,7 +33,7 @@ from .account_data import AccountDataStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore from .cache import CacheInvalidationWorkerStore from .censor_events import CensorEventsStore -from .client_ips import ClientIpStore +from .client_ips import ClientIpWorkerStore from .deviceinbox import DeviceInboxStore from .devices import DeviceStore from .directory import DirectoryStore @@ -112,7 +112,7 @@ class DataStore( AccountDataStore, EventPushActionsStore, OpenIdStore, - ClientIpStore, + ClientIpWorkerStore, DeviceStore, DeviceInboxStore, UserDirectoryStore, diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 065842b29f..71867da01e 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -399,7 +399,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return updated -class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): +class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorkerStore): def __init__( self, database: DatabasePool, @@ -408,11 +408,34 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): ): super().__init__(database, db_conn, hs) + self._update_on_this_worker = hs.config.worker.run_background_tasks + self.user_ips_max_age = hs.config.server.user_ips_max_age if hs.config.worker.run_background_tasks and self.user_ips_max_age: self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) + if self._update_on_this_worker: + # This is the designated worker that can write to the client IP + # tables. + + # (user_id, access_token, ip,) -> last_seen + self.client_ip_last_seen = LruCache[Tuple[str, str, str], int]( + cache_name="client_ip_last_seen", max_size=50000 + ) + + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) + self._batch_row_update: Dict[ + Tuple[str, str, str], Tuple[str, Optional[str], int] + ] = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) + @wrap_as_background_process("prune_old_user_ips") async def _prune_old_user_ips(self) -> None: """Removes entries in user IPs older than the configured period.""" @@ -458,7 +481,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): "_prune_old_user_ips", _prune_old_user_ips_txn ) - async def get_last_client_ip_by_device( + async def _get_last_client_ip_by_device_from_database( self, user_id: str, device_id: Optional[str] ) -> Dict[Tuple[str, str], DeviceLastConnectionInfo]: """For each device_id listed, give the user_ip it was last seen on. @@ -489,7 +512,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): return {(d["user_id"], d["device_id"]): d for d in res} - async def get_user_ip_and_agents( + async def _get_user_ip_and_agents_from_database( self, user: UserID, since_ts: int = 0 ) -> List[LastConnectionInfo]: """Fetch the IPs and user agents for a user since the given timestamp. @@ -541,35 +564,6 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): for access_token, ip, user_agent, last_seen in rows ] - -class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - self._update_on_this_worker = hs.config.worker.run_background_tasks - - # (user_id, access_token, ip,) -> last_seen - self.client_ip_last_seen = LruCache[Tuple[str, str, str], int]( - cache_name="client_ip_last_seen", max_size=50000 - ) - - super().__init__(database, db_conn, hs) - - # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) - self._batch_row_update: Dict[ - Tuple[str, str, str], Tuple[str, Optional[str], int] - ] = {} - - self._client_ip_looper = self._clock.looping_call( - self._update_client_ips_batch, 5 * 1000 - ) - self.hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", self._update_client_ips_batch - ) - async def insert_client_ip( self, user_id: str, @@ -676,11 +670,12 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): A dictionary mapping a tuple of (user_id, device_id) to dicts, with keys giving the column names from the devices table. """ - assert ( - self._update_on_this_worker - ), "This worker is not designated to update client IPs" + ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id) - ret = await super().get_last_client_ip_by_device(user_id, device_id) + if not self._update_on_this_worker: + # Only the writing-worker has additional in-memory data to enhance + # the result + return ret # Update what is retrieved from the database with data which is pending # insertion, as if it has already been stored in the database. @@ -725,13 +720,16 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): Only the latest user agent for each access token and IP address combination is available. """ - assert ( - self._update_on_this_worker - ), "This worker is not designated to update client IPs" + rows_from_db = await self._get_user_ip_and_agents_from_database(user, since_ts) + + if not self._update_on_this_worker: + # Only the writing-worker has additional in-memory data to enhance + # the result + return rows_from_db results: Dict[Tuple[str, str], LastConnectionInfo] = { (connection["access_token"], connection["ip"]): connection - for connection in await super().get_user_ip_and_agents(user, since_ts) + for connection in rows_from_db } # Overlay data that is pending insertion on top of the results from the