1
0

Insert to-device messages into the new to-device part of AS txns

This commit is contained in:
Andrew Morgan
2021-12-03 19:22:22 +00:00
committed by Olivier Wilkinson (reivilibre)
parent 1447897ac8
commit 74d29c1fd7
2 changed files with 32 additions and 7 deletions

View File

@@ -71,6 +71,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
# Maximum number of to-device messages to provide in an AS transaction.
MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
@@ -102,6 +105,7 @@ class ApplicationServiceScheduler:
appservice: ApplicationService,
events: Optional[Iterable[EventBase]] = None,
ephemeral: Optional[Iterable[JsonDict]] = None,
to_device_messages: Optional[Iterable[JsonDict]] = None,
) -> None:
"""
Enqueue some data to be sent off to an application service.
@@ -109,6 +113,9 @@ class ApplicationServiceScheduler:
appservice: The application service to create and send a transaction to.
events: The persistent room events to send.
ephemeral: The ephemeral events to send.
to_device_messages: The to-device messages to send. These differ from normal
to-device messages sent to clients, as they have 'to_device_id' and
'to_user_id' fields.
"""
# We purposefully allow this method to run with empty events/ephemeral
# iterables, so that callers do not need to check iterable size themselves.
@@ -119,6 +126,10 @@ class ApplicationServiceScheduler:
self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
if ephemeral:
self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
if to_device_messages:
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
to_device_messages
)
# Kick off a new application service transaction
self.queuer.start_background_request(appservice)
@@ -137,7 +148,8 @@ class _ServiceQueuer:
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [event_json]}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
# dict of {service_id: [to_device_message_json]}
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
# the appservices which currently have a transaction in flight
self.requests_in_flight: Set[str] = set()
@@ -169,11 +181,21 @@ class _ServiceQueuer:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
if not events and not ephemeral:
all_to_device_messages = self.queued_to_device_messages.get(
service.id, []
)
to_device_messages_to_send = all_to_device_messages[
:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
]
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
if not events and not ephemeral and not to_device_messages_to_send:
return
try:
await self.txn_ctrl.send(service, events, ephemeral)
await self.txn_ctrl.send(
service, events, ephemeral, to_device_messages_to_send
)
except Exception:
logger.exception("AS request failed")
finally:
@@ -205,6 +227,7 @@ class _TransactionController:
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
@@ -214,10 +237,14 @@ class _TransactionController:
service: The application service to send the transaction to.
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral or []
service=service,
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
)
service_is_up = await self._is_service_up(service)
if service_is_up:

View File

@@ -327,10 +327,8 @@ class ApplicationServicesHandler:
to_device_messages = await self._get_to_device_messages(
service, new_token, users
)
# REVIEW: In a subsequent commit, we'll move this to a to-device-specific
# key in the AS transaction.
self.scheduler.enqueue_for_appservice(
service, ephemeral=to_device_messages
service, to_device_messages=to_device_messages
)
# Persist the latest handled stream token for this appservice