Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f6fff3b42 | |||
| 858f09bd09 | |||
| 3d10f0f9c5 | |||
| 53fb62de61 | |||
| 8728124f76 | |||
| fb86791f48 | |||
| 77bfaf91cc | |||
| d3710a6fd7 | |||
| 4a5cfed5ca | |||
| ccdb734051 | |||
| 596dfdb4b4 | |||
| fe7fef2893 | |||
| f2c0b0bd57 | |||
| 883bb8a9b1 | |||
| 93d1c4f1d5 | |||
| 254a83f2f6 | |||
| 8415a185e0 | |||
| d55ab5ea78 | |||
| 114541e9f8 | |||
| c9900ae6dd | |||
| dbebdab044 | |||
| b2997a8f20 | |||
| bff4a11b3f |
-47
@@ -1,50 +1,3 @@
|
||||
# Synapse 1.138.4 (2025-10-07)
|
||||
|
||||
## Bugfixes
|
||||
|
||||
- Fix a bug introduced in 1.138.3 where a client could receive an Internal Server Error if they set `device_keys: null` in the request to [`POST /_matrix/client/v3/keys/upload`](https://spec.matrix.org/v1.16/client-server-api/#post_matrixclientv3keysupload). ([\#19023](https://github.com/element-hq/synapse/issues/19023))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.138.3 (2025-10-07)
|
||||
|
||||
## Security Fixes
|
||||
|
||||
- Fix [CVE-2025-61672](https://www.cve.org/CVERecord?id=CVE-2025-61672) / [GHSA-fh66-fcv5-jjfr](https://github.com/element-hq/synapse/security/advisories/GHSA-fh66-fcv5-jjfr). Lack of validation for device keys in Synapse before 1.139.1 allows an attacker registered on the victim homeserver to degrade federation functionality, unpredictably breaking outbound federation to other homeservers. ([\#17097](https://github.com/element-hq/synapse/issues/17097))
|
||||
|
||||
## Deprecations and Removals
|
||||
|
||||
- Drop support for unstable field names from the long-accepted [MSC2732](https://github.com/matrix-org/matrix-spec-proposals/pull/2732) (Olm fallback keys) proposal. This change allows unit tests to pass following the security patch above. ([\#18996](https://github.com/element-hq/synapse/issues/18996))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.138.2 (2025-09-24)
|
||||
|
||||
## Internal Changes
|
||||
|
||||
- Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin. ([\#18962](https://github.com/element-hq/synapse/issues/18962))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.138.1 (2025-09-24)
|
||||
|
||||
## Bugfixes
|
||||
|
||||
- Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. ([\#18926](https://github.com/element-hq/synapse/issues/18926))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.138.0 (2025-09-09)
|
||||
|
||||
No significant changes since 1.138.0rc1.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.138.0rc1 (2025-09-02)
|
||||
|
||||
### Features
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Fix `LaterGauge` metrics to collect from all servers.
|
||||
@@ -0,0 +1 @@
|
||||
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.
|
||||
Vendored
-30
@@ -1,33 +1,3 @@
|
||||
matrix-synapse-py3 (1.138.4) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.138.4.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 07 Oct 2025 16:28:38 +0100
|
||||
|
||||
matrix-synapse-py3 (1.138.3) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.138.3.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 07 Oct 2025 12:54:18 +0100
|
||||
|
||||
matrix-synapse-py3 (1.138.2) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.138.2.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 12:26:16 +0100
|
||||
|
||||
matrix-synapse-py3 (1.138.1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.138.1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 11:32:38 +0100
|
||||
|
||||
matrix-synapse-py3 (1.138.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.138.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 09 Sep 2025 11:21:25 +0100
|
||||
|
||||
matrix-synapse-py3 (1.138.0~rc1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.138.0rc1.
|
||||
|
||||
@@ -77,6 +77,13 @@ loggers:
|
||||
#}
|
||||
synapse.visibility.filtered_event_debug:
|
||||
level: DEBUG
|
||||
|
||||
{#
|
||||
If Synapse is under test, we don't care about seeing the "Applying schema" log
|
||||
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
|
||||
#}
|
||||
synapse.storage.prepare_database:
|
||||
level: WARN
|
||||
{% endif %}
|
||||
|
||||
root:
|
||||
|
||||
@@ -117,16 +117,6 @@ each upgrade are complete before moving on to the next upgrade, to avoid
|
||||
stacking them up. You can monitor the currently running background updates with
|
||||
[the Admin API](usage/administration/admin_api/background_updates.html#status).
|
||||
|
||||
# Upgrading to v1.138.1
|
||||
|
||||
## Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin
|
||||
|
||||
Ubuntu 24.10 Oracular Oriole [has been end-of-life since 10 Jul
|
||||
2025](https://endoflife.date/ubuntu). This release drops support for Ubuntu
|
||||
24.10, and in its place adds support for Ubuntu 25.04 Plucky Puffin.
|
||||
|
||||
This notice also applies to the v1.139.0 release.
|
||||
|
||||
# Upgrading to v1.136.0
|
||||
|
||||
## Deprecate `run_as_background_process` exported as part of the module API interface in favor of `ModuleApi.run_as_background_process`
|
||||
|
||||
+1
-1
@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.138.4"
|
||||
version = "1.138.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "AGPL-3.0-or-later"
|
||||
|
||||
@@ -32,7 +32,7 @@ DISTS = (
|
||||
"debian:sid", # (rolling distro, no EOL)
|
||||
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
|
||||
"ubuntu:noble", # 24.04 LTS (EOL 2029-06)
|
||||
"ubuntu:plucky", # 25.04 (EOL 2026-01)
|
||||
"ubuntu:oracular", # 24.10 (EOL 2025-07)
|
||||
"debian:trixie", # (EOL not specified yet)
|
||||
)
|
||||
|
||||
|
||||
@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
|
||||
"""
|
||||
|
||||
hs = MockHomeserver(base_config, worker_app)
|
||||
|
||||
# TODO We only do this to avoid an error, but don't need the database etc
|
||||
hs.setup()
|
||||
return get_registered_paths_for_hs(hs)
|
||||
registered_paths = get_registered_paths_for_hs(hs)
|
||||
hs.cleanup()
|
||||
|
||||
return registered_paths
|
||||
|
||||
|
||||
def elide_http_methods_if_unconflicting(
|
||||
|
||||
@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
|
||||
from synapse.storage.prepare_database import prepare_database
|
||||
from synapse.types import ISynapseReactor
|
||||
from synapse.util import SYNAPSE_VERSION, Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
# Cast safety: Twisted does some naughty magic which replaces the
|
||||
# twisted.internet.reactor module with a Reactor instance at runtime.
|
||||
@@ -323,6 +324,7 @@ class MockHomeserver:
|
||||
self.config = config
|
||||
self.hostname = config.server.server_name
|
||||
self.version_string = SYNAPSE_VERSION
|
||||
self.instance_id = random_string(5)
|
||||
|
||||
def get_clock(self) -> Clock:
|
||||
return self.clock
|
||||
@@ -330,6 +332,9 @@ class MockHomeserver:
|
||||
def get_reactor(self) -> ISynapseReactor:
|
||||
return reactor
|
||||
|
||||
def get_instance_id(self) -> str:
|
||||
return self.instance_id
|
||||
|
||||
def get_instance_name(self) -> str:
|
||||
return "master"
|
||||
|
||||
@@ -685,7 +690,15 @@ class Porter:
|
||||
)
|
||||
prepare_database(db_conn, engine, config=self.hs_config)
|
||||
# Type safety: ignore that we're using Mock homeservers here.
|
||||
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
|
||||
store = Store(
|
||||
DatabasePool(
|
||||
hs, # type: ignore[arg-type]
|
||||
db_config,
|
||||
engine,
|
||||
),
|
||||
db_conn,
|
||||
hs, # type: ignore[arg-type]
|
||||
)
|
||||
db_conn.commit()
|
||||
|
||||
return store
|
||||
|
||||
@@ -208,6 +208,8 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
|
||||
self.internal_metadata = EventInternalMetadata(internal_metadata_dict)
|
||||
|
||||
self._stitched_ordering: Optional[int] = None
|
||||
|
||||
depth: DictProperty[int] = DictProperty("depth")
|
||||
content: DictProperty[JsonDict] = DictProperty("content")
|
||||
hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
|
||||
@@ -323,6 +325,20 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
# this will be a no-op if the event dict is already frozen.
|
||||
self._dict = freeze(self._dict)
|
||||
|
||||
def assign_stitched_ordering(self, stitched_ordering: int) -> None:
|
||||
"""Assign a stitched ordering to this event, if one has not already been assigned.
|
||||
|
||||
TODO: figure out a way to only expose this on events that have not yet been persisted.
|
||||
"""
|
||||
if self._stitched_ordering is not None:
|
||||
raise RuntimeError("Attempt to assign stitched ordering twice")
|
||||
self._stitched_ordering = stitched_ordering
|
||||
|
||||
@property
|
||||
def stitched_ordering(self) -> Optional[int]:
|
||||
"""Return the stitched ordering for this event. If one has not (yet) been assigned, returns `None`."""
|
||||
return self._stitched_ordering
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QueueNames(str, Enum):
|
||||
PRESENCE_MAP = "presence_map"
|
||||
KEYED_EDU = "keyed_edu"
|
||||
KEYED_EDU_CHANGED = "keyed_edu_changed"
|
||||
EDUS = "edus"
|
||||
POS_TIME = "pos_time"
|
||||
PRESENCE_DESTINATIONS = "presence_destinations"
|
||||
|
||||
|
||||
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
|
||||
|
||||
for queue_name in QueueNames:
|
||||
queue_name_to_gauge_map[queue_name] = LaterGauge(
|
||||
name=f"synapse_federation_send_queue_{queue_name.value}_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
"""A drop in replacement for FederationSender"""
|
||||
|
||||
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(name: str, queue: Sized) -> None:
|
||||
LaterGauge(
|
||||
name="synapse_federation_send_queue_%s_size" % (queue_name,),
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(queue)},
|
||||
def register(queue_name: QueueNames, queue: Sized) -> None:
|
||||
queue_name_to_gauge_map[queue_name].register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(queue)},
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
"presence_map",
|
||||
"keyed_edu",
|
||||
"keyed_edu_changed",
|
||||
"edus",
|
||||
"pos_time",
|
||||
"presence_destinations",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
for queue_name in QueueNames:
|
||||
queue = getattr(self, queue_name.value)
|
||||
assert isinstance(queue, Sized)
|
||||
register(queue_name, queue=queue)
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
|
||||
@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_destinations_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_pdus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
transaction_queue_pending_edus_gauge = LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# Time (in s) to wait before trying to wake up destinations that have
|
||||
# catch-up outstanding.
|
||||
# Please note that rate limiting still applies, so while the loop is
|
||||
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
|
||||
# map from destination to PerDestinationQueue
|
||||
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_destinations",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_destinations_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
1
|
||||
for d in self._per_destination_queues.values()
|
||||
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_pdus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_pdus_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_pdu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
},
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_federation_transaction_queue_pending_edus",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
transaction_queue_pending_edus_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): sum(
|
||||
d.pending_edu_count() for d in self._per_destination_queues.values()
|
||||
)
|
||||
|
||||
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import ShadowBanError, SynapseError
|
||||
from synapse.api.errors import ShadowBanError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
|
||||
from synapse.logging.opentracing import set_tag
|
||||
@@ -45,7 +45,6 @@ from synapse.types import (
|
||||
)
|
||||
from synapse.util.events import generate_fake_event_id
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.sentinel import Sentinel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -147,37 +146,10 @@ class DelayedEventsHandler:
|
||||
)
|
||||
|
||||
async def _unsafe_process_new_event(self) -> None:
|
||||
# We purposefully fetch the current max room stream ordering before
|
||||
# doing anything else, as it could increment duing processing of state
|
||||
# deltas. We want to avoid updating `delayed_events_stream_pos` past
|
||||
# the stream ordering of the state deltas we've processed. Otherwise
|
||||
# we'll leave gaps in our processing.
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
|
||||
# Check that there are actually any delayed events to process. If not, bail early.
|
||||
delayed_events_count = await self._store.get_count_of_delayed_events()
|
||||
if delayed_events_count == 0:
|
||||
# There are no delayed events to process. Update the
|
||||
# `delayed_events_stream_pos` to the latest `events` stream pos and
|
||||
# exit early.
|
||||
self._event_pos = room_max_stream_ordering
|
||||
|
||||
logger.debug(
|
||||
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
|
||||
room_max_stream_ordering,
|
||||
)
|
||||
|
||||
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
|
||||
|
||||
event_processing_positions.labels(
|
||||
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
|
||||
).set(room_max_stream_ordering)
|
||||
|
||||
return
|
||||
|
||||
# If self._event_pos is None then means we haven't fetched it from the DB yet
|
||||
if self._event_pos is None:
|
||||
self._event_pos = await self._store.get_delayed_events_stream_pos()
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos > room_max_stream_ordering:
|
||||
# apparently, we've processed more events than exist in the database!
|
||||
# this can happen if events are removed with history purge or similar.
|
||||
@@ -195,7 +167,7 @@ class DelayedEventsHandler:
|
||||
self._clock, name="delayed_events_delta", server_name=self.server_name
|
||||
):
|
||||
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
||||
if self._event_pos >= room_max_stream_ordering:
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
@@ -230,81 +202,23 @@ class DelayedEventsHandler:
|
||||
Process current state deltas to cancel other users' pending delayed events
|
||||
that target the same state.
|
||||
"""
|
||||
# Get the senders of each delta's state event (as sender information is
|
||||
# not currently stored in the `current_state_deltas` table).
|
||||
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
|
||||
[delta.event_id for delta in deltas if delta.event_id is not None]
|
||||
)
|
||||
|
||||
# Note: No need to batch as `get_current_state_deltas` will only ever
|
||||
# return 100 rows at a time.
|
||||
for delta in deltas:
|
||||
logger.debug(
|
||||
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
|
||||
)
|
||||
|
||||
# `delta.event_id` and `delta.sender` can be `None` in a few valid
|
||||
# cases (see the docstring of
|
||||
# `get_current_state_delta_membership_changes_for_user` for details).
|
||||
if delta.event_id is None:
|
||||
# TODO: Differentiate between this being caused by a state reset
|
||||
# which removed a user from a room, or the homeserver
|
||||
# purposefully having left the room. We can do so by checking
|
||||
# whether there are any local memberships still left in the
|
||||
# room. If so, then this is the result of a state reset.
|
||||
#
|
||||
# If it is a state reset, we should avoid cancelling new,
|
||||
# delayed state events due to old state resurfacing. So we
|
||||
# should skip and log a warning in this case.
|
||||
#
|
||||
# If the homeserver has left the room, then we should cancel all
|
||||
# delayed state events intended for this room, as there is no
|
||||
# need to try and send a delayed event into a room we've left.
|
||||
logger.warning(
|
||||
"Skipping state delta (%r, %r) without corresponding event ID. "
|
||||
"This can happen if the homeserver has left the room (in which "
|
||||
"case this can be ignored), or if there has been a state reset "
|
||||
"which has caused the sender to be kicked out of the room",
|
||||
logger.debug(
|
||||
"Not handling delta for deleted state: %r %r",
|
||||
delta.event_type,
|
||||
delta.state_key,
|
||||
)
|
||||
continue
|
||||
|
||||
sender_str = event_id_and_sender_dict.get(
|
||||
delta.event_id, Sentinel.UNSET_SENTINEL
|
||||
logger.debug(
|
||||
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
|
||||
)
|
||||
if sender_str is None:
|
||||
# An event exists, but the `sender` field was "null" and Synapse
|
||||
# incorrectly accepted the event. This is not expected.
|
||||
logger.error(
|
||||
"Skipping state delta with event ID '%s' as 'sender' was None. "
|
||||
"This is unexpected - please report it as a bug!",
|
||||
delta.event_id,
|
||||
)
|
||||
continue
|
||||
if sender_str is Sentinel.UNSET_SENTINEL:
|
||||
# We have an event ID, but the event was not found in the
|
||||
# datastore. This can happen if a room, or its history, is
|
||||
# purged. State deltas related to the room are left behind, but
|
||||
# the event no longer exists.
|
||||
#
|
||||
# As we cannot get the sender of this event, we can't calculate
|
||||
# whether to cancel delayed events related to this one. So we skip.
|
||||
logger.debug(
|
||||
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
|
||||
delta.event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
sender = UserID.from_string(sender_str)
|
||||
except SynapseError as e:
|
||||
logger.error(
|
||||
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
|
||||
sender_str,
|
||||
e,
|
||||
)
|
||||
event = await self._store.get_event(delta.event_id, allow_none=True)
|
||||
if not event:
|
||||
continue
|
||||
sender = UserID.from_string(event.sender)
|
||||
|
||||
next_send_ts = await self._store.cancel_delayed_state_events(
|
||||
room_id=delta.room_id,
|
||||
|
||||
@@ -57,6 +57,7 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock"
|
||||
|
||||
|
||||
@@ -847,22 +848,14 @@ class E2eKeysHandler:
|
||||
"""
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
# TODO: Validate the JSON to make sure it has the right keys.
|
||||
device_keys = keys.get("device_keys", None)
|
||||
if device_keys:
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating device_keys for user.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
await self.upload_device_keys_for_user(
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
keys={"device_keys": device_keys},
|
||||
)
|
||||
else:
|
||||
log_kv({"message": "Did not update device_keys", "reason": "not a dict"})
|
||||
|
||||
one_time_keys = keys.get("one_time_keys", None)
|
||||
if one_time_keys:
|
||||
@@ -880,9 +873,10 @@ class E2eKeysHandler:
|
||||
log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
|
||||
fallback_keys = keys.get("fallback_keys")
|
||||
if fallback_keys:
|
||||
fallback_keys = keys.get("fallback_keys") or keys.get(
|
||||
"org.matrix.msc2732.fallback_keys"
|
||||
)
|
||||
if fallback_keys and isinstance(fallback_keys, dict):
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating fallback_keys for device.",
|
||||
@@ -891,6 +885,8 @@ class E2eKeysHandler:
|
||||
}
|
||||
)
|
||||
await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
|
||||
elif fallback_keys:
|
||||
log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
|
||||
else:
|
||||
log_kv(
|
||||
{"message": "Did not update fallback_keys", "reason": "no keys given"}
|
||||
|
||||
@@ -86,6 +86,8 @@ from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
)
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.controllers.persist_events import assign_stitched_orders
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
@@ -894,18 +896,30 @@ class FederationEventHandler:
|
||||
)
|
||||
|
||||
@trace
|
||||
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
|
||||
# We want to sort these by depth so we process them and tell clients about
|
||||
# them in order. It's also more efficient to backfill this way (`depth`
|
||||
# ascending) because one backfill event is likely to be the `prev_event` of
|
||||
# the next event we're going to process.
|
||||
sorted_events = sorted(new_events, key=lambda x: x.depth)
|
||||
async def _process_new_pulled_events(new_events: List[EventBase]) -> None:
|
||||
room_id = new_events[0].room_id
|
||||
await assign_stitched_orders(room_id, new_events, self._store)
|
||||
|
||||
# We want to sort these by stitched ordering, so that events that will
|
||||
# be sent on to clients over /sync will receive stream_orderings that
|
||||
# are consistent with stitched orderings (i.e. we will serve them to clients
|
||||
# in the same order as stitched_order).
|
||||
#
|
||||
# It's also more efficient to backfill this way, because one backfill event
|
||||
# is likely to be the `prev_event` of the next event we're going to process.
|
||||
#
|
||||
# Outliers will not yet have received a stitched ordering, but it doesn't
|
||||
# really matter what order they get persisted in, because they don't get
|
||||
# sent to clients and we don't do so much state resolution for them. We just
|
||||
# persist them before any other events.
|
||||
|
||||
sorted_events = sorted(new_events, key=lambda x: (x.stitched_ordering or 0))
|
||||
for ev in sorted_events:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
# Check if we've already tried to process these events at some point in the
|
||||
# past. We aren't concerned with the expontntial backoff here, just whether it
|
||||
# past. We aren't concerned with the exponential backoff here, just whether it
|
||||
# has failed to be processed before.
|
||||
event_ids_with_failed_pull_attempts = (
|
||||
await self._store.get_event_ids_with_failed_pull_attempts(
|
||||
@@ -2385,3 +2399,32 @@ class FederationEventHandler:
|
||||
len(ev.auth_event_ids()),
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||
|
||||
|
||||
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
|
||||
"""
|
||||
Walk the tree of dependencies (in batch), and return every event that is
|
||||
in batch, and is an ancestor of one of the supplied events.
|
||||
"""
|
||||
found = set()
|
||||
unexplored = set(event_ids)
|
||||
while len(unexplored) > 0:
|
||||
next_unexplored: Set[str] = set()
|
||||
|
||||
# Iterate through the incoming events, looking for events in our "unexplored"
|
||||
# set. For each matching event, add it to the "found" set, and add its
|
||||
# "prev_events" to the "unexplored" set for the next pass.
|
||||
for event in batch:
|
||||
if event.event_id in unexplored:
|
||||
found.add(event.event_id)
|
||||
next_unexplored.update(
|
||||
(
|
||||
event_id
|
||||
for event_id in event.prev_event_ids()
|
||||
if event_id not in found
|
||||
)
|
||||
)
|
||||
|
||||
unexplored = next_unexplored
|
||||
|
||||
return found
|
||||
|
||||
@@ -173,6 +173,18 @@ state_transition_counter = Counter(
|
||||
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_user_to_current_state_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
presence_wheel_timer_size_gauge = LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
LAST_ACTIVE_GRANULARITY = 60 * 1000
|
||||
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
EduTypes.PRESENCE, self.incoming_presence
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_user_to_current_state_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||
presence_user_to_current_state_size_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
|
||||
)
|
||||
|
||||
# The per-device presence state, maps user to devices to per-device presence state.
|
||||
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
|
||||
60 * 1000,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_handlers_presence_wheel_timer_size",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||
presence_wheel_timer_size_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
|
||||
)
|
||||
|
||||
# Used to handle sending of presence to newly joined users/servers
|
||||
@@ -1540,7 +1548,7 @@ class PresenceHandler(BasePresenceHandler):
|
||||
self.clock, name="presence_delta", server_name=self.server_name
|
||||
):
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self._event_pos >= room_max_stream_ordering:
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#
|
||||
|
||||
|
||||
import enum
|
||||
import logging
|
||||
from itertools import chain
|
||||
from typing import (
|
||||
@@ -74,7 +75,6 @@ from synapse.types.handlers.sliding_sync import (
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import MutableOverlayMapping
|
||||
from synapse.util.sentinel import Sentinel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -83,6 +83,12 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup and subsequent type narrowing.
|
||||
UNSET_SENTINEL = object()
|
||||
|
||||
|
||||
# Helper definition for the types that we might return. We do this to avoid
|
||||
# copying data between types (which can be expensive for many rooms).
|
||||
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
|
||||
|
||||
@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
|
||||
return counts
|
||||
|
||||
|
||||
LaterGauge(
|
||||
in_flight_requests = LaterGauge(
|
||||
name="synapse_http_server_in_flight_requests_count",
|
||||
desc="",
|
||||
labelnames=["method", "servlet", SERVER_NAME_LABEL],
|
||||
caller=_get_in_flight_counts,
|
||||
)
|
||||
in_flight_requests.register_hook(
|
||||
homeserver_instance_id=None, hook=_get_in_flight_counts
|
||||
)
|
||||
|
||||
|
||||
|
||||
+93
-35
@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
METRICS_PREFIX = "/_synapse/metrics"
|
||||
|
||||
all_gauges: Dict[str, Collector] = {}
|
||||
|
||||
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
||||
|
||||
SERVER_NAME_LABEL = "server_name"
|
||||
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
|
||||
name: str
|
||||
desc: str
|
||||
labelnames: Optional[StrSequence] = attr.ib(hash=False)
|
||||
# callback: should either return a value (if there are no labels for this metric),
|
||||
# or dict mapping from a label tuple to a value
|
||||
caller: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
]
|
||||
_instance_id_to_hook_map: Dict[
|
||||
Optional[str], # instance_id
|
||||
Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
],
|
||||
] = attr.ib(factory=dict, hash=False)
|
||||
"""
|
||||
Map from homeserver instance_id to a callback. Each callback should either return a
|
||||
value (if there are no labels for this metric), or dict mapping from a label tuple
|
||||
to a value.
|
||||
|
||||
We use `instance_id` instead of `server_name` because it's possible to have multiple
|
||||
workers running in the same process with the same `server_name`.
|
||||
"""
|
||||
|
||||
def collect(self) -> Iterable[Metric]:
|
||||
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
|
||||
# (we don't enforce it here, one level up).
|
||||
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
|
||||
|
||||
try:
|
||||
calls = self.caller()
|
||||
except Exception:
|
||||
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
||||
yield g
|
||||
return
|
||||
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
|
||||
try:
|
||||
hook_result = hook()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
|
||||
self.name,
|
||||
homeserver_instance_id,
|
||||
)
|
||||
# Continue to return the rest of the metrics that aren't broken
|
||||
continue
|
||||
|
||||
if isinstance(calls, (int, float)):
|
||||
g.add_metric([], calls)
|
||||
else:
|
||||
for k, v in calls.items():
|
||||
g.add_metric(k, v)
|
||||
if isinstance(hook_result, (int, float)):
|
||||
g.add_metric([], hook_result)
|
||||
else:
|
||||
for k, v in hook_result.items():
|
||||
g.add_metric(k, v)
|
||||
|
||||
yield g
|
||||
|
||||
def register_hook(
|
||||
self,
|
||||
*,
|
||||
homeserver_instance_id: Optional[str],
|
||||
hook: Callable[
|
||||
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Register a callback/hook that will be called to generate a metric samples for
|
||||
the gauge.
|
||||
|
||||
Args:
|
||||
homeserver_instance_id: The unique ID for this Synapse process instance
|
||||
(`hs.get_instance_id()`) that this hook is associated with. This can be used
|
||||
later to lookup all hooks associated with a given server name in order to
|
||||
unregister them. This should only be omitted for global hooks that work
|
||||
across all homeservers.
|
||||
hook: A callback that should either return a value (if there are no
|
||||
labels for this metric), or dict mapping from a label tuple to a value
|
||||
"""
|
||||
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
|
||||
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
|
||||
assert existing_hook is None, (
|
||||
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
|
||||
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
|
||||
"the server (especially in tests)."
|
||||
)
|
||||
|
||||
self._instance_id_to_hook_map[homeserver_instance_id] = hook
|
||||
|
||||
def unregister_hooks_for_homeserver_instance_id(
|
||||
self, homeserver_instance_id: str
|
||||
) -> None:
|
||||
"""
|
||||
Unregister all hooks associated with the given homeserver `instance_id`. This should be
|
||||
called when a homeserver is shutdown to avoid extra hooks sitting around.
|
||||
|
||||
Args:
|
||||
homeserver_instance_id: The unique ID for this Synapse process instance to
|
||||
unregister hooks for (`hs.get_instance_id()`).
|
||||
"""
|
||||
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
self._register()
|
||||
|
||||
def _register(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
# We shouldn't have multiple metrics with the same name. Typically, metrics
|
||||
# should be created globally so you shouldn't be running into this and this will
|
||||
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
|
||||
# raise an error if the metric already exists but to make things explicit, we'll
|
||||
# also check here.
|
||||
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
|
||||
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
|
||||
|
||||
# Keep track of the gauge so we can clean it up later.
|
||||
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
|
||||
|
||||
|
||||
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
|
||||
"""
|
||||
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
|
||||
shutdown.
|
||||
"""
|
||||
|
||||
|
||||
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
||||
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
# Protects access to _registrations
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self._register_with_collector()
|
||||
REGISTRY.register(self)
|
||||
|
||||
def register(
|
||||
self,
|
||||
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
|
||||
gauge.add_metric(labels=key, value=getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self) -> None:
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering", self.name)
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
|
||||
"""
|
||||
|
||||
+26
-16
@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
|
||||
labelnames=["stream", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
notifier_listeners_gauge = LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
notifier_rooms_gauge = LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
notifier_users_gauge = LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@@ -281,28 +299,20 @@ class Notifier:
|
||||
)
|
||||
}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_notifier_listeners",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=count_listeners,
|
||||
notifier_listeners_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_notifier_rooms",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
notifier_rooms_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(self.server_name,): count(
|
||||
bool, list(self.room_to_user_streams.values())
|
||||
)
|
||||
},
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_notifier_users",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||
notifier_users_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
|
||||
)
|
||||
|
||||
def add_replication_callback(self, cb: Callable[[], None]) -> None:
|
||||
|
||||
@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
|
||||
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
|
||||
)
|
||||
|
||||
tcp_resource_total_connections_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
tcp_command_queue_gauge = LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
# the type of the entries in _command_queues_by_stream
|
||||
_StreamCommandQueue = Deque[
|
||||
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
|
||||
# outgoing replication commands to.)
|
||||
self._connections: List[IReplicationConnection] = []
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_resource_total_connections",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._connections)},
|
||||
tcp_resource_total_connections_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self._connections)},
|
||||
)
|
||||
|
||||
# When POSITION or RDATA commands arrive, we stick them in a queue and process
|
||||
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
|
||||
# from that connection.
|
||||
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_replication_tcp_command_queue",
|
||||
desc="Number of inbound RDATA/POSITION commands queued for processing",
|
||||
labelnames=["stream_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
tcp_command_queue_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(stream_name, self.server_name): len(queue)
|
||||
for stream_name, queue in self._command_queues_by_stream.items()
|
||||
},
|
||||
|
||||
@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_pending_commands",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
pending_commands.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
|
||||
},
|
||||
)
|
||||
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
transport_send_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
|
||||
},
|
||||
)
|
||||
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
tcp_transport_kernel_send_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
|
||||
for p in connected_connections
|
||||
},
|
||||
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
|
||||
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
|
||||
desc="",
|
||||
labelnames=["name", SERVER_NAME_LABEL],
|
||||
caller=lambda: {
|
||||
)
|
||||
tcp_transport_kernel_read_buffer.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: {
|
||||
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
|
||||
for p in connected_connections
|
||||
},
|
||||
|
||||
+3
-147
@@ -23,19 +23,10 @@
|
||||
import logging
|
||||
import re
|
||||
from collections import Counter
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
|
||||
|
||||
from typing_extensions import Self
|
||||
|
||||
from synapse._pydantic_compat import (
|
||||
StrictBool,
|
||||
StrictStr,
|
||||
validator,
|
||||
)
|
||||
from synapse.api.auth.mas import MasDelegatedAuth
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
InteractiveAuthIncompleteError,
|
||||
InvalidAPICallError,
|
||||
SynapseError,
|
||||
@@ -46,13 +37,11 @@ from synapse.http.servlet import (
|
||||
parse_integer,
|
||||
parse_json_object_from_request,
|
||||
parse_string,
|
||||
validate_json_object,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import log_kv, set_tag
|
||||
from synapse.rest.client._base import client_patterns, interactive_auth_handler
|
||||
from synapse.types import JsonDict, StreamToken
|
||||
from synapse.types.rest import RequestBodyModel
|
||||
from synapse.util.cancellation import cancellable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -70,6 +59,7 @@ class KeyUploadServlet(RestServlet):
|
||||
"device_keys": {
|
||||
"user_id": "<user_id>",
|
||||
"device_id": "<device_id>",
|
||||
"valid_until_ts": <millisecond_timestamp>,
|
||||
"algorithms": [
|
||||
"m.olm.curve25519-aes-sha2",
|
||||
]
|
||||
@@ -121,123 +111,12 @@ class KeyUploadServlet(RestServlet):
|
||||
self._clock = hs.get_clock()
|
||||
self._store = hs.get_datastores().main
|
||||
|
||||
class KeyUploadRequestBody(RequestBodyModel):
|
||||
"""
|
||||
The body of a `POST /_matrix/client/v3/keys/upload` request.
|
||||
|
||||
Based on https://spec.matrix.org/v1.16/client-server-api/#post_matrixclientv3keysupload.
|
||||
"""
|
||||
|
||||
class DeviceKeys(RequestBodyModel):
|
||||
algorithms: List[StrictStr]
|
||||
"""The encryption algorithms supported by this device."""
|
||||
|
||||
device_id: StrictStr
|
||||
"""The ID of the device these keys belong to. Must match the device ID used when logging in."""
|
||||
|
||||
keys: Mapping[StrictStr, StrictStr]
|
||||
"""
|
||||
Public identity keys. The names of the properties should be in the
|
||||
format `<algorithm>:<device_id>`. The keys themselves should be encoded as
|
||||
specified by the key algorithm.
|
||||
"""
|
||||
|
||||
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
|
||||
"""Signatures for the device key object. A map from user ID, to a map from "<algorithm>:<device_id>" to the signature."""
|
||||
|
||||
user_id: StrictStr
|
||||
"""The ID of the user the device belongs to. Must match the user ID used when logging in."""
|
||||
|
||||
class KeyObject(RequestBodyModel):
|
||||
key: StrictStr
|
||||
"""The key, encoded using unpadded base64."""
|
||||
|
||||
fallback: Optional[StrictBool] = False
|
||||
"""Whether this is a fallback key. Only used when handling fallback keys."""
|
||||
|
||||
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
|
||||
"""Signature for the device. Mapped from user ID to another map of key signing identifier to the signature itself.
|
||||
|
||||
See the following for more detail: https://spec.matrix.org/v1.16/appendices/#signing-details
|
||||
"""
|
||||
|
||||
device_keys: Optional[DeviceKeys] = None
|
||||
"""Identity keys for the device. May be absent if no new identity keys are required."""
|
||||
|
||||
fallback_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]]
|
||||
"""
|
||||
The public key which should be used if the device's one-time keys are
|
||||
exhausted. The fallback key is not deleted once used, but should be
|
||||
replaced when additional one-time keys are being uploaded. The server
|
||||
will notify the client of the fallback key being used through `/sync`.
|
||||
|
||||
There can only be at most one key per algorithm uploaded, and the server
|
||||
will only persist one key per algorithm.
|
||||
|
||||
When uploading a signed key, an additional fallback: true key should be
|
||||
included to denote that the key is a fallback key.
|
||||
|
||||
May be absent if a new fallback key is not required.
|
||||
"""
|
||||
|
||||
@validator("fallback_keys", pre=True)
|
||||
def validate_fallback_keys(cls: Self, v: Any) -> Any:
|
||||
if v is None:
|
||||
return v
|
||||
if not isinstance(v, dict):
|
||||
raise TypeError("fallback_keys must be a mapping")
|
||||
|
||||
for k in v.keys():
|
||||
if not len(k.split(":")) == 2:
|
||||
raise SynapseError(
|
||||
code=HTTPStatus.BAD_REQUEST,
|
||||
errcode=Codes.BAD_JSON,
|
||||
msg=f"Invalid fallback_keys key {k!r}. "
|
||||
'Expected "<algorithm>:<device_id>".',
|
||||
)
|
||||
return v
|
||||
|
||||
one_time_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]] = None
|
||||
"""
|
||||
One-time public keys for "pre-key" messages. The names of the properties
|
||||
should be in the format `<algorithm>:<key_id>`.
|
||||
|
||||
The format of the key is determined by the key algorithm, see:
|
||||
https://spec.matrix.org/v1.16/client-server-api/#key-algorithms.
|
||||
"""
|
||||
|
||||
@validator("one_time_keys", pre=True)
|
||||
def validate_one_time_keys(cls: Self, v: Any) -> Any:
|
||||
if v is None:
|
||||
return v
|
||||
if not isinstance(v, dict):
|
||||
raise TypeError("one_time_keys must be a mapping")
|
||||
|
||||
for k, _ in v.items():
|
||||
if not len(k.split(":")) == 2:
|
||||
raise SynapseError(
|
||||
code=HTTPStatus.BAD_REQUEST,
|
||||
errcode=Codes.BAD_JSON,
|
||||
msg=f"Invalid one_time_keys key {k!r}. "
|
||||
'Expected "<algorithm>:<device_id>".',
|
||||
)
|
||||
return v
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, device_id: Optional[str]
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
# Parse the request body. Validate separately, as the handler expects a
|
||||
# plain dict, rather than any parsed object.
|
||||
#
|
||||
# Note: It would be nice to work with a parsed object, but the handler
|
||||
# needs to encode portions of the request body as canonical JSON before
|
||||
# storing the result in the DB. There's little point in converted to a
|
||||
# parsed object and then back to a dict.
|
||||
body = parse_json_object_from_request(request)
|
||||
validate_json_object(body, self.KeyUploadRequestBody)
|
||||
|
||||
if device_id is not None:
|
||||
# Providing the device_id should only be done for setting keys
|
||||
@@ -270,31 +149,8 @@ class KeyUploadServlet(RestServlet):
|
||||
400, "To upload keys, you must pass device_id when authenticating"
|
||||
)
|
||||
|
||||
if "device_keys" in body and isinstance(body["device_keys"], dict):
|
||||
# Validate the provided `user_id` and `device_id` fields in
|
||||
# `device_keys` match that of the requesting user. We can't do
|
||||
# this directly in the pydantic model as we don't have access
|
||||
# to the requester yet.
|
||||
#
|
||||
# TODO: We could use ValidationInfo when we switch to Pydantic v2.
|
||||
# https://docs.pydantic.dev/latest/concepts/validators/#validation-info
|
||||
if body["device_keys"].get("user_id") != user_id:
|
||||
raise SynapseError(
|
||||
code=HTTPStatus.BAD_REQUEST,
|
||||
errcode=Codes.BAD_JSON,
|
||||
msg="Provided `user_id` in `device_keys` does not match that of the authenticated user",
|
||||
)
|
||||
if body["device_keys"].get("device_id") != device_id:
|
||||
raise SynapseError(
|
||||
code=HTTPStatus.BAD_REQUEST,
|
||||
errcode=Codes.BAD_JSON,
|
||||
msg="Provided `device_id` in `device_keys` does not match that of the authenticated user device",
|
||||
)
|
||||
|
||||
result = await self.e2e_keys_handler.upload_keys_for_user(
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
keys=body,
|
||||
user_id=user_id, device_id=device_id, keys=body
|
||||
)
|
||||
|
||||
return 200, result
|
||||
|
||||
@@ -363,6 +363,9 @@ class SyncRestServlet(RestServlet):
|
||||
|
||||
# https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
|
||||
# states that this field should always be included, as long as the server supports the feature.
|
||||
response["org.matrix.msc2732.device_unused_fallback_key_types"] = (
|
||||
sync_result.device_unused_fallback_key_types
|
||||
)
|
||||
response["device_unused_fallback_key_types"] = (
|
||||
sync_result.device_unused_fallback_key_types
|
||||
)
|
||||
|
||||
+35
-1
@@ -129,7 +129,10 @@ from synapse.http.client import (
|
||||
)
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.media.media_repository import MediaRepository
|
||||
from synapse.metrics import register_threadpool
|
||||
from synapse.metrics import (
|
||||
all_later_gauges_to_clean_up_on_shutdown,
|
||||
register_threadpool,
|
||||
)
|
||||
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.module_api.callbacks import ModuleApiCallbacks
|
||||
@@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
if self.config.worker.run_background_tasks:
|
||||
self.setup_background_tasks()
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""
|
||||
Called when an the homeserver is garbage collected.
|
||||
|
||||
Make sure we actually do some clean-up, rather than leak data.
|
||||
"""
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""
|
||||
WIP: Clean-up any references to the homeserver and stop any running related
|
||||
processes, timers, loops, replication stream, etc.
|
||||
|
||||
This should be called wherever you care about the HomeServer being completely
|
||||
garbage collected like in tests. It's not necessary to call if you plan to just
|
||||
shut down the whole Python process anyway.
|
||||
|
||||
Can be called multiple times.
|
||||
"""
|
||||
logger.info("Received cleanup request for %s.", self.hostname)
|
||||
|
||||
# TODO: Stop background processes, timers, loops, replication stream, etc.
|
||||
|
||||
# Cleanup metrics associated with the homeserver
|
||||
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
|
||||
later_gauge.unregister_hooks_for_homeserver_instance_id(
|
||||
self.get_instance_id()
|
||||
)
|
||||
|
||||
logger.info("Cleanup complete for %s.", self.hostname)
|
||||
|
||||
def start_listening(self) -> None: # noqa: B027 (no-op by design)
|
||||
"""Start the HTTP, manhole, metrics, etc listeners
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.controllers.state import StateStorageController
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
@@ -616,6 +617,12 @@ class EventsPersistenceStorageController:
|
||||
if not events_and_contexts:
|
||||
return replaced_events
|
||||
|
||||
# TODO massive hack
|
||||
if events_and_contexts[0][0].stitched_ordering is None:
|
||||
await assign_stitched_orders(
|
||||
room_id, [ev for (ev, _) in events_and_contexts], self.main_store
|
||||
)
|
||||
|
||||
chunks = [
|
||||
events_and_contexts[x : x + 100]
|
||||
for x in range(0, len(events_and_contexts), 100)
|
||||
@@ -1241,3 +1248,120 @@ class EventsPersistenceStorageController:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
|
||||
"""
|
||||
Walk the tree of dependencies (in batch), and return every event that is
|
||||
in batch, and is an ancestor of one of the supplied events.
|
||||
"""
|
||||
found = set()
|
||||
unexplored = set(event_ids)
|
||||
while len(unexplored) > 0:
|
||||
next_unexplored: Set[str] = set()
|
||||
|
||||
# Iterate through the incoming events, looking for events in our "unexplored"
|
||||
# set. For each matching event, add it to the "found" set, and add its
|
||||
# "prev_events" to the "unexplored" set for the next pass.
|
||||
for event in batch:
|
||||
if event.event_id in unexplored:
|
||||
found.add(event.event_id)
|
||||
next_unexplored.update(
|
||||
(
|
||||
event_id
|
||||
for event_id in event.prev_event_ids()
|
||||
if event_id not in found
|
||||
)
|
||||
)
|
||||
|
||||
unexplored = next_unexplored
|
||||
|
||||
return found
|
||||
|
||||
|
||||
async def assign_stitched_orders(
|
||||
room_id: str,
|
||||
events: List[EventBase],
|
||||
store: DataStore,
|
||||
) -> None:
|
||||
"""
|
||||
Updates the events within `events`, to assign a
|
||||
stitched_ordering to each event.
|
||||
"""
|
||||
# Take a copy of the events we have to process
|
||||
# TODO find a better way to exclude outliers
|
||||
remaining_batch = list(ev for ev in events if not ev.internal_metadata.is_outlier())
|
||||
|
||||
# Find all events in the current batch which are in a timeline gap
|
||||
gap_events = await store.db_pool.simple_select_many_batch(
|
||||
"event_backward_extremities",
|
||||
"event_id",
|
||||
(ev.event_id for ev in remaining_batch),
|
||||
["event_id", "before_gap_event_id"],
|
||||
)
|
||||
|
||||
# TODO matching against gaps is pointless here
|
||||
# TODO sort gap_events by DAG;received order
|
||||
for gap_event, before_gap_event_id in gap_events:
|
||||
logger.debug("Processing received gap event %s", gap_event)
|
||||
|
||||
matching_events = [gap_event] # TODO find other events in the same gap
|
||||
|
||||
# Find all predecessors of those events in the batch
|
||||
to_insert = find_predecessors(matching_events, remaining_batch)
|
||||
|
||||
logger.debug("Processing to_insert set %s", to_insert)
|
||||
|
||||
# Find the stitched order of the event before the gap
|
||||
# TODO consider doing this with a join
|
||||
previous_event_stitched_order = await store.db_pool.simple_select_one_onecol(
|
||||
"events",
|
||||
{"event_id": before_gap_event_id},
|
||||
"stitched_ordering",
|
||||
True,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Previous event stitched_ordering = %i", previous_event_stitched_order
|
||||
)
|
||||
|
||||
# if previous_event_stitched_order is None, that means we have a room
|
||||
# where there are existing events or gaps without assigned stitched orders.
|
||||
# Let's give up trying to assign stitched orders here.
|
||||
if previous_event_stitched_order is None:
|
||||
# TODO do something better here
|
||||
logger.warning(
|
||||
"Found gap event %s without assigned stitched order: bailing",
|
||||
gap_event,
|
||||
)
|
||||
return
|
||||
|
||||
still_remaining_batch = []
|
||||
for event in remaining_batch:
|
||||
if event.event_id not in to_insert:
|
||||
still_remaining_batch.append(event)
|
||||
continue
|
||||
|
||||
# TODO we may need to reorder existing events
|
||||
previous_event_stitched_order += 1
|
||||
event.assign_stitched_ordering(previous_event_stitched_order)
|
||||
logger.debug(
|
||||
"Persisting inserted events with stitched_order=%i",
|
||||
previous_event_stitched_order,
|
||||
)
|
||||
|
||||
remaining_batch = still_remaining_batch
|
||||
logger.debug("Remaining events: %s", [ev.event_id for ev in remaining_batch])
|
||||
|
||||
logger.debug(
|
||||
"Remaining events after processing gap matches: %s",
|
||||
[ev.event_id for ev in remaining_batch],
|
||||
)
|
||||
|
||||
current_max_stream_ordering = (
|
||||
await store.get_room_max_stitched_ordering(room_id) or 0
|
||||
)
|
||||
|
||||
for event in remaining_batch:
|
||||
current_max_stream_ordering += 2**16
|
||||
event.assign_stitched_ordering(current_max_stream_ordering)
|
||||
|
||||
@@ -682,8 +682,6 @@ class StateStorageController:
|
||||
- the stream id which these results go up to
|
||||
- list of current_state_delta_stream rows. If it is empty, we are
|
||||
up to date.
|
||||
|
||||
A maximum of 100 rows will be returned.
|
||||
"""
|
||||
# FIXME(faster_joins): what do we do here?
|
||||
# https://github.com/matrix-org/synapse/issues/13008
|
||||
|
||||
@@ -61,7 +61,7 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
|
||||
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.background_updates import BackgroundUpdater
|
||||
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
|
||||
@@ -611,12 +611,6 @@ class DatabasePool:
|
||||
)
|
||||
|
||||
self.updates = BackgroundUpdater(hs, self)
|
||||
LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self.updates.get_status()},
|
||||
)
|
||||
|
||||
self._previous_txn_total_time = 0.0
|
||||
self._current_txn_total_time = 0.0
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
|
||||
|
||||
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool, make_conn
|
||||
from synapse.storage.databases.main.events import PersistEventsStore
|
||||
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
|
||||
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
|
||||
|
||||
|
||||
background_update_status = LaterGauge(
|
||||
name="synapse_background_update_status",
|
||||
desc="Background update status",
|
||||
labelnames=["database_name", SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class Databases(Generic[DataStoreT]):
|
||||
"""The various databases.
|
||||
|
||||
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
|
||||
|
||||
db_conn.close()
|
||||
|
||||
# Track the background update status for each database
|
||||
background_update_status.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {
|
||||
(database.name(), server_name): database.updates.get_status()
|
||||
for database in self.databases
|
||||
},
|
||||
)
|
||||
|
||||
# Sanity check that we have actually configured all the required stores.
|
||||
if not main:
|
||||
raise Exception("No 'main' database configured")
|
||||
|
||||
@@ -182,21 +182,6 @@ class DelayedEventsStore(SQLBaseStore):
|
||||
"restart_delayed_event", restart_delayed_event_txn
|
||||
)
|
||||
|
||||
async def get_count_of_delayed_events(self) -> int:
|
||||
"""Returns the number of pending delayed events in the DB."""
|
||||
|
||||
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
|
||||
sql = "SELECT count(*) FROM delayed_events"
|
||||
|
||||
txn.execute(sql)
|
||||
resp = txn.fetchone()
|
||||
return resp[0] if resp is not None else 0
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_count_of_delayed_events",
|
||||
_get_count_of_delayed_events,
|
||||
)
|
||||
|
||||
async def get_all_delayed_events_for_user(
|
||||
self,
|
||||
user_localpart: str,
|
||||
|
||||
@@ -2596,6 +2596,10 @@ class PersistEventsStore:
|
||||
# scenario. XXX: does this cause bugs? It will mean we won't send such
|
||||
# events down /sync. In general they will be historical events, so that
|
||||
# doesn't matter too much, but that is not always the case.
|
||||
#
|
||||
# On the other hand, we *will* assign a stitched ordering at this point.
|
||||
# Outliers are not assigned stitched orderings when they are first
|
||||
# persisted as outliers.
|
||||
|
||||
logger.info(
|
||||
"_update_outliers_txn: Updating state for ex-outlier event %s",
|
||||
@@ -2624,12 +2628,12 @@ class PersistEventsStore:
|
||||
},
|
||||
)
|
||||
|
||||
sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
|
||||
txn.execute(sql, (event.event_id,))
|
||||
sql = "UPDATE events SET outlier = FALSE, stitched_ordering = ? WHERE event_id = ?"
|
||||
txn.execute(sql, (event.stitched_ordering, event.event_id,))
|
||||
|
||||
# Update the event_backward_extremities table now that this
|
||||
# event isn't an outlier any more.
|
||||
self._update_backward_extremeties(txn, [event])
|
||||
self._update_backward_extremeties(txn, [(event, context)])
|
||||
|
||||
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
|
||||
|
||||
@@ -2687,6 +2691,7 @@ class PersistEventsStore:
|
||||
"contains_url",
|
||||
"state_key",
|
||||
"rejection_reason",
|
||||
"stitched_ordering",
|
||||
),
|
||||
values=[
|
||||
(
|
||||
@@ -2705,6 +2710,7 @@ class PersistEventsStore:
|
||||
"url" in event.content and isinstance(event.content["url"], str),
|
||||
event.get_state_key(),
|
||||
context.rejected,
|
||||
event.stitched_ordering,
|
||||
)
|
||||
for event, context in events_and_contexts
|
||||
],
|
||||
@@ -2811,7 +2817,8 @@ class PersistEventsStore:
|
||||
# Update the event_forward_extremities, event_backward_extremities and
|
||||
# event_edges tables.
|
||||
self._handle_mult_prev_events(
|
||||
txn, events=[event for event, _ in events_and_contexts]
|
||||
txn,
|
||||
events_and_contexts,
|
||||
)
|
||||
|
||||
for event, _ in events_and_contexts:
|
||||
@@ -3517,7 +3524,9 @@ class PersistEventsStore:
|
||||
)
|
||||
|
||||
def _handle_mult_prev_events(
|
||||
self, txn: LoggingTransaction, events: List[EventBase]
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[EventPersistencePair],
|
||||
) -> None:
|
||||
"""
|
||||
For the given event, update the event edges table and forward and
|
||||
@@ -3528,14 +3537,18 @@ class PersistEventsStore:
|
||||
table="event_edges",
|
||||
keys=("event_id", "prev_event_id"),
|
||||
values=[
|
||||
(ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
|
||||
(ev.event_id, e_id)
|
||||
for (ev, _) in events_and_contexts
|
||||
for e_id in ev.prev_event_ids()
|
||||
],
|
||||
)
|
||||
|
||||
self._update_backward_extremeties(txn, events)
|
||||
self._update_backward_extremeties(txn, events_and_contexts)
|
||||
|
||||
def _update_backward_extremeties(
|
||||
self, txn: LoggingTransaction, events: List[EventBase]
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[EventPersistencePair],
|
||||
) -> None:
|
||||
"""Updates the event_backward_extremities tables based on the new/updated
|
||||
events being persisted.
|
||||
@@ -3546,45 +3559,73 @@ class PersistEventsStore:
|
||||
Forward extremities are handled when we first start persisting the events.
|
||||
"""
|
||||
|
||||
room_id = events[0].room_id
|
||||
room_id = events_and_contexts[0][0].room_id
|
||||
|
||||
potential_backwards_extremities = {
|
||||
e_id
|
||||
for ev in events
|
||||
for e_id in ev.prev_event_ids()
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
}
|
||||
# Map from missing event ID, to the lowest stitched order of the events that reference it.
|
||||
potential_backwards_extremities: Dict[str, Optional[int]] = {}
|
||||
for ev, ctx in events_and_contexts:
|
||||
if ev.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
for prev_event in ev.prev_event_ids():
|
||||
lowest_referring_ordering = potential_backwards_extremities.get(
|
||||
"prev_event"
|
||||
)
|
||||
persisted_event_stitched_ordering = ev.stitched_ordering
|
||||
|
||||
# If any of the events we persisted did not get assigned a stitched order,
|
||||
# we cannot yet assign a stitched order to the backwards extremity either.
|
||||
if persisted_event_stitched_ordering is None:
|
||||
potential_backwards_extremities[prev_event] = None
|
||||
continue
|
||||
|
||||
if lowest_referring_ordering is None:
|
||||
lowest_referring_ordering = persisted_event_stitched_ordering
|
||||
else:
|
||||
lowest_referring_ordering = min(
|
||||
lowest_referring_ordering, persisted_event_stitched_ordering
|
||||
)
|
||||
potential_backwards_extremities[prev_event] = lowest_referring_ordering
|
||||
|
||||
if not potential_backwards_extremities:
|
||||
return
|
||||
|
||||
existing_events_outliers = self.db_pool.simple_select_many_txn(
|
||||
# Filter potential_backwards_extremities to remove events that are in the
|
||||
# table.
|
||||
existing_events = self.db_pool.simple_select_many_txn(
|
||||
txn,
|
||||
table="events",
|
||||
column="event_id",
|
||||
iterable=potential_backwards_extremities,
|
||||
iterable=potential_backwards_extremities.keys(),
|
||||
keyvalues={"outlier": False},
|
||||
retcols=("event_id",),
|
||||
)
|
||||
for (ev,) in existing_events:
|
||||
del potential_backwards_extremities[ev]
|
||||
|
||||
potential_backwards_extremities.difference_update(
|
||||
e for (e,) in existing_events_outliers
|
||||
)
|
||||
|
||||
if potential_backwards_extremities:
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="event_backward_extremities",
|
||||
key_names=("room_id", "event_id"),
|
||||
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
|
||||
value_names=(),
|
||||
value_values=(),
|
||||
for (
|
||||
backward_extremity,
|
||||
lowest_referring_ordering,
|
||||
) in potential_backwards_extremities.items():
|
||||
before_gap_event_id = self._find_before_gap_event_id(
|
||||
txn, room_id, backward_extremity, lowest_referring_ordering
|
||||
)
|
||||
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="event_backward_extremities",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"event_id": backward_extremity,
|
||||
},
|
||||
values={"before_gap_event_id": before_gap_event_id},
|
||||
)
|
||||
|
||||
if potential_backwards_extremities:
|
||||
# Record the stream orderings where we have new gaps.
|
||||
gap_events = [
|
||||
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
|
||||
for ev in events
|
||||
for (ev, _) in events_and_contexts
|
||||
if any(
|
||||
e_id in potential_backwards_extremities
|
||||
for e_id in ev.prev_event_ids()
|
||||
@@ -3605,7 +3646,7 @@ class PersistEventsStore:
|
||||
)
|
||||
backward_extremity_tuples_to_remove = [
|
||||
(ev.event_id, ev.room_id)
|
||||
for ev in events
|
||||
for (ev, _) in events_and_contexts
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
# If we encountered an event with no prev_events, then we might
|
||||
# as well remove it now because it won't ever have anything else
|
||||
@@ -3630,6 +3671,90 @@ class PersistEventsStore:
|
||||
backward_extremity_tuples_to_remove,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _find_before_gap_event_id(
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
backward_extremity_event_id: str,
|
||||
lowest_referring_ordering: Optional[int],
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Figure out where in the stitched order a gap (or backwards extremity) belongs.
|
||||
|
||||
The result is in terms of the event that precedes the gap in the ordering.
|
||||
"None" means we were unable to find a preceding event, which should only
|
||||
happen if the create event was not assigned a stitched ordering.
|
||||
|
||||
We check if the backwards extremity already exists in the database, at an
|
||||
earlier ordering than that implied by `lowest_referring_ordering`, and if
|
||||
so return that location. Otherwise, we return the event before
|
||||
`lowest_referring_ordering`.
|
||||
|
||||
Args:
|
||||
txn
|
||||
room_id: ID of the room that the gap is in
|
||||
backward_extremity_event_id: Event ID of the backwards extremity (i.e.
|
||||
an event that is not in our database).
|
||||
lowest_referring_ordering: The lowest stitched ordering of all the events
|
||||
that we have just inserted, that refer to this backwards extremity.
|
||||
"""
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
|
||||
|
||||
if lowest_referring_ordering is not None:
|
||||
# Given the lowest stitched ordering of all the events that we have just
|
||||
# inserted, find the previous event (by stitched ordering); the gap
|
||||
# will likely come just afterwards.
|
||||
#
|
||||
# Note: we include "AND event_id <> backward_extremity_event_id" because
|
||||
# if this backward extremity is actually an outlier, then that event
|
||||
# does exist in events, but we don't want to find it.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, stitched_ordering FROM events
|
||||
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
|
||||
ORDER BY stitched_ordering DESC LIMIT 1
|
||||
""",
|
||||
[room_id, lowest_referring_ordering, backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = row
|
||||
|
||||
# If this is an existing backwards extremity, see where it currently
|
||||
# exists in the order.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT events.event_id, events.stitched_ordering FROM
|
||||
event_backward_extremities LEFT JOIN events ON
|
||||
events.event_id = event_backward_extremities.before_gap_event_id
|
||||
WHERE event_backward_extremities.event_id = ?
|
||||
""",
|
||||
[backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
|
||||
if row is None:
|
||||
# Not an existing backwards extremity: use our new before_gap_event_id
|
||||
return new_before_gap_event_id
|
||||
|
||||
(existing_before_gap_id, existing_previous_stitched_ordering) = row
|
||||
|
||||
# If the existing backwards extremity has not yet been assigned a
|
||||
# stream ordering, use our new before_gap_event_id.
|
||||
if existing_previous_stitched_ordering is None:
|
||||
return new_before_gap_event_id
|
||||
|
||||
# This is an existing backwards extremity with an assigned stitched ordering.
|
||||
# Leave it as-is unless we have successfully calculated a new stitched ordering
|
||||
# which is lower than the existing.
|
||||
if (
|
||||
new_previous_stitched_ordering is not None
|
||||
and new_previous_stitched_ordering < existing_previous_stitched_ordering
|
||||
):
|
||||
return new_before_gap_event_id
|
||||
else:
|
||||
return existing_previous_stitched_ordering
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _LinkMap:
|
||||
|
||||
@@ -2135,39 +2135,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
return rows, to_token, True
|
||||
|
||||
async def get_senders_for_event_ids(
|
||||
self, event_ids: Collection[str]
|
||||
) -> Dict[str, Optional[str]]:
|
||||
"""
|
||||
Given a sequence of event IDs, return the sender associated with each.
|
||||
|
||||
Args:
|
||||
event_ids: A collection of event IDs as strings.
|
||||
|
||||
Returns:
|
||||
A dict of event ID -> sender of the event.
|
||||
|
||||
If a given event ID does not exist in the `events` table, then no entry
|
||||
for that event ID will be returned.
|
||||
"""
|
||||
|
||||
def _get_senders_for_event_ids(
|
||||
txn: LoggingTransaction,
|
||||
) -> Dict[str, Optional[str]]:
|
||||
rows = self.db_pool.simple_select_many_txn(
|
||||
txn=txn,
|
||||
table="events",
|
||||
column="event_id",
|
||||
iterable=event_ids,
|
||||
keyvalues={},
|
||||
retcols=["event_id", "sender"],
|
||||
)
|
||||
return dict(rows)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_senders_for_event_ids", _get_senders_for_event_ids
|
||||
)
|
||||
|
||||
@cached(max_entries=5000)
|
||||
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
|
||||
res = await self.db_pool.simple_select_one(
|
||||
@@ -2788,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
sender=row[0],
|
||||
received_ts=row[1],
|
||||
)
|
||||
|
||||
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
|
||||
"""Get the maximum stitched order for any event currently in the room.
|
||||
|
||||
If no events in this room have an assigned stitched order, returns None.
|
||||
"""
|
||||
|
||||
def get_room_max_stitched_ordering_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
|
||||
txn.execute(sql, [room_id])
|
||||
ret = [r[0] for r in txn]
|
||||
return ret[0]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
|
||||
)
|
||||
|
||||
@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
|
||||
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
federation_known_servers_gauge = LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||
class EventIdMembership:
|
||||
"""Returned by `get_membership_from_event_ids`"""
|
||||
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
1,
|
||||
self._count_known_servers,
|
||||
)
|
||||
LaterGauge(
|
||||
name="synapse_federation_known_servers",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): self._known_servers_count},
|
||||
federation_known_servers_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): self._known_servers_count},
|
||||
)
|
||||
|
||||
@wrap_as_background_process("_count_known_servers")
|
||||
|
||||
@@ -94,8 +94,6 @@ class StateDeltasStore(SQLBaseStore):
|
||||
- the stream id which these results go up to
|
||||
- list of current_state_delta_stream rows. If it is empty, we are
|
||||
up to date.
|
||||
|
||||
A maximum of 100 rows will be returned.
|
||||
"""
|
||||
prev_stream_id = int(prev_stream_id)
|
||||
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
ALTER TABLE events ADD COLUMN stitched_ordering BIGINT;
|
||||
CREATE UNIQUE INDEX events_stitched_order ON events(room_id, stitched_ordering); -- TODO make this concurrent
|
||||
|
||||
--CREATE TABLE stitched_ordering_gaps (
|
||||
-- room_id TEXT NOT NULL,
|
||||
-- following_event_id TEXT NOT NULL,
|
||||
-- missing_event_id TEXT NOT NULL,
|
||||
-- UNIQUE (room_id, following_event_id, missing_event_id)
|
||||
--);
|
||||
--CREATE INDEX stitched_ordering_gaps_missing_events ON stitched_ordering_gaps(room_id, missing_event_id);
|
||||
|
||||
-- Gaps in the stitched ordering are equivalent to a group of backward extremities that appear at
|
||||
-- the same point in the stitched ordering.
|
||||
--
|
||||
-- Rather than explicitly tracking where in the stitched ordering a given gap appears, we record the
|
||||
-- event id of the event that comes *before* the gap in the stitched ordering. Doing so means that:
|
||||
--
|
||||
-- 1. There is only one table that has a `stitched_ordering` column, making it easier to figure out
|
||||
-- how to insert a batch of events between existing events (and making the UNIQUE constraint effective).
|
||||
--
|
||||
-- 2. We don't need to allocate space in the stitched ordering for gaps; in particular we can assign an order
|
||||
-- to gaps *after* we have persisted the events. (We could probably work around this by double-spacing inserted
|
||||
-- events? but still, it's a nice property)
|
||||
--
|
||||
-- Note that this assumes that we never need to insert an event *before* a gap (or if we did,
|
||||
-- we'd have to update this table).
|
||||
ALTER TABLE event_backward_extremities ADD COLUMN before_gap_event_id TEXT REFERENCES events (event_id);
|
||||
@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
|
||||
# We track the number of affected hosts per time-period so we can
|
||||
# differentiate one really noisy homeserver from a general
|
||||
# ratelimit tuning problem across the federation.
|
||||
LaterGauge(
|
||||
sleep_affected_hosts_gauge = LaterGauge(
|
||||
name="synapse_rate_limit_sleep_affected_hosts",
|
||||
desc="Number of hosts that had requests put to sleep",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
)
|
||||
sleep_affected_hosts_gauge.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_sleep()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
)
|
||||
),
|
||||
)
|
||||
LaterGauge(
|
||||
reject_affected_hosts_gauge = LaterGauge(
|
||||
name="synapse_rate_limit_reject_affected_hosts",
|
||||
desc="Number of hosts that had requests rejected",
|
||||
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
|
||||
caller=lambda: _get_counts_from_rate_limiter_instance(
|
||||
)
|
||||
reject_affected_hosts_gauge.register_hook(
|
||||
homeserver_instance_id=None,
|
||||
hook=lambda: _get_counts_from_rate_limiter_instance(
|
||||
lambda rate_limiter_instance: sum(
|
||||
ratelimiter.should_reject()
|
||||
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
#
|
||||
|
||||
import enum
|
||||
|
||||
|
||||
class Sentinel(enum.Enum):
|
||||
# defining a sentinel in this way allows mypy to correctly handle the
|
||||
# type of a dictionary lookup and subsequent type narrowing.
|
||||
UNSET_SENTINEL = object()
|
||||
@@ -44,6 +44,13 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
running_tasks_gauge = LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
"""
|
||||
This is a simple task scheduler designed for resumable tasks. Normally,
|
||||
@@ -130,11 +137,9 @@ class TaskScheduler:
|
||||
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
||||
)
|
||||
|
||||
LaterGauge(
|
||||
name="synapse_scheduler_running_tasks",
|
||||
desc="The number of concurrent running tasks handled by the TaskScheduler",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
caller=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||
running_tasks_gauge.register_hook(
|
||||
homeserver_instance_id=hs.get_instance_id(),
|
||||
hook=lambda: {(self.server_name,): len(self._running_tasks)},
|
||||
)
|
||||
|
||||
def register_action(
|
||||
|
||||
@@ -410,6 +410,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
|
||||
device_id = "xyz"
|
||||
fallback_key = {"alg1:k1": "fallback_key1"}
|
||||
fallback_key2 = {"alg1:k2": "fallback_key2"}
|
||||
fallback_key3 = {"alg1:k2": "fallback_key3"}
|
||||
otk = {"alg1:k2": "key2"}
|
||||
|
||||
# we shouldn't have any unused fallback keys yet
|
||||
@@ -530,6 +531,28 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
|
||||
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key2}}},
|
||||
)
|
||||
|
||||
# using the unstable prefix should also set the fallback key
|
||||
self.get_success(
|
||||
self.handler.upload_keys_for_user(
|
||||
local_user,
|
||||
device_id,
|
||||
{"org.matrix.msc2732.fallback_keys": fallback_key3},
|
||||
)
|
||||
)
|
||||
|
||||
claim_res = self.get_success(
|
||||
self.handler.claim_one_time_keys(
|
||||
{local_user: {device_id: {"alg1": 1}}},
|
||||
self.requester,
|
||||
timeout=None,
|
||||
always_include_fallback_keys=False,
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
claim_res,
|
||||
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key3}}},
|
||||
)
|
||||
|
||||
def test_fallback_key_bulk(self) -> None:
|
||||
"""Like test_fallback_key, but claims multiple keys in one handler call."""
|
||||
alice = f"@alice:{self.hs.hostname}"
|
||||
|
||||
@@ -18,11 +18,18 @@
|
||||
# [This file includes modifications made by New Vector Limited]
|
||||
#
|
||||
#
|
||||
from typing import Dict, Protocol, Tuple
|
||||
from typing import Dict, NoReturn, Protocol, Tuple
|
||||
|
||||
from prometheus_client.core import Sample
|
||||
|
||||
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
|
||||
from synapse.metrics import (
|
||||
REGISTRY,
|
||||
SERVER_NAME_LABEL,
|
||||
InFlightGauge,
|
||||
LaterGauge,
|
||||
all_later_gauges_to_clean_up_on_shutdown,
|
||||
generate_latest,
|
||||
)
|
||||
from synapse.util.caches.deferred_cache import DeferredCache
|
||||
|
||||
from tests import unittest
|
||||
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
|
||||
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
|
||||
|
||||
|
||||
class LaterGaugeTests(unittest.HomeserverTestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.later_gauge = LaterGauge(
|
||||
name="foo",
|
||||
desc="",
|
||||
labelnames=[SERVER_NAME_LABEL],
|
||||
)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
super().tearDown()
|
||||
|
||||
REGISTRY.unregister(self.later_gauge)
|
||||
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
|
||||
|
||||
def test_later_gauge_multiple_servers(self) -> None:
|
||||
"""
|
||||
Test that LaterGauge metrics are reported correctly across multiple servers. We
|
||||
will have an metrics entry for each homeserver that is labeled with the
|
||||
`server_name` label.
|
||||
"""
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
|
||||
)
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
|
||||
)
|
||||
|
||||
metrics_map = get_latest_metrics()
|
||||
|
||||
# Find the metrics from both homeservers
|
||||
hs1_metric = 'foo{server_name="hs1"}'
|
||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
||||
self.assertIsNotNone(
|
||||
hs1_metric_value,
|
||||
f"Missing metric {hs1_metric} in metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs1_metric_value, "1.0")
|
||||
|
||||
hs2_metric = 'foo{server_name="hs2"}'
|
||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
||||
self.assertIsNotNone(
|
||||
hs2_metric_value,
|
||||
f"Missing metric {hs2_metric} in metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs2_metric_value, "2.0")
|
||||
|
||||
def test_later_gauge_hook_exception(self) -> None:
|
||||
"""
|
||||
Test that LaterGauge metrics are collected across multiple servers even if one
|
||||
hooks is throwing an exception.
|
||||
"""
|
||||
|
||||
def raise_exception() -> NoReturn:
|
||||
raise Exception("fake error generating data")
|
||||
|
||||
# Make the hook for hs1 throw an exception
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="123", hook=raise_exception
|
||||
)
|
||||
# Metrics from hs2 still work fine
|
||||
self.later_gauge.register_hook(
|
||||
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
|
||||
)
|
||||
|
||||
metrics_map = get_latest_metrics()
|
||||
|
||||
# Since we encountered an exception while trying to collect metrics from hs1, we
|
||||
# don't expect to see it here.
|
||||
hs1_metric = 'foo{server_name="hs1"}'
|
||||
hs1_metric_value = metrics_map.get(hs1_metric)
|
||||
self.assertIsNone(
|
||||
hs1_metric_value,
|
||||
(
|
||||
"Since we encountered an exception while trying to collect metrics from hs1"
|
||||
f"we don't expect to see it the metrics_map {metrics_map}"
|
||||
),
|
||||
)
|
||||
|
||||
# We should still see metrics from hs2 though
|
||||
hs2_metric = 'foo{server_name="hs2"}'
|
||||
hs2_metric_value = metrics_map.get(hs2_metric)
|
||||
self.assertIsNotNone(
|
||||
hs2_metric_value,
|
||||
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
|
||||
)
|
||||
self.assertEqual(hs2_metric_value, "2.0")
|
||||
|
||||
|
||||
def get_latest_metrics() -> Dict[str, str]:
|
||||
"""
|
||||
Collect the latest metrics from the registry and parse them into an easy to use map.
|
||||
|
||||
@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
|
||||
from synapse.http.site import SynapseRequest, SynapseSite
|
||||
from synapse.replication.http import ReplicationRestResource
|
||||
from synapse.replication.tcp.client import ReplicationDataHandler
|
||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||
from synapse.replication.tcp.protocol import (
|
||||
ClientReplicationStreamProtocol,
|
||||
ServerReplicationStreamProtocol,
|
||||
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
|
||||
self.test_handler = self._build_replication_data_handler()
|
||||
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
|
||||
|
||||
repl_handler = ReplicationCommandHandler(self.worker_hs)
|
||||
repl_handler = self.worker_hs.get_replication_command_handler()
|
||||
self.client = ClientReplicationStreamProtocol(
|
||||
self.worker_hs,
|
||||
"client",
|
||||
|
||||
@@ -40,147 +40,6 @@ from tests.unittest import override_config
|
||||
from tests.utils import HAS_AUTHLIB
|
||||
|
||||
|
||||
class KeyUploadTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
keys.register_servlets,
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def test_upload_keys_fails_on_invalid_structure(self) -> None:
|
||||
"""Check that we validate the structure of keys upon upload.
|
||||
|
||||
Regression test for https://github.com/element-hq/synapse/pull/17097
|
||||
"""
|
||||
self.register_user("alice", "wonderland")
|
||||
alice_token = self.login("alice", "wonderland")
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
# Error: device_keys must be a dict
|
||||
"device_keys": ["some", "stuff", "weewoo"]
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"],
|
||||
Codes.BAD_JSON,
|
||||
channel.result,
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
# Error: properties of fallback_keys must be in the form `<algorithm>:<device_id>`
|
||||
"fallback_keys": {"invalid_key": "signature_base64"}
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"],
|
||||
Codes.BAD_JSON,
|
||||
channel.result,
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
# Same as above, but for one_time_keys
|
||||
"one_time_keys": {"invalid_key": "signature_base64"}
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"],
|
||||
Codes.BAD_JSON,
|
||||
channel.result,
|
||||
)
|
||||
|
||||
def test_upload_keys_fails_on_invalid_user_id_or_device_id(self) -> None:
|
||||
"""
|
||||
Validate that the requesting user is uploading their own keys and nobody
|
||||
else's.
|
||||
"""
|
||||
device_id = "DEVICE_ID"
|
||||
alice_user_id = self.register_user("alice", "wonderland")
|
||||
alice_token = self.login("alice", "wonderland", device_id=device_id)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
"device_keys": {
|
||||
# Included `user_id` does not match requesting user.
|
||||
"user_id": "@unknown_user:test",
|
||||
"device_id": device_id,
|
||||
"algorithms": ["m.olm.curve25519-aes-sha2"],
|
||||
"keys": {
|
||||
f"ed25519:{device_id}": "publickey",
|
||||
},
|
||||
"signatures": {},
|
||||
}
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"],
|
||||
Codes.BAD_JSON,
|
||||
channel.result,
|
||||
)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
"device_keys": {
|
||||
"user_id": alice_user_id,
|
||||
# Included `device_id` does not match requesting user's.
|
||||
"device_id": "UNKNOWN_DEVICE_ID",
|
||||
"algorithms": ["m.olm.curve25519-aes-sha2"],
|
||||
"keys": {
|
||||
f"ed25519:{device_id}": "publickey",
|
||||
},
|
||||
"signatures": {},
|
||||
}
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
|
||||
self.assertEqual(
|
||||
channel.json_body["errcode"],
|
||||
Codes.BAD_JSON,
|
||||
channel.result,
|
||||
)
|
||||
|
||||
def test_upload_keys_succeeds_when_fields_are_explicitly_set_to_null(self) -> None:
|
||||
"""
|
||||
This is a regression test for https://github.com/element-hq/synapse/pull/19023.
|
||||
"""
|
||||
device_id = "DEVICE_ID"
|
||||
self.register_user("alice", "wonderland")
|
||||
alice_token = self.login("alice", "wonderland", device_id=device_id)
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"/_matrix/client/v3/keys/upload",
|
||||
{
|
||||
"device_keys": None,
|
||||
"one_time_keys": None,
|
||||
"fallback_keys": None,
|
||||
},
|
||||
alice_token,
|
||||
)
|
||||
self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
|
||||
|
||||
|
||||
class KeyQueryTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
keys.register_servlets,
|
||||
|
||||
@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
|
||||
reactor=reactor,
|
||||
)
|
||||
|
||||
# Register the cleanup hook
|
||||
cleanup_func(hs.cleanup)
|
||||
|
||||
# Install @cache_in_self attributes
|
||||
for key, val in kwargs.items():
|
||||
setattr(hs, "_" + key, val)
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
from typing import List
|
||||
|
||||
import attr
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
from synapse.storage.controllers.persist_events import find_predecessors
|
||||
|
||||
|
||||
class FindPredecessorsTestCase(TestCase):
|
||||
def test_predecessors_finds_nothing_if_event_is_not_in_batch(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="B", prev_event_ids=["C"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, set())
|
||||
|
||||
def test_predecessors_finds_only_event_if_it_has_no_predecessors(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=[]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E3"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1"})
|
||||
|
||||
def test_predecessors_finds_all_ancestors(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="A", prev_event_ids=["B", "C"]),
|
||||
FakeEvent(event_id="B", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="C", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="D", prev_event_ids=["E"]),
|
||||
FakeEvent(event_id="E", prev_event_ids=[]),
|
||||
FakeEvent(event_id="F", prev_event_ids=["G", "H"]),
|
||||
FakeEvent(event_id="G", prev_event_ids=[]),
|
||||
]
|
||||
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
|
||||
|
||||
def test_predecessors_ignores_cycles(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E1"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1", "E2"})
|
||||
|
||||
def test_predecessors_ignores_self_reference_cycles(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E2"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1", "E2"})
|
||||
|
||||
def test_predecessors_finds_ancestors_of_multiple_starting_events(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="A", prev_event_ids=["B"]),
|
||||
FakeEvent(event_id="B", prev_event_ids=[]),
|
||||
FakeEvent(event_id="C", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="D", prev_event_ids=["E"]),
|
||||
FakeEvent(event_id="E", prev_event_ids=[]),
|
||||
FakeEvent(event_id="F", prev_event_ids=["G"]),
|
||||
FakeEvent(event_id="G", prev_event_ids=[]),
|
||||
]
|
||||
predecessors = find_predecessors({"A", "C"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class FakeEvent:
|
||||
event_id: str
|
||||
_prev_event_ids: List[str]
|
||||
|
||||
def prev_event_ids(self) -> List[str]:
|
||||
return self._prev_event_ids
|
||||
@@ -20,7 +20,7 @@
|
||||
#
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional
|
||||
from typing import List, Optional
|
||||
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
@@ -31,6 +31,7 @@ from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.controllers.persist_events import assign_stitched_orders
|
||||
from synapse.types import StateMap
|
||||
from synapse.util import Clock
|
||||
|
||||
@@ -39,77 +40,6 @@ from tests.unittest import HomeserverTestCase
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self._store = self.hs.get_datastores().main
|
||||
|
||||
def test_get_senders_for_event_ids(self) -> None:
|
||||
"""Tests the `get_senders_for_event_ids` storage function."""
|
||||
|
||||
users_and_tokens: Dict[str, str] = {}
|
||||
for localpart_suffix in range(10):
|
||||
localpart = f"user_{localpart_suffix}"
|
||||
user_id = self.register_user(localpart, "rabbit")
|
||||
token = self.login(localpart, "rabbit")
|
||||
|
||||
users_and_tokens[user_id] = token
|
||||
|
||||
room_creator_user_id = self.register_user("room_creator", "rabbit")
|
||||
room_creator_token = self.login("room_creator", "rabbit")
|
||||
users_and_tokens[room_creator_user_id] = room_creator_token
|
||||
|
||||
# Create a room and invite some users.
|
||||
room_id = self.helper.create_room_as(
|
||||
room_creator_user_id, tok=room_creator_token
|
||||
)
|
||||
event_ids_to_senders: Dict[str, str] = {}
|
||||
for user_id, token in users_and_tokens.items():
|
||||
if user_id == room_creator_user_id:
|
||||
continue
|
||||
|
||||
self.helper.invite(
|
||||
room=room_id,
|
||||
targ=user_id,
|
||||
tok=room_creator_token,
|
||||
)
|
||||
|
||||
# Have the user accept the invite and join the room.
|
||||
self.helper.join(
|
||||
room=room_id,
|
||||
user=user_id,
|
||||
tok=token,
|
||||
)
|
||||
|
||||
# Have the user send an event.
|
||||
response = self.helper.send_event(
|
||||
room_id=room_id,
|
||||
type="m.room.message",
|
||||
content={
|
||||
"msgtype": "m.text",
|
||||
"body": f"hello, I'm {user_id}!",
|
||||
},
|
||||
tok=token,
|
||||
)
|
||||
|
||||
# Record the event ID and sender.
|
||||
event_id = response["event_id"]
|
||||
event_ids_to_senders[event_id] = user_id
|
||||
|
||||
# Check that `get_senders_for_event_ids` returns the correct data.
|
||||
response = self.get_success(
|
||||
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
|
||||
)
|
||||
self.assert_dict(event_ids_to_senders, response)
|
||||
|
||||
|
||||
class ExtremPruneTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
@@ -590,3 +520,47 @@ class InvalideUsersInRoomCacheTestCase(HomeserverTestCase):
|
||||
|
||||
users = self.get_success(self.store.get_users_in_room(room_id))
|
||||
self.assertEqual(users, [])
|
||||
|
||||
|
||||
class AssignStitchedOrderingTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self.state = self.hs.get_state_handler()
|
||||
# persistence = self.hs.get_storage_controllers().persistence
|
||||
# assert persistence is not None
|
||||
# self._persistence = persistence
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
def test_insert_events(self) -> None:
|
||||
# Create a room
|
||||
self.register_user("user", "pass")
|
||||
token = self.login("user", "pass")
|
||||
room_id = self.helper.create_room_as(
|
||||
"user", room_version=RoomVersions.V12.identifier, tok=token
|
||||
)
|
||||
|
||||
# Build a test event
|
||||
test_event = event_from_pdu_json(
|
||||
{
|
||||
"type": EventTypes.Message,
|
||||
"content": {"body": "blah"},
|
||||
"room_id": room_id,
|
||||
"sender": "@user:other",
|
||||
"depth": 5,
|
||||
"prev_events": [],
|
||||
"auth_events": [],
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
},
|
||||
RoomVersions.V12,
|
||||
)
|
||||
|
||||
self.get_success(assign_stitched_orders(room_id, [test_event], self.store))
|
||||
|
||||
self.assertEqual(test_event.stitched_ordering, 6 * 2**16)
|
||||
|
||||
Reference in New Issue
Block a user