Deduplicate ephemeral events to send conditional
Test cases needed to be updated, as we now always call submit_ephemeral_events_for_as, it may just be with an empty events list.
This commit is contained in:
@@ -97,6 +97,18 @@ class ApplicationServiceScheduler:
|
||||
def submit_ephemeral_events_for_as(
|
||||
self, service: ApplicationService, events: Iterable[JsonDict]
|
||||
) -> None:
|
||||
"""
|
||||
Send ephemeral events to application services, and schedule a new
|
||||
outgoing AS transaction.
|
||||
|
||||
Args:
|
||||
service: The service to send ephemeral events to.
|
||||
events: The ephemeral events to send.
|
||||
"""
|
||||
# Ensure we have some events to send
|
||||
if not events:
|
||||
return
|
||||
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
|
||||
|
||||
|
||||
@@ -302,10 +302,7 @@ class ApplicationServicesHandler:
|
||||
):
|
||||
if stream_key == "receipt_key":
|
||||
events = await self._handle_receipts(service, new_token)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, events
|
||||
)
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
@@ -314,10 +311,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
elif stream_key == "presence_key":
|
||||
events = await self._handle_presence(service, users, new_token)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, events
|
||||
)
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
@@ -328,10 +322,7 @@ class ApplicationServicesHandler:
|
||||
# Retrieve a list of to-device message events, as well as the
|
||||
# maximum stream token of the messages we were able to retrieve.
|
||||
events = await self._handle_to_device(service, new_token, users)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, events
|
||||
)
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
|
||||
@@ -47,6 +47,12 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.handler = ApplicationServicesHandler(hs)
|
||||
self.event_source = hs.get_event_sources()
|
||||
|
||||
# Mock the ApplicationServiceScheduler queuer so that we can track any
|
||||
# outgoing ephemeral events
|
||||
self.mock_service_queuer = Mock()
|
||||
self.mock_service_queuer.enqueue_ephemeral = Mock()
|
||||
hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer
|
||||
|
||||
def test_notify_interested_services(self):
|
||||
interested_service = self._mkservice(is_interested=True)
|
||||
services = [
|
||||
@@ -279,7 +285,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.handler.notify_interested_services_ephemeral(
|
||||
"receipt_key", 580, ["@fakerecipient:example.com"]
|
||||
)
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
|
||||
self.mock_service_queuer.enqueue_ephemeral.assert_called_once_with(
|
||||
interested_service, [event]
|
||||
)
|
||||
self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
|
||||
@@ -309,7 +315,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.handler.notify_interested_services_ephemeral(
|
||||
"receipt_key", 580, ["@fakerecipient:example.com"]
|
||||
)
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
|
||||
self.mock_service_queuer.enqueue_ephemeral.assert_not_called()
|
||||
|
||||
def _mkservice(self, is_interested, protocols=None):
|
||||
service = Mock()
|
||||
@@ -337,12 +343,11 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
# Mock the application service scheduler so that we can track any
|
||||
# outgoing transactions
|
||||
self.mock_scheduler = Mock()
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as = Mock()
|
||||
|
||||
hs.get_application_service_handler().scheduler = self.mock_scheduler
|
||||
# Mock the ApplicationServiceScheduler queuer so that we can track any
|
||||
# outgoing ephemeral events
|
||||
self.mock_service_queuer = Mock()
|
||||
self.mock_service_queuer.enqueue_ephemeral = Mock()
|
||||
hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer
|
||||
|
||||
self.device1 = "device1"
|
||||
self.user1 = self.register_user("user1", "password")
|
||||
@@ -391,10 +396,8 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
|
||||
# Only the user1 -> user2 to-device message should have been forwarded to the AS.
|
||||
#
|
||||
# The uninterested application service should not have been notified at all.
|
||||
self.assertEqual(
|
||||
1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count
|
||||
)
|
||||
service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[
|
||||
self.mock_service_queuer.enqueue_ephemeral.assert_called_once()
|
||||
service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[
|
||||
0
|
||||
]
|
||||
|
||||
@@ -481,14 +484,14 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(chan.code, 200, chan.result)
|
||||
|
||||
self.mock_service_queuer.enqueue_ephemeral.assert_called()
|
||||
|
||||
# Count the total number of to-device messages that were sent out per-service.
|
||||
# Ensure that we only sent to-device messages to interested services, and that
|
||||
# each interested service received the full count of to-device messages.
|
||||
service_id_to_message_count: Dict[str, int] = {}
|
||||
self.assertGreater(
|
||||
self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0
|
||||
)
|
||||
for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list:
|
||||
|
||||
for call in self.mock_service_queuer.enqueue_ephemeral.call_args_list:
|
||||
service, events = call[0]
|
||||
|
||||
# Check that this was made to an interested service
|
||||
|
||||
Reference in New Issue
Block a user