Support sending device lists everywhere; needs cleaning up
This commit is contained in:
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
|
||||
from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id
|
||||
from synapse.util.caches.descriptors import _CacheContext, cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -305,10 +305,7 @@ class ApplicationService:
|
||||
return False
|
||||
|
||||
def is_user_in_namespace(self, user_id: str) -> bool:
|
||||
return (
|
||||
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
|
||||
or user_id == self.sender
|
||||
)
|
||||
return bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
|
||||
|
||||
def is_room_alias_in_namespace(self, alias: str) -> bool:
|
||||
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
|
||||
@@ -377,12 +374,14 @@ class AppServiceTransaction:
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
device_list_summary: DeviceLists,
|
||||
):
|
||||
self.service = service
|
||||
self.id = id
|
||||
self.events = events
|
||||
self.ephemeral = ephemeral
|
||||
self.to_device_messages = to_device_messages
|
||||
self.device_list_summary = device_list_summary
|
||||
|
||||
async def send(self, as_api: "ApplicationServiceApi") -> bool:
|
||||
"""Sends this transaction using the provided AS API interface.
|
||||
@@ -397,6 +396,7 @@ class AppServiceTransaction:
|
||||
events=self.events,
|
||||
ephemeral=self.ephemeral,
|
||||
to_device_messages=self.to_device_messages,
|
||||
device_list_summary=self.device_list_summary,
|
||||
txn_id=self.id,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -13,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import urllib
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -22,7 +23,7 @@ from synapse.api.errors import CodeMessageException
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.types import JsonDict, ThirdPartyInstanceID
|
||||
from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -205,6 +206,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
device_list_summary: DeviceLists,
|
||||
txn_id: Optional[int] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
@@ -233,7 +235,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
|
||||
|
||||
# Never send ephemeral events to appservices that do not support it
|
||||
body: Dict[str, List[JsonDict]] = {"events": serialized_events}
|
||||
body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events}
|
||||
if service.supports_ephemeral:
|
||||
body.update(
|
||||
{
|
||||
@@ -243,6 +245,19 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
}
|
||||
)
|
||||
|
||||
# Send device list summaries if needed
|
||||
if device_list_summary:
|
||||
logger.info("Sending device list summary: %s", device_list_summary)
|
||||
body.update(
|
||||
{
|
||||
# TODO: Update to stable prefix once MSC3202 completes FCP merge
|
||||
"org.matrix.msc3202.device_lists": {
|
||||
"changed": list(device_list_summary.changed),
|
||||
"left": list(device_list_summary.left),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
await self.put_json(
|
||||
uri=uri,
|
||||
|
||||
@@ -54,7 +54,7 @@ from synapse.appservice import ApplicationService, ApplicationServiceState
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import JsonDict
|
||||
from synapse.types import DeviceLists, JsonDict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -100,9 +100,11 @@ class ApplicationServiceScheduler:
|
||||
events: Optional[Iterable[EventBase]] = None,
|
||||
ephemeral: Optional[Iterable[JsonDict]] = None,
|
||||
to_device_messages: Optional[Iterable[JsonDict]] = None,
|
||||
device_list_summary: Optional[DeviceLists] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Enqueue some data to be sent off to an application service.
|
||||
|
||||
Args:
|
||||
appservice: The application service to create and send a transaction to.
|
||||
events: The persistent room events to send.
|
||||
@@ -110,10 +112,18 @@ class ApplicationServiceScheduler:
|
||||
to_device_messages: The to-device messages to send. These differ from normal
|
||||
to-device messages sent to clients, as they have 'to_device_id' and
|
||||
'to_user_id' fields.
|
||||
device_list_summary: A summary of users that the application service either needs
|
||||
to refresh the device lists of, or those that the application service need no
|
||||
longer track the device lists of.
|
||||
"""
|
||||
# We purposefully allow this method to run with empty events/ephemeral
|
||||
# iterables, so that callers do not need to check iterable size themselves.
|
||||
if not events and not ephemeral and not to_device_messages:
|
||||
if (
|
||||
not events
|
||||
and not ephemeral
|
||||
and not to_device_messages
|
||||
and not device_list_summary
|
||||
):
|
||||
return
|
||||
|
||||
if events:
|
||||
@@ -124,6 +134,10 @@ class ApplicationServiceScheduler:
|
||||
self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
|
||||
to_device_messages
|
||||
)
|
||||
if device_list_summary:
|
||||
self.queuer.queued_device_list_summaries.setdefault(
|
||||
appservice.id, []
|
||||
).append(device_list_summary)
|
||||
|
||||
# Kick off a new application service transaction
|
||||
self.queuer.start_background_request(appservice)
|
||||
@@ -144,6 +158,8 @@ class _ServiceQueuer:
|
||||
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
|
||||
# dict of {service_id: [to_device_message_json]}
|
||||
self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
|
||||
# dict of {service_id: [device_list_summary]}
|
||||
self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
@@ -183,12 +199,49 @@ class _ServiceQueuer:
|
||||
]
|
||||
del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
|
||||
|
||||
if not events and not ephemeral and not to_device_messages_to_send:
|
||||
# Consolidate any pending device list summaries into a single, up-to-date
|
||||
# summary.
|
||||
# Note: this code assumes that in a single DeviceLists, a user will
|
||||
# never be in both "changed" and "left" sets.
|
||||
device_list_summary = DeviceLists()
|
||||
while self.queued_device_list_summaries.get(service.id, []):
|
||||
# Pop a summary off the front of the queue
|
||||
summary = self.queued_device_list_summaries[service.id].pop(0)
|
||||
|
||||
# For every user in the incoming "changed" set:
|
||||
# * Remove them from the existing "left" set if necessary
|
||||
# (as we need to start tracking them again)
|
||||
# * Add them to the existing "changed" set if necessary.
|
||||
for user_id in summary.changed:
|
||||
if user_id in device_list_summary.left:
|
||||
device_list_summary.left.remove(user_id)
|
||||
device_list_summary.changed.add(user_id)
|
||||
|
||||
# For every user in the incoming "left" set:
|
||||
# * Remove them from the existing "changed" set if necessary
|
||||
# (we no longer need to track them)
|
||||
# * Add them to the existing "left" set if necessary.
|
||||
for user_id in summary.left:
|
||||
if user_id in device_list_summary.changed:
|
||||
device_list_summary.changed.remove(user_id)
|
||||
device_list_summary.left.add(user_id)
|
||||
|
||||
if (
|
||||
not events
|
||||
and not ephemeral
|
||||
and not to_device_messages_to_send
|
||||
# Note that DeviceLists implements __bool__
|
||||
and not device_list_summary
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
await self.txn_ctrl.send(
|
||||
service, events, ephemeral, to_device_messages_to_send
|
||||
service,
|
||||
events,
|
||||
ephemeral,
|
||||
to_device_messages_to_send,
|
||||
device_list_summary,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("AS request failed")
|
||||
@@ -227,6 +280,7 @@ class _TransactionController:
|
||||
events: List[EventBase],
|
||||
ephemeral: Optional[List[JsonDict]] = None,
|
||||
to_device_messages: Optional[List[JsonDict]] = None,
|
||||
device_list_summary: Optional[DeviceLists] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Create a transaction with the given data and send to the provided
|
||||
@@ -237,6 +291,7 @@ class _TransactionController:
|
||||
events: The persistent events to include in the transaction.
|
||||
ephemeral: The ephemeral events to include in the transaction.
|
||||
to_device_messages: The to-device messages to include in the transaction.
|
||||
device_list_summary: The device list summary to include in the transaction.
|
||||
"""
|
||||
try:
|
||||
txn = await self.store.create_appservice_txn(
|
||||
@@ -244,6 +299,7 @@ class _TransactionController:
|
||||
events=events,
|
||||
ephemeral=ephemeral or [],
|
||||
to_device_messages=to_device_messages or [],
|
||||
device_list_summary=device_list_summary or DeviceLists(),
|
||||
)
|
||||
service_is_up = await self._is_service_up(service)
|
||||
if service_is_up:
|
||||
|
||||
@@ -33,9 +33,8 @@ from synapse.metrics.background_process_metrics import (
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.storage.databases.main.directory import RoomAliasMapping
|
||||
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
|
||||
from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.descriptors import _CacheContext, cached
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -344,22 +343,16 @@ class ApplicationServicesHandler:
|
||||
)
|
||||
|
||||
elif stream_key == "device_list_key":
|
||||
users_whose_device_lists_changed = await self._get_device_list_changes(
|
||||
device_list_summary = await self._get_device_list_summary(
|
||||
service, new_token
|
||||
)
|
||||
if users_whose_device_lists_changed:
|
||||
# TODO: Have a way of including things in an outgoing appservice
|
||||
# transaction that's not "events" or "ephemeral"
|
||||
payload = [{
|
||||
"changed": users_whose_device_lists_changed,
|
||||
"left": [],
|
||||
}]
|
||||
self.scheduler.submit_ephemeral_events_for_as(
|
||||
service, payload
|
||||
if device_list_summary:
|
||||
self.scheduler.enqueue_for_appservice(
|
||||
service, device_list_summary=device_list_summary
|
||||
)
|
||||
|
||||
# Persist the latest handled stream token for this appservice
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
await self.store.set_appservice_stream_type_pos(
|
||||
service, "device_list", new_token
|
||||
)
|
||||
|
||||
@@ -568,11 +561,11 @@ class ApplicationServicesHandler:
|
||||
|
||||
return message_payload
|
||||
|
||||
async def _get_device_list_changes(
|
||||
async def _get_device_list_summary(
|
||||
self,
|
||||
appservice: ApplicationService,
|
||||
new_key: int,
|
||||
) -> List[str]:
|
||||
) -> DeviceLists:
|
||||
"""
|
||||
Retrieve a list of users who have changed their device lists.
|
||||
|
||||
@@ -581,8 +574,9 @@ class ApplicationServicesHandler:
|
||||
new_key: The stream key of the device list change that triggered this method call.
|
||||
|
||||
Returns:
|
||||
A list of users whose device lists have changed and need to be resynced by the
|
||||
appservice.
|
||||
A set of device list updates, comprised of users that the appservices needs to:
|
||||
* resync the device list of, and
|
||||
* stop tracking the device list of.
|
||||
"""
|
||||
# Fetch the last successfully processed device list update stream ID
|
||||
# for this appservice.
|
||||
@@ -591,21 +585,31 @@ class ApplicationServicesHandler:
|
||||
)
|
||||
|
||||
# Fetch the users who have modified their device list since then.
|
||||
users_with_changed_device_lists = await self.store.get_users_whose_devices_changed(
|
||||
from_key, filter_user_ids=None, to_key=new_key
|
||||
users_with_changed_device_lists = (
|
||||
await self.store.get_users_whose_devices_changed(
|
||||
from_key, filter_user_ids=None, to_key=new_key
|
||||
)
|
||||
)
|
||||
|
||||
# Filter out any users the application service is not interested in
|
||||
#
|
||||
# For each user who changed their device list, we want to check whether this
|
||||
# appservice would be interested in the change
|
||||
filtered_users_with_changed_device_lists = [
|
||||
# appservice would be interested in the change.
|
||||
filtered_users_with_changed_device_lists = {
|
||||
user_id
|
||||
for user_id in users_with_changed_device_lists
|
||||
if self._is_appservice_interested_in_device_lists_of_user(appservice, user_id)
|
||||
]
|
||||
if self._is_appservice_interested_in_device_lists_of_user(
|
||||
appservice, user_id
|
||||
)
|
||||
}
|
||||
|
||||
return filtered_users_with_changed_device_lists
|
||||
# Create a summary of "changed" and "left" users.
|
||||
# TODO: Calculate "left" users.
|
||||
device_list_summary = DeviceLists(
|
||||
changed=filtered_users_with_changed_device_lists
|
||||
)
|
||||
|
||||
return device_list_summary
|
||||
|
||||
async def _is_appservice_interested_in_device_lists_of_user(
|
||||
self,
|
||||
@@ -641,9 +645,7 @@ class ApplicationServicesHandler:
|
||||
for room_id in room_ids:
|
||||
# This method covers checking room members for appservice interest as well as
|
||||
# room ID and alias checks.
|
||||
if await appservice.is_interested_in_room(
|
||||
room_id, self.store
|
||||
):
|
||||
if await appservice.is_interested_in_room(room_id, self.store):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@@ -27,7 +27,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
from synapse.storage.types import Connection
|
||||
from synapse.types import JsonDict
|
||||
from synapse.types import DeviceLists, JsonDict
|
||||
from synapse.util import json_encoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -195,6 +195,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
to_device_messages: List[JsonDict],
|
||||
device_list_summary: DeviceLists,
|
||||
) -> AppServiceTransaction:
|
||||
"""Atomically creates a new transaction for this application service
|
||||
with the given list of events. Ephemeral events are NOT persisted to the
|
||||
@@ -205,6 +206,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
events: A list of persistent events to put in the transaction.
|
||||
ephemeral: A list of ephemeral events to put in the transaction.
|
||||
to_device_messages: A list of to-device messages to put in the transaction.
|
||||
device_list_summary: The device list summary to include in the transaction.
|
||||
|
||||
Returns:
|
||||
A new transaction.
|
||||
@@ -240,6 +242,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
events=events,
|
||||
ephemeral=ephemeral,
|
||||
to_device_messages=to_device_messages,
|
||||
device_list_summary=device_list_summary,
|
||||
)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
@@ -337,6 +340,7 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
events=events,
|
||||
ephemeral=[],
|
||||
to_device_messages=[],
|
||||
device_list_summary=DeviceLists(),
|
||||
)
|
||||
|
||||
def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
|
||||
|
||||
@@ -555,7 +555,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
||||
}
|
||||
|
||||
async def get_users_whose_devices_changed(
|
||||
self, from_key: int, filter_user_ids: Optional[Iterable[str]] = None, to_key: Optional[int] = None
|
||||
self,
|
||||
from_key: int,
|
||||
filter_user_ids: Optional[Iterable[str]] = None,
|
||||
to_key: Optional[int] = None,
|
||||
) -> Set[str]:
|
||||
"""Get set of users whose devices have changed since `from_key` that
|
||||
are in the given list of user_ids.
|
||||
|
||||
Reference in New Issue
Block a user