1
0

Refactor and generalise the sending of arbitrary fields over AS transactions

Things were starting to get a little inflexible as we kept adding new
types of data to send to application services. It's better to just
have one method for adding data to AS transactions, than one for
each type of data.

Note that subsequent PRs will need to add device lists, one-time keys
and fallback keys to these transactions. Adding those are additional
arguments to a method is much nicer than a new method for each one.

Plus with this setup we can add multiple different types of data at
once without kicking off an AS transaction for each type. This will
be useful for OTK/fallback keys, as we plan to lazily attach those
when handling other event types.
This commit is contained in:
Andrew Morgan
2021-12-03 20:00:30 +00:00
committed by Olivier Wilkinson (reivilibre)
parent 49bbf65796
commit 8375272fa3
2 changed files with 42 additions and 35 deletions

View File

@@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, List, Optional, Set
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice.api import ApplicationServiceApi
@@ -97,27 +97,31 @@ class ApplicationServiceScheduler:
for service in services:
self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(
self, service: ApplicationService, event: EventBase
) -> None:
self.queuer.enqueue_event(service, event)
def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: Iterable[JsonDict]
def enqueue_for_appservice(
self,
appservice: ApplicationService,
events: Optional[Iterable[EventBase]] = None,
ephemeral: Optional[Iterable[JsonDict]] = None,
) -> None:
"""
Send ephemeral events to application services, and schedule a new
outgoing AS transaction.
Enqueue some data to be sent off to an application service.
Args:
service: The service to send ephemeral events to.
events: The ephemeral events to send.
appservice: The application service to create and send a transaction to.
events: The persistent room events to send.
ephemeral: The ephemeral events to send.
"""
# Ensure we have some events to send
if not events:
# We purposefully allow this method to run with empty events/ephemeral
# iterables, so that callers do not need to check iterable size themselves.
if not events and not ephemeral and not to_device_messages:
return
self.queuer.enqueue_ephemeral(service, events)
if events:
self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
if ephemeral:
self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
# Kick off a new application service transaction
self.queuer.start_background_request(appservice)
class _ServiceQueuer:
@@ -131,7 +135,8 @@ class _ServiceQueuer:
def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
# dict of {service_id: [events]}
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [events]}
# dict of {service_id: [event_json]}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
# the appservices which currently have a transaction in flight
@@ -139,7 +144,7 @@ class _ServiceQueuer:
self.txn_ctrl = txn_ctrl
self.clock = clock
def _start_background_request(self, service: ApplicationService) -> None:
def start_background_request(self, service: ApplicationService) -> None:
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
@@ -148,17 +153,7 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service
)
def enqueue_event(self, service: ApplicationService, event: EventBase) -> None:
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)
def enqueue_ephemeral(
self, service: ApplicationService, events: Iterable[JsonDict]
) -> None:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)
async def _send_request(self, service: ApplicationService) -> None:
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
@@ -211,6 +206,14 @@ class _TransactionController:
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
application service.
Args:
service: The application service to send the transaction to.
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
service=service, events=events, ephemeral=ephemeral or []

View File

@@ -135,7 +135,9 @@ class ApplicationServicesHandler:
# Fork off pushes to these services
for service in services:
self.scheduler.submit_event_for_as(service, event)
self.scheduler.enqueue_for_appservice(
service, events=[event]
)
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -292,7 +294,7 @@ class ApplicationServicesHandler:
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
continue
# Since we read/update the stream position for this AS/stream
@@ -303,7 +305,7 @@ class ApplicationServicesHandler:
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
self.scheduler.submit_ephemeral_events_for_as(service, events)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
await self.store.set_appservice_stream_type_pos(
@@ -312,7 +314,7 @@ class ApplicationServicesHandler:
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
self.scheduler.submit_ephemeral_events_for_as(service, events)
self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
await self.store.set_appservice_stream_type_pos(
@@ -325,8 +327,10 @@ class ApplicationServicesHandler:
to_device_messages = await self._get_to_device_messages(
service, new_token, users
)
self.scheduler.submit_ephemeral_events_for_as(
service, to_device_messages
# REVIEW: In a subsequent commit, we'll move this to a to-device-specific
# key in the AS transaction.
self.scheduler.enqueue_for_appservice(
service, ephemeral=to_device_messages
)
# Persist the latest handled stream token for this appservice