From ec81f8d38f4fb2de664e41b2779d494ad0b682c4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 13 Dec 2021 16:39:40 +0000 Subject: [PATCH] Find interesting users for the AS when sending OTKs and FBKs --- synapse/appservice/scheduler.py | 38 +++++++++++++++++--- synapse/storage/databases/main/appservice.py | 27 +++++++++++--- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 8657aae7d0..9c8bec5ed7 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -160,6 +160,7 @@ class _ServiceQueuer: self._msc3202_transaction_extensions_enabled: bool = ( hs.config.experimental.msc3202_transaction_extensions ) + self._store = hs.get_datastore() def start_background_request(self, service): # start a sender for this appservice if we don't already have one @@ -204,7 +205,7 @@ class _ServiceQueuer: # Lazily compute the one-time key counts and fallback key # usage states for the users which are mentioned in this # transaction, as well as the appservice's sender. - interesting_users = self._determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( + interesting_users = await self._determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( service, events, ephemeral, to_device_messages_to_send ) ( @@ -228,11 +229,11 @@ class _ServiceQueuer: finally: self.requests_in_flight.discard(service.id) - def _determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( + async def _determine_interesting_users_for_msc3202_otk_counts_and_fallback_keys( self, service: ApplicationService, events: Iterable[EventBase], - ephemeral: Iterable[JsonDict], + ephemerals: Iterable[JsonDict], to_device_messages: Iterable[JsonDict], ) -> Set[str]: """ @@ -240,8 +241,35 @@ class _ServiceQueuer: compute a list of application services users that may have interesting updates to the one-time key counts or fallback key usage. """ - # OSTD implement me! - return set() + interesting_users: Set[str] = set() + + # The sender is always included + interesting_users.add(service.sender) + + # All AS users that would receive the PDUs or EDUs sent to these rooms + # are classed as 'interesting'. + rooms_of_interesting_users: Set[str] = set() + # PDUs + rooms_of_interesting_users.update(event.room_id for event in events) + # EDUs + rooms_of_interesting_users.update( + ephemeral["room_id"] for ephemeral in ephemerals + ) + + # Look up the AS users in those rooms + for room_id in rooms_of_interesting_users: + interesting_users.update( + await self._store.get_app_service_users_in_room(room_id, service) + ) + + # Add recipients of to-device messages. + # device_message["user_id"] is the ID of the recipient. + interesting_users.update( + device_message["user_id"] for device_message in to_device_messages + ) + + return interesting_users + async def _compute_msc3202_otk_counts_and_fallback_keys( self, users: Set[str] ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]: diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index a0404e2391..d9a79d6f4f 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -25,12 +25,13 @@ from synapse.appservice import ( ) from synapse.config.appservice import load_appservices from synapse.events import EventBase -from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage._base import db_to_json +from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.types import Connection +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.types import JsonDict from synapse.util import json_encoder +from synapse.util.caches.descriptors import cached if TYPE_CHECKING: from synapse.server import HomeServer @@ -59,8 +60,13 @@ def _make_exclusive_regex( return exclusive_user_pattern -class ApplicationServiceWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"): +class ApplicationServiceWorkerStore(RoomMemberWorkerStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): self.services_cache = load_appservices( hs.hostname, hs.config.appservice.app_service_config_files ) @@ -122,6 +128,17 @@ class ApplicationServiceWorkerStore(SQLBaseStore): return service return None + # OSTD cache invalidation + @cached(iterable=True, prune_unread_entries=False) + async def get_app_service_users_in_room( + self, room_id: str, app_service: "ApplicationService" + ) -> List[str]: + return list( + filter( + app_service.is_interested_in_user, await self.get_users_in_room(room_id) + ) + ) + class ApplicationServiceStore(ApplicationServiceWorkerStore): # This is currently empty due to there not being any AS storage functions