Compare commits
4 Commits
erikj/dock
...
kegan/to-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e95932e01 | ||
|
|
fab5948f5b | ||
|
|
4850970cac | ||
|
|
38b304eaa7 |
1
changelog.d/16875.bugfix
Normal file
1
changelog.d/16875.bugfix
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Fixed an issue where device list updates were not sent eagerly to remote servers on room join.
|
||||||
@@ -22,8 +22,9 @@
|
|||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
|
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
|
||||||
|
|
||||||
|
import synapse.metrics
|
||||||
from synapse.api import errors
|
from synapse.api import errors
|
||||||
from synapse.api.constants import EduTypes, EventTypes
|
from synapse.api.constants import EduTypes, EventTypes, Membership
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
Codes,
|
Codes,
|
||||||
FederationDeniedError,
|
FederationDeniedError,
|
||||||
@@ -38,6 +39,7 @@ from synapse.metrics.background_process_metrics import (
|
|||||||
wrap_as_background_process,
|
wrap_as_background_process,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
|
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
|
||||||
|
from synapse.storage.databases.main.state_deltas import StateDelta
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
JsonDict,
|
JsonDict,
|
||||||
JsonMapping,
|
JsonMapping,
|
||||||
@@ -54,7 +56,7 @@ from synapse.util import stringutils
|
|||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.cancellation import cancellable
|
from synapse.util.cancellation import cancellable
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
from synapse.util.retryutils import (
|
from synapse.util.retryutils import (
|
||||||
NotRetryingDestination,
|
NotRetryingDestination,
|
||||||
filter_destinations_by_retry_limiter,
|
filter_destinations_by_retry_limiter,
|
||||||
@@ -428,6 +430,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||||||
self._account_data_handler = hs.get_account_data_handler()
|
self._account_data_handler = hs.get_account_data_handler()
|
||||||
self._storage_controllers = hs.get_storage_controllers()
|
self._storage_controllers = hs.get_storage_controllers()
|
||||||
self.db_pool = hs.get_datastores().main.db_pool
|
self.db_pool = hs.get_datastores().main.db_pool
|
||||||
|
self._is_processing = False
|
||||||
|
|
||||||
self.device_list_updater = DeviceListUpdater(hs, self)
|
self.device_list_updater = DeviceListUpdater(hs, self)
|
||||||
|
|
||||||
@@ -461,6 +464,153 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||||||
self._delete_stale_devices,
|
self._delete_stale_devices,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Listen for state delta updates. We do this so we can send device list updates on room join
|
||||||
|
# to remote servers. We do not remember where we got up to before, as we only need to send
|
||||||
|
# these updates on a best-effort basis, as they quickly heal due to /keys/query requests.
|
||||||
|
# We want to send device list updates eagerly to improve our robustness on unreliable
|
||||||
|
# networks.
|
||||||
|
# See https://github.com/element-hq/synapse/issues/11374#issuecomment-1908396300
|
||||||
|
self._event_pos = self.store.get_room_max_stream_ordering()
|
||||||
|
self._event_processing = False
|
||||||
|
self.notifier.add_replication_callback(self.notify_new_event)
|
||||||
|
|
||||||
|
def notify_new_event(self) -> None:
|
||||||
|
"""Called when there may be more deltas to process"""
|
||||||
|
if self._event_processing:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._event_processing = True
|
||||||
|
|
||||||
|
async def process() -> None:
|
||||||
|
try:
|
||||||
|
await self._unsafe_process()
|
||||||
|
finally:
|
||||||
|
self._event_processing = False
|
||||||
|
|
||||||
|
run_as_background_process("device.notify_new_event", process)
|
||||||
|
|
||||||
|
async def _unsafe_process(self) -> None:
|
||||||
|
# Loop round handling deltas until we're up to date
|
||||||
|
while True:
|
||||||
|
with Measure(self.clock, "device_list_delta"):
|
||||||
|
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||||
|
if self._event_pos == room_max_stream_ordering:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Processing device list stats %s->%s",
|
||||||
|
self._event_pos,
|
||||||
|
room_max_stream_ordering,
|
||||||
|
)
|
||||||
|
(
|
||||||
|
max_pos,
|
||||||
|
deltas,
|
||||||
|
) = await self._storage_controllers.state.get_current_state_deltas(
|
||||||
|
self._event_pos, room_max_stream_ordering
|
||||||
|
)
|
||||||
|
|
||||||
|
# We may get multiple deltas for different rooms, but we want to
|
||||||
|
# handle them on a room by room basis, so we batch them up by
|
||||||
|
# room.
|
||||||
|
deltas_by_room: Dict[str, List[StateDelta]] = {}
|
||||||
|
for delta in deltas:
|
||||||
|
deltas_by_room.setdefault(delta.room_id, []).append(delta)
|
||||||
|
|
||||||
|
for room_id, deltas_for_room in deltas_by_room.items():
|
||||||
|
newly_joined_local_users = await self._get_newly_joined_local_users(
|
||||||
|
room_id, deltas_for_room
|
||||||
|
)
|
||||||
|
if not newly_joined_local_users:
|
||||||
|
continue
|
||||||
|
# if a local user newly joins a room, we want to broadcast their device lists to
|
||||||
|
# federated servers in that room, if we haven't already.
|
||||||
|
hosts = await self.store.get_current_hosts_in_room(room_id)
|
||||||
|
# filter out ourselves
|
||||||
|
other_hosts = [
|
||||||
|
h for h in hosts if not self.hs.is_mine_server_name(h)
|
||||||
|
]
|
||||||
|
if len(other_hosts) == 0:
|
||||||
|
continue
|
||||||
|
# broadcast device lists for these users in the room
|
||||||
|
num_pokes = 0
|
||||||
|
for user_id in newly_joined_local_users:
|
||||||
|
# the join is for the user, we need to send device list updates for all
|
||||||
|
# their devices.
|
||||||
|
device_ids = await self.store.get_devices_by_user(user_id)
|
||||||
|
for device_id in device_ids.keys():
|
||||||
|
num_pokes += 1
|
||||||
|
await self.store.add_device_list_outbound_pokes(
|
||||||
|
user_id=user_id,
|
||||||
|
device_id=device_id,
|
||||||
|
room_id=room_id,
|
||||||
|
hosts=other_hosts,
|
||||||
|
context=None,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Found %d hosts to send device list updates to for a new room join, "
|
||||||
|
+ "added %s device_list_outbound_pokes",
|
||||||
|
len(other_hosts),
|
||||||
|
num_pokes,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Notify things that device lists need to be sent out.
|
||||||
|
self.notifier.notify_replication()
|
||||||
|
await self.federation_sender.send_device_messages(
|
||||||
|
other_hosts, immediate=False
|
||||||
|
)
|
||||||
|
|
||||||
|
self._event_pos = max_pos
|
||||||
|
|
||||||
|
# Expose current event processing position to prometheus
|
||||||
|
synapse.metrics.event_processing_positions.labels("device").set(max_pos)
|
||||||
|
|
||||||
|
async def _get_newly_joined_local_users(
|
||||||
|
self, room_id: str, deltas: List[StateDelta]
|
||||||
|
) -> Optional[Set[str]]:
|
||||||
|
"""Process current state deltas for the room to find new joins that need
|
||||||
|
to be handled.
|
||||||
|
"""
|
||||||
|
newly_joined_local_users = set()
|
||||||
|
|
||||||
|
for delta in deltas:
|
||||||
|
assert room_id == delta.room_id
|
||||||
|
logger.debug(
|
||||||
|
"device.handling: %r %r, %s",
|
||||||
|
delta.event_type,
|
||||||
|
delta.state_key,
|
||||||
|
delta.event_id,
|
||||||
|
)
|
||||||
|
# Drop any event that isn't a membership join
|
||||||
|
if delta.event_type != EventTypes.Member:
|
||||||
|
continue
|
||||||
|
if delta.event_id is None:
|
||||||
|
# state has been deleted, so this is not a join. We only care about joins.
|
||||||
|
continue
|
||||||
|
# Drop any event that is for a non-local user
|
||||||
|
membership_change_user = UserID.from_string(delta.state_key)
|
||||||
|
if not self.hs.is_mine(membership_change_user):
|
||||||
|
continue
|
||||||
|
event = await self.store.get_event(delta.event_id, allow_none=True)
|
||||||
|
if not event or event.content.get("membership") != Membership.JOIN:
|
||||||
|
# We only care about joins
|
||||||
|
continue
|
||||||
|
if delta.prev_event_id:
|
||||||
|
prev_event = await self.store.get_event(
|
||||||
|
delta.prev_event_id, allow_none=True
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
prev_event
|
||||||
|
and prev_event.content.get("membership") == Membership.JOIN
|
||||||
|
):
|
||||||
|
# Ignore changes to join events.
|
||||||
|
continue
|
||||||
|
newly_joined_local_users.add(delta.state_key)
|
||||||
|
|
||||||
|
if not newly_joined_local_users:
|
||||||
|
# If nobody has joined then there's nothing to do.
|
||||||
|
return None
|
||||||
|
return newly_joined_local_users
|
||||||
|
|
||||||
def _check_device_name_length(self, name: Optional[str]) -> None:
|
def _check_device_name_length(self, name: Optional[str]) -> None:
|
||||||
"""
|
"""
|
||||||
Checks whether a device name is longer than the maximum allowed length.
|
Checks whether a device name is longer than the maximum allowed length.
|
||||||
|
|||||||
Reference in New Issue
Block a user