Compare commits

...

4 Commits

Author SHA1 Message Date
Andrew Morgan
d24ef5a4f4 Use a set for _handle_typing and _handle_reciepts as well
This wasn't necessary, but mypy complained that 'events' could be both
a List and a Set of JsonDict. So rather than define it with a Union type
as such, I figured it'd be best to convert the other methods to use Set
as well. We may even eliminate other duplicates in the process.
2022-01-14 15:30:17 +00:00
Andrew Morgan
8a249da4a7 Changelog 2022-01-14 14:43:23 +00:00
Andrew Morgan
1fa720b15f Remove the need to convert to a List 2022-01-14 14:43:19 +00:00
Andrew Morgan
bc9cab6a49 Deduplicate presence updates before sending them to application services
We calculate presence updates for application services based on the
users that application service is interested in. For each of these
users, we determine which presence updates they are set to receive,
compile that into a list, and then send every update from the list
to the application service.

However, because a single presence update can cause a notification
for many different users, we're likely to end up with lots of
duplicated presence updates being collected here. Currently, all of
these are sent to the application service.

By using a Set, this deduplication happens automatically.
2022-01-14 14:40:47 +00:00
5 changed files with 33 additions and 23 deletions

1
changelog.d/11140.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug that caused duplicate presence updates to be sent to application services.

View File

@@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
)
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice.api import ApplicationServiceApi
@@ -103,7 +112,7 @@ class ApplicationServiceScheduler:
self.queuer.enqueue_event(service, event)
def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
self, service: ApplicationService, events: Iterable[JsonDict]
) -> None:
self.queuer.enqueue_ephemeral(service, events)
@@ -141,7 +150,7 @@ class _ServiceQueuer:
self._start_background_request(service)
def enqueue_ephemeral(
self, service: ApplicationService, events: List[JsonDict]
self, service: ApplicationService, events: Iterable[JsonDict]
) -> None:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Union
from prometheus_client import Counter
@@ -309,7 +309,7 @@ class ApplicationServicesHandler:
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
) -> Set[JsonDict]:
"""
Return the typing events since the given stream token that the given application
service should receive.
@@ -323,7 +323,7 @@ class ApplicationServicesHandler:
new_token: A typing event stream token.
Returns:
A list of JSON dictionaries containing data derived from the typing events that
A set of JSON dictionaries containing data derived from the typing events that
should be sent to the given application service.
"""
typing_source = self.event_sources.sources.typing
@@ -344,7 +344,7 @@ class ApplicationServicesHandler:
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
) -> List[JsonDict]:
) -> Set[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@@ -360,7 +360,7 @@ class ApplicationServicesHandler:
token. Prevents accidentally duplicating work.
Returns:
A list of JSON dictionaries containing data derived from the read receipts that
A set of JSON dictionaries containing data derived from the read receipts that
should be sent to the given application service.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
@@ -370,7 +370,7 @@ class ApplicationServicesHandler:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []
return set()
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
@@ -383,7 +383,7 @@ class ApplicationServicesHandler:
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
) -> List[JsonDict]:
) -> Set[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
@@ -403,7 +403,7 @@ class ApplicationServicesHandler:
A list of json dictionaries containing data derived from the presence events
that should be sent to the given application service.
"""
events: List[JsonDict] = []
events: Set[JsonDict] = set()
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
@@ -412,7 +412,7 @@ class ApplicationServicesHandler:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
return []
return set()
for user in users:
if isinstance(user, str):
@@ -427,7 +427,7 @@ class ApplicationServicesHandler:
from_key=from_key,
)
time_now = self.clock.time_msec()
events.extend(
events.update(
{
"type": "m.presence",
"sender": event.user_id,

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
from synapse.appservice import ApplicationService
@@ -240,7 +240,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
) -> Tuple[Set[JsonDict], int]:
"""Returns a set of new read receipt events that an appservice
may be interested in.
@@ -250,7 +250,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from read receipts that the
* A set of json dictionaries derived from read receipts that the
appservice may be interested in.
* The current read receipt stream token.
"""
@@ -258,7 +258,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
to_key = self.get_current_key()
if from_key == to_key:
return [], to_key
return set(), to_key
# Fetch all read receipts for all rooms, up to a limit of 100. This is ordered
# by most recent.
@@ -267,12 +267,12 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
)
# Then filter down to rooms that the AS can read
events = []
events = set()
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
continue
events.append(event)
events.add(event)
return events, to_key

View File

@@ -464,7 +464,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
) -> Tuple[Set[JsonDict], int]:
"""Returns a set of new typing events that an appservice
may be interested in.
@@ -474,14 +474,14 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from typing events that the
* A set of json dictionaries derived from typing events that the
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
handler = self.get_typing_handler()
events = []
events = set()
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue
@@ -491,7 +491,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
):
continue
events.append(self._make_event_for(room_id))
events.add(self._make_event_for(room_id))
return events, handler._latest_room_serial