Compare commits

...

6 Commits

Author SHA1 Message Date
Andrew Morgan
2a78554196 Remove _notify_app_services_ephemeral; convert token in on_new_event instead
This is a small effort to reduce the layers of methods we have here.

Also, I've opted to continue converting the RoomStreamToken to an int,
but only for when tracking stream tokens of EDUs. As stated in the
comment, this shouldn't have any gaps as long as we *always* convert to
minimum value.
2021-10-26 14:47:17 +01:00
Andrew Morgan
b194d47d7c wip - use int everywhere instead 2021-10-26 12:03:29 +01:00
Andrew Morgan
4821daf312 Add missing return type hints for stream ids
While searching around for all types that were eventually passed into on_new_event,
I found a couple methods which didn't specify the type they returned. So this commit
adds return type hints for those methods.

I didn't do any other arguments as I plan to type-hint those files later anyways.
2021-10-22 17:57:26 +01:00
Andrew Morgan
9abbd08e33 Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e_as_room_stream_token 2021-10-21 17:49:38 +01:00
Andrew Morgan
b0d775ad59 Changelog 2021-10-20 16:36:07 +01:00
Andrew Morgan
4d6af01a54 Fix providing a RoomStreamToken instance to _notify_app_services_ephemeral
Previously, if a RoomStreamToken object were provided, new_token would
be set to `None`. In the same changeset, which was intended to prevent
mypy from failing, `_notify_app_services_ephemeral` was configured
to allow `new_token` to be an Optional[int]
(559974fb4bc1872abced395de22ffbd2292b2f8b).

However, if `None` is provided, `_notify_app_services_ephemeral` will
end up passing `None` to `set_type_stream_id_for_appservice`, which will
effectively clear each appservice's stream token for the given
`stream_key`. This seems like a bug to do every time we have a
RoomStreamToken rather than an int.

Instead, I'm converting the RoomStreamToken to an int using
RoomStreamToken.stream. I think this is the right way to convert to an
int. If RoomStreamToken this ends up being a vector clock, `stream` will
be the minimum stream token amongst all workers... which I think is fine?
2021-10-20 16:20:11 +01:00
5 changed files with 25 additions and 38 deletions

1
changelog.d/11137.bugfix Normal file
View File

@@ -0,0 +1 @@
Partially address occasional bursts of old read receipt and presence data being sent to application services that have opted in to receiving them.

View File

@@ -182,7 +182,7 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""
@@ -203,7 +203,7 @@ class ApplicationServicesHandler:
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
new_token: The latest stream token.
new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
@@ -212,6 +212,13 @@ class ApplicationServicesHandler:
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
# Convert new_token from a RoomStreamToken to an int if necessary.
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
if isinstance(new_token, RoomStreamToken):
new_token = new_token.stream
services = [
service
for service in self.store.get_app_services()
@@ -231,14 +238,13 @@ class ApplicationServicesHandler:
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.

View File

@@ -271,7 +271,7 @@ class Notifier:
self,
event: EventBase,
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
max_room_stream_token: int,
extra_users: Optional[Collection[UserID]] = None,
):
"""Unwraps event and calls `on_new_room_event_args`."""
@@ -374,29 +374,6 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""Notify application services of ephemeral event activity.
Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users or []
)
except Exception:
logger.exception("Error notifying application services of event")
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -406,7 +383,7 @@ class Notifier:
def on_new_event(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[Collection[str]] = None,
) -> None:
@@ -459,11 +436,14 @@ class Notifier:
self.notify_replication()
# Notify appservices
self._notify_app_services_ephemeral(
stream_key,
new_token,
users,
)
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened

View File

@@ -414,7 +414,7 @@ class DeviceWorkerStore(SQLBaseStore):
user_ids: the users who were signed
Returns:
THe new stream ID.
The new stream ID.
"""
async with self._device_list_id_gen.get_next() as stream_id:
@@ -1302,7 +1302,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
async def add_device_change_to_streams(
self, user_id: str, device_ids: Collection[str], hosts: List[str]
):
) -> int:
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""

View File

@@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore):
prefilled_cache=presence_cache_prefill,
)
async def update_presence(self, presence_states):
async def update_presence(self, presence_states) -> Tuple[int, int]:
assert self._can_persist_presence
stream_ordering_manager = self._presence_id_gen.get_next_mult(