generalise sending application service transactions and allow to-device
This commit is contained in:
@@ -329,11 +329,13 @@ class AppServiceTransaction:
|
||||
id: int,
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
):
|
||||
self.service = service
|
||||
self.id = id
|
||||
self.events = events
|
||||
self.ephemeral = ephemeral
|
||||
self.to_device_messages = to_device_messages
|
||||
|
||||
async def send(self, as_api: "ApplicationServiceApi") -> bool:
|
||||
"""Sends this transaction using the provided AS API interface.
|
||||
@@ -347,6 +349,7 @@ class AppServiceTransaction:
|
||||
service=self.service,
|
||||
events=self.events,
|
||||
ephemeral=self.ephemeral,
|
||||
to_device_messages=self.to_device_messages,
|
||||
txn_id=self.id,
|
||||
)
|
||||
|
||||
|
||||
@@ -204,8 +204,23 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
service: "ApplicationService",
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
txn_id: Optional[int] = None,
|
||||
):
|
||||
) -> bool:
|
||||
"""
|
||||
Push data to an application service.
|
||||
|
||||
Args:
|
||||
service: The application service to send to.
|
||||
events: The persistent events to send.
|
||||
ephemeral: The ephemeral events to send.
|
||||
to_device_messages: The to-device messages to send.
|
||||
txn_id: An unique ID to assign to this transaction. Application services should
|
||||
deduplicate transactions received with identitical IDs.
|
||||
|
||||
Returns:
|
||||
True if the task succeeded, False if it failed.
|
||||
"""
|
||||
if service.url is None:
|
||||
return True
|
||||
|
||||
@@ -220,10 +235,16 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
|
||||
|
||||
# Never send ephemeral events to appservices that do not support it
|
||||
body = {"events": events}
|
||||
|
||||
if service.supports_ephemeral:
|
||||
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
|
||||
else:
|
||||
body = {"events": events}
|
||||
body.update(
|
||||
{
|
||||
# TODO: Update to stable prefixes once MSC2409 completes FCP merge.
|
||||
"de.sorunome.msc2409.ephemeral": ephemeral,
|
||||
"de.sorunome.msc2409.to_device": to_device_messages,
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
await self.put_json(
|
||||
|
||||
@@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
|
||||
components.
|
||||
"""
|
||||
import logging
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
from synapse.appservice import ApplicationService, ApplicationServiceState
|
||||
from synapse.events import EventBase
|
||||
@@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
|
||||
# Maximum number of ephemeral events to provide in an AS transaction.
|
||||
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
|
||||
|
||||
# Maximum number of to-device messages to provide in an AS transaction.
|
||||
MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
|
||||
|
||||
|
||||
class ApplicationServiceScheduler:
|
||||
"""Public facing API for this module. Does the required DI to tie the
|
||||
@@ -91,25 +94,38 @@ class ApplicationServiceScheduler:
|
||||
for service in services:
|
||||
self.txn_ctrl.start_recoverer(service)
|
||||
|
||||
def submit_event_for_as(self, service: ApplicationService, event: EventBase):
|
||||
self.queuer.enqueue_event(service, event)
|
||||
|
||||
def submit_ephemeral_events_for_as(
|
||||
self, service: ApplicationService, events: Iterable[JsonDict]
|
||||
def enqueue_for_appservice(
|
||||
self,
|
||||
appservice: ApplicationService,
|
||||
events: Optional[Iterable[EventBase]] = None,
|
||||
ephemeral: Optional[Iterable[JsonDict]] = None,
|
||||
to_device_messages: Optional[Iterable[JsonDict]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Send ephemeral events to application services, and schedule a new
|
||||
outgoing AS transaction.
|
||||
Enqueue some data to be sent off to an application service.
|
||||
|
||||
Args:
|
||||
service: The service to send ephemeral events to.
|
||||
events: The ephemeral events to send.
|
||||
appservice: The application service to create and send a transaction to.
|
||||
events: The persistent room events to send.
|
||||
ephemeral: The ephemeral events to send.
|
||||
to_device_messages: The to-device messages to send. These differ from normal
|
||||
to-device messages sent to clients, as they have 'to_device_id' and
|
||||
'to_user_id' fields.
|
||||
"""
|
||||
# Ensure we have some events to send
|
||||
if not events:
|
||||
return
|
||||
# Note that we purposefully allow this method to run with empty events/ephemeral
|
||||
# iterables, and it helps the ergonomics of callers.
|
||||
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
if events:
|
||||
self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
|
||||
if ephemeral:
|
||||
self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
|
||||
if to_device_messages:
|
||||
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
|
||||
to_device_messages
|
||||
)
|
||||
|
||||
# Kick off a new application service transaction
|
||||
self.queuer.start_background_request(appservice)
|
||||
|
||||
|
||||
class _ServiceQueuer:
|
||||
@@ -121,15 +137,19 @@ class _ServiceQueuer:
|
||||
"""
|
||||
|
||||
def __init__(self, txn_ctrl, clock):
|
||||
self.queued_events = {} # dict of {service_id: [events]}
|
||||
self.queued_ephemeral = {} # dict of {service_id: [events]}
|
||||
# dict of {service_id: [events]}
|
||||
self.queued_events: Dict[str, List[EventBase]] = {}
|
||||
# dict of {service_id: [event_json]}
|
||||
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
|
||||
# dict of {service_id: [to_device_message_json]}
|
||||
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
self.txn_ctrl = txn_ctrl
|
||||
self.clock = clock
|
||||
|
||||
def _start_background_request(self, service):
|
||||
def start_background_request(self, service):
|
||||
# start a sender for this appservice if we don't already have one
|
||||
if service.id in self.requests_in_flight:
|
||||
return
|
||||
@@ -138,16 +158,6 @@ class _ServiceQueuer:
|
||||
"as-sender-%s" % (service.id,), self._send_request, service
|
||||
)
|
||||
|
||||
def enqueue_event(self, service: ApplicationService, event: EventBase):
|
||||
self.queued_events.setdefault(service.id, []).append(event)
|
||||
self._start_background_request(service)
|
||||
|
||||
def enqueue_ephemeral(
|
||||
self, service: ApplicationService, events: Iterable[JsonDict]
|
||||
) -> None:
|
||||
self.queued_ephemeral.setdefault(service.id, []).extend(events)
|
||||
self._start_background_request(service)
|
||||
|
||||
async def _send_request(self, service: ApplicationService):
|
||||
# sanity-check: we shouldn't get here if this service already has a sender
|
||||
# running.
|
||||
@@ -164,11 +174,21 @@ class _ServiceQueuer:
|
||||
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
|
||||
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
|
||||
|
||||
if not events and not ephemeral:
|
||||
all_to_device_messages = self.queued_to_device_messages.get(
|
||||
service.id, []
|
||||
)
|
||||
to_device_messages_to_send = all_to_device_messages[
|
||||
:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
|
||||
]
|
||||
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
|
||||
|
||||
if not events and not ephemeral and not to_device_messages_to_send:
|
||||
return
|
||||
|
||||
try:
|
||||
await self.txn_ctrl.send(service, events, ephemeral)
|
||||
await self.txn_ctrl.send(
|
||||
service, events, ephemeral, to_device_messages_to_send
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("AS request failed")
|
||||
finally:
|
||||
@@ -205,10 +225,24 @@ class _TransactionController:
|
||||
service: ApplicationService,
|
||||
events: List[EventBase],
|
||||
ephemeral: Optional[List[JsonDict]] = None,
|
||||
):
|
||||
to_device_messages: Optional[List[JsonDict]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Create a transaction with the given data and send to the provided
|
||||
application service.
|
||||
|
||||
Args:
|
||||
service: The application service to send the transaction to.
|
||||
events: The persistent events to include in the transaction.
|
||||
ephemeral: The ephemeral events to include in the transaction.
|
||||
to_device_messages: The to-device messages to include in the transaction.
|
||||
"""
|
||||
try:
|
||||
txn = await self.store.create_appservice_txn(
|
||||
service=service, events=events, ephemeral=ephemeral or []
|
||||
service=service,
|
||||
events=events,
|
||||
ephemeral=ephemeral or [],
|
||||
to_device_messages=to_device_messages or [],
|
||||
)
|
||||
service_is_up = await self._is_service_up(service)
|
||||
if service_is_up:
|
||||
|
||||
@@ -135,7 +135,9 @@ class ApplicationServicesHandler:
|
||||
|
||||
# Fork off pushes to these services
|
||||
for service in services:
|
||||
self.scheduler.submit_event_for_as(service, event)
|
||||
self.scheduler.enqueue_for_appservice(
|
||||
service, events=[event]
|
||||
)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
ts = await self.store.get_received_ts(event.event_id)
|
||||
@@ -292,7 +294,7 @@ class ApplicationServicesHandler:
|
||||
# and, if they apply to this application service, send it off.
|
||||
events = await self._handle_typing(service, new_token)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
|
||||
continue
|
||||
|
||||
# Since we read/update the stream position for this AS/stream
|
||||
@@ -303,7 +305,7 @@ class ApplicationServicesHandler:
|
||||
):
|
||||
if stream_key == "receipt_key":
|
||||
events = await self._handle_receipts(service, new_token)
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_appservice_stream_type_pos(
|
||||
@@ -312,7 +314,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
elif stream_key == "presence_key":
|
||||
events = await self._handle_presence(service, users, new_token)
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_appservice_stream_type_pos(
|
||||
@@ -325,8 +327,8 @@ class ApplicationServicesHandler:
|
||||
to_device_messages = await self._get_to_device_messages(
|
||||
service, new_token, users
|
||||
)
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, to_device_messages
|
||||
self.scheduler.enqueue_for_appservice(
|
||||
service, to_device_messages=to_device_messages
|
||||
)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
|
||||
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
service: ApplicationService,
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
) -> AppServiceTransaction:
|
||||
"""Atomically creates a new transaction for this application service
|
||||
with the given list of events. Ephemeral events are NOT persisted to the
|
||||
@@ -203,6 +204,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
service: The service who the transaction is for.
|
||||
events: A list of persistent events to put in the transaction.
|
||||
ephemeral: A list of ephemeral events to put in the transaction.
|
||||
to_device_messages: A list of to-device messages to put in the transaction.
|
||||
|
||||
Returns:
|
||||
A new transaction.
|
||||
@@ -233,7 +235,11 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
(service.id, new_txn_id, event_ids),
|
||||
)
|
||||
return AppServiceTransaction(
|
||||
service=service, id=new_txn_id, events=events, ephemeral=ephemeral
|
||||
service=service,
|
||||
id=new_txn_id,
|
||||
events=events,
|
||||
ephemeral=ephemeral,
|
||||
to_device_messages=to_device_messages,
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
|
||||
Reference in New Issue
Block a user