Compare commits
4 Commits
devon/medi
...
v1.129.0rc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d67e9c5367 | ||
|
|
2b5c6239de | ||
|
|
9b8eebbe4e | ||
|
|
5ced4efe1d |
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
run: docker buildx inspect
|
||||
|
||||
- name: Install Cosign
|
||||
uses: sigstore/cosign-installer@3454372f43399081ed03b604cb2d021dabca52bb # v3.8.2
|
||||
uses: sigstore/cosign-installer@d7d6bc7722e3daa8354c50bcb52f4837da5e9b6a # v3.8.1
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
2
.github/workflows/fix_lint.yaml
vendored
2
.github/workflows/fix_lint.yaml
vendored
@@ -44,6 +44,6 @@ jobs:
|
||||
- run: cargo fmt
|
||||
continue-on-error: true
|
||||
|
||||
- uses: stefanzweifel/git-auto-commit-action@b863ae1933cb653a53c021fe36dbb774e1fb9403 # v5.2.0
|
||||
- uses: stefanzweifel/git-auto-commit-action@e348103e9026cc0eee72ae06630dbe30c8bf7a79 # v5.1.0
|
||||
with:
|
||||
commit_message: "Attempt to fix linting"
|
||||
|
||||
4
.github/workflows/release-artifacts.yml
vendored
4
.github/workflows/release-artifacts.yml
vendored
@@ -203,7 +203,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Download all workflow run artifacts
|
||||
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
|
||||
uses: actions/download-artifact@95815c38cf2ff2164869cbab79da8d1f422bc89e # v4.2.1
|
||||
- name: Build a tarball for the debs
|
||||
# We need to merge all the debs uploads into one folder, then compress
|
||||
# that.
|
||||
@@ -213,7 +213,7 @@ jobs:
|
||||
tar -cvJf debs.tar.xz debs
|
||||
- name: Attach to release
|
||||
# Pinned to work around https://github.com/softprops/action-gh-release/issues/445
|
||||
uses: softprops/action-gh-release@c95fe1489396fe8a9eb87c0abf8aa5b2ef267fda # v0.1.15
|
||||
uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844 # v0.1.15
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
with:
|
||||
|
||||
2
.github/workflows/triage_labelled.yml
vendored
2
.github/workflows/triage_labelled.yml
vendored
@@ -11,7 +11,7 @@ jobs:
|
||||
if: >
|
||||
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
|
||||
steps:
|
||||
- uses: actions/add-to-project@5b1a254a3546aef88e0a7724a77a623fa2e47c36 # main (v1.0.2 + 10 commits)
|
||||
- uses: actions/add-to-project@280af8ae1f83a494cfad2cb10f02f6d13529caa9 # main (v1.0.2 + 10 commits)
|
||||
id: add_project
|
||||
with:
|
||||
project-url: "https://github.com/orgs/matrix-org/projects/67"
|
||||
|
||||
27
CHANGES.md
27
CHANGES.md
@@ -1,3 +1,30 @@
|
||||
# Synapse 1.129.0rc1 (2025-04-15)
|
||||
|
||||
### Features
|
||||
|
||||
- Add `passthrough_authorization_parameters` in OIDC configuration to allow passing parameters to the authorization grant URL. ([\#18232](https://github.com/element-hq/synapse/issues/18232))
|
||||
- Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics. ([\#18260](https://github.com/element-hq/synapse/issues/18260))
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- Fix `force_tracing_for_users` config when using delegated auth. ([\#18334](https://github.com/element-hq/synapse/issues/18334))
|
||||
- Fix the token introspection cache logging access tokens when MAS integration is in use. ([\#18335](https://github.com/element-hq/synapse/issues/18335))
|
||||
- Stop caching introspection failures when delegating auth to MAS. ([\#18339](https://github.com/element-hq/synapse/issues/18339))
|
||||
- Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic. ([\#18342](https://github.com/element-hq/synapse/issues/18342))
|
||||
- Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0. ([\#18345](https://github.com/element-hq/synapse/issues/18345))
|
||||
|
||||
### Updates to the Docker image
|
||||
|
||||
- Optimize the build of the complement-synapse image. ([\#18294](https://github.com/element-hq/synapse/issues/18294))
|
||||
|
||||
### Internal Changes
|
||||
|
||||
- Disable statement timeout during room purge. ([\#18133](https://github.com/element-hq/synapse/issues/18133))
|
||||
- Add cache to storage functions used to auth requests when using delegated auth. ([\#18337](https://github.com/element-hq/synapse/issues/18337))
|
||||
|
||||
|
||||
|
||||
|
||||
# Synapse 1.128.0 (2025-04-08)
|
||||
|
||||
No significant changes since 1.128.0rc1.
|
||||
|
||||
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -13,9 +13,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.98"
|
||||
version = "1.0.97"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
@@ -316,9 +316,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pyo3-log"
|
||||
version = "0.12.3"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7079e412e909af5d6be7c04a7f29f6a2837a080410e1c529c9dee2c367383db4"
|
||||
checksum = "4b78e4983ba15bc62833a0e0941d965bc03690163f1127864f1408db25063466"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"log",
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Disable statement timeout during room purge.
|
||||
@@ -1 +0,0 @@
|
||||
Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL.
|
||||
@@ -1 +0,0 @@
|
||||
Add documentation for configuring [Pocket ID](https://github.com/pocket-id/pocket-id) as an OIDC provider.
|
||||
@@ -1 +0,0 @@
|
||||
In configure_workers_and_start.py, use the same absolute path of Python in the interpreter shebang, and invoke child Python processes with `sys.executable`.
|
||||
@@ -1 +0,0 @@
|
||||
Optimize the build of the workers image.
|
||||
@@ -1 +0,0 @@
|
||||
In start_for_complement.sh, replace some external program calls with shell builtins.
|
||||
@@ -1 +0,0 @@
|
||||
Optimize the build of the complement-synapse image.
|
||||
@@ -1 +0,0 @@
|
||||
When generating container scripts from templates, don't add a leading newline so that their shebangs may be handled correctly.
|
||||
@@ -1 +0,0 @@
|
||||
Fix typo in docs about the `push` config option. Contributed by @HarHarLinks.
|
||||
@@ -1 +0,0 @@
|
||||
Fix `force_tracing_for_users` config when using delegated auth.
|
||||
@@ -1 +0,0 @@
|
||||
Fix the token introspection cache logging access tokens when MAS integration is in use.
|
||||
@@ -1 +0,0 @@
|
||||
Add cache to storage functions used to auth requests when using delegated auth.
|
||||
@@ -1 +0,0 @@
|
||||
Stop caching introspection failures when delegating auth to MAS.
|
||||
@@ -1 +0,0 @@
|
||||
Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic.
|
||||
@@ -1 +0,0 @@
|
||||
Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for handling `GET /devices/` on workers.
|
||||
@@ -1 +0,0 @@
|
||||
Allow `/rooms/` admin API to be run on workers.
|
||||
@@ -1 +0,0 @@
|
||||
Fix longstanding bug where Synapse would immediately retry a failing push endpoint when a new event is received, ignoring any backoff timers.
|
||||
@@ -1 +0,0 @@
|
||||
Minor performance improvements to the notifier.
|
||||
@@ -1 +0,0 @@
|
||||
Slight performance increase when using the ratelimiter.
|
||||
@@ -1 +0,0 @@
|
||||
Allow client & media admin apis to coexist.
|
||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -1,3 +1,9 @@
|
||||
matrix-synapse-py3 (1.129.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.129.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Apr 2025 10:47:43 -0600
|
||||
|
||||
matrix-synapse-py3 (1.128.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.128.0.
|
||||
|
||||
@@ -3,37 +3,18 @@
|
||||
ARG SYNAPSE_VERSION=latest
|
||||
ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
|
||||
ARG DEBIAN_VERSION=bookworm
|
||||
ARG PYTHON_VERSION=3.12
|
||||
|
||||
# first of all, we create a base image with dependencies which we can copy into the
|
||||
# first of all, we create a base image with an nginx which we can copy into the
|
||||
# target image. For repeated rebuilds, this is much faster than apt installing
|
||||
# each time.
|
||||
|
||||
FROM ghcr.io/astral-sh/uv:python${PYTHON_VERSION}-${DEBIAN_VERSION} AS deps_base
|
||||
|
||||
# Tell apt to keep downloaded package files, as we're using cache mounts.
|
||||
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
|
||||
|
||||
FROM docker.io/library/debian:${DEBIAN_VERSION}-slim AS deps_base
|
||||
RUN \
|
||||
--mount=type=cache,target=/var/cache/apt,sharing=locked \
|
||||
--mount=type=cache,target=/var/lib/apt,sharing=locked \
|
||||
apt-get update -qq && \
|
||||
DEBIAN_FRONTEND=noninteractive apt-get install -yqq --no-install-recommends \
|
||||
nginx-light
|
||||
|
||||
RUN \
|
||||
# remove default page
|
||||
rm /etc/nginx/sites-enabled/default && \
|
||||
# have nginx log to stderr/out
|
||||
ln -sf /dev/stdout /var/log/nginx/access.log && \
|
||||
ln -sf /dev/stderr /var/log/nginx/error.log
|
||||
|
||||
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
|
||||
# (mounted as --mount=type=cache) and the target directory.
|
||||
RUN --mount=type=cache,target=/root/.cache/uv \
|
||||
uv pip install --link-mode=copy --prefix="/uv/usr/local" supervisor~=4.2
|
||||
|
||||
RUN mkdir -p /uv/etc/supervisor/conf.d
|
||||
redis-server nginx-light
|
||||
|
||||
# Similarly, a base to copy the redis server from.
|
||||
#
|
||||
@@ -46,16 +27,31 @@ FROM docker.io/library/redis:7-${DEBIAN_VERSION} AS redis_base
|
||||
# now build the final image, based on the the regular Synapse docker image
|
||||
FROM $FROM
|
||||
|
||||
# Copy over dependencies
|
||||
# Install supervisord with uv pip instead of apt, to avoid installing a second
|
||||
# copy of python.
|
||||
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
|
||||
# (mounted as --mount=type=cache) and the target directory.
|
||||
RUN \
|
||||
--mount=type=bind,from=ghcr.io/astral-sh/uv:0.6.8,source=/uv,target=/uv \
|
||||
--mount=type=cache,target=/root/.cache/uv \
|
||||
/uv pip install --link-mode=copy --prefix="/usr/local" supervisor~=4.2
|
||||
|
||||
RUN mkdir -p /etc/supervisor/conf.d
|
||||
|
||||
# Copy over redis and nginx
|
||||
COPY --from=redis_base /usr/local/bin/redis-server /usr/local/bin
|
||||
COPY --from=deps_base /uv /
|
||||
|
||||
COPY --from=deps_base /usr/sbin/nginx /usr/sbin
|
||||
COPY --from=deps_base /usr/share/nginx /usr/share/nginx
|
||||
COPY --from=deps_base /usr/lib/nginx /usr/lib/nginx
|
||||
COPY --from=deps_base /etc/nginx /etc/nginx
|
||||
COPY --from=deps_base /var/log/nginx /var/log/nginx
|
||||
# chown to allow non-root user to write to http-*-temp-path dirs
|
||||
COPY --from=deps_base --chown=www-data:root /var/lib/nginx /var/lib/nginx
|
||||
RUN rm /etc/nginx/sites-enabled/default
|
||||
RUN mkdir /var/log/nginx /var/lib/nginx
|
||||
RUN chown www-data /var/lib/nginx
|
||||
|
||||
# have nginx log to stderr/out
|
||||
RUN ln -sf /dev/stdout /var/log/nginx/access.log
|
||||
RUN ln -sf /dev/stderr /var/log/nginx/error.log
|
||||
|
||||
# Copy Synapse worker, nginx and supervisord configuration template files
|
||||
COPY ./docker/conf-workers/* /conf/
|
||||
@@ -74,4 +70,4 @@ FROM $FROM
|
||||
# Replace the healthcheck with one which checks *all* the workers. The script
|
||||
# is generated by configure_workers_and_start.py.
|
||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||
CMD ["/healthcheck.sh"]
|
||||
CMD /bin/sh /healthcheck.sh
|
||||
|
||||
@@ -58,4 +58,4 @@ ENTRYPOINT ["/start_for_complement.sh"]
|
||||
|
||||
# Update the healthcheck to have a shorter check interval
|
||||
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
|
||||
CMD ["/healthcheck.sh"]
|
||||
CMD /bin/sh /healthcheck.sh
|
||||
|
||||
@@ -9,7 +9,7 @@ echo " Args: $*"
|
||||
echo " Env: SYNAPSE_COMPLEMENT_DATABASE=$SYNAPSE_COMPLEMENT_DATABASE SYNAPSE_COMPLEMENT_USE_WORKERS=$SYNAPSE_COMPLEMENT_USE_WORKERS SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=$SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR"
|
||||
|
||||
function log {
|
||||
d=$(printf '%(%Y-%m-%d %H:%M:%S)T,%.3s\n' ${EPOCHREALTIME/./ })
|
||||
d=$(date +"%Y-%m-%d %H:%M:%S,%3N")
|
||||
echo "$d $*"
|
||||
}
|
||||
|
||||
@@ -103,11 +103,12 @@ fi
|
||||
# Note that both the key and certificate are in PEM format (not DER).
|
||||
|
||||
# First generate a configuration file to set up a Subject Alternative Name.
|
||||
echo "\
|
||||
cat > /conf/server.tls.conf <<EOF
|
||||
.include /etc/ssl/openssl.cnf
|
||||
|
||||
[SAN]
|
||||
subjectAltName=DNS:${SERVER_NAME}" > /conf/server.tls.conf
|
||||
subjectAltName=DNS:${SERVER_NAME}
|
||||
EOF
|
||||
|
||||
# Generate an RSA key
|
||||
openssl genrsa -out /conf/server.tls.key 2048
|
||||
@@ -122,8 +123,8 @@ openssl x509 -req -in /conf/server.tls.csr \
|
||||
-out /conf/server.tls.crt -extfile /conf/server.tls.conf -extensions SAN
|
||||
|
||||
# Assert that we have a Subject Alternative Name in the certificate.
|
||||
# (the test will exit with 1 here if there isn't a SAN in the certificate.)
|
||||
[[ $(openssl x509 -in /conf/server.tls.crt -noout -text) == *DNS:* ]]
|
||||
# (grep will exit with 1 here if there isn't a SAN in the certificate.)
|
||||
openssl x509 -in /conf/server.tls.crt -noout -text | grep DNS:
|
||||
|
||||
export SYNAPSE_TLS_CERT=/conf/server.tls.crt
|
||||
export SYNAPSE_TLS_KEY=/conf/server.tls.key
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/local/bin/python
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
@@ -376,11 +376,9 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||
#
|
||||
# We use append mode in case the files have already been written to by something else
|
||||
# (for instance, as part of the instructions in a dockerfile).
|
||||
exists = os.path.isfile(dst)
|
||||
with open(dst, "a") as outfile:
|
||||
# In case the existing file doesn't end with a newline
|
||||
if exists:
|
||||
outfile.write("\n")
|
||||
outfile.write("\n")
|
||||
|
||||
outfile.write(rendered)
|
||||
|
||||
@@ -606,7 +604,7 @@ def generate_base_homeserver_config() -> None:
|
||||
# start.py already does this for us, so just call that.
|
||||
# note that this script is copied in in the official, monolith dockerfile
|
||||
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
|
||||
subprocess.run([sys.executable, "/start.py", "migrate_config"], check=True)
|
||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||
|
||||
|
||||
def parse_worker_types(
|
||||
@@ -1000,7 +998,6 @@ def generate_worker_files(
|
||||
"/healthcheck.sh",
|
||||
healthcheck_urls=healthcheck_urls,
|
||||
)
|
||||
os.chmod("/healthcheck.sh", 0o755)
|
||||
|
||||
# Ensure the logging directory exists
|
||||
log_dir = data_dir + "/logs"
|
||||
|
||||
@@ -23,7 +23,6 @@ such as [Github][github-idp].
|
||||
[auth0]: https://auth0.com/
|
||||
[authentik]: https://goauthentik.io/
|
||||
[lemonldap]: https://lemonldap-ng.org/
|
||||
[pocket-id]: https://pocket-id.org/
|
||||
[okta]: https://www.okta.com/
|
||||
[dex-idp]: https://github.com/dexidp/dex
|
||||
[keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
|
||||
@@ -625,32 +624,6 @@ oidc_providers:
|
||||
|
||||
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
|
||||
|
||||
### Pocket ID
|
||||
|
||||
[Pocket ID][pocket-id] is a simple OIDC provider that allows users to authenticate with their passkeys.
|
||||
1. Go to `OIDC Clients`
|
||||
2. Click on `Add OIDC Client`
|
||||
3. Add a name, for example `Synapse`
|
||||
4. Add `"https://auth.example.org/_synapse/client/oidc/callback` to `Callback URLs` # Replace `auth.example.org` with your domain
|
||||
5. Click on `Save`
|
||||
6. Note down your `Client ID` and `Client secret`, these will be used later
|
||||
|
||||
Synapse config:
|
||||
|
||||
```yaml
|
||||
oidc_providers:
|
||||
- idp_id: pocket_id
|
||||
idp_name: Pocket ID
|
||||
issuer: "https://auth.example.org/" # Replace with your domain
|
||||
client_id: "your-client-id" # Replace with the "Client ID" you noted down before
|
||||
client_secret: "your-client-secret" # Replace with the "Client secret" you noted down before
|
||||
scopes: ["openid", "profile"]
|
||||
user_mapping_provider:
|
||||
config:
|
||||
localpart_template: "{{ user.preferred_username }}"
|
||||
display_name_template: "{{ user.name }}"
|
||||
```
|
||||
|
||||
### Shibboleth with OIDC Plugin
|
||||
|
||||
[Shibboleth](https://www.shibboleth.net/) is an open Standard IdP solution widely used by Universities.
|
||||
|
||||
@@ -30,10 +30,13 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. |
|
||||
| `total_users` | int | The number of registered users on the homeserver. |
|
||||
| `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. |
|
||||
| `daily_user_type_native` | int | The number of native users created in the last 24 hours. |
|
||||
| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. |
|
||||
| `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. |
|
||||
| `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. |
|
||||
| `total_room_count` | int | The total number of rooms present on the homeserver. |
|
||||
| `total_event_count` | int | The total number of events present on the homeserver. |
|
||||
| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. |
|
||||
| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. |
|
||||
| `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. |
|
||||
| `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. |
|
||||
| `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. |
|
||||
@@ -50,8 +53,8 @@ The following statistics are sent to the configured reporting endpoint:
|
||||
| `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. |
|
||||
| `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
|
||||
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
|
||||
|
||||
|
||||
[^1]: Native matrix users and guests are always counted. If the
|
||||
|
||||
@@ -4018,7 +4018,7 @@ This option has a number of sub-options. They are as follows:
|
||||
* `include_content`: Clients requesting push notifications can either have the body of
|
||||
the message sent in the notification poke along with other details
|
||||
like the sender, or just the event ID and room ID (`event_id_only`).
|
||||
If clients choose to have the body sent, this option controls whether the
|
||||
If clients choose the to have the body sent, this option controls whether the
|
||||
notification request includes the content of the event (other details
|
||||
like the sender are still included). If `event_id_only` is enabled, it
|
||||
has no effect.
|
||||
|
||||
@@ -249,7 +249,6 @@ information.
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
|
||||
^/_matrix/client/(r0|v3|unstable)/capabilities$
|
||||
^/_matrix/client/(r0|v3|unstable)/notifications$
|
||||
^/_synapse/admin/v1/rooms/
|
||||
|
||||
# Encryption requests
|
||||
^/_matrix/client/(r0|v3|unstable)/keys/query$
|
||||
@@ -281,7 +280,6 @@ Additionally, the following REST endpoints can be handled for GET requests:
|
||||
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
|
||||
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
|
||||
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
|
||||
|
||||
# Account data requests
|
||||
^/_matrix/client/(r0|v3|unstable)/.*/tags
|
||||
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@@ -2053,19 +2053,18 @@ tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "pyopenssl"
|
||||
version = "25.0.0"
|
||||
version = "24.3.0"
|
||||
description = "Python wrapper module around the OpenSSL library"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "pyOpenSSL-25.0.0-py3-none-any.whl", hash = "sha256:424c247065e46e76a37411b9ab1782541c23bb658bf003772c3405fbaa128e90"},
|
||||
{file = "pyopenssl-25.0.0.tar.gz", hash = "sha256:cd2cef799efa3936bb08e8ccb9433a575722b9dd986023f1cabc4ae64e9dac16"},
|
||||
{file = "pyOpenSSL-24.3.0-py3-none-any.whl", hash = "sha256:e474f5a473cd7f92221cc04976e48f4d11502804657a08a989fb3be5514c904a"},
|
||||
{file = "pyopenssl-24.3.0.tar.gz", hash = "sha256:49f7a019577d834746bc55c5fce6ecbcec0f2b4ec5ce1cf43a9a173b8138bb36"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
cryptography = ">=41.0.5,<45"
|
||||
typing-extensions = {version = ">=4.9", markers = "python_version < \"3.13\" and python_version >= \"3.8\""}
|
||||
|
||||
[package.extras]
|
||||
docs = ["sphinx (!=5.2.0,!=5.2.0.post0,!=7.2.5)", "sphinx_rtd_theme"]
|
||||
@@ -2957,14 +2956,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-jsonschema"
|
||||
version = "4.23.0.20241208"
|
||||
version = "4.23.0.20240813"
|
||||
description = "Typing stubs for jsonschema"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "types_jsonschema-4.23.0.20241208-py3-none-any.whl", hash = "sha256:87934bd9231c99d8eff94cacfc06ba668f7973577a9bd9e1f9de957c5737313e"},
|
||||
{file = "types_jsonschema-4.23.0.20241208.tar.gz", hash = "sha256:e8b15ad01f290ecf6aea53f93fbdf7d4730e4600313e89e8a7f95622f7e87b7c"},
|
||||
{file = "types-jsonschema-4.23.0.20240813.tar.gz", hash = "sha256:c93f48206f209a5bc4608d295ac39f172fb98b9e24159ce577dbd25ddb79a1c0"},
|
||||
{file = "types_jsonschema-4.23.0.20240813-py3-none-any.whl", hash = "sha256:be283e23f0b87547316c2ee6b0fd36d95ea30e921db06478029e10b5b6aa6ac3"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -3008,14 +3007,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "types-psycopg2"
|
||||
version = "2.9.21.20250318"
|
||||
version = "2.9.21.20250121"
|
||||
description = "Typing stubs for psycopg2"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "types_psycopg2-2.9.21.20250318-py3-none-any.whl", hash = "sha256:7296d111ad950bbd2fc979a1ab0572acae69047f922280e77db657c00d2c79c0"},
|
||||
{file = "types_psycopg2-2.9.21.20250318.tar.gz", hash = "sha256:eb6eac5bfb16adfd5f16b818918b9e26a40ede147e0f2bbffdf53a6ef7025a87"},
|
||||
{file = "types_psycopg2-2.9.21.20250121-py3-none-any.whl", hash = "sha256:b890dc6f5a08b6433f0ff73a4ec9a834deedad3e914f2a4a6fd43df021f745f1"},
|
||||
{file = "types_psycopg2-2.9.21.20250121.tar.gz", hash = "sha256:2b0e2cd0f3747af1ae25a7027898716d80209604770ef3cbf350fe055b9c349b"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.128.0"
|
||||
version = "1.129.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "AGPL-3.0-or-later"
|
||||
|
||||
@@ -20,7 +20,8 @@
|
||||
#
|
||||
#
|
||||
|
||||
from typing import Dict, Hashable, Optional, Tuple
|
||||
from collections import OrderedDict
|
||||
from typing import Hashable, Optional, Tuple
|
||||
|
||||
from synapse.api.errors import LimitExceededError
|
||||
from synapse.config.ratelimiting import RatelimitSettings
|
||||
@@ -79,14 +80,12 @@ class Ratelimiter:
|
||||
self.store = store
|
||||
self._limiter_name = cfg.key
|
||||
|
||||
# A dictionary representing the token buckets tracked by this rate
|
||||
# An ordered dictionary representing the token buckets tracked by this rate
|
||||
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
|
||||
# * The number of tokens currently in the bucket,
|
||||
# * The time point when the bucket was last completely empty, and
|
||||
# * The rate_hz (leak rate) of this particular bucket.
|
||||
self.actions: Dict[Hashable, Tuple[float, float, float]] = {}
|
||||
|
||||
self.clock.looping_call(self._prune_message_counts, 60 * 1000)
|
||||
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
|
||||
|
||||
def _get_key(
|
||||
self, requester: Optional[Requester], key: Optional[Hashable]
|
||||
@@ -170,6 +169,9 @@ class Ratelimiter:
|
||||
rate_hz = rate_hz if rate_hz is not None else self.rate_hz
|
||||
burst_count = burst_count if burst_count is not None else self.burst_count
|
||||
|
||||
# Remove any expired entries
|
||||
self._prune_message_counts(time_now_s)
|
||||
|
||||
# Check if there is an existing count entry for this key
|
||||
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
|
||||
|
||||
@@ -244,12 +246,13 @@ class Ratelimiter:
|
||||
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
|
||||
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
|
||||
|
||||
def _prune_message_counts(self) -> None:
|
||||
def _prune_message_counts(self, time_now_s: float) -> None:
|
||||
"""Remove message count entries that have not exceeded their defined
|
||||
rate_hz limit
|
||||
"""
|
||||
time_now_s = self.clock.time()
|
||||
|
||||
Args:
|
||||
time_now_s: The current time
|
||||
"""
|
||||
# We create a copy of the key list here as the dictionary is modified during
|
||||
# the loop
|
||||
for key in list(self.actions.keys()):
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
#
|
||||
import logging
|
||||
import sys
|
||||
from typing import Dict, List, cast
|
||||
from typing import Dict, List
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
@@ -51,8 +51,8 @@ from synapse.http.server import JsonResource, OptionsResource
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||
from synapse.rest import ClientRestResource, admin
|
||||
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
|
||||
from synapse.rest import ClientRestResource
|
||||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
from synapse.rest.health import HealthResource
|
||||
from synapse.rest.key.v2 import KeyResource
|
||||
from synapse.rest.synapse.client import build_synapse_client_resource_tree
|
||||
@@ -190,11 +190,7 @@ class GenericWorkerServer(HomeServer):
|
||||
|
||||
resources.update(build_synapse_client_resource_tree(self))
|
||||
resources["/.well-known"] = well_known_resource(self)
|
||||
admin_res = resources.get("/_synapse/admin")
|
||||
if admin_res is not None:
|
||||
admin.register_servlets(self, cast(JsonResource, admin_res))
|
||||
else:
|
||||
resources["/_synapse/admin"] = AdminRestResource(self)
|
||||
|
||||
elif name == "federation":
|
||||
resources[FEDERATION_PREFIX] = TransportLayerServer(self)
|
||||
elif name == "media":
|
||||
@@ -203,21 +199,15 @@ class GenericWorkerServer(HomeServer):
|
||||
|
||||
# We need to serve the admin servlets for media on the
|
||||
# worker.
|
||||
admin_res = resources.get("/_synapse/admin")
|
||||
if admin_res is not None:
|
||||
register_servlets_for_media_repo(
|
||||
self, cast(JsonResource, admin_res)
|
||||
)
|
||||
else:
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
register_servlets_for_media_repo(self, admin_resource)
|
||||
resources["/_synapse/admin"] = admin_resource
|
||||
admin_resource = JsonResource(self, canonical_json=False)
|
||||
register_servlets_for_media_repo(self, admin_resource)
|
||||
|
||||
resources.update(
|
||||
{
|
||||
MEDIA_R0_PREFIX: media_repo,
|
||||
MEDIA_V3_PREFIX: media_repo,
|
||||
LEGACY_MEDIA_PREFIX: media_repo,
|
||||
"/_synapse/admin": admin_resource,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -34,6 +34,22 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger("synapse.app.homeserver")
|
||||
|
||||
ONE_MINUTE_SECONDS = 60
|
||||
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
|
||||
|
||||
MILLISECONDS_PER_SECOND = 1000
|
||||
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
|
||||
"""
|
||||
We wait 5 minutes to send the first set of stats as the server can be quite busy the
|
||||
first few minutes
|
||||
"""
|
||||
|
||||
PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS
|
||||
"""
|
||||
Phone home stats are sent every 3 hours
|
||||
"""
|
||||
|
||||
# Contains the list of processes we will be monitoring
|
||||
# currently either 0 or 1
|
||||
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
|
||||
@@ -121,6 +137,9 @@ async def phone_stats_home(
|
||||
|
||||
room_count = await store.get_room_count()
|
||||
stats["total_room_count"] = room_count
|
||||
stats["total_event_count"] = await store.count_total_events()
|
||||
stats["total_message_count"] = await store.count_total_messages()
|
||||
stats["total_e2ee_event_count"] = await store.count_total_e2ee_events()
|
||||
|
||||
stats["daily_active_users"] = common_metrics.daily_active_users
|
||||
stats["monthly_active_users"] = await store.count_monthly_users()
|
||||
@@ -185,12 +204,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000
|
||||
hs.get_datastores().main.generate_user_daily_visits,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
clock.looping_call(
|
||||
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
|
||||
hs.get_datastores().main.reap_monthly_active_users,
|
||||
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
hs.get_datastores().main.reap_monthly_active_users()
|
||||
|
||||
@@ -216,12 +237,20 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
|
||||
generate_monthly_active_users()
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
clock.looping_call(
|
||||
generate_monthly_active_users,
|
||||
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.metrics.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
|
||||
clock.looping_call(
|
||||
phone_stats_home,
|
||||
PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND,
|
||||
hs,
|
||||
stats,
|
||||
)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
@@ -229,4 +258,6 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, phone_stats_home, hs, stats)
|
||||
clock.call_later(
|
||||
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats
|
||||
)
|
||||
|
||||
@@ -66,6 +66,7 @@ from synapse.types import (
|
||||
from synapse.util.async_helpers import (
|
||||
timeout_deferred,
|
||||
)
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import shortstr
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -519,22 +520,20 @@ class Notifier:
|
||||
users = users or []
|
||||
rooms = rooms or []
|
||||
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
with Measure(self.clock, "on_new_event"):
|
||||
user_streams: Set[_NotifierUserStream] = set()
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"waking_up_explicit_users": len(users),
|
||||
"waking_up_explicit_rooms": len(rooms),
|
||||
"users": shortstr(users),
|
||||
"rooms": shortstr(rooms),
|
||||
"stream": stream_key,
|
||||
"stream_id": new_token,
|
||||
}
|
||||
)
|
||||
|
||||
# Only calculate which user streams to wake up if there are, in fact,
|
||||
# any user streams registered.
|
||||
if self.user_to_user_stream or self.room_to_user_streams:
|
||||
for user in users:
|
||||
user_stream = self.user_to_user_stream.get(str(user))
|
||||
if user_stream is not None:
|
||||
@@ -566,25 +565,25 @@ class Notifier:
|
||||
# We resolve all these deferreds in one go so that we only need to
|
||||
# call `PreserveLoggingContext` once, as it has a bunch of overhead
|
||||
# (to calculate performance stats)
|
||||
if listeners:
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
with PreserveLoggingContext():
|
||||
for listener in listeners:
|
||||
listener.callback(current_token)
|
||||
|
||||
if user_streams:
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
|
||||
|
||||
self.notify_replication()
|
||||
self.notify_replication()
|
||||
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of ephemeral events")
|
||||
# Notify appservices.
|
||||
try:
|
||||
self.appservice_handler.notify_interested_services_ephemeral(
|
||||
stream_key,
|
||||
new_token,
|
||||
users,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error notifying application services of ephemeral events"
|
||||
)
|
||||
|
||||
def on_new_replication_data(self) -> None:
|
||||
"""Used to inform replication listeners that something has happened
|
||||
|
||||
@@ -205,12 +205,6 @@ class HttpPusher(Pusher):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
# Check if we are trying, but failing, to contact the pusher. If so, we
|
||||
# don't try and start processing immediately and instead wait for the
|
||||
# retry loop to try again later (which is controlled by the timer).
|
||||
if self.failing_since and self.timed_call and self.timed_call.active():
|
||||
return
|
||||
|
||||
run_as_background_process("httppush.process", self._process)
|
||||
|
||||
async def _process(self) -> None:
|
||||
|
||||
@@ -275,9 +275,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
"""
|
||||
Register all the admin servlets.
|
||||
"""
|
||||
RoomRestServlet(hs).register(http_server)
|
||||
|
||||
# Admin servlets below may not work on workers.
|
||||
# Admin servlets aren't registered on workers.
|
||||
if hs.config.worker.worker_app is not None:
|
||||
return
|
||||
|
||||
@@ -285,6 +283,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
BlockRoomRestServlet(hs).register(http_server)
|
||||
ListRoomRestServlet(hs).register(http_server)
|
||||
RoomStateRestServlet(hs).register(http_server)
|
||||
RoomRestServlet(hs).register(http_server)
|
||||
RoomRestV2Servlet(hs).register(http_server)
|
||||
RoomMembersRestServlet(hs).register(http_server)
|
||||
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)
|
||||
|
||||
@@ -143,11 +143,11 @@ class DeviceRestServlet(RestServlet):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
handler = hs.get_device_handler()
|
||||
assert isinstance(handler, DeviceHandler)
|
||||
self.device_handler = handler
|
||||
self.auth_handler = hs.get_auth_handler()
|
||||
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
|
||||
self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
|
||||
self._is_main_process = hs.config.worker.worker_app is None
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, device_id: str
|
||||
@@ -179,14 +179,6 @@ class DeviceRestServlet(RestServlet):
|
||||
async def on_DELETE(
|
||||
self, request: SynapseRequest, device_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# Can only be run on main process, as changes to device lists must
|
||||
# happen on main.
|
||||
if not self._is_main_process:
|
||||
error_message = "DELETE on /devices/ must be routed to main process"
|
||||
logger.error(error_message)
|
||||
raise SynapseError(500, error_message)
|
||||
assert isinstance(self.device_handler, DeviceHandler)
|
||||
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
try:
|
||||
@@ -231,14 +223,6 @@ class DeviceRestServlet(RestServlet):
|
||||
async def on_PUT(
|
||||
self, request: SynapseRequest, device_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
# Can only be run on main process, as changes to device lists must
|
||||
# happen on main.
|
||||
if not self._is_main_process:
|
||||
error_message = "PUT on /devices/ must be routed to main process"
|
||||
logger.error(error_message)
|
||||
raise SynapseError(500, error_message)
|
||||
assert isinstance(self.device_handler, DeviceHandler)
|
||||
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
body = parse_and_validate_json_object_from_request(request, self.PutBody)
|
||||
@@ -601,9 +585,9 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
):
|
||||
DeleteDevicesRestServlet(hs).register(http_server)
|
||||
DevicesRestServlet(hs).register(http_server)
|
||||
DeviceRestServlet(hs).register(http_server)
|
||||
|
||||
if hs.config.worker.worker_app is None:
|
||||
DeviceRestServlet(hs).register(http_server)
|
||||
if hs.config.experimental.msc2697_enabled:
|
||||
DehydratedDeviceServlet(hs).register(http_server)
|
||||
ClaimDehydratedDeviceServlet(hs).register(http_server)
|
||||
|
||||
@@ -24,7 +24,7 @@ from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
|
||||
|
||||
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
|
||||
from synapse.api.errors import Codes, StoreError, SynapseError
|
||||
from synapse.api.errors import Codes, LimitExceededError, StoreError, SynapseError
|
||||
from synapse.api.filtering import FilterCollection
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
@@ -248,8 +248,9 @@ class SyncRestServlet(RestServlet):
|
||||
await self._server_notices_sender.on_user_syncing(user.to_string())
|
||||
|
||||
# ignore the presence update if the ratelimit is exceeded but do not pause the request
|
||||
allowed, _ = await self._presence_per_user_limiter.can_do_action(requester)
|
||||
if not allowed:
|
||||
try:
|
||||
await self._presence_per_user_limiter.ratelimit(requester, pause=0.0)
|
||||
except LimitExceededError:
|
||||
affect_presence = False
|
||||
logger.debug("User set_presence ratelimit exceeded; ignoring it.")
|
||||
else:
|
||||
|
||||
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
|
||||
)
|
||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
|
||||
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
|
||||
@@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
|
||||
)
|
||||
|
||||
# Add a background update to add triggers which track event counts.
|
||||
self.db_pool.updates.register_background_update_handler(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
self._event_stats_populate_counts_bg_update,
|
||||
)
|
||||
|
||||
# We want this to run on the main database at startup before we start processing
|
||||
# events.
|
||||
#
|
||||
@@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||
|
||||
return num_rows
|
||||
|
||||
async def _event_stats_populate_counts_bg_update(
|
||||
self, progress: JsonDict, batch_size: int
|
||||
) -> int:
|
||||
"""
|
||||
Background update to populate the `event_stats` table with initial
|
||||
values, and register DB triggers to continue updating it.
|
||||
|
||||
We first register TRIGGERs on rows being added/removed from the `events` table,
|
||||
which will keep the event counts continuously updated. We also mark the stopping
|
||||
point for the main population step so we don't double count events.
|
||||
|
||||
Then we will iterate through the `events` table in batches and update event
|
||||
counts until we reach the stopping point.
|
||||
|
||||
This data is intended to be used by the phone-home stats to keep track
|
||||
of total event and message counts. A trigger is preferred to counting
|
||||
rows in the `events` table, as said table can grow quite large.
|
||||
|
||||
It is also preferable to adding an index on the `events` table, as even
|
||||
an index can grow large. And calculating total counts would require
|
||||
querying that entire index.
|
||||
"""
|
||||
# The last event `stream_ordering` we processed (starting place of this next
|
||||
# batch).
|
||||
last_event_stream_ordering = progress.get(
|
||||
"last_event_stream_ordering", -(1 << 31)
|
||||
)
|
||||
# The event `stream_ordering` we should stop at. This is used to avoid double
|
||||
# counting events that are already accounted for because of the triggers.
|
||||
stop_event_stream_ordering: Optional[int] = progress.get(
|
||||
"stop_event_stream_ordering", None
|
||||
)
|
||||
|
||||
def _add_triggers_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Adds the triggers to the `events` table to keep the `event_stats` counts
|
||||
up-to-date.
|
||||
|
||||
Also populates the `stop_event_stream_ordering` background update progress
|
||||
value. This marks the point at which we added the triggers, so we can avoid
|
||||
double counting events that are already accounted for in the population
|
||||
step.
|
||||
|
||||
Returns:
|
||||
The latest event `stream_ordering` in the `events` table when the triggers
|
||||
were added or `None` if the `events` table is empty.
|
||||
"""
|
||||
|
||||
# Each time an event is inserted into the `events` table, update the stats.
|
||||
#
|
||||
# We're using `AFTER` triggers as we want to count successful inserts/deletes and
|
||||
# not the ones that could potentially fail.
|
||||
if isinstance(txn.database_engine, Sqlite3Engine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
|
||||
AFTER INSERT ON events
|
||||
BEGIN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count + 1
|
||||
WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count + 1
|
||||
WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
|
||||
AFTER DELETE ON events
|
||||
BEGIN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
UPDATE event_stats
|
||||
SET unencrypted_message_count = unencrypted_message_count - 1
|
||||
WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
UPDATE event_stats
|
||||
SET e2ee_event_count = e2ee_event_count - 1
|
||||
WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
|
||||
END;
|
||||
"""
|
||||
)
|
||||
elif isinstance(txn.database_engine, PostgresEngine):
|
||||
txn.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
|
||||
BEGIN
|
||||
IF TG_OP = 'INSERT' THEN
|
||||
-- Always increment total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count + 1;
|
||||
|
||||
-- Increment unencrypted_message_count for m.room.message events
|
||||
IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Increment e2ee_event_count for m.room.encrypted events
|
||||
IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
|
||||
END IF;
|
||||
|
||||
-- We're not modifying the row being inserted/deleted, so we return it unchanged.
|
||||
RETURN NEW;
|
||||
|
||||
ELSIF TG_OP = 'DELETE' THEN
|
||||
-- Always decrement total_event_count
|
||||
UPDATE event_stats SET total_event_count = total_event_count - 1;
|
||||
|
||||
-- Decrement unencrypted_message_count for m.room.message events
|
||||
IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
|
||||
END IF;
|
||||
|
||||
-- Decrement e2ee_event_count for m.room.encrypted events
|
||||
IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
|
||||
UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
|
||||
END IF;
|
||||
|
||||
-- "The usual idiom in DELETE triggers is to return OLD."
|
||||
-- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
|
||||
RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
|
||||
'This indicates a trigger misconfiguration as this function should only'
|
||||
'run with INSERT/DELETE operations.', TG_OP;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
"""
|
||||
)
|
||||
|
||||
# We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
|
||||
# 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
|
||||
txn.execute(
|
||||
"""
|
||||
DO
|
||||
$$BEGIN
|
||||
CREATE TRIGGER event_stats_increment_counts_trigger
|
||||
AFTER INSERT OR DELETE ON events
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE event_stats_increment_counts();
|
||||
EXCEPTION
|
||||
-- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
|
||||
WHEN duplicate_object THEN
|
||||
NULL;
|
||||
END;$$;
|
||||
"""
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("Unknown database engine")
|
||||
|
||||
# Find the latest `stream_ordering` in the `events` table. We need to do
|
||||
# this in the same transaction as where we add the triggers so we don't miss
|
||||
# any events.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT stream_ordering
|
||||
FROM events
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
)
|
||||
row = cast(Optional[Tuple[int]], txn.fetchone())
|
||||
|
||||
# Update the progress
|
||||
if row is not None:
|
||||
(max_stream_ordering,) = row
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{"stop_event_stream_ordering": max_stream_ordering},
|
||||
)
|
||||
return max_stream_ordering
|
||||
|
||||
return None
|
||||
|
||||
# First, add the triggers to keep the `event_stats` values up-to-date.
|
||||
#
|
||||
# If we don't have a `stop_event_stream_ordering` yet, we need to add the
|
||||
# triggers to the `events` table and set the stopping point so we don't
|
||||
# double count `events` later.
|
||||
if stop_event_stream_ordering is None:
|
||||
stop_event_stream_ordering = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update_add_triggers",
|
||||
_add_triggers_txn,
|
||||
)
|
||||
|
||||
# If there is no `stop_event_stream_ordering`, then there are no events
|
||||
# in the `events` table and we can end the background update altogether.
|
||||
if stop_event_stream_ordering is None:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
return batch_size
|
||||
|
||||
def _populate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> int:
|
||||
"""
|
||||
Updates the `event_stats` table from this batch of events.
|
||||
"""
|
||||
|
||||
# Increment the counts based on the events present in this batch.
|
||||
txn.execute(
|
||||
"""
|
||||
WITH event_batch AS (
|
||||
SELECT *
|
||||
FROM events
|
||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
||||
ORDER BY stream_ordering ASC
|
||||
LIMIT ?
|
||||
),
|
||||
batch_stats AS (
|
||||
SELECT
|
||||
MAX(stream_ordering) AS max_stream_ordering,
|
||||
COALESCE(COUNT(*), 0) AS total_event_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
|
||||
COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
|
||||
FROM event_batch
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT null, 0, 0, 0
|
||||
WHERE NOT EXISTS (SELECT 1 FROM event_batch)
|
||||
LIMIT 1
|
||||
)
|
||||
UPDATE event_stats
|
||||
SET
|
||||
total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
|
||||
unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
|
||||
e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
|
||||
RETURNING
|
||||
(SELECT total_event_count FROM batch_stats) AS total_event_count,
|
||||
(SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
|
||||
""",
|
||||
(last_event_stream_ordering, stop_event_stream_ordering, batch_size),
|
||||
)
|
||||
|
||||
# Get the results of the update
|
||||
(total_event_count, max_stream_ordering) = cast(
|
||||
Tuple[int, Optional[int]], txn.fetchone()
|
||||
)
|
||||
|
||||
# Update the progress
|
||||
self.db_pool.updates._background_update_progress_txn(
|
||||
txn,
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
{
|
||||
"last_event_stream_ordering": max_stream_ordering,
|
||||
"stop_event_stream_ordering": stop_event_stream_ordering,
|
||||
},
|
||||
)
|
||||
|
||||
return total_event_count
|
||||
|
||||
num_rows_processed = await self.db_pool.runInteraction(
|
||||
"_event_stats_populate_counts_bg_update",
|
||||
_populate_txn,
|
||||
)
|
||||
|
||||
# No more rows to process, so our background update is complete.
|
||||
if not num_rows_processed:
|
||||
await self.db_pool.updates._end_background_update(
|
||||
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
|
||||
)
|
||||
|
||||
return batch_size
|
||||
|
||||
|
||||
def _resolve_stale_data_in_sliding_sync_tables(
|
||||
txn: LoggingTransaction,
|
||||
|
||||
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
|
||||
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
|
||||
|
||||
async def count_total_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of events present on the server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="total_event_count",
|
||||
desc="count_total_events",
|
||||
)
|
||||
|
||||
async def count_total_messages(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.message` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="unencrypted_message_count",
|
||||
desc="count_total_messages",
|
||||
)
|
||||
|
||||
async def count_total_e2ee_events(self) -> int:
|
||||
"""
|
||||
Returns the total number of `m.room.encrypted` events present on the
|
||||
server.
|
||||
"""
|
||||
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_stats",
|
||||
keyvalues={},
|
||||
retcol="e2ee_event_count",
|
||||
desc="count_total_e2ee_events",
|
||||
)
|
||||
|
||||
async def count_daily_sent_e2ee_messages(self) -> int:
|
||||
def _count_messages(txn: LoggingTransaction) -> int:
|
||||
# This is good enough as if you have silly characters in your own
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
SCHEMA_VERSION = 91 # remember to update the list below when updating
|
||||
SCHEMA_VERSION = 92 # remember to update the list below when updating
|
||||
"""Represents the expectations made by the codebase about the database schema
|
||||
|
||||
This should be incremented whenever the codebase changes its requirements on the
|
||||
@@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 89
|
||||
Changes in SCHEMA_VERSION = 90
|
||||
- Add a column `participant` to `room_memberships` table
|
||||
- Add background update to delete unreferenced state groups.
|
||||
|
||||
Changes in SCHEMA_VERSION = 91
|
||||
- TODO
|
||||
|
||||
Changes in SCHEMA_VERSION = 92
|
||||
- Add `event_stats` table to store global event statistics like total counts
|
||||
"""
|
||||
|
||||
|
||||
|
||||
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
33
synapse/storage/schema/main/delta/92/01_event_stats.sql
Normal file
@@ -0,0 +1,33 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
-- Create the `event_stats` table to store these statistics.
|
||||
CREATE TABLE event_stats (
|
||||
total_event_count INTEGER NOT NULL DEFAULT 0,
|
||||
unencrypted_message_count INTEGER NOT NULL DEFAULT 0,
|
||||
e2ee_event_count INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
-- Insert initial values into the table.
|
||||
INSERT INTO event_stats (
|
||||
total_event_count,
|
||||
unencrypted_message_count,
|
||||
e2ee_event_count
|
||||
) VALUES (0, 0, 0);
|
||||
|
||||
-- Add a background update to populate the `event_stats` table with the current counts
|
||||
-- from the `events` table and add triggers to keep this count up-to-date.
|
||||
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
|
||||
(9201, 'event_stats_populate_counts_bg_update', '{}');
|
||||
|
||||
@@ -52,3 +52,5 @@ class _BackgroundUpdates:
|
||||
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
|
||||
"mark_unreferenced_state_groups_for_deletion_bg_update"
|
||||
)
|
||||
|
||||
EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update"
|
||||
|
||||
@@ -220,7 +220,9 @@ class TestRatelimiter(unittest.HomeserverTestCase):
|
||||
|
||||
self.assertIn("test_id_1", limiter.actions)
|
||||
|
||||
self.reactor.advance(60)
|
||||
self.get_success_or_raise(
|
||||
limiter.can_do_action(None, key="test_id_2", _time_now_s=10)
|
||||
)
|
||||
|
||||
self.assertNotIn("test_id_1", limiter.actions)
|
||||
|
||||
|
||||
258
tests/metrics/test_phone_home_stats.py
Normal file
258
tests/metrics/test_phone_home_stats.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
import logging
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.app.phone_stats_home import (
|
||||
PHONE_HOME_INTERVAL_SECONDS,
|
||||
start_phone_stats_home,
|
||||
)
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
from tests.server import ThreadedMemoryReactorClock
|
||||
|
||||
TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats"
|
||||
TEST_SERVER_CONTEXT = "test-server-context"
|
||||
|
||||
|
||||
class PhoneHomeStatsTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def make_homeserver(
|
||||
self, reactor: ThreadedMemoryReactorClock, clock: Clock
|
||||
) -> HomeServer:
|
||||
# Configure the homeserver to enable stats reporting.
|
||||
config = self.default_config()
|
||||
config["report_stats"] = True
|
||||
config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT
|
||||
|
||||
# Configure the server context so we can check it ends up being reported
|
||||
config["server_context"] = TEST_SERVER_CONTEXT
|
||||
|
||||
# Allow guests to be registered
|
||||
config["allow_guest_access"] = True
|
||||
|
||||
hs = self.setup_test_homeserver(config=config)
|
||||
|
||||
# Replace the proxied http client with a mock, so we can inspect outbound requests to
|
||||
# the configured stats endpoint.
|
||||
self.put_json_mock = AsyncMock(return_value={})
|
||||
hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign]
|
||||
return hs
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# Force stats reporting to occur
|
||||
start_phone_stats_home(hs=hs)
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _get_latest_phone_home_stats(self) -> JsonDict:
|
||||
# Wait for `phone_stats_home` to be called again + a healthy margin (50s).
|
||||
self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50)
|
||||
|
||||
# Extract the reported stats from our http client mock
|
||||
mock_calls = self.put_json_mock.call_args_list
|
||||
report_stats_calls = []
|
||||
for call in mock_calls:
|
||||
if call.args[0] == TEST_REPORT_STATS_ENDPOINT:
|
||||
report_stats_calls.append(call)
|
||||
|
||||
self.assertGreaterEqual(
|
||||
(len(report_stats_calls)),
|
||||
1,
|
||||
"Expected at-least one call to the report_stats endpoint",
|
||||
)
|
||||
|
||||
# Extract the phone home stats from the call
|
||||
phone_home_stats = report_stats_calls[0].args[1]
|
||||
|
||||
return phone_home_stats
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the phone home
|
||||
stats.
|
||||
"""
|
||||
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_phone_home_stats(self) -> None:
|
||||
"""
|
||||
Test that the phone home stats contain the stats we expect based on
|
||||
the scenario carried out in `prepare`
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Wait for the stats to be reported
|
||||
phone_home_stats = self._get_latest_phone_home_stats()
|
||||
|
||||
self.assertEqual(
|
||||
phone_home_stats["homeserver"], self.hs.config.server.server_name
|
||||
)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["memory_rss"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["cpu_average"], int))
|
||||
|
||||
self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT)
|
||||
|
||||
self.assertTrue(isinstance(phone_home_stats["timestamp"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int))
|
||||
self.assertTrue(isinstance(phone_home_stats["python_version"], str))
|
||||
|
||||
# We expect only our test users to exist on the homeserver
|
||||
self.assertEqual(phone_home_stats["total_users"], 3)
|
||||
self.assertEqual(phone_home_stats["total_nonbridged_users"], 3)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_native"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_guest"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0)
|
||||
self.assertEqual(phone_home_stats["total_room_count"], 1)
|
||||
self.assertEqual(phone_home_stats["total_event_count"], 24)
|
||||
self.assertEqual(phone_home_stats["total_message_count"], 10)
|
||||
self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["monthly_active_users"], 2)
|
||||
self.assertEqual(phone_home_stats["daily_active_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1)
|
||||
self.assertEqual(phone_home_stats["daily_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5)
|
||||
self.assertEqual(phone_home_stats["daily_sent_messages"], 10)
|
||||
self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5)
|
||||
|
||||
# Our users have not been around for >30 days, hence these are all 0.
|
||||
self.assertEqual(phone_home_stats["r30v2_users_all"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_android"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_ios"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_electron"], 0)
|
||||
self.assertEqual(phone_home_stats["r30v2_users_web"], 0)
|
||||
self.assertEqual(
|
||||
phone_home_stats["cache_factor"], self.hs.config.caches.global_factor
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["event_cache_size"],
|
||||
self.hs.config.caches.event_cache_size,
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_engine"],
|
||||
self.hs.config.database.databases[0].config["name"],
|
||||
)
|
||||
self.assertEqual(
|
||||
phone_home_stats["database_server_version"],
|
||||
self.hs.get_datastores().main.database_engine.server_version,
|
||||
)
|
||||
|
||||
synapse_logger = logging.getLogger("synapse")
|
||||
log_level = synapse_logger.getEffectiveLevel()
|
||||
self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level))
|
||||
@@ -1167,81 +1167,3 @@ class HTTPPusherTests(HomeserverTestCase):
|
||||
self.assertEqual(
|
||||
self.push_attempts[0][2]["notification"]["counts"]["unread"], 1
|
||||
)
|
||||
|
||||
def test_push_backoff(self) -> None:
|
||||
"""
|
||||
The HTTP pusher will backoff correctly if it fails to contact the pusher.
|
||||
"""
|
||||
|
||||
# Register the user who gets notified
|
||||
user_id = self.register_user("user", "pass")
|
||||
access_token = self.login("user", "pass")
|
||||
|
||||
# Register the user who sends the message
|
||||
other_user_id = self.register_user("otheruser", "pass")
|
||||
other_access_token = self.login("otheruser", "pass")
|
||||
|
||||
# Register the pusher
|
||||
user_tuple = self.get_success(
|
||||
self.hs.get_datastores().main.get_user_by_access_token(access_token)
|
||||
)
|
||||
assert user_tuple is not None
|
||||
device_id = user_tuple.device_id
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_pusherpool().add_or_update_pusher(
|
||||
user_id=user_id,
|
||||
device_id=device_id,
|
||||
kind="http",
|
||||
app_id="m.http",
|
||||
app_display_name="HTTP Push Notifications",
|
||||
device_display_name="pushy push",
|
||||
pushkey="a@example.com",
|
||||
lang=None,
|
||||
data={"url": "http://example.com/_matrix/push/v1/notify"},
|
||||
)
|
||||
)
|
||||
|
||||
# Create a room with the other user
|
||||
room = self.helper.create_room_as(user_id, tok=access_token)
|
||||
self.helper.join(room=room, user=other_user_id, tok=other_access_token)
|
||||
|
||||
# The other user sends some messages
|
||||
self.helper.send(room, body="Message 1", tok=other_access_token)
|
||||
|
||||
# One push was attempted to be sent
|
||||
self.assertEqual(len(self.push_attempts), 1)
|
||||
self.assertEqual(
|
||||
self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify"
|
||||
)
|
||||
self.assertEqual(
|
||||
self.push_attempts[0][2]["notification"]["content"]["body"], "Message 1"
|
||||
)
|
||||
self.push_attempts[0][0].callback({})
|
||||
self.pump()
|
||||
|
||||
# Send another message, this time it fails
|
||||
self.helper.send(room, body="Message 2", tok=other_access_token)
|
||||
self.assertEqual(len(self.push_attempts), 2)
|
||||
self.push_attempts[1][0].errback(Exception("couldn't connect"))
|
||||
self.pump()
|
||||
|
||||
# Sending yet another message doesn't trigger a push immediately
|
||||
self.helper.send(room, body="Message 3", tok=other_access_token)
|
||||
self.pump()
|
||||
self.assertEqual(len(self.push_attempts), 2)
|
||||
|
||||
# .. but waiting for a bit will cause more pushes
|
||||
self.reactor.advance(10)
|
||||
self.assertEqual(len(self.push_attempts), 3)
|
||||
self.assertEqual(
|
||||
self.push_attempts[2][2]["notification"]["content"]["body"], "Message 2"
|
||||
)
|
||||
self.push_attempts[2][0].callback({})
|
||||
self.pump()
|
||||
|
||||
self.assertEqual(len(self.push_attempts), 4)
|
||||
self.assertEqual(
|
||||
self.push_attempts[3][2]["notification"]["content"]["body"], "Message 3"
|
||||
)
|
||||
self.push_attempts[3][0].callback({})
|
||||
|
||||
237
tests/storage/test_event_stats.py
Normal file
237
tests/storage/test_event_stats.py
Normal file
@@ -0,0 +1,237 @@
|
||||
#
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.rest import admin, login, register, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types.storage import _BackgroundUpdates
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class EventStatsTestCase(unittest.HomeserverTestCase):
|
||||
"""
|
||||
Tests for the `event_stats` table
|
||||
"""
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
register.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
# Wait for the background updates to add the database triggers that keep the
|
||||
# `event_stats` table up-to-date.
|
||||
#
|
||||
# This also prevents background updates running during the tests and messing
|
||||
# with the results.
|
||||
self.wait_for_background_updates()
|
||||
|
||||
super().prepare(reactor, clock, hs)
|
||||
|
||||
def _perform_user_actions(self) -> None:
|
||||
"""
|
||||
Perform some actions on the homeserver that would bump the event counts.
|
||||
"""
|
||||
# Create some users
|
||||
user_1_mxid = self.register_user(
|
||||
username="test_user_1",
|
||||
password="test",
|
||||
)
|
||||
user_2_mxid = self.register_user(
|
||||
username="test_user_2",
|
||||
password="test",
|
||||
)
|
||||
# Note: `self.register_user` does not support guest registration, and updating the
|
||||
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
|
||||
# in a backwards-incompatible manner. Hence, we make a manual request here.
|
||||
_guest_user_mxid = self.make_request(
|
||||
method="POST",
|
||||
path="/_matrix/client/v3/register?kind=guest",
|
||||
content={
|
||||
"username": "guest_user",
|
||||
"password": "test",
|
||||
},
|
||||
shorthand=False,
|
||||
)
|
||||
|
||||
# Log in to each user
|
||||
user_1_token = self.login(username=user_1_mxid, password="test")
|
||||
user_2_token = self.login(username=user_2_mxid, password="test")
|
||||
|
||||
# Create a room between the two users
|
||||
room_1_id = self.helper.create_room_as(
|
||||
is_public=False,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# Mark this room as end-to-end encrypted
|
||||
self.helper.send_state(
|
||||
room_id=room_1_id,
|
||||
event_type="m.room.encryption",
|
||||
body={
|
||||
"algorithm": "m.megolm.v1.aes-sha2",
|
||||
"rotation_period_ms": 604800000,
|
||||
"rotation_period_msgs": 100,
|
||||
},
|
||||
state_key="",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 1 invites user 2
|
||||
self.helper.invite(
|
||||
room=room_1_id,
|
||||
src=user_1_mxid,
|
||||
targ=user_2_mxid,
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 joins
|
||||
self.helper.join(
|
||||
room=room_1_id,
|
||||
user=user_2_mxid,
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
# User 1 sends 10 unencrypted messages
|
||||
for _ in range(10):
|
||||
self.helper.send(
|
||||
room_id=room_1_id,
|
||||
body="Zoinks Scoob! A message!",
|
||||
tok=user_1_token,
|
||||
)
|
||||
|
||||
# User 2 sends 5 encrypted "messages"
|
||||
for _ in range(5):
|
||||
self.helper.send_event(
|
||||
room_id=room_1_id,
|
||||
type="m.room.encrypted",
|
||||
content={
|
||||
"algorithm": "m.olm.v1.curve25519-aes-sha2",
|
||||
"sender_key": "some_key",
|
||||
"ciphertext": {
|
||||
"some_key": {
|
||||
"type": 0,
|
||||
"body": "encrypted_payload",
|
||||
},
|
||||
},
|
||||
},
|
||||
tok=user_2_token,
|
||||
)
|
||||
|
||||
def test_background_update_with_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly when there are events in the database.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to double as the background update is being run *again*
|
||||
# and will double-count the `events`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 48)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 20)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10)
|
||||
|
||||
def test_background_update_without_events(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly without events in the database.
|
||||
"""
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
#
|
||||
# In this case, no events have been sent, so we expect the counts to be 0.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": "{}",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
|
||||
|
||||
def test_background_update_resume_progress(self) -> None:
|
||||
"""
|
||||
Test that the background update to populate the `event_stats` table works
|
||||
correctly to resume from `progress_json`.
|
||||
"""
|
||||
# Do things to bump the stats
|
||||
self._perform_user_actions()
|
||||
|
||||
# Keep in mind: These are already populated as the background update has already
|
||||
# ran once when Synapse started and added the database triggers which are
|
||||
# incrementing things as new events come in.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
|
||||
|
||||
# Run the background update again
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
|
||||
"progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }',
|
||||
},
|
||||
)
|
||||
)
|
||||
self.store.db_pool.updates._all_done = False
|
||||
self.wait_for_background_updates()
|
||||
|
||||
# We expect these values to increase as the background update is being run
|
||||
# *again* and will double-count some of the `events` over the range specified
|
||||
# by the `progress_json`.
|
||||
self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7)
|
||||
self.assertEqual(self.get_success(self.store.count_total_messages()), 16)
|
||||
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)
|
||||
Reference in New Issue
Block a user