Compare commits

...

6 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
023036838c STASH dead-ish end 1 2022-03-18 14:24:14 +00:00
Olivier Wilkinson (reivilibre)
7973fe82ce Make the background worker handle USER_IP replication commands 2022-03-18 13:59:52 +00:00
Olivier Wilkinson (reivilibre)
eedf64a20a Fold ClientIpStore into ClientIpWorkerStore 2022-03-17 14:53:28 +00:00
Olivier Wilkinson (reivilibre)
4908222720 Add assertions that client IP update methods only run on a designated worker 2022-03-17 14:53:28 +00:00
Olivier Wilkinson (reivilibre)
a2476fdada Fold MontlyActiveUsersStore into MonthlyActiveUsersWorkerStore
This is to prepare for moving if off the main process, so we no longer

give one store special treatment.
2022-03-17 14:53:27 +00:00
Olivier Wilkinson (reivilibre)
9c52c1dc06 Add assertions that some MAU updating methods only run on the background worker
Currently don't hold, but it's preparation for the next step
2022-03-17 13:42:36 +00:00
5 changed files with 234 additions and 157 deletions

View File

@@ -11,27 +11,173 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Tuple
from typing import TYPE_CHECKING
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.util.caches.lrucache import LruCache
from ._base import BaseSlavedStore
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class AbstractClientIpStrategy(abc.ABC):
"""
Abstract interface for the operations that a store should be able to provide
for dealing with client IPs.
See `DatabaseWritingClientIpStrategy` (the single writer)
and `ReplicationStreamingClientIpStrategy` (the
"""
async def insert_client_ip(
self, user_id: str, access_token: str, ip: str, user_agent: str, device_id: str
) -> None:
"""
Insert a client IP.
TODO docstring
"""
...
class DatabaseWritingClientIpStrategy(AbstractClientIpStrategy):
"""
Strategy for writing client IPs by direct database access.
This is intended to be used on a single designated Synapse worker
(the background worker).
"""
class SlavedClientIpStore(BaseSlavedStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
db_pool: DatabasePool,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
) -> None:
assert (
hs.config.worker.run_background_tasks
), "This worker is not designated to update client IPs"
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._db_pool = db_pool
# This is the designated worker that can write to the client IP
# tables.
# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
async def insert_client_ip(
self,
user_id: str,
access_token: str,
ip: str,
user_agent: str,
device_id: Optional[str],
now: Optional[int] = None,
) -> None:
if not now:
now = int(self._clock.time_msec())
key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
await self._store.populate_monthly_active_users(user_id)
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.set(key, now)
self._batch_row_update[key] = (user_agent, device_id, now)
@wrap_as_background_process("update_client_ips")
async def _update_client_ips_batch(self) -> None:
# If the DB pool has already terminated, don't try updating
if not self._db_pool.is_running():
return
to_update = self._batch_row_update
self._batch_row_update = {}
await self._db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
def _update_client_ips_batch_txn(
self,
txn: LoggingTransaction,
to_update: Mapping[Tuple[str, str, str], Tuple[str, Optional[str], int]],
) -> None:
db_pool = self._db_pool
if "user_ips" in db_pool._unsafe_to_upsert_tables or (
not db_pool.engine.can_native_upsert
):
db_pool.engine.lock_table(txn, "user_ips")
for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
db_pool.simple_upsert_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
)
# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)
class ReplicationStreamingClientIpStrategy(AbstractClientIpStrategy):
"""
Strategy for writing client IPs by streaming them over replication to
a designated writer worker.
"""
def __init__(self, hs: "HomeServer"):
self.hs = hs
self._clock = hs.get_clock()
self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
cache_name="client_ip_last_seen", max_size=50000

View File

@@ -235,6 +235,8 @@ class ReplicationCommandHandler:
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()
self._is_background_worker = hs.config.worker.run_background_tasks
def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
@@ -401,12 +403,27 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
if self._is_master:
if self._is_master or self._is_background_worker:
return self._handle_user_ip(cmd)
else:
return None
async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
"""
Handles a User IP, branching depending on whether we are the main process
and/or the background worker.
"""
if self._is_master:
await self._handle_user_ip_as_master(cmd)
if self._is_background_worker:
await self._handle_user_ip_as_background_worker(cmd)
async def _handle_user_ip_as_master(self, cmd: UserIpCommand) -> None:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
async def _handle_user_ip_as_background_worker(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
@@ -416,9 +433,6 @@ class ReplicationCommandHandler:
cmd.last_seen,
)
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes

View File

@@ -33,7 +33,7 @@ from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .client_ips import ClientIpWorkerStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
@@ -49,7 +49,7 @@ from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .monthly_active_users import MonthlyActiveUsersWorkerStore
from .openid import OpenIdStore
from .presence import PresenceStore
from .profile import ProfileStore
@@ -112,13 +112,13 @@ class DataStore(
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
MonthlyActiveUsersWorkerStore,
StatsStore,
RelationsStore,
CensorEventsStore,

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast
from typing_extensions import TypedDict
@@ -25,9 +25,10 @@ from synapse.storage.database import (
LoggingTransaction,
make_tuple_comparison_clause,
)
from synapse.storage.databases.main.monthly_active_users import MonthlyActiveUsersStore
from synapse.storage.databases.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.types import JsonDict, UserID
from synapse.util.caches.lrucache import LruCache
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -397,7 +398,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return updated
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -406,11 +407,16 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
):
super().__init__(database, db_conn, hs)
self._update_on_this_worker = hs.config.worker.run_background_tasks
self.user_ips_max_age = hs.config.server.user_ips_max_age
if hs.config.worker.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
if self._update_on_this_worker:
... # TODO
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self) -> None:
"""Removes entries in user IPs older than the configured period."""
@@ -456,7 +462,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
"_prune_old_user_ips", _prune_old_user_ips_txn
)
async def get_last_client_ip_by_device(
async def _get_last_client_ip_by_device_from_database(
self, user_id: str, device_id: Optional[str]
) -> Dict[Tuple[str, str], DeviceLastConnectionInfo]:
"""For each device_id listed, give the user_ip it was last seen on.
@@ -487,7 +493,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
return {(d["user_id"], d["device_id"]): d for d in res}
async def get_user_ip_and_agents(
async def _get_user_ip_and_agents_from_database(
self, user: UserID, since_ts: int = 0
) -> List[LastConnectionInfo]:
"""Fetch the IPs and user agents for a user since the given timestamp.
@@ -539,115 +545,7 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
for access_token, ip, user_agent, last_seen in rows
]
class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
# (user_id, access_token, ip,) -> last_seen
self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
cache_name="client_ip_last_seen", max_size=50000
)
super().__init__(database, db_conn, hs)
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update: Dict[
Tuple[str, str, str], Tuple[str, Optional[str], int]
] = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", self._update_client_ips_batch
)
async def insert_client_ip(
self,
user_id: str,
access_token: str,
ip: str,
user_agent: str,
device_id: Optional[str],
now: Optional[int] = None,
) -> None:
if not now:
now = int(self._clock.time_msec())
key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
await self.populate_monthly_active_users(user_id)
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.set(key, now)
self._batch_row_update[key] = (user_agent, device_id, now)
@wrap_as_background_process("update_client_ips")
async def _update_client_ips_batch(self) -> None:
# If the DB pool has already terminated, don't try updating
if not self.db_pool.is_running():
return
to_update = self._batch_row_update
self._batch_row_update = {}
await self.db_pool.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
def _update_client_ips_batch_txn(
self,
txn: LoggingTransaction,
to_update: Mapping[Tuple[str, str, str], Tuple[str, Optional[str], int]],
) -> None:
if "user_ips" in self.db_pool._unsafe_to_upsert_tables or (
not self.database_engine.can_native_upsert
):
self.database_engine.lock_table(txn, "user_ips")
for entry in to_update.items():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
self.db_pool.simple_upsert_txn(
txn,
table="user_ips",
keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
values={
"user_agent": user_agent,
"device_id": device_id,
"last_seen": last_seen,
},
lock=False,
)
# Technically an access token might not be associated with
# a device so we need to check.
if device_id:
# this is always an update rather than an upsert: the row should
# already exist, and if it doesn't, that may be because it has been
# deleted, and we don't want to re-create it.
self.db_pool.simple_update_txn(
txn,
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
updatevalues={
"user_agent": user_agent,
"last_seen": last_seen,
"ip": ip,
},
)
# TODO ici
async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
@@ -662,7 +560,12 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""
ret = await super().get_last_client_ip_by_device(user_id, device_id)
ret = await self._get_last_client_ip_by_device_from_database(user_id, device_id)
if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return ret
# Update what is retrieved from the database with data which is pending
# insertion, as if it has already been stored in the database.
@@ -707,9 +610,16 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersStore):
Only the latest user agent for each access token and IP address combination
is available.
"""
rows_from_db = await self._get_user_ip_and_agents_from_database(user, since_ts)
if not self._update_on_this_worker:
# Only the writing-worker has additional in-memory data to enhance
# the result
return rows_from_db
results: Dict[Tuple[str, str], LastConnectionInfo] = {
(connection["access_token"], connection["ip"]): connection
for connection in await super().get_user_ip_and_agents(user, since_ts)
for connection in rows_from_db
}
# Overlay data that is pending insertion on top of the results from the

View File

@@ -45,9 +45,24 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
self._clock = hs.get_clock()
self.hs = hs
self._update_on_this_worker = hs.config.worker.run_background_tasks
self._limit_usage_by_mau = hs.config.server.limit_usage_by_mau
self._max_mau_value = hs.config.server.max_mau_value
self._mau_stats_only = hs.config.server.mau_stats_only
if self._update_on_this_worker:
# Do not add more reserved users than the total allowable number
self.db_pool.new_transaction(
db_conn,
"initialise_mau_threepids",
[],
[],
self._initialise_reserved_users,
hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
)
@cached(num_args=0)
async def get_monthly_active_count(self) -> int:
"""Generates current count of monthly active users
@@ -220,28 +235,6 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
"reap_monthly_active_users", _reap_users, reserved_users
)
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._mau_stats_only = hs.config.server.mau_stats_only
# Do not add more reserved users than the total allowable number
self.db_pool.new_transaction(
db_conn,
"initialise_mau_threepids",
[],
[],
self._initialise_reserved_users,
hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
)
def _initialise_reserved_users(self, txn, threepids):
"""Ensures that reserved threepids are accounted for in the MAU table, should
be called on start up.
@@ -250,6 +243,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
txn (cursor):
threepids (list[dict]): List of threepid dicts to reserve
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update MAUs"
# XXX what is this function trying to achieve? It upserts into
# monthly_active_users for each *registered* reserved mau user, but why?
@@ -283,6 +279,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
Args:
user_id: user to add/update
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update MAUs"
# Support user never to be included in MAU stats. Note I can't easily call this
# from upsert_monthly_active_user_txn because then I need a _txn form of
# is_support_user which is complicated because I want to cache the result.
@@ -316,6 +316,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
txn (cursor):
user_id (str): user to add/update
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update MAUs"
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
@@ -343,6 +346,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
Args:
user_id(str): the user_id to query
"""
assert (
self._update_on_this_worker
), "This worker is not designated to update MAUs"
if self._limit_usage_by_mau or self._mau_stats_only:
# Trial users and guests should not be included as part of MAU group
is_guest = await self.is_guest(user_id)