1
0

Compare commits

..

23 Commits

Author SHA1 Message Date
Richard van der Hoff 5f6fff3b42 Persist in stitched order 2025-09-03 15:05:40 +01:00
Richard van der Hoff 858f09bd09 Assign stitched ordering when de-outliering 2025-09-03 14:56:41 +01:00
Richard van der Hoff 3d10f0f9c5 Assign stitched orders to pulled events before persisting 2025-09-03 14:34:02 +01:00
Richard van der Hoff 53fb62de61 Update assign_stitched_orders to not require EventContext 2025-09-03 14:16:50 +01:00
Richard van der Hoff 8728124f76 fixup! Move stitched_ordering property to EventBase 2025-09-03 14:16:09 +01:00
Richard van der Hoff fb86791f48 Move assign_stitched_orders to top level 2025-09-03 14:05:21 +01:00
Richard van der Hoff 77bfaf91cc fixup! Move stitched_ordering property to EventBase 2025-09-03 13:57:15 +01:00
Richard van der Hoff d3710a6fd7 Remove code that attempted to assign stitched orderings retrospectively 2025-09-03 13:56:23 +01:00
Richard van der Hoff 4a5cfed5ca Move stitched_ordering property to EventBase
... so that we don't need to have an EventContext when we do the assignment
2025-09-03 13:20:47 +01:00
Richard van der Hoff ccdb734051 lint 2025-09-03 13:10:34 +01:00
Richard van der Hoff 596dfdb4b4 Update stitched_order of backfilled events after persisting 2025-09-03 13:10:34 +01:00
Andy Balaam fe7fef2893 When finding event before a gap, exclude outlier matching the gap 2025-09-03 13:10:34 +01:00
Andy Balaam f2c0b0bd57 Log more while deciding the stitched order 2025-09-03 13:10:34 +01:00
Richard van der Hoff 883bb8a9b1 Handle gaps with unassigned stitched ordering 2025-09-03 13:10:34 +01:00
Richard van der Hoff 93d1c4f1d5 Rework _find_before_gap_event_id
There being no before_gap_event_id is an error case, not normal behaviour, so
we should handle it differently.
2025-09-03 13:10:34 +01:00
Richard van der Hoff 254a83f2f6 tests for find_predecessors 2025-09-03 13:10:34 +01:00
Richard van der Hoff 8415a185e0 WIP on stitched ordering algorithm 2025-09-03 13:10:34 +01:00
Richard van der Hoff d55ab5ea78 Assign ordering to backward extremities
We're going to use backward extremities to represent "gaps" in our
algorithm. (Specifically, a "gap" is represented by a set of backward
extremities which all appear at the same point in the stitched ordering).

Accordingly, we need to assign them a stitched ordering. We're going to do this
indirectly, by referencing the event that appears before it in the stitched
ordering, so that there is only one table that has a stitched ordering column.
2025-09-03 13:10:34 +01:00
Richard van der Hoff 114541e9f8 Test for simple case 2025-09-03 13:10:34 +01:00
Richard van der Hoff c9900ae6dd Assign a dumb stitched ordering to incoming events
For now, we just give each incoming event the next stitched ordering.
2025-09-03 13:10:34 +01:00
Richard van der Hoff dbebdab044 Pass EventContext into _update_backward_extremities 2025-09-03 13:10:32 +01:00
Eric Eastwood b2997a8f20 Suppress "Applying schema" log noise bulk when running Complement tests (#18878)
If Synapse is under test (`SYNAPSE_LOG_TESTING` is set), we don't care
about seeing the "Applying schema" log lines at the INFO level every
time we run the tests (it's 100 lines of bulk for each homeserver).

```
synapse_main | 2025-08-29 22:34:03,453 - synapse.storage.prepare_database - 433 - INFO - main - Applying schema deltas for v73
synapse_main | 2025-08-29 22:34:03,454 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/01event_failed_pull_attempts.sql
synapse_main | 2025-08-29 22:34:03,463 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02add_pusher_enabled.sql
synapse_main | 2025-08-29 22:34:03,473 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/02room_id_indexes_for_purging.sql
synapse_main | 2025-08-29 22:34:03,482 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03pusher_device_id.sql
synapse_main | 2025-08-29 22:34:03,492 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/03users_approved_column.sql
synapse_main | 2025-08-29 22:34:03,502 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04partial_join_details.sql
synapse_main | 2025-08-29 22:34:03,513 - synapse.storage.prepare_database - 541 - INFO - main - Applying schema 73/04pending_device_list_updates.sql
...
```


The Synapse logs are visible when a Complement test fails or you use
`COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1`. This is spawning from a
Complement test with three homeservers and wanting less log noise to
scroll through.
2025-09-02 13:34:47 -05:00
Eric Eastwood bff4a11b3f Re-introduce: Fix LaterGauge metrics to collect from all servers (#18791)
Re-introduce: https://github.com/element-hq/synapse/pull/18751 that was
reverted in https://github.com/element-hq/synapse/pull/18789 (explains
why the PR was reverted in the first place).

- Adds a `cleanup` pattern that cleans up metrics from each homeserver
in the tests. Previously, the list of hooks built up until our CI
machines couldn't operate properly, see
https://github.com/element-hq/synapse/pull/18789
- Fix long-standing issue with `synapse_background_update_status`
metrics only tracking the last database listed in the config (see
https://github.com/element-hq/synapse/pull/18791#discussion_r2261706749)
2025-09-02 12:14:27 -05:00
47 changed files with 1041 additions and 806 deletions
-47
View File
@@ -1,50 +1,3 @@
# Synapse 1.138.4 (2025-10-07)
## Bugfixes
- Fix a bug introduced in 1.138.3 where a client could receive an Internal Server Error if they set `device_keys: null` in the request to [`POST /_matrix/client/v3/keys/upload`](https://spec.matrix.org/v1.16/client-server-api/#post_matrixclientv3keysupload). ([\#19023](https://github.com/element-hq/synapse/issues/19023))
# Synapse 1.138.3 (2025-10-07)
## Security Fixes
- Fix [CVE-2025-61672](https://www.cve.org/CVERecord?id=CVE-2025-61672) / [GHSA-fh66-fcv5-jjfr](https://github.com/element-hq/synapse/security/advisories/GHSA-fh66-fcv5-jjfr). Lack of validation for device keys in Synapse before 1.139.1 allows an attacker registered on the victim homeserver to degrade federation functionality, unpredictably breaking outbound federation to other homeservers. ([\#17097](https://github.com/element-hq/synapse/issues/17097))
## Deprecations and Removals
- Drop support for unstable field names from the long-accepted [MSC2732](https://github.com/matrix-org/matrix-spec-proposals/pull/2732) (Olm fallback keys) proposal. This change allows unit tests to pass following the security patch above. ([\#18996](https://github.com/element-hq/synapse/issues/18996))
# Synapse 1.138.2 (2025-09-24)
## Internal Changes
- Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin. ([\#18962](https://github.com/element-hq/synapse/issues/18962))
# Synapse 1.138.1 (2025-09-24)
## Bugfixes
- Fix a performance regression related to the experimental Delayed Events ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140)) feature. ([\#18926](https://github.com/element-hq/synapse/issues/18926))
# Synapse 1.138.0 (2025-09-09)
No significant changes since 1.138.0rc1.
# Synapse 1.138.0rc1 (2025-09-02)
### Features
+1
View File
@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.
+1
View File
@@ -0,0 +1 @@
Suppress "Applying schema" log noise bulk when `SYNAPSE_LOG_TESTING` is set.
-30
View File
@@ -1,33 +1,3 @@
matrix-synapse-py3 (1.138.4) stable; urgency=medium
* New Synapse release 1.138.4.
-- Synapse Packaging team <packages@matrix.org> Tue, 07 Oct 2025 16:28:38 +0100
matrix-synapse-py3 (1.138.3) stable; urgency=medium
* New Synapse release 1.138.3.
-- Synapse Packaging team <packages@matrix.org> Tue, 07 Oct 2025 12:54:18 +0100
matrix-synapse-py3 (1.138.2) stable; urgency=medium
* New Synapse release 1.138.2.
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 12:26:16 +0100
matrix-synapse-py3 (1.138.1) stable; urgency=medium
* New Synapse release 1.138.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 24 Sep 2025 11:32:38 +0100
matrix-synapse-py3 (1.138.0) stable; urgency=medium
* New Synapse release 1.138.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 09 Sep 2025 11:21:25 +0100
matrix-synapse-py3 (1.138.0~rc1) stable; urgency=medium
* New synapse release 1.138.0rc1.
+7
View File
@@ -77,6 +77,13 @@ loggers:
#}
synapse.visibility.filtered_event_debug:
level: DEBUG
{#
If Synapse is under test, we don't care about seeing the "Applying schema" log
lines at the INFO level every time we run the tests (it's 100 lines of bulk)
#}
synapse.storage.prepare_database:
level: WARN
{% endif %}
root:
-10
View File
@@ -117,16 +117,6 @@ each upgrade are complete before moving on to the next upgrade, to avoid
stacking them up. You can monitor the currently running background updates with
[the Admin API](usage/administration/admin_api/background_updates.html#status).
# Upgrading to v1.138.1
## Drop support for Ubuntu 24.10 Oracular Oriole, and add support for Ubuntu 25.04 Plucky Puffin
Ubuntu 24.10 Oracular Oriole [has been end-of-life since 10 Jul
2025](https://endoflife.date/ubuntu). This release drops support for Ubuntu
24.10, and in its place adds support for Ubuntu 25.04 Plucky Puffin.
This notice also applies to the v1.139.0 release.
# Upgrading to v1.136.0
## Deprecate `run_as_background_process` exported as part of the module API interface in favor of `ModuleApi.run_as_background_process`
+1 -1
View File
@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.138.4"
version = "1.138.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
+1 -1
View File
@@ -32,7 +32,7 @@ DISTS = (
"debian:sid", # (rolling distro, no EOL)
"ubuntu:jammy", # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
"ubuntu:noble", # 24.04 LTS (EOL 2029-06)
"ubuntu:plucky", # 25.04 (EOL 2026-01)
"ubuntu:oracular", # 24.10 (EOL 2025-07)
"debian:trixie", # (EOL not specified yet)
)
+5 -1
View File
@@ -153,9 +153,13 @@ def get_registered_paths_for_default(
"""
hs = MockHomeserver(base_config, worker_app)
# TODO We only do this to avoid an error, but don't need the database etc
hs.setup()
return get_registered_paths_for_hs(hs)
registered_paths = get_registered_paths_for_hs(hs)
hs.cleanup()
return registered_paths
def elide_http_methods_if_unconflicting(
+14 -1
View File
@@ -99,6 +99,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
@@ -323,6 +324,7 @@ class MockHomeserver:
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)
def get_clock(self) -> Clock:
return self.clock
@@ -330,6 +332,9 @@ class MockHomeserver:
def get_reactor(self) -> ISynapseReactor:
return reactor
def get_instance_id(self) -> str:
return self.instance_id
def get_instance_name(self) -> str:
return "master"
@@ -685,7 +690,15 @@ class Porter:
)
prepare_database(db_conn, engine, config=self.hs_config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type]
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
)
db_conn.commit()
return store
+16
View File
@@ -208,6 +208,8 @@ class EventBase(metaclass=abc.ABCMeta):
self.internal_metadata = EventInternalMetadata(internal_metadata_dict)
self._stitched_ordering: Optional[int] = None
depth: DictProperty[int] = DictProperty("depth")
content: DictProperty[JsonDict] = DictProperty("content")
hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
@@ -323,6 +325,20 @@ class EventBase(metaclass=abc.ABCMeta):
# this will be a no-op if the event dict is already frozen.
self._dict = freeze(self._dict)
def assign_stitched_ordering(self, stitched_ordering: int) -> None:
"""Assign a stitched ordering to this event, if one has not already been assigned.
TODO: figure out a way to only expose this on events that have not yet been persisted.
"""
if self._stitched_ordering is not None:
raise RuntimeError("Attempt to assign stitched ordering twice")
self._stitched_ordering = stitched_ordering
@property
def stitched_ordering(self) -> Optional[int]:
"""Return the stitched ordering for this event. If one has not (yet) been assigned, returns `None`."""
return self._stitched_ordering
def __str__(self) -> str:
return self.__repr__()
+28 -15
View File
@@ -37,6 +37,7 @@ Events are replicated via a separate events stream.
"""
import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
@@ -67,6 +68,25 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""
@@ -111,23 +131,16 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(queue)},
)
for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)
self.clock.looping_call(self._clear_queue, 30 * 1000)
+27 -16
View File
@@ -199,6 +199,24 @@ sent_pdus_destination_dist_total = Counter(
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
@@ -398,11 +416,9 @@ class FederationSender(AbstractFederationSender):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
@@ -410,22 +426,17 @@ class FederationSender(AbstractFederationSender):
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
+10 -96
View File
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.errors import ShadowBanError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
@@ -45,7 +45,6 @@ from synapse.types import (
)
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -147,37 +146,10 @@ class DelayedEventsHandler:
)
async def _unsafe_process_new_event(self) -> None:
# We purposefully fetch the current max room stream ordering before
# doing anything else, as it could increment duing processing of state
# deltas. We want to avoid updating `delayed_events_stream_pos` past
# the stream ordering of the state deltas we've processed. Otherwise
# we'll leave gaps in our processing.
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
self._event_pos = room_max_stream_ordering
logger.debug(
"No delayed events to process. Updating `delayed_events_stream_pos` to max stream ordering (%s)",
room_max_stream_ordering,
)
await self._store.update_delayed_events_stream_pos(room_max_stream_ordering)
event_processing_positions.labels(
name="delayed_events", **{SERVER_NAME_LABEL: self.server_name}
).set(room_max_stream_ordering)
return
# If self._event_pos is None then means we haven't fetched it from the DB yet
if self._event_pos is None:
self._event_pos = await self._store.get_delayed_events_stream_pos()
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
@@ -195,7 +167,7 @@ class DelayedEventsHandler:
self._clock, name="delayed_events_delta", server_name=self.server_name
):
room_max_stream_ordering = self._store.get_room_max_stream_ordering()
if self._event_pos >= room_max_stream_ordering:
if self._event_pos == room_max_stream_ordering:
return
logger.debug(
@@ -230,81 +202,23 @@ class DelayedEventsHandler:
Process current state deltas to cancel other users' pending delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
# not currently stored in the `current_state_deltas` table).
event_id_and_sender_dict = await self._store.get_senders_for_event_ids(
[delta.event_id for delta in deltas if delta.event_id is not None]
)
# Note: No need to batch as `get_current_state_deltas` will only ever
# return 100 rows at a time.
for delta in deltas:
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
# `delta.event_id` and `delta.sender` can be `None` in a few valid
# cases (see the docstring of
# `get_current_state_delta_membership_changes_for_user` for details).
if delta.event_id is None:
# TODO: Differentiate between this being caused by a state reset
# which removed a user from a room, or the homeserver
# purposefully having left the room. We can do so by checking
# whether there are any local memberships still left in the
# room. If so, then this is the result of a state reset.
#
# If it is a state reset, we should avoid cancelling new,
# delayed state events due to old state resurfacing. So we
# should skip and log a warning in this case.
#
# If the homeserver has left the room, then we should cancel all
# delayed state events intended for this room, as there is no
# need to try and send a delayed event into a room we've left.
logger.warning(
"Skipping state delta (%r, %r) without corresponding event ID. "
"This can happen if the homeserver has left the room (in which "
"case this can be ignored), or if there has been a state reset "
"which has caused the sender to be kicked out of the room",
logger.debug(
"Not handling delta for deleted state: %r %r",
delta.event_type,
delta.state_key,
)
continue
sender_str = event_id_and_sender_dict.get(
delta.event_id, Sentinel.UNSET_SENTINEL
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)
if sender_str is None:
# An event exists, but the `sender` field was "null" and Synapse
# incorrectly accepted the event. This is not expected.
logger.error(
"Skipping state delta with event ID '%s' as 'sender' was None. "
"This is unexpected - please report it as a bug!",
delta.event_id,
)
continue
if sender_str is Sentinel.UNSET_SENTINEL:
# We have an event ID, but the event was not found in the
# datastore. This can happen if a room, or its history, is
# purged. State deltas related to the room are left behind, but
# the event no longer exists.
#
# As we cannot get the sender of this event, we can't calculate
# whether to cancel delayed events related to this one. So we skip.
logger.debug(
"Skipping state delta with event ID '%s' - the room, or its history, may have been purged",
delta.event_id,
)
continue
try:
sender = UserID.from_string(sender_str)
except SynapseError as e:
logger.error(
"Skipping state delta with Matrix User ID '%s' that failed to parse: %s",
sender_str,
e,
)
event = await self._store.get_event(delta.event_id, allow_none=True)
if not event:
continue
sender = UserID.from_string(event.sender)
next_send_ts = await self._store.cancel_delayed_state_events(
room_id=delta.room_id,
+8 -12
View File
@@ -57,6 +57,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock"
@@ -847,22 +848,14 @@ class E2eKeysHandler:
"""
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None)
if device_keys:
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
await self.upload_device_keys_for_user(
user_id=user_id,
device_id=device_id,
keys={"device_keys": device_keys},
)
else:
log_kv({"message": "Did not update device_keys", "reason": "not a dict"})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
@@ -880,9 +873,10 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
fallback_keys = keys.get("fallback_keys")
if fallback_keys:
fallback_keys = keys.get("fallback_keys") or keys.get(
"org.matrix.msc2732.fallback_keys"
)
if fallback_keys and isinstance(fallback_keys, dict):
log_kv(
{
"message": "Updating fallback_keys for device.",
@@ -891,6 +885,8 @@ class E2eKeysHandler:
}
)
await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
elif fallback_keys:
log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
else:
log_kv(
{"message": "Did not update fallback_keys", "reason": "no keys given"}
+50 -7
View File
@@ -86,6 +86,8 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
from synapse.storage.controllers.persist_events import assign_stitched_orders
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
PersistedEventPosition,
@@ -894,18 +896,30 @@ class FederationEventHandler:
)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and tell clients about
# them in order. It's also more efficient to backfill this way (`depth`
# ascending) because one backfill event is likely to be the `prev_event` of
# the next event we're going to process.
sorted_events = sorted(new_events, key=lambda x: x.depth)
async def _process_new_pulled_events(new_events: List[EventBase]) -> None:
room_id = new_events[0].room_id
await assign_stitched_orders(room_id, new_events, self._store)
# We want to sort these by stitched ordering, so that events that will
# be sent on to clients over /sync will receive stream_orderings that
# are consistent with stitched orderings (i.e. we will serve them to clients
# in the same order as stitched_order).
#
# It's also more efficient to backfill this way, because one backfill event
# is likely to be the `prev_event` of the next event we're going to process.
#
# Outliers will not yet have received a stitched ordering, but it doesn't
# really matter what order they get persisted in, because they don't get
# sent to clients and we don't do so much state resolution for them. We just
# persist them before any other events.
sorted_events = sorted(new_events, key=lambda x: (x.stitched_ordering or 0))
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# past. We aren't concerned with the exponential backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
@@ -2385,3 +2399,32 @@ class FederationEventHandler:
len(ev.auth_event_ids()),
)
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
"""
Walk the tree of dependencies (in batch), and return every event that is
in batch, and is an ancestor of one of the supplied events.
"""
found = set()
unexplored = set(event_ids)
while len(unexplored) > 0:
next_unexplored: Set[str] = set()
# Iterate through the incoming events, looking for events in our "unexplored"
# set. For each matching event, add it to the "found" set, and add its
# "prev_events" to the "unexplored" set for the next pass.
for event in batch:
if event.event_id in unexplored:
found.add(event.event_id)
next_unexplored.update(
(
event_id
for event_id in event.prev_event_ids()
if event_id not in found
)
)
unexplored = next_unexplored
return found
+19 -11
View File
@@ -173,6 +173,18 @@ state_transition_counter = Counter(
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)
presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,9 @@ class PresenceHandler(BasePresenceHandler):
EduTypes.PRESENCE, self.incoming_presence
)
LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
)
# The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +892,9 @@ class PresenceHandler(BasePresenceHandler):
60 * 1000,
)
LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
)
# Used to handle sending of presence to newly joined users/servers
@@ -1540,7 +1548,7 @@ class PresenceHandler(BasePresenceHandler):
self.clock, name="presence_delta", server_name=self.server_name
):
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self._event_pos >= room_max_stream_ordering:
if self._event_pos == room_max_stream_ordering:
return
logger.debug(
+7 -1
View File
@@ -13,6 +13,7 @@
#
import enum
import logging
from itertools import chain
from typing import (
@@ -74,7 +75,6 @@ from synapse.types.handlers.sliding_sync import (
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
from synapse.util.sentinel import Sentinel
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -83,6 +83,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
+4 -2
View File
@@ -164,11 +164,13 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts
LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(
homeserver_instance_id=None, hook=_get_in_flight_counts
)
+93 -35
View File
@@ -73,8 +73,6 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
@@ -163,42 +161,110 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
_instance_id_to_hook_map: Dict[
Optional[str], # instance_id
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
] = attr.ib(factory=dict, hash=False)
"""
Map from homeserver instance_id to a callback. Each callback should either return a
value (if there are no labels for this metric), or dict mapping from a label tuple
to a value.
We use `instance_id` instead of `server_name` because it's possible to have multiple
workers running in the same process with the same `server_name`.
"""
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
for homeserver_instance_id, hook in self._instance_id_to_hook_map.items():
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s",
self.name,
homeserver_instance_id,
)
# Continue to return the rest of the metrics that aren't broken
continue
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)
yield g
def register_hook(
self,
*,
homeserver_instance_id: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance
(`hs.get_instance_id()`) that this hook is associated with. This can be used
later to lookup all hooks associated with a given server name in order to
unregister them. This should only be omitted for global hooks that work
across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same homeserver `instance_id`.
existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)
self._instance_id_to_hook_map[homeserver_instance_id] = hook
def unregister_hooks_for_homeserver_instance_id(
self, homeserver_instance_id: str
) -> None:
"""
Unregister all hooks associated with the given homeserver `instance_id`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.
Args:
homeserver_instance_id: The unique ID for this Synapse process instance to
unregister hooks for (`hs.get_instance_id()`).
"""
self._instance_id_to_hook_map.pop(homeserver_instance_id, None)
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "
# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self
all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +316,7 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
REGISTRY.register(self)
def register(
self,
@@ -341,14 +407,6 @@ class InFlightGauge(Generic[MetricsEntry], Collector):
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
+26 -16
View File
@@ -86,6 +86,24 @@ users_woken_by_stream_counter = Counter(
labelnames=["stream", SERVER_NAME_LABEL],
)
notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
T = TypeVar("T")
@@ -281,28 +299,20 @@ class Notifier:
)
}
LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
notifier_listeners_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(), hook=count_listeners
)
LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
notifier_rooms_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
)
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
notifier_users_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)
def add_replication_callback(self, cb: Callable[[], None]) -> None:
+18 -10
View File
@@ -106,6 +106,18 @@ user_ip_cache_counter = Counter(
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)
tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)
# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
@@ -243,11 +255,9 @@ class ReplicationCommandHandler:
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []
LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
tcp_resource_total_connections_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._connections)},
)
# When POSITION or RDATA commands arrive, we stick them in a queue and process
@@ -266,11 +276,9 @@ class ReplicationCommandHandler:
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}
LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
tcp_command_queue_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
+16 -4
View File
@@ -527,7 +527,10 @@ pending_commands = LaterGauge(
name="synapse_replication_tcp_protocol_pending_commands",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
pending_commands.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
},
)
@@ -544,7 +547,10 @@ transport_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
transport_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
},
)
@@ -571,7 +577,10 @@ tcp_transport_kernel_send_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_send_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_send_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
@@ -582,7 +591,10 @@ tcp_transport_kernel_read_buffer = LaterGauge(
name="synapse_replication_tcp_protocol_transport_kernel_read_buffer",
desc="",
labelnames=["name", SERVER_NAME_LABEL],
caller=lambda: {
)
tcp_transport_kernel_read_buffer.register_hook(
homeserver_instance_id=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
+3 -147
View File
@@ -23,19 +23,10 @@
import logging
import re
from collections import Counter
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from typing_extensions import Self
from synapse._pydantic_compat import (
StrictBool,
StrictStr,
validator,
)
from synapse.api.auth.mas import MasDelegatedAuth
from synapse.api.errors import (
Codes,
InteractiveAuthIncompleteError,
InvalidAPICallError,
SynapseError,
@@ -46,13 +37,11 @@ from synapse.http.servlet import (
parse_integer,
parse_json_object_from_request,
parse_string,
validate_json_object,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.types import JsonDict, StreamToken
from synapse.types.rest import RequestBodyModel
from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
@@ -70,6 +59,7 @@ class KeyUploadServlet(RestServlet):
"device_keys": {
"user_id": "<user_id>",
"device_id": "<device_id>",
"valid_until_ts": <millisecond_timestamp>,
"algorithms": [
"m.olm.curve25519-aes-sha2",
]
@@ -121,123 +111,12 @@ class KeyUploadServlet(RestServlet):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
class KeyUploadRequestBody(RequestBodyModel):
"""
The body of a `POST /_matrix/client/v3/keys/upload` request.
Based on https://spec.matrix.org/v1.16/client-server-api/#post_matrixclientv3keysupload.
"""
class DeviceKeys(RequestBodyModel):
algorithms: List[StrictStr]
"""The encryption algorithms supported by this device."""
device_id: StrictStr
"""The ID of the device these keys belong to. Must match the device ID used when logging in."""
keys: Mapping[StrictStr, StrictStr]
"""
Public identity keys. The names of the properties should be in the
format `<algorithm>:<device_id>`. The keys themselves should be encoded as
specified by the key algorithm.
"""
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
"""Signatures for the device key object. A map from user ID, to a map from "<algorithm>:<device_id>" to the signature."""
user_id: StrictStr
"""The ID of the user the device belongs to. Must match the user ID used when logging in."""
class KeyObject(RequestBodyModel):
key: StrictStr
"""The key, encoded using unpadded base64."""
fallback: Optional[StrictBool] = False
"""Whether this is a fallback key. Only used when handling fallback keys."""
signatures: Mapping[StrictStr, Mapping[StrictStr, StrictStr]]
"""Signature for the device. Mapped from user ID to another map of key signing identifier to the signature itself.
See the following for more detail: https://spec.matrix.org/v1.16/appendices/#signing-details
"""
device_keys: Optional[DeviceKeys] = None
"""Identity keys for the device. May be absent if no new identity keys are required."""
fallback_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]]
"""
The public key which should be used if the device's one-time keys are
exhausted. The fallback key is not deleted once used, but should be
replaced when additional one-time keys are being uploaded. The server
will notify the client of the fallback key being used through `/sync`.
There can only be at most one key per algorithm uploaded, and the server
will only persist one key per algorithm.
When uploading a signed key, an additional fallback: true key should be
included to denote that the key is a fallback key.
May be absent if a new fallback key is not required.
"""
@validator("fallback_keys", pre=True)
def validate_fallback_keys(cls: Self, v: Any) -> Any:
if v is None:
return v
if not isinstance(v, dict):
raise TypeError("fallback_keys must be a mapping")
for k in v.keys():
if not len(k.split(":")) == 2:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg=f"Invalid fallback_keys key {k!r}. "
'Expected "<algorithm>:<device_id>".',
)
return v
one_time_keys: Optional[Mapping[StrictStr, Union[StrictStr, KeyObject]]] = None
"""
One-time public keys for "pre-key" messages. The names of the properties
should be in the format `<algorithm>:<key_id>`.
The format of the key is determined by the key algorithm, see:
https://spec.matrix.org/v1.16/client-server-api/#key-algorithms.
"""
@validator("one_time_keys", pre=True)
def validate_one_time_keys(cls: Self, v: Any) -> Any:
if v is None:
return v
if not isinstance(v, dict):
raise TypeError("one_time_keys must be a mapping")
for k, _ in v.items():
if not len(k.split(":")) == 2:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg=f"Invalid one_time_keys key {k!r}. "
'Expected "<algorithm>:<device_id>".',
)
return v
async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user_id = requester.user.to_string()
# Parse the request body. Validate separately, as the handler expects a
# plain dict, rather than any parsed object.
#
# Note: It would be nice to work with a parsed object, but the handler
# needs to encode portions of the request body as canonical JSON before
# storing the result in the DB. There's little point in converted to a
# parsed object and then back to a dict.
body = parse_json_object_from_request(request)
validate_json_object(body, self.KeyUploadRequestBody)
if device_id is not None:
# Providing the device_id should only be done for setting keys
@@ -270,31 +149,8 @@ class KeyUploadServlet(RestServlet):
400, "To upload keys, you must pass device_id when authenticating"
)
if "device_keys" in body and isinstance(body["device_keys"], dict):
# Validate the provided `user_id` and `device_id` fields in
# `device_keys` match that of the requesting user. We can't do
# this directly in the pydantic model as we don't have access
# to the requester yet.
#
# TODO: We could use ValidationInfo when we switch to Pydantic v2.
# https://docs.pydantic.dev/latest/concepts/validators/#validation-info
if body["device_keys"].get("user_id") != user_id:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg="Provided `user_id` in `device_keys` does not match that of the authenticated user",
)
if body["device_keys"].get("device_id") != device_id:
raise SynapseError(
code=HTTPStatus.BAD_REQUEST,
errcode=Codes.BAD_JSON,
msg="Provided `device_id` in `device_keys` does not match that of the authenticated user device",
)
result = await self.e2e_keys_handler.upload_keys_for_user(
user_id=user_id,
device_id=device_id,
keys=body,
user_id=user_id, device_id=device_id, keys=body
)
return 200, result
+3
View File
@@ -363,6 +363,9 @@ class SyncRestServlet(RestServlet):
# https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
# states that this field should always be included, as long as the server supports the feature.
response["org.matrix.msc2732.device_unused_fallback_key_types"] = (
sync_result.device_unused_fallback_key_types
)
response["device_unused_fallback_key_types"] = (
sync_result.device_unused_fallback_key_types
)
+35 -1
View File
@@ -129,7 +129,10 @@ from synapse.http.client import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics import register_threadpool
from synapse.metrics import (
all_later_gauges_to_clean_up_on_shutdown,
register_threadpool,
)
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
from synapse.module_api import ModuleApi
from synapse.module_api.callbacks import ModuleApiCallbacks
@@ -369,6 +372,37 @@ class HomeServer(metaclass=abc.ABCMeta):
if self.config.worker.run_background_tasks:
self.setup_background_tasks()
def __del__(self) -> None:
"""
Called when an the homeserver is garbage collected.
Make sure we actually do some clean-up, rather than leak data.
"""
self.cleanup()
def cleanup(self) -> None:
"""
WIP: Clean-up any references to the homeserver and stop any running related
processes, timers, loops, replication stream, etc.
This should be called wherever you care about the HomeServer being completely
garbage collected like in tests. It's not necessary to call if you plan to just
shut down the whole Python process anyway.
Can be called multiple times.
"""
logger.info("Received cleanup request for %s.", self.hostname)
# TODO: Stop background processes, timers, loops, replication stream, etc.
# Cleanup metrics associated with the homeserver
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_homeserver_instance_id(
self.get_instance_id()
)
logger.info("Cleanup complete for %s.", self.hostname)
def start_listening(self) -> None: # noqa: B027 (no-op by design)
"""Start the HTTP, manhole, metrics, etc listeners
@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import DataStore
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
@@ -616,6 +617,12 @@ class EventsPersistenceStorageController:
if not events_and_contexts:
return replaced_events
# TODO massive hack
if events_and_contexts[0][0].stitched_ordering is None:
await assign_stitched_orders(
room_id, [ev for (ev, _) in events_and_contexts], self.main_store
)
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
@@ -1241,3 +1248,120 @@ class EventsPersistenceStorageController:
return True
return False
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
"""
Walk the tree of dependencies (in batch), and return every event that is
in batch, and is an ancestor of one of the supplied events.
"""
found = set()
unexplored = set(event_ids)
while len(unexplored) > 0:
next_unexplored: Set[str] = set()
# Iterate through the incoming events, looking for events in our "unexplored"
# set. For each matching event, add it to the "found" set, and add its
# "prev_events" to the "unexplored" set for the next pass.
for event in batch:
if event.event_id in unexplored:
found.add(event.event_id)
next_unexplored.update(
(
event_id
for event_id in event.prev_event_ids()
if event_id not in found
)
)
unexplored = next_unexplored
return found
async def assign_stitched_orders(
room_id: str,
events: List[EventBase],
store: DataStore,
) -> None:
"""
Updates the events within `events`, to assign a
stitched_ordering to each event.
"""
# Take a copy of the events we have to process
# TODO find a better way to exclude outliers
remaining_batch = list(ev for ev in events if not ev.internal_metadata.is_outlier())
# Find all events in the current batch which are in a timeline gap
gap_events = await store.db_pool.simple_select_many_batch(
"event_backward_extremities",
"event_id",
(ev.event_id for ev in remaining_batch),
["event_id", "before_gap_event_id"],
)
# TODO matching against gaps is pointless here
# TODO sort gap_events by DAG;received order
for gap_event, before_gap_event_id in gap_events:
logger.debug("Processing received gap event %s", gap_event)
matching_events = [gap_event] # TODO find other events in the same gap
# Find all predecessors of those events in the batch
to_insert = find_predecessors(matching_events, remaining_batch)
logger.debug("Processing to_insert set %s", to_insert)
# Find the stitched order of the event before the gap
# TODO consider doing this with a join
previous_event_stitched_order = await store.db_pool.simple_select_one_onecol(
"events",
{"event_id": before_gap_event_id},
"stitched_ordering",
True,
)
logger.debug(
"Previous event stitched_ordering = %i", previous_event_stitched_order
)
# if previous_event_stitched_order is None, that means we have a room
# where there are existing events or gaps without assigned stitched orders.
# Let's give up trying to assign stitched orders here.
if previous_event_stitched_order is None:
# TODO do something better here
logger.warning(
"Found gap event %s without assigned stitched order: bailing",
gap_event,
)
return
still_remaining_batch = []
for event in remaining_batch:
if event.event_id not in to_insert:
still_remaining_batch.append(event)
continue
# TODO we may need to reorder existing events
previous_event_stitched_order += 1
event.assign_stitched_ordering(previous_event_stitched_order)
logger.debug(
"Persisting inserted events with stitched_order=%i",
previous_event_stitched_order,
)
remaining_batch = still_remaining_batch
logger.debug("Remaining events: %s", [ev.event_id for ev in remaining_batch])
logger.debug(
"Remaining events after processing gap matches: %s",
[ev.event_id for ev in remaining_batch],
)
current_max_stream_ordering = (
await store.get_room_max_stitched_ordering(room_id) or 0
)
for event in remaining_batch:
current_max_stream_ordering += 2**16
event.assign_stitched_ordering(current_max_stream_ordering)
-2
View File
@@ -682,8 +682,6 @@ class StateStorageController:
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
# FIXME(faster_joins): what do we do here?
# https://github.com/matrix-org/synapse/issues/13008
+1 -7
View File
@@ -61,7 +61,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool
from synapse.metrics import SERVER_NAME_LABEL, register_threadpool
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
@@ -611,12 +611,6 @@ class DatabasePool:
)
self.updates = BackgroundUpdater(hs, self)
LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self.updates.get_status()},
)
self._previous_txn_total_time = 0.0
self._current_txn_total_time = 0.0
+17
View File
@@ -22,6 +22,7 @@
import logging
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_conn
from synapse.storage.databases.main.events import PersistEventsStore
@@ -40,6 +41,13 @@ logger = logging.getLogger(__name__)
DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True)
background_update_status = LaterGauge(
name="synapse_background_update_status",
desc="Background update status",
labelnames=["database_name", SERVER_NAME_LABEL],
)
class Databases(Generic[DataStoreT]):
"""The various databases.
@@ -143,6 +151,15 @@ class Databases(Generic[DataStoreT]):
db_conn.close()
# Track the background update status for each database
background_update_status.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {
(database.name(), server_name): database.updates.get_status()
for database in self.databases
},
)
# Sanity check that we have actually configured all the required stores.
if not main:
raise Exception("No 'main' database configured")
@@ -182,21 +182,6 @@ class DelayedEventsStore(SQLBaseStore):
"restart_delayed_event", restart_delayed_event_txn
)
async def get_count_of_delayed_events(self) -> int:
"""Returns the number of pending delayed events in the DB."""
def _get_count_of_delayed_events(txn: LoggingTransaction) -> int:
sql = "SELECT count(*) FROM delayed_events"
txn.execute(sql)
resp = txn.fetchone()
return resp[0] if resp is not None else 0
return await self.db_pool.runInteraction(
"get_count_of_delayed_events",
_get_count_of_delayed_events,
)
async def get_all_delayed_events_for_user(
self,
user_localpart: str,
+156 -31
View File
@@ -2596,6 +2596,10 @@ class PersistEventsStore:
# scenario. XXX: does this cause bugs? It will mean we won't send such
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.
#
# On the other hand, we *will* assign a stitched ordering at this point.
# Outliers are not assigned stitched orderings when they are first
# persisted as outliers.
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
@@ -2624,12 +2628,12 @@ class PersistEventsStore:
},
)
sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
txn.execute(sql, (event.event_id,))
sql = "UPDATE events SET outlier = FALSE, stitched_ordering = ? WHERE event_id = ?"
txn.execute(sql, (event.stitched_ordering, event.event_id,))
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_backward_extremeties(txn, [event])
self._update_backward_extremeties(txn, [(event, context)])
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
@@ -2687,6 +2691,7 @@ class PersistEventsStore:
"contains_url",
"state_key",
"rejection_reason",
"stitched_ordering",
),
values=[
(
@@ -2705,6 +2710,7 @@ class PersistEventsStore:
"url" in event.content and isinstance(event.content["url"], str),
event.get_state_key(),
context.rejected,
event.stitched_ordering,
)
for event, context in events_and_contexts
],
@@ -2811,7 +2817,8 @@ class PersistEventsStore:
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
txn, events=[event for event, _ in events_and_contexts]
txn,
events_and_contexts,
)
for event, _ in events_and_contexts:
@@ -3517,7 +3524,9 @@ class PersistEventsStore:
)
def _handle_mult_prev_events(
self, txn: LoggingTransaction, events: List[EventBase]
self,
txn: LoggingTransaction,
events_and_contexts: List[EventPersistencePair],
) -> None:
"""
For the given event, update the event edges table and forward and
@@ -3528,14 +3537,18 @@ class PersistEventsStore:
table="event_edges",
keys=("event_id", "prev_event_id"),
values=[
(ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
(ev.event_id, e_id)
for (ev, _) in events_and_contexts
for e_id in ev.prev_event_ids()
],
)
self._update_backward_extremeties(txn, events)
self._update_backward_extremeties(txn, events_and_contexts)
def _update_backward_extremeties(
self, txn: LoggingTransaction, events: List[EventBase]
self,
txn: LoggingTransaction,
events_and_contexts: List[EventPersistencePair],
) -> None:
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.
@@ -3546,45 +3559,73 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
room_id = events[0].room_id
room_id = events_and_contexts[0][0].room_id
potential_backwards_extremities = {
e_id
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
}
# Map from missing event ID, to the lowest stitched order of the events that reference it.
potential_backwards_extremities: Dict[str, Optional[int]] = {}
for ev, ctx in events_and_contexts:
if ev.internal_metadata.is_outlier():
continue
for prev_event in ev.prev_event_ids():
lowest_referring_ordering = potential_backwards_extremities.get(
"prev_event"
)
persisted_event_stitched_ordering = ev.stitched_ordering
# If any of the events we persisted did not get assigned a stitched order,
# we cannot yet assign a stitched order to the backwards extremity either.
if persisted_event_stitched_ordering is None:
potential_backwards_extremities[prev_event] = None
continue
if lowest_referring_ordering is None:
lowest_referring_ordering = persisted_event_stitched_ordering
else:
lowest_referring_ordering = min(
lowest_referring_ordering, persisted_event_stitched_ordering
)
potential_backwards_extremities[prev_event] = lowest_referring_ordering
if not potential_backwards_extremities:
return
existing_events_outliers = self.db_pool.simple_select_many_txn(
# Filter potential_backwards_extremities to remove events that are in the
# table.
existing_events = self.db_pool.simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=potential_backwards_extremities,
iterable=potential_backwards_extremities.keys(),
keyvalues={"outlier": False},
retcols=("event_id",),
)
for (ev,) in existing_events:
del potential_backwards_extremities[ev]
potential_backwards_extremities.difference_update(
e for (e,) in existing_events_outliers
)
if potential_backwards_extremities:
self.db_pool.simple_upsert_many_txn(
txn,
table="event_backward_extremities",
key_names=("room_id", "event_id"),
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
value_names=(),
value_values=(),
for (
backward_extremity,
lowest_referring_ordering,
) in potential_backwards_extremities.items():
before_gap_event_id = self._find_before_gap_event_id(
txn, room_id, backward_extremity, lowest_referring_ordering
)
self.db_pool.simple_upsert_txn(
txn,
table="event_backward_extremities",
keyvalues={
"room_id": room_id,
"event_id": backward_extremity,
},
values={"before_gap_event_id": before_gap_event_id},
)
if potential_backwards_extremities:
# Record the stream orderings where we have new gaps.
gap_events = [
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
for ev in events
for (ev, _) in events_and_contexts
if any(
e_id in potential_backwards_extremities
for e_id in ev.prev_event_ids()
@@ -3605,7 +3646,7 @@ class PersistEventsStore:
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
for (ev, _) in events_and_contexts
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
@@ -3630,6 +3671,90 @@ class PersistEventsStore:
backward_extremity_tuples_to_remove,
)
@staticmethod
def _find_before_gap_event_id(
txn: LoggingTransaction,
room_id: str,
backward_extremity_event_id: str,
lowest_referring_ordering: Optional[int],
) -> Optional[str]:
"""
Figure out where in the stitched order a gap (or backwards extremity) belongs.
The result is in terms of the event that precedes the gap in the ordering.
"None" means we were unable to find a preceding event, which should only
happen if the create event was not assigned a stitched ordering.
We check if the backwards extremity already exists in the database, at an
earlier ordering than that implied by `lowest_referring_ordering`, and if
so return that location. Otherwise, we return the event before
`lowest_referring_ordering`.
Args:
txn
room_id: ID of the room that the gap is in
backward_extremity_event_id: Event ID of the backwards extremity (i.e.
an event that is not in our database).
lowest_referring_ordering: The lowest stitched ordering of all the events
that we have just inserted, that refer to this backwards extremity.
"""
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
if lowest_referring_ordering is not None:
# Given the lowest stitched ordering of all the events that we have just
# inserted, find the previous event (by stitched ordering); the gap
# will likely come just afterwards.
#
# Note: we include "AND event_id <> backward_extremity_event_id" because
# if this backward extremity is actually an outlier, then that event
# does exist in events, but we don't want to find it.
txn.execute(
"""
SELECT event_id, stitched_ordering FROM events
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
ORDER BY stitched_ordering DESC LIMIT 1
""",
[room_id, lowest_referring_ordering, backward_extremity_event_id],
)
row = txn.fetchone()
if row is not None:
(new_before_gap_event_id, new_previous_stitched_ordering) = row
# If this is an existing backwards extremity, see where it currently
# exists in the order.
txn.execute(
"""
SELECT events.event_id, events.stitched_ordering FROM
event_backward_extremities LEFT JOIN events ON
events.event_id = event_backward_extremities.before_gap_event_id
WHERE event_backward_extremities.event_id = ?
""",
[backward_extremity_event_id],
)
row = txn.fetchone()
if row is None:
# Not an existing backwards extremity: use our new before_gap_event_id
return new_before_gap_event_id
(existing_before_gap_id, existing_previous_stitched_ordering) = row
# If the existing backwards extremity has not yet been assigned a
# stream ordering, use our new before_gap_event_id.
if existing_previous_stitched_ordering is None:
return new_before_gap_event_id
# This is an existing backwards extremity with an assigned stitched ordering.
# Leave it as-is unless we have successfully calculated a new stitched ordering
# which is lower than the existing.
if (
new_previous_stitched_ordering is not None
and new_previous_stitched_ordering < existing_previous_stitched_ordering
):
return new_before_gap_event_id
else:
return existing_previous_stitched_ordering
@attr.s(slots=True, auto_attribs=True)
class _LinkMap:
+18 -33
View File
@@ -2135,39 +2135,6 @@ class EventsWorkerStore(SQLBaseStore):
return rows, to_token, True
async def get_senders_for_event_ids(
self, event_ids: Collection[str]
) -> Dict[str, Optional[str]]:
"""
Given a sequence of event IDs, return the sender associated with each.
Args:
event_ids: A collection of event IDs as strings.
Returns:
A dict of event ID -> sender of the event.
If a given event ID does not exist in the `events` table, then no entry
for that event ID will be returned.
"""
def _get_senders_for_event_ids(
txn: LoggingTransaction,
) -> Dict[str, Optional[str]]:
rows = self.db_pool.simple_select_many_txn(
txn=txn,
table="events",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=["event_id", "sender"],
)
return dict(rows)
return await self.db_pool.runInteraction(
"get_senders_for_event_ids", _get_senders_for_event_ids
)
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str, room_id: str) -> Tuple[int, int]:
res = await self.db_pool.simple_select_one(
@@ -2788,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore):
sender=row[0],
received_ts=row[1],
)
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
"""Get the maximum stitched order for any event currently in the room.
If no events in this room have an assigned stitched order, returns None.
"""
def get_room_max_stitched_ordering_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
txn.execute(sql, [room_id])
ret = [r[0] for r in txn]
return ret[0]
return await self.db_pool.runInteraction(
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
)
+10 -5
View File
@@ -84,6 +84,13 @@ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000
federation_known_servers_gauge = LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventIdMembership:
"""Returned by `get_membership_from_event_ids`"""
@@ -116,11 +123,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
1,
self._count_known_servers,
)
LaterGauge(
name="synapse_federation_known_servers",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): self._known_servers_count},
federation_known_servers_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): self._known_servers_count},
)
@wrap_as_background_process("_count_known_servers")
@@ -94,8 +94,6 @@ class StateDeltasStore(SQLBaseStore):
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.
A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)
@@ -0,0 +1,40 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
ALTER TABLE events ADD COLUMN stitched_ordering BIGINT;
CREATE UNIQUE INDEX events_stitched_order ON events(room_id, stitched_ordering); -- TODO make this concurrent
--CREATE TABLE stitched_ordering_gaps (
-- room_id TEXT NOT NULL,
-- following_event_id TEXT NOT NULL,
-- missing_event_id TEXT NOT NULL,
-- UNIQUE (room_id, following_event_id, missing_event_id)
--);
--CREATE INDEX stitched_ordering_gaps_missing_events ON stitched_ordering_gaps(room_id, missing_event_id);
-- Gaps in the stitched ordering are equivalent to a group of backward extremities that appear at
-- the same point in the stitched ordering.
--
-- Rather than explicitly tracking where in the stitched ordering a given gap appears, we record the
-- event id of the event that comes *before* the gap in the stitched ordering. Doing so means that:
--
-- 1. There is only one table that has a `stitched_ordering` column, making it easier to figure out
-- how to insert a batch of events between existing events (and making the UNIQUE constraint effective).
--
-- 2. We don't need to allocate space in the stitched ordering for gaps; in particular we can assign an order
-- to gaps *after* we have persisted the events. (We could probably work around this by double-spacing inserted
-- events? but still, it's a nice property)
--
-- Note that this assumes that we never need to insert an event *before* a gap (or if we did,
-- we'd have to update this table).
ALTER TABLE event_backward_extremities ADD COLUMN before_gap_event_id TEXT REFERENCES events (event_id);
+10 -4
View File
@@ -131,22 +131,28 @@ def _get_counts_from_rate_limiter_instance(
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
sleep_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_sleep_affected_hosts",
desc="Number of hosts that had requests put to sleep",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
sleep_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
),
)
LaterGauge(
reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
caller=lambda: _get_counts_from_rate_limiter_instance(
)
reject_affected_hosts_gauge.register_hook(
homeserver_instance_id=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
-21
View File
@@ -1,21 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import enum
class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup and subsequent type narrowing.
UNSET_SENTINEL = object()
+10 -5
View File
@@ -44,6 +44,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
running_tasks_gauge = LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
)
class TaskScheduler:
"""
This is a simple task scheduler designed for resumable tasks. Normally,
@@ -130,11 +137,9 @@ class TaskScheduler:
TaskScheduler.SCHEDULE_INTERVAL_MS,
)
LaterGauge(
name="synapse_scheduler_running_tasks",
desc="The number of concurrent running tasks handled by the TaskScheduler",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._running_tasks)},
running_tasks_gauge.register_hook(
homeserver_instance_id=hs.get_instance_id(),
hook=lambda: {(self.server_name,): len(self._running_tasks)},
)
def register_action(
+23
View File
@@ -410,6 +410,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
device_id = "xyz"
fallback_key = {"alg1:k1": "fallback_key1"}
fallback_key2 = {"alg1:k2": "fallback_key2"}
fallback_key3 = {"alg1:k2": "fallback_key3"}
otk = {"alg1:k2": "key2"}
# we shouldn't have any unused fallback keys yet
@@ -530,6 +531,28 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key2}}},
)
# using the unstable prefix should also set the fallback key
self.get_success(
self.handler.upload_keys_for_user(
local_user,
device_id,
{"org.matrix.msc2732.fallback_keys": fallback_key3},
)
)
claim_res = self.get_success(
self.handler.claim_one_time_keys(
{local_user: {device_id: {"alg1": 1}}},
self.requester,
timeout=None,
always_include_fallback_keys=False,
)
)
self.assertEqual(
claim_res,
{"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key3}}},
)
def test_fallback_key_bulk(self) -> None:
"""Like test_fallback_key, but claims multiple keys in one handler call."""
alice = f"@alice:{self.hs.hostname}"
+98 -2
View File
@@ -18,11 +18,18 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Dict, Protocol, Tuple
from typing import Dict, NoReturn, Protocol, Tuple
from prometheus_client.core import Sample
from synapse.metrics import REGISTRY, InFlightGauge, generate_latest
from synapse.metrics import (
REGISTRY,
SERVER_NAME_LABEL,
InFlightGauge,
LaterGauge,
all_later_gauges_to_clean_up_on_shutdown,
generate_latest,
)
from synapse.util.caches.deferred_cache import DeferredCache
from tests import unittest
@@ -285,6 +292,95 @@ class CacheMetricsTests(unittest.HomeserverTestCase):
self.assertEqual(hs2_cache_max_size_metric_value, "777.0")
class LaterGaugeTests(unittest.HomeserverTestCase):
def setUp(self) -> None:
super().setUp()
self.later_gauge = LaterGauge(
name="foo",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
def tearDown(self) -> None:
super().tearDown()
REGISTRY.unregister(self.later_gauge)
all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None)
def test_later_gauge_multiple_servers(self) -> None:
"""
Test that LaterGauge metrics are reported correctly across multiple servers. We
will have an metrics entry for each homeserver that is labeled with the
`server_name` label.
"""
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=lambda: {("hs1",): 1}
)
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Find the metrics from both homeservers
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNotNone(
hs1_metric_value,
f"Missing metric {hs1_metric} in metrics {metrics_map}",
)
self.assertEqual(hs1_metric_value, "1.0")
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def test_later_gauge_hook_exception(self) -> None:
"""
Test that LaterGauge metrics are collected across multiple servers even if one
hooks is throwing an exception.
"""
def raise_exception() -> NoReturn:
raise Exception("fake error generating data")
# Make the hook for hs1 throw an exception
self.later_gauge.register_hook(
homeserver_instance_id="123", hook=raise_exception
)
# Metrics from hs2 still work fine
self.later_gauge.register_hook(
homeserver_instance_id="456", hook=lambda: {("hs2",): 2}
)
metrics_map = get_latest_metrics()
# Since we encountered an exception while trying to collect metrics from hs1, we
# don't expect to see it here.
hs1_metric = 'foo{server_name="hs1"}'
hs1_metric_value = metrics_map.get(hs1_metric)
self.assertIsNone(
hs1_metric_value,
(
"Since we encountered an exception while trying to collect metrics from hs1"
f"we don't expect to see it the metrics_map {metrics_map}"
),
)
# We should still see metrics from hs2 though
hs2_metric = 'foo{server_name="hs2"}'
hs2_metric_value = metrics_map.get(hs2_metric)
self.assertIsNotNone(
hs2_metric_value,
f"Missing metric {hs2_metric} in cache metrics {metrics_map}",
)
self.assertEqual(hs2_metric_value, "2.0")
def get_latest_metrics() -> Dict[str, str]:
"""
Collect the latest metrics from the registry and parse them into an easy to use map.
+1 -2
View File
@@ -32,7 +32,6 @@ from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocati
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import (
ClientReplicationStreamProtocol,
ServerReplicationStreamProtocol,
@@ -97,7 +96,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.test_handler = self._build_replication_data_handler()
self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined]
repl_handler = ReplicationCommandHandler(self.worker_hs)
repl_handler = self.worker_hs.get_replication_command_handler()
self.client = ClientReplicationStreamProtocol(
self.worker_hs,
"client",
-141
View File
@@ -40,147 +40,6 @@ from tests.unittest import override_config
from tests.utils import HAS_AUTHLIB
class KeyUploadTestCase(unittest.HomeserverTestCase):
servlets = [
keys.register_servlets,
admin.register_servlets_for_client_rest_resource,
login.register_servlets,
]
def test_upload_keys_fails_on_invalid_structure(self) -> None:
"""Check that we validate the structure of keys upon upload.
Regression test for https://github.com/element-hq/synapse/pull/17097
"""
self.register_user("alice", "wonderland")
alice_token = self.login("alice", "wonderland")
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Error: device_keys must be a dict
"device_keys": ["some", "stuff", "weewoo"]
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Error: properties of fallback_keys must be in the form `<algorithm>:<device_id>`
"fallback_keys": {"invalid_key": "signature_base64"}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
# Same as above, but for one_time_keys
"one_time_keys": {"invalid_key": "signature_base64"}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
def test_upload_keys_fails_on_invalid_user_id_or_device_id(self) -> None:
"""
Validate that the requesting user is uploading their own keys and nobody
else's.
"""
device_id = "DEVICE_ID"
alice_user_id = self.register_user("alice", "wonderland")
alice_token = self.login("alice", "wonderland", device_id=device_id)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
"device_keys": {
# Included `user_id` does not match requesting user.
"user_id": "@unknown_user:test",
"device_id": device_id,
"algorithms": ["m.olm.curve25519-aes-sha2"],
"keys": {
f"ed25519:{device_id}": "publickey",
},
"signatures": {},
}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
"device_keys": {
"user_id": alice_user_id,
# Included `device_id` does not match requesting user's.
"device_id": "UNKNOWN_DEVICE_ID",
"algorithms": ["m.olm.curve25519-aes-sha2"],
"keys": {
f"ed25519:{device_id}": "publickey",
},
"signatures": {},
}
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.BAD_REQUEST, channel.result)
self.assertEqual(
channel.json_body["errcode"],
Codes.BAD_JSON,
channel.result,
)
def test_upload_keys_succeeds_when_fields_are_explicitly_set_to_null(self) -> None:
"""
This is a regression test for https://github.com/element-hq/synapse/pull/19023.
"""
device_id = "DEVICE_ID"
self.register_user("alice", "wonderland")
alice_token = self.login("alice", "wonderland", device_id=device_id)
channel = self.make_request(
"POST",
"/_matrix/client/v3/keys/upload",
{
"device_keys": None,
"one_time_keys": None,
"fallback_keys": None,
},
alice_token,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
class KeyQueryTestCase(unittest.HomeserverTestCase):
servlets = [
keys.register_servlets,
+3
View File
@@ -1145,6 +1145,9 @@ def setup_test_homeserver(
reactor=reactor,
)
# Register the cleanup hook
cleanup_func(hs.cleanup)
# Install @cache_in_self attributes
for key, val in kwargs.items():
setattr(hs, "_" + key, val)
@@ -0,0 +1,91 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from typing import List
import attr
from tests.unittest import TestCase
from synapse.storage.controllers.persist_events import find_predecessors
class FindPredecessorsTestCase(TestCase):
def test_predecessors_finds_nothing_if_event_is_not_in_batch(self) -> None:
batch = [
FakeEvent(event_id="B", prev_event_ids=["C"]),
]
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, set())
def test_predecessors_finds_only_event_if_it_has_no_predecessors(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=[]),
FakeEvent(event_id="E2", prev_event_ids=["E3"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1"})
def test_predecessors_finds_all_ancestors(self) -> None:
batch = [
FakeEvent(event_id="A", prev_event_ids=["B", "C"]),
FakeEvent(event_id="B", prev_event_ids=["D"]),
FakeEvent(event_id="C", prev_event_ids=["D"]),
FakeEvent(event_id="D", prev_event_ids=["E"]),
FakeEvent(event_id="E", prev_event_ids=[]),
FakeEvent(event_id="F", prev_event_ids=["G", "H"]),
FakeEvent(event_id="G", prev_event_ids=[]),
]
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
def test_predecessors_ignores_cycles(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
FakeEvent(event_id="E2", prev_event_ids=["E1"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1", "E2"})
def test_predecessors_ignores_self_reference_cycles(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
FakeEvent(event_id="E2", prev_event_ids=["E2"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1", "E2"})
def test_predecessors_finds_ancestors_of_multiple_starting_events(self) -> None:
batch = [
FakeEvent(event_id="A", prev_event_ids=["B"]),
FakeEvent(event_id="B", prev_event_ids=[]),
FakeEvent(event_id="C", prev_event_ids=["D"]),
FakeEvent(event_id="D", prev_event_ids=["E"]),
FakeEvent(event_id="E", prev_event_ids=[]),
FakeEvent(event_id="F", prev_event_ids=["G"]),
FakeEvent(event_id="G", prev_event_ids=[]),
]
predecessors = find_predecessors({"A", "C"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
@attr.s(auto_attribs=True)
class FakeEvent:
event_id: str
_prev_event_ids: List[str]
def prev_event_ids(self) -> List[str]:
return self._prev_event_ids
+46 -72
View File
@@ -20,7 +20,7 @@
#
import logging
from typing import Dict, List, Optional
from typing import List, Optional
from twisted.internet.testing import MemoryReactor
@@ -31,6 +31,7 @@ from synapse.federation.federation_base import event_from_pdu_json
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.controllers.persist_events import assign_stitched_orders
from synapse.types import StateMap
from synapse.util import Clock
@@ -39,77 +40,6 @@ from tests.unittest import HomeserverTestCase
logger = logging.getLogger(__name__)
class EventsTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self._store = self.hs.get_datastores().main
def test_get_senders_for_event_ids(self) -> None:
"""Tests the `get_senders_for_event_ids` storage function."""
users_and_tokens: Dict[str, str] = {}
for localpart_suffix in range(10):
localpart = f"user_{localpart_suffix}"
user_id = self.register_user(localpart, "rabbit")
token = self.login(localpart, "rabbit")
users_and_tokens[user_id] = token
room_creator_user_id = self.register_user("room_creator", "rabbit")
room_creator_token = self.login("room_creator", "rabbit")
users_and_tokens[room_creator_user_id] = room_creator_token
# Create a room and invite some users.
room_id = self.helper.create_room_as(
room_creator_user_id, tok=room_creator_token
)
event_ids_to_senders: Dict[str, str] = {}
for user_id, token in users_and_tokens.items():
if user_id == room_creator_user_id:
continue
self.helper.invite(
room=room_id,
targ=user_id,
tok=room_creator_token,
)
# Have the user accept the invite and join the room.
self.helper.join(
room=room_id,
user=user_id,
tok=token,
)
# Have the user send an event.
response = self.helper.send_event(
room_id=room_id,
type="m.room.message",
content={
"msgtype": "m.text",
"body": f"hello, I'm {user_id}!",
},
tok=token,
)
# Record the event ID and sender.
event_id = response["event_id"]
event_ids_to_senders[event_id] = user_id
# Check that `get_senders_for_event_ids` returns the correct data.
response = self.get_success(
self._store.get_senders_for_event_ids(list(event_ids_to_senders.keys()))
)
self.assert_dict(event_ids_to_senders, response)
class ExtremPruneTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
@@ -590,3 +520,47 @@ class InvalideUsersInRoomCacheTestCase(HomeserverTestCase):
users = self.get_success(self.store.get_users_in_room(room_id))
self.assertEqual(users, [])
class AssignStitchedOrderingTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.state = self.hs.get_state_handler()
# persistence = self.hs.get_storage_controllers().persistence
# assert persistence is not None
# self._persistence = persistence
self.store = self.hs.get_datastores().main
def test_insert_events(self) -> None:
# Create a room
self.register_user("user", "pass")
token = self.login("user", "pass")
room_id = self.helper.create_room_as(
"user", room_version=RoomVersions.V12.identifier, tok=token
)
# Build a test event
test_event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "blah"},
"room_id": room_id,
"sender": "@user:other",
"depth": 5,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V12,
)
self.get_success(assign_stitched_orders(room_id, [test_event], self.store))
self.assertEqual(test_event.stitched_ordering, 6 * 2**16)