From dc7e6c1cfc7cf9d7ed92b7d9f24511d23f02ba34 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 13 Dec 2021 16:39:40 +0000 Subject: [PATCH] Find interesting users for the AS when sending OTKs and FBKs --- synapse/appservice/scheduler.py | 38 +++++++++++++++++--- synapse/storage/databases/main/appservice.py | 17 +++++++-- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 9dc0cfed8d..e235a6ee95 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -163,6 +163,7 @@ class _ServiceQueuer: self._msc3202_transaction_extensions_enabled: bool = ( hs.config.experimental.msc3202_transaction_extensions ) + self._store = hs.get_datastore() def start_background_request(self, service: ApplicationService) -> None: # start a sender for this appservice if we don't already have one @@ -207,7 +208,7 @@ class _ServiceQueuer: # Lazily compute the one-time key counts and fallback key # usage states for the users which are mentioned in this # transaction, as well as the appservice's sender. - interesting_users = self._determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( + interesting_users = await self._determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( service, events, ephemeral, to_device_messages_to_send ) ( @@ -231,11 +232,11 @@ class _ServiceQueuer: finally: self.requests_in_flight.discard(service.id) - def _determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( + async def _determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( self, service: ApplicationService, events: Iterable[EventBase], - ephemeral: Iterable[JsonDict], + ephemerals: Iterable[JsonDict], to_device_messages: Iterable[JsonDict], ) -> Set[str]: """ @@ -243,8 +244,35 @@ class _ServiceQueuer: compute a list of application services users that may have interesting updates to the one-time key counts or fallback key usage. """ - # OSTD implement me! - return set() + interesting_users: Set[str] = set() + + # The sender is always included + interesting_users.add(service.sender) + + # All AS users that would receive the PDUs or EDUs sent to these rooms + # are classed as 'interesting'. + rooms_of_interesting_users: Set[str] = set() + # PDUs + rooms_of_interesting_users.update(event.room_id for event in events) + # EDUs + rooms_of_interesting_users.update( + ephemeral["room_id"] for ephemeral in ephemerals + ) + + # Look up the AS users in those rooms + for room_id in rooms_of_interesting_users: + interesting_users.update( + await self._store.get_app_service_users_in_room(room_id, service) + ) + + # Add recipients of to-device messages. + # device_message["user_id"] is the ID of the recipient. + interesting_users.update( + device_message["user_id"] for device_message in to_device_messages + ) + + return interesting_users + async def _compute_msc3202_otk_counts_and_fallback_keys( self, users: Set[str] ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]: diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index c4409983eb..d9a79d6f4f 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -25,11 +25,13 @@ from synapse.appservice import ( ) from synapse.config.appservice import load_appservices from synapse.events import EventBase -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import db_to_json from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.types import JsonDict from synapse.util import json_encoder +from synapse.util.caches.descriptors import cached if TYPE_CHECKING: from synapse.server import HomeServer @@ -58,7 +60,7 @@ def _make_exclusive_regex( return exclusive_user_pattern -class ApplicationServiceWorkerStore(SQLBaseStore): +class ApplicationServiceWorkerStore(RoomMemberWorkerStore): def __init__( self, database: DatabasePool, @@ -126,6 +128,17 @@ class ApplicationServiceWorkerStore(SQLBaseStore): return service return None + # OSTD cache invalidation + @cached(iterable=True, prune_unread_entries=False) + async def get_app_service_users_in_room( + self, room_id: str, app_service: "ApplicationService" + ) -> List[str]: + return list( + filter( + app_service.is_interested_in_user, await self.get_users_in_room(room_id) + ) + ) + class ApplicationServiceStore(ApplicationServiceWorkerStore): # This is currently empty due to there not being any AS storage functions