1
0

Fold ClientIpStore into ClientIpWorkerStore

This commit is contained in:
Olivier Wilkinson (reivilibre)
2022-03-17 14:51:14 +00:00
parent 4908222720
commit eedf64a20a
2 changed files with 40 additions and 42 deletions
+2 -2
View File
@@ -33,7 +33,7 @@ from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .client_ips import ClientIpWorkerStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
@@ -112,7 +112,7 @@ class DataStore(
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
+38 -40
View File
@@ -399,7 +399,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return updated
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -408,11 +408,34 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
):
super().__init__(database, db_conn, hs)
self._update_on_this_worker = hs.config.worker.run_background_tasks
self.user_ips_max_age = hs.config.server.user_ips_max_age
if hs.config.worker.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
if self._update_on_this_worker:
# This is the designated worker that can write to the client IP
# tables.
# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self) -> None:
"""Removes entries in user IPs older than the configured period."""
@@ -458,7 +481,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
"_prune_old_user_ips", _prune_old_user_ips_txn
)
async def get_last_client_ip_by_device(
async def _get_last_client_ip_by_device_from_database(
self, user_id: str, device_id: Optional[str]
) -> Dict[Tuple[str, str], DeviceLastConnectionInfo]:
"""For each device_id listed, give the user_ip it was last seen on.
@@ -489,7 +512,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
return {(d["user_id"], d["device_id"]): d for d in res}
async def get_user_ip_and_agents(
async def _get_user_ip_and_agents_from_database(
self, user: UserID, since_ts: int = 0
) -> List[LastConnectionInfo]:
"""Fetch the IPs and user agents for a user since the given timestamp.
@@ -541,35 +564,6 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
for access_token, ip, user_agent, last_seen in rows
]
class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
self._update_on_this_worker = hs.config.worker.run_background_tasks
# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)
super().__init__(database, db_conn, hs)
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
async def insert_client_ip(
self,
user_id: str,
@@ -676,11 +670,12 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update client IPs"
ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id)
ret = await super().get_last_client_ip_by_device(user_id, device_id)
if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return ret
# Update what is retrieved from the database with data which is pending
# insertion, as if it has already been stored in the database.
@@ -725,13 +720,16 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
Only the latest user agent for each access token and IP address combination
is available.
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update client IPs"
rows_from_db = await self._get_user_ip_and_agents_from_database(user, since_ts)
if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return rows_from_db
results: Dict[Tuple[str, str], LastConnectionInfo] = {
(connection["access_token"], connection["ip"]): connection
for connection in await super().get_user_ip_and_agents(user, since_ts)
for connection in rows_from_db
}
# Overlay data that is pending insertion on top of the results from the