Compare commits
4 Commits
v1.140.0rc
...
anoa/prese
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d24ef5a4f4 | ||
|
|
8a249da4a7 | ||
|
|
1fa720b15f | ||
|
|
bc9cab6a49 |
1
changelog.d/11140.bugfix
Normal file
1
changelog.d/11140.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug that caused duplicate presence updates to be sent to application services.
|
||||
@@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required
|
||||
components.
|
||||
"""
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
)
|
||||
|
||||
from synapse.appservice import ApplicationService, ApplicationServiceState
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
@@ -103,7 +112,7 @@ class ApplicationServiceScheduler:
|
||||
self.queuer.enqueue_event(service, event)
|
||||
|
||||
def submit_ephemeral_events_for_as(
|
||||
self, service: ApplicationService, events: List[JsonDict]
|
||||
self, service: ApplicationService, events: Iterable[JsonDict]
|
||||
) -> None:
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
|
||||
@@ -141,7 +150,7 @@ class _ServiceQueuer:
|
||||
self._start_background_request(service)
|
||||
|
||||
def enqueue_ephemeral(
|
||||
self, service: ApplicationService, events: List[JsonDict]
|
||||
self, service: ApplicationService, events: Iterable[JsonDict]
|
||||
) -> None:
|
||||
self.queued_ephemeral.setdefault(service.id, []).extend(events)
|
||||
self._start_background_request(service)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
|
||||
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Union
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -309,7 +309,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
async def _handle_typing(
|
||||
self, service: ApplicationService, new_token: int
|
||||
) -> List[JsonDict]:
|
||||
) -> Set[JsonDict]:
|
||||
"""
|
||||
Return the typing events since the given stream token that the given application
|
||||
service should receive.
|
||||
@@ -323,7 +323,7 @@ class ApplicationServicesHandler:
|
||||
new_token: A typing event stream token.
|
||||
|
||||
Returns:
|
||||
A list of JSON dictionaries containing data derived from the typing events that
|
||||
A set of JSON dictionaries containing data derived from the typing events that
|
||||
should be sent to the given application service.
|
||||
"""
|
||||
typing_source = self.event_sources.sources.typing
|
||||
@@ -344,7 +344,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
async def _handle_receipts(
|
||||
self, service: ApplicationService, new_token: Optional[int]
|
||||
) -> List[JsonDict]:
|
||||
) -> Set[JsonDict]:
|
||||
"""
|
||||
Return the latest read receipts that the given application service should receive.
|
||||
|
||||
@@ -360,7 +360,7 @@ class ApplicationServicesHandler:
|
||||
token. Prevents accidentally duplicating work.
|
||||
|
||||
Returns:
|
||||
A list of JSON dictionaries containing data derived from the read receipts that
|
||||
A set of JSON dictionaries containing data derived from the read receipts that
|
||||
should be sent to the given application service.
|
||||
"""
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
@@ -370,7 +370,7 @@ class ApplicationServicesHandler:
|
||||
logger.debug(
|
||||
"Rejecting token lower than or equal to stored: %s" % (new_token,)
|
||||
)
|
||||
return []
|
||||
return set()
|
||||
|
||||
receipts_source = self.event_sources.sources.receipt
|
||||
receipts, _ = await receipts_source.get_new_events_as(
|
||||
@@ -383,7 +383,7 @@ class ApplicationServicesHandler:
|
||||
service: ApplicationService,
|
||||
users: Collection[Union[str, UserID]],
|
||||
new_token: Optional[int],
|
||||
) -> List[JsonDict]:
|
||||
) -> Set[JsonDict]:
|
||||
"""
|
||||
Return the latest presence updates that the given application service should receive.
|
||||
|
||||
@@ -403,7 +403,7 @@ class ApplicationServicesHandler:
|
||||
A list of json dictionaries containing data derived from the presence events
|
||||
that should be sent to the given application service.
|
||||
"""
|
||||
events: List[JsonDict] = []
|
||||
events: Set[JsonDict] = set()
|
||||
presence_source = self.event_sources.sources.presence
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
service, "presence"
|
||||
@@ -412,7 +412,7 @@ class ApplicationServicesHandler:
|
||||
logger.debug(
|
||||
"Rejecting token lower than or equal to stored: %s" % (new_token,)
|
||||
)
|
||||
return []
|
||||
return set()
|
||||
|
||||
for user in users:
|
||||
if isinstance(user, str):
|
||||
@@ -427,7 +427,7 @@ class ApplicationServicesHandler:
|
||||
from_key=from_key,
|
||||
)
|
||||
time_now = self.clock.time_msec()
|
||||
events.extend(
|
||||
events.update(
|
||||
{
|
||||
"type": "m.presence",
|
||||
"sender": event.user_id,
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
|
||||
from synapse.appservice import ApplicationService
|
||||
@@ -240,7 +240,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
async def get_new_events_as(
|
||||
self, from_key: int, service: ApplicationService
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
) -> Tuple[Set[JsonDict], int]:
|
||||
"""Returns a set of new read receipt events that an appservice
|
||||
may be interested in.
|
||||
|
||||
@@ -250,7 +250,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
Returns:
|
||||
A two-tuple containing the following:
|
||||
* A list of json dictionaries derived from read receipts that the
|
||||
* A set of json dictionaries derived from read receipts that the
|
||||
appservice may be interested in.
|
||||
* The current read receipt stream token.
|
||||
"""
|
||||
@@ -258,7 +258,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
to_key = self.get_current_key()
|
||||
|
||||
if from_key == to_key:
|
||||
return [], to_key
|
||||
return set(), to_key
|
||||
|
||||
# Fetch all read receipts for all rooms, up to a limit of 100. This is ordered
|
||||
# by most recent.
|
||||
@@ -267,12 +267,12 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
)
|
||||
|
||||
# Then filter down to rooms that the AS can read
|
||||
events = []
|
||||
events = set()
|
||||
for room_id, event in rooms_to_events.items():
|
||||
if not await service.matches_user_in_member_list(room_id, self.store):
|
||||
continue
|
||||
|
||||
events.append(event)
|
||||
events.add(event)
|
||||
|
||||
return events, to_key
|
||||
|
||||
|
||||
@@ -464,7 +464,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
async def get_new_events_as(
|
||||
self, from_key: int, service: ApplicationService
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
) -> Tuple[Set[JsonDict], int]:
|
||||
"""Returns a set of new typing events that an appservice
|
||||
may be interested in.
|
||||
|
||||
@@ -474,14 +474,14 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
Returns:
|
||||
A two-tuple containing the following:
|
||||
* A list of json dictionaries derived from typing events that the
|
||||
* A set of json dictionaries derived from typing events that the
|
||||
appservice may be interested in.
|
||||
* The latest known room serial.
|
||||
"""
|
||||
with Measure(self.clock, "typing.get_new_events_as"):
|
||||
handler = self.get_typing_handler()
|
||||
|
||||
events = []
|
||||
events = set()
|
||||
for room_id in handler._room_serials.keys():
|
||||
if handler._room_serials[room_id] <= from_key:
|
||||
continue
|
||||
@@ -491,7 +491,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
|
||||
):
|
||||
continue
|
||||
|
||||
events.append(self._make_event_for(room_id))
|
||||
events.add(self._make_event_for(room_id))
|
||||
|
||||
return events, handler._latest_room_serial
|
||||
|
||||
|
||||
Reference in New Issue
Block a user