1
0

Compare commits

...

7 Commits

Author SHA1 Message Date
Andrew Morgan
95a49727a6 Note that AS interest via room ID or alias isn't respected
Typing and read receipt events (those that are associated with a room)
should be sent to an AS that has registered interest in that room,
according to MSC2409:

231084da13/proposals/2409-appservice-edus.md (expectations-of-when-an-edu-should-be-pushed-to-an-appservice)
2021-10-20 18:23:56 +01:00
Andrew Morgan
1df714c21c newsfile 2021-10-20 18:04:44 +01:00
Andrew Morgan
e423a94b28 Improve docstring for format_user_presence_state 2021-10-20 17:58:53 +01:00
Andrew Morgan
0ce33e58fc Add comments to _handle* methods and those methods they call 2021-10-20 17:58:51 +01:00
Andrew Morgan
2fd99fefed Add comments to notify_interested_services_ephemeral 2021-10-20 17:42:43 +01:00
Andrew Morgan
1e5a0f2fea Add docstring to _notify_app_services_ephemeral 2021-10-20 17:11:45 +01:00
Andrew Morgan
795d0584f6 Add docstring to on_new_event 2021-10-20 17:11:45 +01:00
7 changed files with 147 additions and 18 deletions

1
changelog.d/11138.misc Normal file
View File

@@ -0,0 +1 @@
Add docstrings and comments to the application service ephemeral event sending code.

View File

@@ -185,19 +185,25 @@ class ApplicationServicesHandler:
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
"""
This is called by the notifier in the background when
an ephemeral event is handled by the homeserver.
This will determine which appservices
are interested in the event, and submit them.
Events will only be pushed to appservices
that have opted into ephemeral events
This will determine which appservices are
interested in the event, and submit them.
Args:
stream_key: The stream the event came from.
new_token: The latest stream token
users: The user(s) involved with the event.
When `stream_key` is "typing_key", "receipt_key" or "presence_key", events
will only be pushed to appservices that have opted into ephemeral events.
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
Any other value for `stream_key` will cause this function to return early.
new_token: The latest stream token.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
return
@@ -232,21 +238,34 @@ class ApplicationServicesHandler:
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
# Instead we simply grab the latest typing update in _handle_typing
# and, if it applies to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons
elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# Persist the latest handled stream token for this appservice
# TODO: We seem to update the stream token for each appservice,
# even if sending the ephemeral events to the appservice failed.
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
@@ -254,18 +273,49 @@ class ApplicationServicesHandler:
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Given an application service, determine which events it should receive
from the given typing event stream token and now.
Args:
service: The application service to check for which events it should receive.
new_token: The latest typing event stream token.
Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to the given application service.
"""
typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# token in the DB and instead fetch the latest typing event
# for appservices.
# TODO: It'd probably be more efficient to simply fetch the
# typing event with the given 'new_token' stream token and
# checking if the given service was interested, rather than
# iterating over all typing events and only grabbing the
# latest one.
from_key=new_token - 1,
)
return typing
async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
"""
Given an application service, determine which events it should receive
from those between the last-recorded typing event stream token for this
appservice and the latest one.
Args:
service: The application service to check for which events it should receive.
new_token: A typing event stream token. Typing events between this token and
the current event stream token will be checked.
Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to the given application service.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
@@ -278,6 +328,19 @@ class ApplicationServicesHandler:
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
"""
Given an application service and a list of users who should be receiving
presence updates, return a list of presence updates destined for the
application service.
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
Returns:
A list of json dictionaries containing data derived from the presence events
that should be sent to the given application service.
"""
events: List[JsonDict] = []
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(

View File

@@ -454,6 +454,10 @@ class DeviceHandler(DeviceWorkerHandler):
) -> None:
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.
Args:
user_id: The Matrix ID of the user who's device list has been updated.
device_ids: The device IDs that have changed.
"""
if not device_ids:
# No changes to notify about, so this is a no-op.

View File

@@ -1483,11 +1483,38 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients
"""Convert UserPresenceState to a JSON format that can be sent down to clients
and to other servers.
The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
Args:
state: The user presence state to format.
now: The current timestamp since the epoch in ms.
include_user_id: Whether to include `user_id` in the returned dictionary.
As this function can be used both to format presence updates for client /sync
responses and for federation /send requests, only the latter needs the include
the `user_id` field.
Returns:
A JSON dictionary with the following keys:
* presence: The presence state as a str.
* user_id: Optional. Included if `include_user_id` is truthy. The canonical
Matrix ID of the user.
* last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
The timestamp that the user was last active.
* status_msg: Optional. Included if `status_msg` is set on `state`. The user's
status.
* currently_active: Optional. Included only if `state.state` is "online". Set to
the value of `state.currently_active`.
Example:
{
"presence": "online",
"user_id": "@alice:example.com",
"last_active_ago": 16783813918,
"status_msg": "Hello world!",
"currently_active": True
}
"""
content: JsonDict = {"presence": state.state}
if include_user_id:

View File

@@ -241,12 +241,18 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
"""Returns a set of new read receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from read receipts that the
appservice may be interested in.
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()
@@ -261,6 +267,10 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
)
# Then filter down to rooms that the AS can read
# TODO: This doesn't seem to honour an appservice's registration of room or
# namespace aliases. For instance, if an appservice registered a room namespace
# that matched this room, but it didn't have any members in the room, then that
# appservice wouldn't receive the read receipt.
events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):

View File

@@ -467,15 +467,25 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from typing events that the
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()
events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue
# TODO: This doesn't seem to honour an appservice's registration of room or
# namespace aliases. For instance, if an appservice registered a room namespace
# that matched this room, but it didn't have any members in the room, then that
# appservice wouldn't receive the typing event.
if not await service.matches_user_in_member_list(
room_id, handler.store
):

View File

@@ -379,7 +379,14 @@ class Notifier:
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
):
) -> None:
"""Notify application services of ephemeral event activity.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
@@ -402,10 +409,17 @@ class Notifier:
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[Collection[str]] = None,
):
) -> None:
"""Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event.
rooms: A collection of room IDs for which each joined member will be
informed of the new event.
"""
users = users or []
rooms = rooms or []