Merge remote-tracking branch 'origin/release-v1.129' into matrix-org-hotfixes
This commit is contained in:
37
CHANGES.md
37
CHANGES.md
@@ -1,10 +1,42 @@
|
||||
# Synapse 1.129.0rc1 (2025-04-15)
|
||||
|
||||
### Features
|
||||
|
||||
- Add `passthrough_authorization_parameters` in OIDC configuration to allow passing parameters to the authorization grant URL. ([\#18232](https://github.com/element-hq/synapse/issues/18232))
|
||||
- Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics. ([\#18260](https://github.com/element-hq/synapse/issues/18260))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix `force_tracing_for_users` config when using delegated auth. ([\#18334](https://github.com/element-hq/synapse/issues/18334))
|
||||
- Fix the token introspection cache logging access tokens when MAS integration is in use. ([\#18335](https://github.com/element-hq/synapse/issues/18335))
|
||||
- Stop caching introspection failures when delegating auth to MAS. ([\#18339](https://github.com/element-hq/synapse/issues/18339))
|
||||
|
||||
### Updates to the Docker image
|
||||
|
||||
- Optimize the build of the complement-synapse image. ([\#18294](https://github.com/element-hq/synapse/issues/18294))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Disable statement timeout during room purge. ([\#18133](https://github.com/element-hq/synapse/issues/18133))
|
||||
- Add cache to storage functions used to auth requests when using delegated auth. ([\#18337](https://github.com/element-hq/synapse/issues/18337))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.128.0 (2025-04-08)
|
||||
|
||||
No significant changes since 1.128.0rc1.
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.128.0rc1 (2025-04-01)
|
||||
|
||||
### Features
|
||||
|
||||
- Add an access token introspection cache to make Matrix Authentication Service integration ([MSC3861](https://github.com/matrix-org/matrix-doc/pull/3861)) more efficient. ([\#18231](https://github.com/element-hq/synapse/issues/18231))
|
||||
- Add background job to clear unreferenced state groups. ([\#18254](https://github.com/element-hq/synapse/issues/18254))
|
||||
- Hashes of media files are now tracked by Synapse. Media quarantines will now apply to all files with the same hash. ([\#18277](https://github.com/element-hq/synapse/issues/18277), [\#18302](https://github.com/element-hq/synapse/issues/18302))
|
||||
- Hashes of media files are now tracked by Synapse. Media quarantines will now apply to all files with the same hash. ([\#18277](https://github.com/element-hq/synapse/issues/18277), [\#18302](https://github.com/element-hq/synapse/issues/18302), [\#18296](https://github.com/element-hq/synapse/issues/18296))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
@@ -14,7 +46,7 @@
|
||||
|
||||
- Specify the architecture of installed packages via an APT config option, which is more reliable than appending package names with `:{arch}`. ([\#18271](https://github.com/element-hq/synapse/issues/18271))
|
||||
- Always specify base image debian versions with a build argument. ([\#18272](https://github.com/element-hq/synapse/issues/18272))
|
||||
- Allow passing arguments to `start_for_complement.sh (to be sent to `configure_workers_and_start.py`). ([\#18273](https://github.com/element-hq/synapse/issues/18273))
|
||||
- Allow passing arguments to `start_for_complement.sh` (to be sent to `configure_workers_and_start.py`). ([\#18273](https://github.com/element-hq/synapse/issues/18273))
|
||||
- Make some improvements to the `prefix-log` script in the workers image. ([\#18274](https://github.com/element-hq/synapse/issues/18274))
|
||||
- Use `uv pip` to install `supervisor` in the worker image. ([\#18275](https://github.com/element-hq/synapse/issues/18275))
|
||||
- Avoid needing to download & use `rsync` in a build layer. ([\#18287](https://github.com/element-hq/synapse/issues/18287))
|
||||
@@ -24,7 +56,6 @@
|
||||
- Fix how to obtain access token and change naming from riot to element ([\#18225](https://github.com/element-hq/synapse/issues/18225))
|
||||
- Correct a small typo in the SSO mapping providers documentation. ([\#18276](https://github.com/element-hq/synapse/issues/18276))
|
||||
- Add docs for how to clear out the Poetry wheel cache. ([\#18283](https://github.com/element-hq/synapse/issues/18283))
|
||||
- Hashes of media files are now tracked by Synapse. Media quarantines will now apply to all files with the same hash. ([\#18296](https://github.com/element-hq/synapse/issues/18296))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
|
||||
12
debian/changelog
vendored
12
debian/changelog
vendored
@@ -1,3 +1,15 @@
|
||||
matrix-synapse-py3 (1.129.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.129.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Apr 2025 10:47:43 -0600
|
||||
|
||||
matrix-synapse-py3 (1.128.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.128.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Apr 2025 14:09:54 +0100
|
||||
|
||||
matrix-synapse-py3 (1.128.0~rc1) stable; urgency=medium
|
||||
|
||||
* Update Poetry to 2.1.1.
|
||||
|
||||
@@ -25,7 +25,7 @@ FROM $FROM
|
||||
RUN adduser --system --uid 999 postgres --home /var/lib/postgresql
|
||||
COPY --from=postgres_base /usr/lib/postgresql /usr/lib/postgresql
|
||||
COPY --from=postgres_base /usr/share/postgresql /usr/share/postgresql
|
||||
RUN mkdir /var/run/postgresql && chown postgres /var/run/postgresql
|
||||
COPY --from=postgres_base --chown=postgres /var/run/postgresql /var/run/postgresql
|
||||
ENV PATH="${PATH}:/usr/lib/postgresql/13/bin"
|
||||
ENV PGDATA=/var/lib/postgresql/data
|
||||
|
||||
|
||||
@@ -30,10 +30,13 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. |
|
||||
| `total_users` | int | The number of registered users on the homeserver. |
|
||||
| `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. |
|
||||
| `daily_user_type_native` | int | The number of native users created in the last 24 hours. |
|
||||
| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. |
|
||||
| `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. |
|
||||
| `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. |
|
||||
| `total_room_count` | int | The total number of rooms present on the homeserver. |
|
||||
| `total_event_count` | int | The total number of events present on the homeserver. |
|
||||
| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. |
|
||||
| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. |
|
||||
| `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. |
|
||||
| `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. |
|
||||
| `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. |
|
||||
|
||||
@@ -3672,6 +3672,9 @@ Options for each entry include:
|
||||
* `additional_authorization_parameters`: String to string dictionary that will be passed as
|
||||
additional parameters to the authorization grant URL.
|
||||
|
||||
* `passthrough_authorization_parameters`: List of parameters that will be passed through from the redirect endpoint
|
||||
to the authorization grant URL.
|
||||
|
||||
* `allow_existing_users`: set to true to allow a user logging in via OIDC to
|
||||
match a pre-existing account instead of failing. This could be used if
|
||||
switching from password logins to OIDC. Defaults to false.
|
||||
@@ -3798,6 +3801,7 @@ oidc_providers:
|
||||
jwks_uri: "https://accounts.example.com/.well-known/jwks.json"
|
||||
additional_authorization_parameters:
|
||||
acr_values: 2fa
|
||||
passthrough_authorization_parameters: ["login_hint"]
|
||||
skip_verification: true
|
||||
enable_registration: true
|
||||
user_mapping_provider:
|
||||
|
||||
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.128.0rc1"
|
||||
version = "1.129.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "AGPL-3.0-or-later"
|
||||
|
||||
@@ -45,10 +45,11 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
|
||||
from synapse.types import Requester, UserID, create_requester
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.rest.admin.experimental_features import ExperimentalFeature
|
||||
@@ -177,6 +178,7 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
self._http_client = hs.get_proxied_http_client()
|
||||
self._hostname = hs.hostname
|
||||
self._admin_token: Callable[[], Optional[str]] = self._config.admin_token
|
||||
self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
|
||||
|
||||
# # Token Introspection Cache
|
||||
# This remembers what users/devices are represented by which access tokens,
|
||||
@@ -201,6 +203,8 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
self._clock,
|
||||
"token_introspection",
|
||||
timeout_ms=120_000,
|
||||
# don't log because the keys are access tokens
|
||||
enable_logging=False,
|
||||
)
|
||||
|
||||
self._issuer_metadata = RetryOnExceptionCachedCall[OpenIDProviderMetadata](
|
||||
@@ -275,7 +279,9 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
metadata = await self._issuer_metadata.get()
|
||||
return metadata.get("introspection_endpoint")
|
||||
|
||||
async def _introspect_token(self, token: str) -> IntrospectionResult:
|
||||
async def _introspect_token(
|
||||
self, token: str, cache_context: ResponseCacheContext[str]
|
||||
) -> IntrospectionResult:
|
||||
"""
|
||||
Send a token to the introspection endpoint and returns the introspection response
|
||||
|
||||
@@ -291,6 +297,8 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
Returns:
|
||||
The introspection response
|
||||
"""
|
||||
# By default, we shouldn't cache the result unless we know it's valid
|
||||
cache_context.should_cache = False
|
||||
introspection_endpoint = await self._introspection_endpoint()
|
||||
raw_headers: Dict[str, str] = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
@@ -348,6 +356,8 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
"The introspection endpoint returned an invalid JSON response."
|
||||
)
|
||||
|
||||
# We had a valid response, so we can cache it
|
||||
cache_context.should_cache = True
|
||||
return IntrospectionResult(
|
||||
IntrospectionToken(**resp), retrieved_at_ms=self._clock.time_msec()
|
||||
)
|
||||
@@ -361,6 +371,55 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
allow_guest: bool = False,
|
||||
allow_expired: bool = False,
|
||||
allow_locked: bool = False,
|
||||
) -> Requester:
|
||||
"""Get a registered user's ID.
|
||||
|
||||
Args:
|
||||
request: An HTTP request with an access_token query parameter.
|
||||
allow_guest: If False, will raise an AuthError if the user making the
|
||||
request is a guest.
|
||||
allow_expired: If True, allow the request through even if the account
|
||||
is expired, or session token lifetime has ended. Note that
|
||||
/login will deliver access tokens regardless of expiration.
|
||||
|
||||
Returns:
|
||||
Resolves to the requester
|
||||
Raises:
|
||||
InvalidClientCredentialsError if no user by that token exists or the token
|
||||
is invalid.
|
||||
AuthError if access is denied for the user in the access token
|
||||
"""
|
||||
parent_span = active_span()
|
||||
with start_active_span("get_user_by_req"):
|
||||
requester = await self._wrapped_get_user_by_req(
|
||||
request, allow_guest, allow_expired, allow_locked
|
||||
)
|
||||
|
||||
if parent_span:
|
||||
if requester.authenticated_entity in self._force_tracing_for_users:
|
||||
# request tracing is enabled for this user, so we need to force it
|
||||
# tracing on for the parent span (which will be the servlet span).
|
||||
#
|
||||
# It's too late for the get_user_by_req span to inherit the setting,
|
||||
# so we also force it on for that.
|
||||
force_tracing()
|
||||
force_tracing(parent_span)
|
||||
parent_span.set_tag(
|
||||
"authenticated_entity", requester.authenticated_entity
|
||||
)
|
||||
parent_span.set_tag("user_id", requester.user.to_string())
|
||||
if requester.device_id is not None:
|
||||
parent_span.set_tag("device_id", requester.device_id)
|
||||
if requester.app_service is not None:
|
||||
parent_span.set_tag("appservice_id", requester.app_service.id)
|
||||
return requester
|
||||
|
||||
async def _wrapped_get_user_by_req(
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
allow_guest: bool = False,
|
||||
allow_expired: bool = False,
|
||||
allow_locked: bool = False,
|
||||
) -> Requester:
|
||||
access_token = self.get_access_token_from_request(request)
|
||||
|
||||
@@ -429,7 +488,7 @@ class MSC3861DelegatedAuth(BaseAuth):
|
||||
|
||||
try:
|
||||
introspection_result = await self._introspection_cache.wrap(
|
||||
token, self._introspect_token, token
|
||||
token, self._introspect_token, token, cache_context=True
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to introspect token")
|
||||
|
||||
@@ -34,6 +34,22 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
|
||||
MILLISECONDS_PER_SECOND = 1000
|
||||
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
|
||||
"""
|
||||
We wait 5 minutes to send the first set of stats as the server can be quite busy the
|
||||
first few minutes
|
||||
"""
|
||||
|
||||
PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS
|
||||
"""
|
||||
Phone home stats are sent every 3 hours
|
||||
"""
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
|
||||
@@ -121,6 +137,9 @@ async def phone_stats_home(
|
||||
|
||||
room_count = await store.get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
stats["total_event_count"] = await store.count_total_events()
|
||||
stats["total_message_count"] = await store.count_total_messages()
|
||||
stats["total_e2ee_event_count"] = await store.count_total_e2ee_events()
|
||||
|
||||
stats["daily_active_users"] = common_metrics.daily_active_users
|
||||
stats["monthly_active_users"] = await store.count_monthly_users()
|
||||
@@ -185,12 +204,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000
|
||||
hs.get_datastores().main.generate_user_daily_visits,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
|
||||
hs.get_datastores().main.reap_monthly_active_users,
|
||||
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
hs.get_datastores().main.reap_monthly_active_users()
|
||||
|
||||
@@ -216,12 +237,20 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
|
||||
generate_monthly_active_users()
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
clock.looping_call(
|
||||
generate_monthly_active_users,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.metrics.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
|
||||
clock.looping_call(
|
||||
phone_stats_home,
|
||||
PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs,
|
||||
stats,
|
||||
)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
@@ -229,4 +258,6 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, phone_stats_home, hs, stats)
|
||||
clock.call_later(
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats
|
||||
)
|
||||
|
||||
@@ -356,6 +356,9 @@ def _parse_oidc_config_dict(
|
||||
additional_authorization_parameters=oidc_config.get(
|
||||
"additional_authorization_parameters", {}
|
||||
),
|
||||
passthrough_authorization_parameters=oidc_config.get(
|
||||
"passthrough_authorization_parameters", []
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -501,3 +504,6 @@ class OidcProviderConfig:
|
||||
|
||||
# Additional parameters that will be passed to the authorization grant URL
|
||||
additional_authorization_parameters: Mapping[str, str]
|
||||
|
||||
# Allow query parameters to the redirect endpoint that will be passed to the authorization grant URL
|
||||
passthrough_authorization_parameters: Collection[str]
|
||||
|
||||
@@ -163,6 +163,8 @@ class DeviceWorkerHandler:
|
||||
raise errors.NotFoundError()
|
||||
|
||||
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||
|
||||
device = dict(device)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
set_tag("device", str(device))
|
||||
|
||||
@@ -467,6 +467,10 @@ class OidcProvider:
|
||||
|
||||
self._sso_handler.register_identity_provider(self)
|
||||
|
||||
self.passthrough_authorization_parameters = (
|
||||
provider.passthrough_authorization_parameters
|
||||
)
|
||||
|
||||
def _validate_metadata(self, m: OpenIDProviderMetadata) -> None:
|
||||
"""Verifies the provider metadata.
|
||||
|
||||
@@ -1005,7 +1009,6 @@ class OidcProvider:
|
||||
when everything is done (or None for UI Auth)
|
||||
ui_auth_session_id: The session ID of the ongoing UI Auth (or
|
||||
None if this is a login).
|
||||
|
||||
Returns:
|
||||
The redirect URL to the authorization endpoint.
|
||||
|
||||
@@ -1078,6 +1081,13 @@ class OidcProvider:
|
||||
)
|
||||
)
|
||||
|
||||
# add passthrough additional authorization parameters
|
||||
passthrough_authorization_parameters = self.passthrough_authorization_parameters
|
||||
for parameter in passthrough_authorization_parameters:
|
||||
parameter_value = parse_string(request, parameter)
|
||||
if parameter_value:
|
||||
additional_authorization_parameters.update({parameter: parameter_value})
|
||||
|
||||
authorization_endpoint = metadata.get("authorization_endpoint")
|
||||
return prepare_grant_uri(
|
||||
authorization_endpoint,
|
||||
|
||||
@@ -284,9 +284,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
|
||||
"count_devices_by_users", count_devices_by_users_txn, user_ids
|
||||
)
|
||||
|
||||
@cached()
|
||||
async def get_device(
|
||||
self, user_id: str, device_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
) -> Optional[Mapping[str, Any]]:
|
||||
"""Retrieve a device. Only returns devices that are not marked as
|
||||
hidden.
|
||||
|
||||
@@ -1819,6 +1820,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
},
|
||||
desc="store_device",
|
||||
)
|
||||
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
|
||||
|
||||
if not inserted:
|
||||
# if the device already exists, check if it's a real device, or
|
||||
# if the device ID is reserved by something else
|
||||
@@ -1884,6 +1887,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
values=device_ids,
|
||||
keyvalues={"user_id": user_id},
|
||||
)
|
||||
self._invalidate_cache_and_stream_bulk(
|
||||
txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
|
||||
)
|
||||
|
||||
for batch in batch_iter(device_ids, 100):
|
||||
await self.db_pool.runInteraction(
|
||||
@@ -1917,6 +1923,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
updatevalues=updates,
|
||||
desc="update_device",
|
||||
)
|
||||
await self.invalidate_cache_and_stream("get_device", (user_id, device_id))
|
||||
|
||||
async def update_remote_device_list_cache_entry(
|
||||
self, user_id: str, device_id: str, content: JsonDict, stream_id: str
|
||||
|
||||
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
@@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
|
||||
)
|
||||
|
||||
# Add a background update to add triggers which track event counts.
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
self._event_stats_populate_counts_bg_update,
|
||||
)
|
||||
|
||||
# We want this to run on the main database at startup before we start processing
|
||||
# events.
|
||||
#
|
||||
@@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
return num_rows
|
||||
|
||||
async def _event_stats_populate_counts_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `event_stats` table with initial
|
||||
values, and register DB triggers to continue updating it.
|
||||
|
||||
We first register TRIGGERs on rows being added/removed from the `events` table,
|
||||
which will keep the event counts continuously updated. We also mark the stopping
|
||||
point for the main population step so we don't double count events.
|
||||
|
||||
Then we will iterate through the `events` table in batches and update event
|
||||
counts until we reach the stopping point.
|
||||
|
||||
This data is intended to be used by the phone-home stats to keep track
|
||||
of total event and message counts. A trigger is preferred to counting
|
||||
rows in the `events` table, as said table can grow quite large.
|
||||
|
||||
It is also preferable to adding an index on the `events` table, as even
|
||||
an index can grow large. And calculating total counts would require
|
||||
querying that entire index.
|
||||
"""
|
||||
# The last event `stream_ordering` we processed (starting place of this next
|
||||
# batch).
|
||||
last_event_stream_ordering = progress.get(
|
||||
"last_event_stream_ordering", -(1 << 31)
|
||||
)
|
||||
# The event `stream_ordering` we should stop at. This is used to avoid double
|
||||
# counting events that are already accounted for because of the triggers.
|
||||
stop_event_stream_ordering: Optional[int] = progress.get(
|
||||
"stop_event_stream_ordering", None
|
||||
)
|
||||
|
||||
def _add_triggers_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Adds the triggers to the `events` table to keep the `event_stats` counts
|
||||
up-to-date.
|
||||
|
||||
Also populates the `stop_event_stream_ordering` background update progress
|
||||
value. This marks the point at which we added the triggers, so we can avoid
|
||||
double counting events that are already accounted for in the population
|
||||
step.
|
||||
|
||||
Returns:
|
||||
The latest event `stream_ordering` in the `events` table when the triggers
|
||||
were added or `None` if the `events` table is empty.
|
||||
"""
|
||||
|
||||
# Each time an event is inserted into the `events` table, update the stats.
|
||||
#
|
||||
# We're using `AFTER` triggers as we want to count successful inserts/deletes and
|
||||
# not the ones that could potentially fail.
|
||||
if isinstance(txn.database_engine, Sqlite3Engine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
|
||||
AFTER INSERT ON events
|
||||
BEGIN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count + 1
|
||||
WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count + 1
|
||||
WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
|
||||
AFTER DELETE ON events
|
||||
BEGIN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count - 1
|
||||
WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count - 1
|
||||
WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF TG_OP = 'INSERT' THEN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
|
||||
END IF;
|
||||
|
||||
-- We're not modifying the row being inserted/deleted, so we return it unchanged.
|
||||
RETURN NEW;
|
||||
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
|
||||
END IF;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
|
||||
END IF;
|
||||
|
||||
-- "The usual idiom in DELETE triggers is to return OLD."
|
||||
-- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
|
||||
RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
|
||||
'This indicates a trigger misconfiguration as this function should only'
|
||||
'run with INSERT/DELETE operations.', TG_OP;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
# We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
|
||||
# 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
|
||||
txn.execute(
|
||||
"""
|
||||
DO
|
||||
$$BEGIN
|
||||
CREATE TRIGGER event_stats_increment_counts_trigger
|
||||
AFTER INSERT OR DELETE ON events
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE event_stats_increment_counts();
|
||||
EXCEPTION
|
||||
-- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
|
||||
WHEN duplicate_object THEN
|
||||
NULL;
|
||||
END;$$;
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
|
||||
# Find the latest `stream_ordering` in the `events` table. We need to do
|
||||
# this in the same transaction as where we add the triggers so we don't miss
|
||||
# any events.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering
|
||||
FROM events
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
)
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
|
||||
# Update the progress
|
||||
if row is not None:
|
||||
(max_stream_ordering,) = row
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{"stop_event_stream_ordering": max_stream_ordering},
|
||||
)
|
||||
return max_stream_ordering
|
||||
|
||||
return None
|
||||
|
||||
# First, add the triggers to keep the `event_stats` values up-to-date.
|
||||
#
|
||||
# If we don't have a `stop_event_stream_ordering` yet, we need to add the
|
||||
# triggers to the `events` table and set the stopping point so we don't
|
||||
# double count `events` later.
|
||||
if stop_event_stream_ordering is None:
|
||||
stop_event_stream_ordering = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update_add_triggers",
|
||||
_add_triggers_txn,
|
||||
)
|
||||
|
||||
# If there is no `stop_event_stream_ordering`, then there are no events
|
||||
# in the `events` table and we can end the background update altogether.
|
||||
if stop_event_stream_ordering is None:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
return batch_size
|
||||
|
||||
def _populate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> int:
|
||||
"""
|
||||
Updates the `event_stats` table from this batch of events.
|
||||
"""
|
||||
|
||||
# Increment the counts based on the events present in this batch.
|
||||
txn.execute(
|
||||
"""
|
||||
WITH event_batch AS (
|
||||
SELECT *
|
||||
FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering ASC
|
||||
LIMIT ?
|
||||
),
|
||||
batch_stats AS (
|
||||
SELECT
|
||||
MAX(stream_ordering) AS max_stream_ordering,
|
||||
COALESCE(COUNT(*), 0) AS total_event_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
|
||||
FROM event_batch
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT null, 0, 0, 0
|
||||
WHERE NOT EXISTS (SELECT 1 FROM event_batch)
|
||||
LIMIT 1
|
||||
)
|
||||
UPDATE event_stats
|
||||
SET
|
||||
total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
|
||||
unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
|
||||
e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
|
||||
RETURNING
|
||||
(SELECT total_event_count FROM batch_stats) AS total_event_count,
|
||||
(SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
|
||||
""",
|
||||
(last_event_stream_ordering, stop_event_stream_ordering, batch_size),
|
||||
)
|
||||
|
||||
# Get the results of the update
|
||||
(total_event_count, max_stream_ordering) = cast(
|
||||
Tuple[int, Optional[int]], txn.fetchone()
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{
|
||||
"last_event_stream_ordering": max_stream_ordering,
|
||||
"stop_event_stream_ordering": stop_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
|
||||
return total_event_count
|
||||
|
||||
num_rows_processed = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update",
|
||||
_populate_txn,
|
||||
)
|
||||
|
||||
# No more rows to process, so our background update is complete.
|
||||
if not num_rows_processed:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
|
||||
def _resolve_stale_data_in_sliding_sync_tables(
|
||||
txn: LoggingTransaction,
|
||||
|
||||
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
|
||||
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
|
||||
|
||||
async def count_total_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of events present on the server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="total_event_count",
|
||||
desc="count_total_events",
|
||||
)
|
||||
|
||||
async def count_total_messages(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.message` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="unencrypted_message_count",
|
||||
desc="count_total_messages",
|
||||
)
|
||||
|
||||
async def count_total_e2ee_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.encrypted` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="e2ee_event_count",
|
||||
desc="count_total_e2ee_events",
|
||||
)
|
||||
|
||||
async def count_daily_sent_e2ee_messages(self) -> int:
|
||||
def _count_messages(txn: LoggingTransaction) -> int:
|
||||
# This is good enough as if you have silly characters in your own
|
||||
|
||||
@@ -759,6 +759,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
external_id: id on that system
|
||||
user_id: complete mxid that it is mapped to
|
||||
"""
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_by_external_id, (auth_provider, external_id)
|
||||
)
|
||||
|
||||
self.db_pool.simple_insert_txn(
|
||||
txn,
|
||||
@@ -789,6 +792,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
},
|
||||
desc="remove_user_external_id",
|
||||
)
|
||||
await self.invalidate_cache_and_stream(
|
||||
"get_user_by_external_id", (auth_provider, external_id)
|
||||
)
|
||||
|
||||
async def replace_user_external_id(
|
||||
self,
|
||||
@@ -809,29 +815,20 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
ExternalIDReuseException if the new external_id could not be mapped.
|
||||
"""
|
||||
|
||||
def _remove_user_external_ids_txn(
|
||||
def _replace_user_external_id_txn(
|
||||
txn: LoggingTransaction,
|
||||
user_id: str,
|
||||
) -> None:
|
||||
"""Remove all mappings from external user ids to a mxid
|
||||
If these mappings are not found, this method does nothing.
|
||||
|
||||
Args:
|
||||
user_id: complete mxid that it is mapped to
|
||||
"""
|
||||
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
table="user_external_ids",
|
||||
keyvalues={"user_id": user_id},
|
||||
)
|
||||
|
||||
def _replace_user_external_id_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> None:
|
||||
_remove_user_external_ids_txn(txn, user_id)
|
||||
|
||||
for auth_provider, external_id in record_external_ids:
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_by_external_id, (auth_provider, external_id)
|
||||
)
|
||||
|
||||
self._record_user_external_id_txn(
|
||||
txn,
|
||||
auth_provider,
|
||||
@@ -847,6 +844,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
except self.database_engine.module.IntegrityError:
|
||||
raise ExternalIDReuseException()
|
||||
|
||||
@cached()
|
||||
async def get_user_by_external_id(
|
||||
self, auth_provider: str, external_id: str
|
||||
) -> Optional[str]:
|
||||
|
||||
@@ -48,6 +48,7 @@ from synapse.storage.database import (
|
||||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import MutableStateMap, StateKey, StateMap
|
||||
@@ -914,6 +915,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
) -> None:
|
||||
# Delete all edges that reference a state group linked to room_id
|
||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# Disable statement timeouts for this transaction; purging rooms can
|
||||
# take a while!
|
||||
txn.execute("SET LOCAL statement_timeout = 0")
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_group_edges AS sge WHERE sge.state_group IN (
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 91 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 92 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 89
|
||||
Changes in SCHEMA_VERSION = 90
|
||||
- Add a column `participant` to `room_memberships` table
|
||||
- Add background update to delete unreferenced state groups.
|
||||
|
||||
Changes in SCHEMA_VERSION = 91
|
||||
- TODO
|
||||
|
||||
Changes in SCHEMA_VERSION = 92
|
||||
- Add `event_stats` table to store global event statistics like total counts
|
||||
"""
|
||||
|
||||
|
||||
|
||||
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
@@ -0,0 +1,33 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Create the `event_stats` table to store these statistics.
|
||||
CREATE TABLE event_stats (
|
||||
total_event_count INTEGER NOT NULL DEFAULT 0,
|
||||
unencrypted_message_count INTEGER NOT NULL DEFAULT 0,
|
||||
e2ee_event_count INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
-- Insert initial values into the table.
|
||||
INSERT INTO event_stats (
|
||||
total_event_count,
|
||||
unencrypted_message_count,
|
||||
e2ee_event_count
|
||||
) VALUES (0, 0, 0);
|
||||
|
||||
-- Add a background update to populate the `event_stats` table with the current counts
|
||||
-- from the `events` table and add triggers to keep this count up-to-date.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(9201, 'event_stats_populate_counts_bg_update', '{}');
|
||||
|
||||
@@ -52,3 +52,5 @@ class _BackgroundUpdates:
|
||||
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
|
||||
"mark_unreferenced_state_groups_for_deletion_bg_update"
|
||||
)
|
||||
|
||||
EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update"
|
||||
|
||||
@@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]):
|
||||
used rather than trying to compute a new response.
|
||||
"""
|
||||
|
||||
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
|
||||
def __init__(
|
||||
self,
|
||||
clock: Clock,
|
||||
name: str,
|
||||
timeout_ms: float = 0,
|
||||
enable_logging: bool = True,
|
||||
):
|
||||
self._result_cache: Dict[KV, ResponseCacheEntry] = {}
|
||||
|
||||
self.clock = clock
|
||||
@@ -109,6 +115,7 @@ class ResponseCache(Generic[KV]):
|
||||
|
||||
self._name = name
|
||||
self._metrics = register_cache("response_cache", name, self, resizable=False)
|
||||
self._enable_logging = enable_logging
|
||||
|
||||
def size(self) -> int:
|
||||
return len(self._result_cache)
|
||||
@@ -246,8 +253,11 @@ class ResponseCache(Generic[KV]):
|
||||
"""
|
||||
entry = self._get(key)
|
||||
if not entry:
|
||||
if self._enable_logging:
|
||||
logger.debug(
|
||||
"[%s]: no cached result for [%s], calculating new one", self._name, key
|
||||
"[%s]: no cached result for [%s], calculating new one",
|
||||
self._name,
|
||||
key,
|
||||
)
|
||||
context = ResponseCacheContext(cache_key=key)
|
||||
if cache_context:
|
||||
@@ -269,8 +279,11 @@ class ResponseCache(Generic[KV]):
|
||||
return await make_deferred_yieldable(entry.result.observe())
|
||||
|
||||
result = entry.result.observe()
|
||||
if self._enable_logging:
|
||||
if result.called:
|
||||
logger.info("[%s]: using completed cached result for [%s]", self._name, key)
|
||||
logger.info(
|
||||
"[%s]: using completed cached result for [%s]", self._name, key
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"[%s]: using incomplete cached result for [%s]", self._name, key
|
||||
|
||||
@@ -484,6 +484,32 @@ class OidcHandlerTestCase(HomeserverTestCase):
|
||||
self.assertEqual(code_verifier, "")
|
||||
self.assertEqual(redirect, "http://client/redirect")
|
||||
|
||||
@override_config(
|
||||
{
|
||||
"oidc_config": {
|
||||
**DEFAULT_CONFIG,
|
||||
"passthrough_authorization_parameters": ["additional_parameter"],
|
||||
}
|
||||
}
|
||||
)
|
||||
def test_passthrough_parameters(self) -> None:
|
||||
"""The redirect request has additional parameters, one is authorized, one is not"""
|
||||
req = Mock(spec=["cookies", "args"])
|
||||
req.cookies = []
|
||||
req.args = {}
|
||||
req.args[b"additional_parameter"] = ["a_value".encode("utf-8")]
|
||||
req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")]
|
||||
|
||||
url = urlparse(
|
||||
self.get_success(
|
||||
self.provider.handle_redirect_request(req, b"http://client/redirect")
|
||||
)
|
||||
)
|
||||
|
||||
params = parse_qs(url.query)
|
||||
self.assertEqual(params["additional_parameter"], ["a_value"])
|
||||
self.assertNotIn("not_authorized_parameters", params)
|
||||
|
||||
@override_config({"oidc_config": DEFAULT_CONFIG})
|
||||
def test_redirect_request_with_code_challenge(self) -> None:
|
||||
"""The redirect request has the right arguments & generates a valid session cookie."""
|
||||
|
||||
258
tests/metrics/test_phone_home_stats.py
Normal file
258
tests/metrics/test_phone_home_stats.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import logging
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.app.phone_stats_home import (
|
||||
PHONE_HOME_INTERVAL_SECONDS,
|
||||
start_phone_stats_home,
|
||||
)
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock
|
||||
|
||||
TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats"
|
||||
TEST_SERVER_CONTEXT = "test-server-context"
|
||||
|
||||
|
||||
class PhoneHomeStatsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(
|
||||
self, reactor: ThreadedMemoryReactorClock, clock: Clock
|
||||
) -> HomeServer:
|
||||
# Configure the homeserver to enable stats reporting.
|
||||
config = self.default_config()
|
||||
config["report_stats"] = True
|
||||
config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT
|
||||
|
||||
# Configure the server context so we can check it ends up being reported
|
||||
config["server_context"] = TEST_SERVER_CONTEXT
|
||||
|
||||
# Allow guests to be registered
|
||||
config["allow_guest_access"] = True
|
||||
|
||||
hs = self.setup_test_homeserver(config=config)
|
||||
|
||||
# Replace the proxied http client with a mock, so we can inspect outbound requests to
|
||||
# the configured stats endpoint.
|
||||
self.put_json_mock = AsyncMock(return_value={})
|
||||
hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign]
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Force stats reporting to occur
|
||||
start_phone_stats_home(hs=hs)
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _get_latest_phone_home_stats(self) -> JsonDict:
|
||||
# Wait for `phone_stats_home` to be called again + a healthy margin (50s).
|
||||
self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50)
|
||||
|
||||
# Extract the reported stats from our http client mock
|
||||
mock_calls = self.put_json_mock.call_args_list
|
||||
report_stats_calls = []
|
||||
for call in mock_calls:
|
||||
if call.args[0] == TEST_REPORT_STATS_ENDPOINT:
|
||||
report_stats_calls.append(call)
|
||||
|
||||
self.assertGreaterEqual(
|
||||
(len(report_stats_calls)),
|
||||
1,
|
||||
"Expected at-least one call to the report_stats endpoint",
|
||||
)
|
||||
|
||||
# Extract the phone home stats from the call
|
||||
phone_home_stats = report_stats_calls[0].args[1]
|
||||
|
||||
return phone_home_stats
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the phone home
|
||||
stats.
|
||||
"""
|
||||
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_phone_home_stats(self) -> None:
|
||||
"""
|
||||
Test that the phone home stats contain the stats we expect based on
|
||||
the scenario carried out in `prepare`
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Wait for the stats to be reported
|
||||
phone_home_stats = self._get_latest_phone_home_stats()
|
||||
|
||||
self.assertEqual(
|
||||
phone_home_stats["homeserver"], self.hs.config.server.server_name
|
||||
)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["memory_rss"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["cpu_average"], int))
|
||||
|
||||
self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["timestamp"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["python_version"], str))
|
||||
|
||||
# We expect only our test users to exist on the homeserver
|
||||
self.assertEqual(phone_home_stats["total_users"], 3)
|
||||
self.assertEqual(phone_home_stats["total_nonbridged_users"], 3)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_native"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_guest"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0)
|
||||
self.assertEqual(phone_home_stats["total_room_count"], 1)
|
||||
self.assertEqual(phone_home_stats["total_event_count"], 24)
|
||||
self.assertEqual(phone_home_stats["total_message_count"], 10)
|
||||
self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["monthly_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_active_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_sent_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5)
|
||||
|
||||
# Our users have not been around for >30 days, hence these are all 0.
|
||||
self.assertEqual(phone_home_stats["r30v2_users_all"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_android"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_ios"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_electron"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_web"], 0)
|
||||
self.assertEqual(
|
||||
phone_home_stats["cache_factor"], self.hs.config.caches.global_factor
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["event_cache_size"],
|
||||
self.hs.config.caches.event_cache_size,
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_engine"],
|
||||
self.hs.config.database.databases[0].config["name"],
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_server_version"],
|
||||
self.hs.get_datastores().main.database_engine.server_version,
|
||||
)
|
||||
|
||||
synapse_logger = logging.getLogger("synapse")
|
||||
log_level = synapse_logger.getEffectiveLevel()
|
||||
self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level))
|
||||
237
tests/storage/test_event_stats.py
Normal file
237
tests/storage/test_event_stats.py
Normal file
@@ -0,0 +1,237 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class EventStatsTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
Tests for the `event_stats` table
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
#
|
||||
# This also prevents background updates running during the tests and messing
|
||||
# with the results.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the event counts.
|
||||
"""
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_background_update_with_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly when there are events in the database.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to double as the background update is being run *again*
|
||||
# and will double-count the `events`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 48)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 20)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10)
|
||||
|
||||
def test_background_update_without_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly without events in the database.
|
||||
"""
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
#
|
||||
# In this case, no events have been sent, so we expect the counts to be 0.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
def test_background_update_resume_progress(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly to resume from `progress_json`.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }',
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to increase as the background update is being run
|
||||
# *again* and will double-count some of the `events` over the range specified
|
||||
# by the `progress_json`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 16)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)
|
||||
Reference in New Issue
Block a user