1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
Devon Hudson
d67e9c5367 Update changelog 2025-04-16 07:19:27 -06:00
Devon Hudson
2b5c6239de Merge branch 'develop' into release-v1.129 2025-04-16 07:17:07 -06:00
Devon Hudson
9b8eebbe4e Changelog tweaks 2025-04-15 11:12:04 -06:00
Devon Hudson
5ced4efe1d 1.129.0rc1 2025-04-15 10:48:32 -06:00
55 changed files with 1051 additions and 285 deletions

View File

@@ -30,7 +30,7 @@ jobs:
run: docker buildx inspect
- name: Install Cosign
uses: sigstore/cosign-installer@3454372f43399081ed03b604cb2d021dabca52bb # v3.8.2
uses: sigstore/cosign-installer@d7d6bc7722e3daa8354c50bcb52f4837da5e9b6a # v3.8.1
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2

View File

@@ -44,6 +44,6 @@ jobs:
- run: cargo fmt
continue-on-error: true
- uses: stefanzweifel/git-auto-commit-action@b863ae1933cb653a53c021fe36dbb774e1fb9403 # v5.2.0
- uses: stefanzweifel/git-auto-commit-action@e348103e9026cc0eee72ae06630dbe30c8bf7a79 # v5.1.0
with:
commit_message: "Attempt to fix linting"

View File

@@ -203,7 +203,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download all workflow run artifacts
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
uses: actions/download-artifact@95815c38cf2ff2164869cbab79da8d1f422bc89e # v4.2.1
- name: Build a tarball for the debs
# We need to merge all the debs uploads into one folder, then compress
# that.
@@ -213,7 +213,7 @@ jobs:
tar -cvJf debs.tar.xz debs
- name: Attach to release
# Pinned to work around https://github.com/softprops/action-gh-release/issues/445
uses: softprops/action-gh-release@c95fe1489396fe8a9eb87c0abf8aa5b2ef267fda # v0.1.15
uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844 # v0.1.15
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:

View File

@@ -11,7 +11,7 @@ jobs:
if: >
contains(github.event.issue.labels.*.name, 'X-Needs-Info')
steps:
- uses: actions/add-to-project@5b1a254a3546aef88e0a7724a77a623fa2e47c36 # main (v1.0.2 + 10 commits)
- uses: actions/add-to-project@280af8ae1f83a494cfad2cb10f02f6d13529caa9 # main (v1.0.2 + 10 commits)
id: add_project
with:
project-url: "https://github.com/orgs/matrix-org/projects/67"

View File

@@ -1,3 +1,30 @@
# Synapse 1.129.0rc1 (2025-04-15)
### Features
- Add `passthrough_authorization_parameters` in OIDC configuration to allow passing parameters to the authorization grant URL. ([\#18232](https://github.com/element-hq/synapse/issues/18232))
- Add `total_event_count`, `total_message_count`, and `total_e2ee_event_count` fields to the homeserver usage statistics. ([\#18260](https://github.com/element-hq/synapse/issues/18260))
### Bugfixes
- Fix `force_tracing_for_users` config when using delegated auth. ([\#18334](https://github.com/element-hq/synapse/issues/18334))
- Fix the token introspection cache logging access tokens when MAS integration is in use. ([\#18335](https://github.com/element-hq/synapse/issues/18335))
- Stop caching introspection failures when delegating auth to MAS. ([\#18339](https://github.com/element-hq/synapse/issues/18339))
- Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic. ([\#18342](https://github.com/element-hq/synapse/issues/18342))
- Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0. ([\#18345](https://github.com/element-hq/synapse/issues/18345))
### Updates to the Docker image
- Optimize the build of the complement-synapse image. ([\#18294](https://github.com/element-hq/synapse/issues/18294))
### Internal Changes
- Disable statement timeout during room purge. ([\#18133](https://github.com/element-hq/synapse/issues/18133))
- Add cache to storage functions used to auth requests when using delegated auth. ([\#18337](https://github.com/element-hq/synapse/issues/18337))
# Synapse 1.128.0 (2025-04-08)
No significant changes since 1.128.0rc1.

8
Cargo.lock generated
View File

@@ -13,9 +13,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.98"
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "arc-swap"
@@ -316,9 +316,9 @@ dependencies = [
[[package]]
name = "pyo3-log"
version = "0.12.3"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079e412e909af5d6be7c04a7f29f6a2837a080410e1c529c9dee2c367383db4"
checksum = "4b78e4983ba15bc62833a0e0941d965bc03690163f1127864f1408db25063466"
dependencies = [
"arc-swap",
"log",

View File

@@ -1 +0,0 @@
Disable statement timeout during room purge.

View File

@@ -1 +0,0 @@
Add `passthrough_authorization_parameters` in OIDC configuration to allow to pass parameters to the authorization grant URL.

View File

@@ -1 +0,0 @@
Add documentation for configuring [Pocket ID](https://github.com/pocket-id/pocket-id) as an OIDC provider.

View File

@@ -1 +0,0 @@
In configure_workers_and_start.py, use the same absolute path of Python in the interpreter shebang, and invoke child Python processes with `sys.executable`.

View File

@@ -1 +0,0 @@
Optimize the build of the workers image.

View File

@@ -1 +0,0 @@
In start_for_complement.sh, replace some external program calls with shell builtins.

View File

@@ -1 +0,0 @@
Optimize the build of the complement-synapse image.

View File

@@ -1 +0,0 @@
When generating container scripts from templates, don't add a leading newline so that their shebangs may be handled correctly.

View File

@@ -1 +0,0 @@
Fix typo in docs about the `push` config option. Contributed by @HarHarLinks.

View File

@@ -1 +0,0 @@
Fix `force_tracing_for_users` config when using delegated auth.

View File

@@ -1 +0,0 @@
Fix the token introspection cache logging access tokens when MAS integration is in use.

View File

@@ -1 +0,0 @@
Add cache to storage functions used to auth requests when using delegated auth.

View File

@@ -1 +0,0 @@
Stop caching introspection failures when delegating auth to MAS.

View File

@@ -1 +0,0 @@
Fix `ExternalIDReuse` exception after migrating to MAS on workers with a high traffic.

View File

@@ -1 +0,0 @@
Fix minor performance regression caused by tracking of room participation. Regressed in v1.128.0.

View File

@@ -1 +0,0 @@
Add support for handling `GET /devices/` on workers.

View File

@@ -1 +0,0 @@
Allow `/rooms/` admin API to be run on workers.

View File

@@ -1 +0,0 @@
Fix longstanding bug where Synapse would immediately retry a failing push endpoint when a new event is received, ignoring any backoff timers.

View File

@@ -1 +0,0 @@
Minor performance improvements to the notifier.

View File

@@ -1 +0,0 @@
Slight performance increase when using the ratelimiter.

View File

@@ -1 +0,0 @@
Allow client & media admin apis to coexist.

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.129.0~rc1) stable; urgency=medium
* New Synapse release 1.129.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 15 Apr 2025 10:47:43 -0600
matrix-synapse-py3 (1.128.0) stable; urgency=medium
* New Synapse release 1.128.0.

View File

@@ -3,37 +3,18 @@
ARG SYNAPSE_VERSION=latest
ARG FROM=matrixdotorg/synapse:$SYNAPSE_VERSION
ARG DEBIAN_VERSION=bookworm
ARG PYTHON_VERSION=3.12
# first of all, we create a base image with dependencies which we can copy into the
# first of all, we create a base image with an nginx which we can copy into the
# target image. For repeated rebuilds, this is much faster than apt installing
# each time.
FROM ghcr.io/astral-sh/uv:python${PYTHON_VERSION}-${DEBIAN_VERSION} AS deps_base
# Tell apt to keep downloaded package files, as we're using cache mounts.
RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache
FROM docker.io/library/debian:${DEBIAN_VERSION}-slim AS deps_base
RUN \
--mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update -qq && \
DEBIAN_FRONTEND=noninteractive apt-get install -yqq --no-install-recommends \
nginx-light
RUN \
# remove default page
rm /etc/nginx/sites-enabled/default && \
# have nginx log to stderr/out
ln -sf /dev/stdout /var/log/nginx/access.log && \
ln -sf /dev/stderr /var/log/nginx/error.log
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
# (mounted as --mount=type=cache) and the target directory.
RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install --link-mode=copy --prefix="/uv/usr/local" supervisor~=4.2
RUN mkdir -p /uv/etc/supervisor/conf.d
redis-server nginx-light
# Similarly, a base to copy the redis server from.
#
@@ -46,16 +27,31 @@ FROM docker.io/library/redis:7-${DEBIAN_VERSION} AS redis_base
# now build the final image, based on the the regular Synapse docker image
FROM $FROM
# Copy over dependencies
# Install supervisord with uv pip instead of apt, to avoid installing a second
# copy of python.
# --link-mode=copy silences a warning as uv isn't able to do hardlinks between its cache
# (mounted as --mount=type=cache) and the target directory.
RUN \
--mount=type=bind,from=ghcr.io/astral-sh/uv:0.6.8,source=/uv,target=/uv \
--mount=type=cache,target=/root/.cache/uv \
/uv pip install --link-mode=copy --prefix="/usr/local" supervisor~=4.2
RUN mkdir -p /etc/supervisor/conf.d
# Copy over redis and nginx
COPY --from=redis_base /usr/local/bin/redis-server /usr/local/bin
COPY --from=deps_base /uv /
COPY --from=deps_base /usr/sbin/nginx /usr/sbin
COPY --from=deps_base /usr/share/nginx /usr/share/nginx
COPY --from=deps_base /usr/lib/nginx /usr/lib/nginx
COPY --from=deps_base /etc/nginx /etc/nginx
COPY --from=deps_base /var/log/nginx /var/log/nginx
# chown to allow non-root user to write to http-*-temp-path dirs
COPY --from=deps_base --chown=www-data:root /var/lib/nginx /var/lib/nginx
RUN rm /etc/nginx/sites-enabled/default
RUN mkdir /var/log/nginx /var/lib/nginx
RUN chown www-data /var/lib/nginx
# have nginx log to stderr/out
RUN ln -sf /dev/stdout /var/log/nginx/access.log
RUN ln -sf /dev/stderr /var/log/nginx/error.log
# Copy Synapse worker, nginx and supervisord configuration template files
COPY ./docker/conf-workers/* /conf/
@@ -74,4 +70,4 @@ FROM $FROM
# Replace the healthcheck with one which checks *all* the workers. The script
# is generated by configure_workers_and_start.py.
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD ["/healthcheck.sh"]
CMD /bin/sh /healthcheck.sh

View File

@@ -58,4 +58,4 @@ ENTRYPOINT ["/start_for_complement.sh"]
# Update the healthcheck to have a shorter check interval
HEALTHCHECK --start-period=5s --interval=1s --timeout=1s \
CMD ["/healthcheck.sh"]
CMD /bin/sh /healthcheck.sh

View File

@@ -9,7 +9,7 @@ echo " Args: $*"
echo " Env: SYNAPSE_COMPLEMENT_DATABASE=$SYNAPSE_COMPLEMENT_DATABASE SYNAPSE_COMPLEMENT_USE_WORKERS=$SYNAPSE_COMPLEMENT_USE_WORKERS SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=$SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR"
function log {
d=$(printf '%(%Y-%m-%d %H:%M:%S)T,%.3s\n' ${EPOCHREALTIME/./ })
d=$(date +"%Y-%m-%d %H:%M:%S,%3N")
echo "$d $*"
}
@@ -103,11 +103,12 @@ fi
# Note that both the key and certificate are in PEM format (not DER).
# First generate a configuration file to set up a Subject Alternative Name.
echo "\
cat > /conf/server.tls.conf <<EOF
.include /etc/ssl/openssl.cnf
[SAN]
subjectAltName=DNS:${SERVER_NAME}" > /conf/server.tls.conf
subjectAltName=DNS:${SERVER_NAME}
EOF
# Generate an RSA key
openssl genrsa -out /conf/server.tls.key 2048
@@ -122,8 +123,8 @@ openssl x509 -req -in /conf/server.tls.csr \
-out /conf/server.tls.crt -extfile /conf/server.tls.conf -extensions SAN
# Assert that we have a Subject Alternative Name in the certificate.
# (the test will exit with 1 here if there isn't a SAN in the certificate.)
[[ $(openssl x509 -in /conf/server.tls.crt -noout -text) == *DNS:* ]]
# (grep will exit with 1 here if there isn't a SAN in the certificate.)
openssl x509 -in /conf/server.tls.crt -noout -text | grep DNS:
export SYNAPSE_TLS_CERT=/conf/server.tls.crt
export SYNAPSE_TLS_KEY=/conf/server.tls.key

View File

@@ -1,4 +1,4 @@
#!/usr/local/bin/python
#!/usr/bin/env python
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
@@ -376,11 +376,9 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
#
# We use append mode in case the files have already been written to by something else
# (for instance, as part of the instructions in a dockerfile).
exists = os.path.isfile(dst)
with open(dst, "a") as outfile:
# In case the existing file doesn't end with a newline
if exists:
outfile.write("\n")
outfile.write("\n")
outfile.write(rendered)
@@ -606,7 +604,7 @@ def generate_base_homeserver_config() -> None:
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
subprocess.run([sys.executable, "/start.py", "migrate_config"], check=True)
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def parse_worker_types(
@@ -1000,7 +998,6 @@ def generate_worker_files(
"/healthcheck.sh",
healthcheck_urls=healthcheck_urls,
)
os.chmod("/healthcheck.sh", 0o755)
# Ensure the logging directory exists
log_dir = data_dir + "/logs"

View File

@@ -23,7 +23,6 @@ such as [Github][github-idp].
[auth0]: https://auth0.com/
[authentik]: https://goauthentik.io/
[lemonldap]: https://lemonldap-ng.org/
[pocket-id]: https://pocket-id.org/
[okta]: https://www.okta.com/
[dex-idp]: https://github.com/dexidp/dex
[keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
@@ -625,32 +624,6 @@ oidc_providers:
Note that the fields `client_id` and `client_secret` are taken from the CURL response above.
### Pocket ID
[Pocket ID][pocket-id] is a simple OIDC provider that allows users to authenticate with their passkeys.
1. Go to `OIDC Clients`
2. Click on `Add OIDC Client`
3. Add a name, for example `Synapse`
4. Add `"https://auth.example.org/_synapse/client/oidc/callback` to `Callback URLs` # Replace `auth.example.org` with your domain
5. Click on `Save`
6. Note down your `Client ID` and `Client secret`, these will be used later
Synapse config:
```yaml
oidc_providers:
- idp_id: pocket_id
idp_name: Pocket ID
issuer: "https://auth.example.org/" # Replace with your domain
client_id: "your-client-id" # Replace with the "Client ID" you noted down before
client_secret: "your-client-secret" # Replace with the "Client secret" you noted down before
scopes: ["openid", "profile"]
user_mapping_provider:
config:
localpart_template: "{{ user.preferred_username }}"
display_name_template: "{{ user.name }}"
```
### Shibboleth with OIDC Plugin
[Shibboleth](https://www.shibboleth.net/) is an open Standard IdP solution widely used by Universities.

View File

@@ -30,10 +30,13 @@ The following statistics are sent to the configured reporting endpoint:
| `python_version` | string | The Python version number in use (e.g "3.7.1"). Taken from `sys.version_info`. |
| `total_users` | int | The number of registered users on the homeserver. |
| `total_nonbridged_users` | int | The number of users, excluding those created by an Application Service. |
| `daily_user_type_native` | int | The number of native users created in the last 24 hours. |
| `daily_user_type_native` | int | The number of native, non-guest users created in the last 24 hours. |
| `daily_user_type_guest` | int | The number of guest users created in the last 24 hours. |
| `daily_user_type_bridged` | int | The number of users created by Application Services in the last 24 hours. |
| `total_room_count` | int | The total number of rooms present on the homeserver. |
| `total_event_count` | int | The total number of events present on the homeserver. |
| `total_message_count` | int | The total number of non-state events with type `m.room.message` present on the homeserver. |
| `total_e2ee_event_count` | int | The total number of non-state events with type `m.room.encrypted` present on the homeserver. This can be used as a slight over-estimate for the number of encrypted messages. |
| `daily_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 24 hours. |
| `monthly_active_users` | int | The number of unique users[^1] that have used the homeserver in the last 30 days. |
| `daily_active_rooms` | int | The number of rooms that have had a (state) event with the type `m.room.message` sent in them in the last 24 hours. |
@@ -50,8 +53,8 @@ The following statistics are sent to the configured reporting endpoint:
| `cache_factor` | int | The configured [`global factor`](../../configuration/config_documentation.md#caching) value for caching. |
| `event_cache_size` | int | The configured [`event_cache_size`](../../configuration/config_documentation.md#caching) value for caching. |
| `database_engine` | string | The database engine that is in use. Either "psycopg2" meaning PostgreSQL is in use, or "sqlite3" for SQLite3. |
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
| `database_server_version` | string | The version of the database server. Examples being "10.10" for PostgreSQL server version 10.0, and "3.38.5" for SQLite 3.38.5 installed on the system. |
| `log_level` | string | The log level in use. Examples are "INFO", "WARNING", "ERROR", "DEBUG", etc. |
[^1]: Native matrix users and guests are always counted. If the

View File

@@ -4018,7 +4018,7 @@ This option has a number of sub-options. They are as follows:
* `include_content`: Clients requesting push notifications can either have the body of
the message sent in the notification poke along with other details
like the sender, or just the event ID and room ID (`event_id_only`).
If clients choose to have the body sent, this option controls whether the
If clients choose the to have the body sent, this option controls whether the
notification request includes the content of the event (other details
like the sender are still included). If `event_id_only` is enabled, it
has no effect.

View File

@@ -249,7 +249,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
^/_matrix/client/(r0|v3|unstable)/capabilities$
^/_matrix/client/(r0|v3|unstable)/notifications$
^/_synapse/admin/v1/rooms/
# Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$
@@ -281,7 +280,6 @@ Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
# Account data requests
^/_matrix/client/(r0|v3|unstable)/.*/tags

19
poetry.lock generated
View File

@@ -2053,19 +2053,18 @@ tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"]
[[package]]
name = "pyopenssl"
version = "25.0.0"
version = "24.3.0"
description = "Python wrapper module around the OpenSSL library"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "pyOpenSSL-25.0.0-py3-none-any.whl", hash = "sha256:424c247065e46e76a37411b9ab1782541c23bb658bf003772c3405fbaa128e90"},
{file = "pyopenssl-25.0.0.tar.gz", hash = "sha256:cd2cef799efa3936bb08e8ccb9433a575722b9dd986023f1cabc4ae64e9dac16"},
{file = "pyOpenSSL-24.3.0-py3-none-any.whl", hash = "sha256:e474f5a473cd7f92221cc04976e48f4d11502804657a08a989fb3be5514c904a"},
{file = "pyopenssl-24.3.0.tar.gz", hash = "sha256:49f7a019577d834746bc55c5fce6ecbcec0f2b4ec5ce1cf43a9a173b8138bb36"},
]
[package.dependencies]
cryptography = ">=41.0.5,<45"
typing-extensions = {version = ">=4.9", markers = "python_version < \"3.13\" and python_version >= \"3.8\""}
[package.extras]
docs = ["sphinx (!=5.2.0,!=5.2.0.post0,!=7.2.5)", "sphinx_rtd_theme"]
@@ -2957,14 +2956,14 @@ files = [
[[package]]
name = "types-jsonschema"
version = "4.23.0.20241208"
version = "4.23.0.20240813"
description = "Typing stubs for jsonschema"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
{file = "types_jsonschema-4.23.0.20241208-py3-none-any.whl", hash = "sha256:87934bd9231c99d8eff94cacfc06ba668f7973577a9bd9e1f9de957c5737313e"},
{file = "types_jsonschema-4.23.0.20241208.tar.gz", hash = "sha256:e8b15ad01f290ecf6aea53f93fbdf7d4730e4600313e89e8a7f95622f7e87b7c"},
{file = "types-jsonschema-4.23.0.20240813.tar.gz", hash = "sha256:c93f48206f209a5bc4608d295ac39f172fb98b9e24159ce577dbd25ddb79a1c0"},
{file = "types_jsonschema-4.23.0.20240813-py3-none-any.whl", hash = "sha256:be283e23f0b87547316c2ee6b0fd36d95ea30e921db06478029e10b5b6aa6ac3"},
]
[package.dependencies]
@@ -3008,14 +3007,14 @@ files = [
[[package]]
name = "types-psycopg2"
version = "2.9.21.20250318"
version = "2.9.21.20250121"
description = "Typing stubs for psycopg2"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_psycopg2-2.9.21.20250318-py3-none-any.whl", hash = "sha256:7296d111ad950bbd2fc979a1ab0572acae69047f922280e77db657c00d2c79c0"},
{file = "types_psycopg2-2.9.21.20250318.tar.gz", hash = "sha256:eb6eac5bfb16adfd5f16b818918b9e26a40ede147e0f2bbffdf53a6ef7025a87"},
{file = "types_psycopg2-2.9.21.20250121-py3-none-any.whl", hash = "sha256:b890dc6f5a08b6433f0ff73a4ec9a834deedad3e914f2a4a6fd43df021f745f1"},
{file = "types_psycopg2-2.9.21.20250121.tar.gz", hash = "sha256:2b0e2cd0f3747af1ae25a7027898716d80209604770ef3cbf350fe055b9c349b"},
]
[[package]]

View File

@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.128.0"
version = "1.129.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View File

@@ -20,7 +20,8 @@
#
#
from typing import Dict, Hashable, Optional, Tuple
from collections import OrderedDict
from typing import Hashable, Optional, Tuple
from synapse.api.errors import LimitExceededError
from synapse.config.ratelimiting import RatelimitSettings
@@ -79,14 +80,12 @@ class Ratelimiter:
self.store = store
self._limiter_name = cfg.key
# A dictionary representing the token buckets tracked by this rate
# An ordered dictionary representing the token buckets tracked by this rate
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
# * The number of tokens currently in the bucket,
# * The time point when the bucket was last completely empty, and
# * The rate_hz (leak rate) of this particular bucket.
self.actions: Dict[Hashable, Tuple[float, float, float]] = {}
self.clock.looping_call(self._prune_message_counts, 60 * 1000)
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()
def _get_key(
self, requester: Optional[Requester], key: Optional[Hashable]
@@ -170,6 +169,9 @@ class Ratelimiter:
rate_hz = rate_hz if rate_hz is not None else self.rate_hz
burst_count = burst_count if burst_count is not None else self.burst_count
# Remove any expired entries
self._prune_message_counts(time_now_s)
# Check if there is an existing count entry for this key
action_count, time_start, _ = self._get_action_counts(key, time_now_s)
@@ -244,12 +246,13 @@ class Ratelimiter:
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
self.actions[key] = (action_count + n_actions, time_start, rate_hz)
def _prune_message_counts(self) -> None:
def _prune_message_counts(self, time_now_s: float) -> None:
"""Remove message count entries that have not exceeded their defined
rate_hz limit
"""
time_now_s = self.clock.time()
Args:
time_now_s: The current time
"""
# We create a copy of the key list here as the dictionary is modified during
# the loop
for key in list(self.actions.keys()):

View File

@@ -21,7 +21,7 @@
#
import logging
import sys
from typing import Dict, List, cast
from typing import Dict, List
from twisted.web.resource import Resource
@@ -51,8 +51,8 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -190,11 +190,7 @@ class GenericWorkerServer(HomeServer):
resources.update(build_synapse_client_resource_tree(self))
resources["/.well-known"] = well_known_resource(self)
admin_res = resources.get("/_synapse/admin")
if admin_res is not None:
admin.register_servlets(self, cast(JsonResource, admin_res))
else:
resources["/_synapse/admin"] = AdminRestResource(self)
elif name == "federation":
resources[FEDERATION_PREFIX] = TransportLayerServer(self)
elif name == "media":
@@ -203,21 +199,15 @@ class GenericWorkerServer(HomeServer):
# We need to serve the admin servlets for media on the
# worker.
admin_res = resources.get("/_synapse/admin")
if admin_res is not None:
register_servlets_for_media_repo(
self, cast(JsonResource, admin_res)
)
else:
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources["/_synapse/admin"] = admin_resource
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources.update(
{
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
}
)

View File

@@ -34,6 +34,22 @@ if TYPE_CHECKING:
logger = logging.getLogger("synapse.app.homeserver")
ONE_MINUTE_SECONDS = 60
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
MILLISECONDS_PER_SECOND = 1000
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
"""
We wait 5 minutes to send the first set of stats as the server can be quite busy the
first few minutes
"""
PHONE_HOME_INTERVAL_SECONDS = 3 * ONE_HOUR_SECONDS
"""
Phone home stats are sent every 3 hours
"""
# Contains the list of processes we will be monitoring
# currently either 0 or 1
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
@@ -121,6 +137,9 @@ async def phone_stats_home(
room_count = await store.get_room_count()
stats["total_room_count"] = room_count
stats["total_event_count"] = await store.count_total_events()
stats["total_message_count"] = await store.count_total_messages()
stats["total_e2ee_event_count"] = await store.count_total_e2ee_events()
stats["daily_active_users"] = common_metrics.daily_active_users
stats["monthly_active_users"] = await store.count_monthly_users()
@@ -185,12 +204,14 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(
hs.get_datastores().main.generate_user_daily_visits, 5 * 60 * 1000
hs.get_datastores().main.generate_user_daily_visits,
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
)
# monthly active user limiting functionality
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users, 1000 * 60 * 60
hs.get_datastores().main.reap_monthly_active_users,
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
)
hs.get_datastores().main.reap_monthly_active_users()
@@ -216,12 +237,20 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
clock.looping_call(
generate_monthly_active_users,
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
)
# End of monthly active user settings
if hs.config.metrics.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
clock.looping_call(
phone_stats_home,
PHONE_HOME_INTERVAL_SECONDS * MILLISECONDS_PER_SECOND,
hs,
stats,
)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@@ -229,4 +258,6 @@ def start_phone_stats_home(hs: "HomeServer") -> None:
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home, hs, stats)
clock.call_later(
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS, phone_stats_home, hs, stats
)

View File

@@ -66,6 +66,7 @@ from synapse.types import (
from synapse.util.async_helpers import (
timeout_deferred,
)
from synapse.util.metrics import Measure
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_client
@@ -519,22 +520,20 @@ class Notifier:
users = users or []
rooms = rooms or []
user_streams: Set[_NotifierUserStream] = set()
with Measure(self.clock, "on_new_event"):
user_streams: Set[_NotifierUserStream] = set()
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
"users": shortstr(users),
"rooms": shortstr(rooms),
"stream": stream_key,
"stream_id": new_token,
}
)
# Only calculate which user streams to wake up if there are, in fact,
# any user streams registered.
if self.user_to_user_stream or self.room_to_user_streams:
for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
@@ -566,25 +565,25 @@ class Notifier:
# We resolve all these deferreds in one go so that we only need to
# call `PreserveLoggingContext` once, as it has a bunch of overhead
# (to calculate performance stats)
if listeners:
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
with PreserveLoggingContext():
for listener in listeners:
listener.callback(current_token)
if user_streams:
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
users_woken_by_stream_counter.labels(stream_key).inc(len(user_streams))
self.notify_replication()
self.notify_replication()
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of ephemeral events")
# Notify appservices.
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception(
"Error notifying application services of ephemeral events"
)
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened

View File

@@ -205,12 +205,6 @@ class HttpPusher(Pusher):
if self._is_processing:
return
# Check if we are trying, but failing, to contact the pusher. If so, we
# don't try and start processing immediately and instead wait for the
# retry loop to try again later (which is controlled by the timer).
if self.failing_since and self.timed_call and self.timed_call.active():
return
run_as_background_process("httppush.process", self._process)
async def _process(self) -> None:

View File

@@ -275,9 +275,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
"""
RoomRestServlet(hs).register(http_server)
# Admin servlets below may not work on workers.
# Admin servlets aren't registered on workers.
if hs.config.worker.worker_app is not None:
return
@@ -285,6 +283,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BlockRoomRestServlet(hs).register(http_server)
ListRoomRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
RoomRestServlet(hs).register(http_server)
RoomRestV2Servlet(hs).register(http_server)
RoomMembersRestServlet(hs).register(http_server)
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)

View File

@@ -143,11 +143,11 @@ class DeviceRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_handler = handler
self.auth_handler = hs.get_auth_handler()
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
self._is_main_process = hs.config.worker.worker_app is None
async def on_GET(
self, request: SynapseRequest, device_id: str
@@ -179,14 +179,6 @@ class DeviceRestServlet(RestServlet):
async def on_DELETE(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
# Can only be run on main process, as changes to device lists must
# happen on main.
if not self._is_main_process:
error_message = "DELETE on /devices/ must be routed to main process"
logger.error(error_message)
raise SynapseError(500, error_message)
assert isinstance(self.device_handler, DeviceHandler)
requester = await self.auth.get_user_by_req(request)
try:
@@ -231,14 +223,6 @@ class DeviceRestServlet(RestServlet):
async def on_PUT(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
# Can only be run on main process, as changes to device lists must
# happen on main.
if not self._is_main_process:
error_message = "PUT on /devices/ must be routed to main process"
logger.error(error_message)
raise SynapseError(500, error_message)
assert isinstance(self.device_handler, DeviceHandler)
requester = await self.auth.get_user_by_req(request, allow_guest=True)
body = parse_and_validate_json_object_from_request(request, self.PutBody)
@@ -601,9 +585,9 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
):
DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
DeviceRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None:
DeviceRestServlet(hs).register(http_server)
if hs.config.experimental.msc2697_enabled:
DehydratedDeviceServlet(hs).register(http_server)
ClaimDehydratedDeviceServlet(hs).register(http_server)

View File

@@ -24,7 +24,7 @@ from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.errors import Codes, LimitExceededError, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.ratelimiting import Ratelimiter
@@ -248,8 +248,9 @@ class SyncRestServlet(RestServlet):
await self._server_notices_sender.on_user_syncing(user.to_string())
# ignore the presence update if the ratelimit is exceeded but do not pause the request
allowed, _ = await self._presence_per_user_limiter.can_do_action(requester)
if not allowed:
try:
await self._presence_per_user_limiter.ratelimit(requester, pause=0.0)
except LimitExceededError:
affect_presence = False
logger.debug("User set_presence ratelimit exceeded; ignoring it.")
else:

View File

@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
@@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
)
# Add a background update to add triggers which track event counts.
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
self._event_stats_populate_counts_bg_update,
)
# We want this to run on the main database at startup before we start processing
# events.
#
@@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
return num_rows
async def _event_stats_populate_counts_bg_update(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Background update to populate the `event_stats` table with initial
values, and register DB triggers to continue updating it.
We first register TRIGGERs on rows being added/removed from the `events` table,
which will keep the event counts continuously updated. We also mark the stopping
point for the main population step so we don't double count events.
Then we will iterate through the `events` table in batches and update event
counts until we reach the stopping point.
This data is intended to be used by the phone-home stats to keep track
of total event and message counts. A trigger is preferred to counting
rows in the `events` table, as said table can grow quite large.
It is also preferable to adding an index on the `events` table, as even
an index can grow large. And calculating total counts would require
querying that entire index.
"""
# The last event `stream_ordering` we processed (starting place of this next
# batch).
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)
# The event `stream_ordering` we should stop at. This is used to avoid double
# counting events that are already accounted for because of the triggers.
stop_event_stream_ordering: Optional[int] = progress.get(
"stop_event_stream_ordering", None
)
def _add_triggers_txn(
txn: LoggingTransaction,
) -> Optional[int]:
"""
Adds the triggers to the `events` table to keep the `event_stats` counts
up-to-date.
Also populates the `stop_event_stream_ordering` background update progress
value. This marks the point at which we added the triggers, so we can avoid
double counting events that are already accounted for in the population
step.
Returns:
The latest event `stream_ordering` in the `events` table when the triggers
were added or `None` if the `events` table is empty.
"""
# Each time an event is inserted into the `events` table, update the stats.
#
# We're using `AFTER` triggers as we want to count successful inserts/deletes and
# not the ones that could potentially fail.
if isinstance(txn.database_engine, Sqlite3Engine):
txn.execute(
"""
CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
AFTER INSERT ON events
BEGIN
-- Always increment total_event_count
UPDATE event_stats SET total_event_count = total_event_count + 1;
-- Increment unencrypted_message_count for m.room.message events
UPDATE event_stats
SET unencrypted_message_count = unencrypted_message_count + 1
WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
-- Increment e2ee_event_count for m.room.encrypted events
UPDATE event_stats
SET e2ee_event_count = e2ee_event_count + 1
WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
END;
"""
)
txn.execute(
"""
CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
AFTER DELETE ON events
BEGIN
-- Always decrement total_event_count
UPDATE event_stats SET total_event_count = total_event_count - 1;
-- Decrement unencrypted_message_count for m.room.message events
UPDATE event_stats
SET unencrypted_message_count = unencrypted_message_count - 1
WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
-- Decrement e2ee_event_count for m.room.encrypted events
UPDATE event_stats
SET e2ee_event_count = e2ee_event_count - 1
WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
END;
"""
)
elif isinstance(txn.database_engine, PostgresEngine):
txn.execute(
"""
CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
BEGIN
IF TG_OP = 'INSERT' THEN
-- Always increment total_event_count
UPDATE event_stats SET total_event_count = total_event_count + 1;
-- Increment unencrypted_message_count for m.room.message events
IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
END IF;
-- Increment e2ee_event_count for m.room.encrypted events
IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
END IF;
-- We're not modifying the row being inserted/deleted, so we return it unchanged.
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
-- Always decrement total_event_count
UPDATE event_stats SET total_event_count = total_event_count - 1;
-- Decrement unencrypted_message_count for m.room.message events
IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
END IF;
-- Decrement e2ee_event_count for m.room.encrypted events
IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
END IF;
-- "The usual idiom in DELETE triggers is to return OLD."
-- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
RETURN OLD;
END IF;
RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
'This indicates a trigger misconfiguration as this function should only'
'run with INSERT/DELETE operations.', TG_OP;
END;
$BODY$ LANGUAGE plpgsql;
"""
)
# We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
# 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
txn.execute(
"""
DO
$$BEGIN
CREATE TRIGGER event_stats_increment_counts_trigger
AFTER INSERT OR DELETE ON events
FOR EACH ROW
EXECUTE PROCEDURE event_stats_increment_counts();
EXCEPTION
-- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
WHEN duplicate_object THEN
NULL;
END;$$;
"""
)
else:
raise NotImplementedError("Unknown database engine")
# Find the latest `stream_ordering` in the `events` table. We need to do
# this in the same transaction as where we add the triggers so we don't miss
# any events.
txn.execute(
"""
SELECT stream_ordering
FROM events
ORDER BY stream_ordering DESC
LIMIT 1
"""
)
row = cast(Optional[Tuple[int]], txn.fetchone())
# Update the progress
if row is not None:
(max_stream_ordering,) = row
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
{"stop_event_stream_ordering": max_stream_ordering},
)
return max_stream_ordering
return None
# First, add the triggers to keep the `event_stats` values up-to-date.
#
# If we don't have a `stop_event_stream_ordering` yet, we need to add the
# triggers to the `events` table and set the stopping point so we don't
# double count `events` later.
if stop_event_stream_ordering is None:
stop_event_stream_ordering = await self.db_pool.runInteraction(
"_event_stats_populate_counts_bg_update_add_triggers",
_add_triggers_txn,
)
# If there is no `stop_event_stream_ordering`, then there are no events
# in the `events` table and we can end the background update altogether.
if stop_event_stream_ordering is None:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
)
return batch_size
def _populate_txn(
txn: LoggingTransaction,
) -> int:
"""
Updates the `event_stats` table from this batch of events.
"""
# Increment the counts based on the events present in this batch.
txn.execute(
"""
WITH event_batch AS (
SELECT *
FROM events
WHERE stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering ASC
LIMIT ?
),
batch_stats AS (
SELECT
MAX(stream_ordering) AS max_stream_ordering,
COALESCE(COUNT(*), 0) AS total_event_count,
COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
FROM event_batch
UNION ALL
SELECT null, 0, 0, 0
WHERE NOT EXISTS (SELECT 1 FROM event_batch)
LIMIT 1
)
UPDATE event_stats
SET
total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
RETURNING
(SELECT total_event_count FROM batch_stats) AS total_event_count,
(SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
""",
(last_event_stream_ordering, stop_event_stream_ordering, batch_size),
)
# Get the results of the update
(total_event_count, max_stream_ordering) = cast(
Tuple[int, Optional[int]], txn.fetchone()
)
# Update the progress
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
{
"last_event_stream_ordering": max_stream_ordering,
"stop_event_stream_ordering": stop_event_stream_ordering,
},
)
return total_event_count
num_rows_processed = await self.db_pool.runInteraction(
"_event_stats_populate_counts_bg_update",
_populate_txn,
)
# No more rows to process, so our background update is complete.
if not num_rows_processed:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
)
return batch_size
def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,

View File

@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
async def count_total_events(self) -> int:
"""
Returns the total number of events present on the server.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_stats",
keyvalues={},
retcol="total_event_count",
desc="count_total_events",
)
async def count_total_messages(self) -> int:
"""
Returns the total number of `m.room.message` events present on the
server.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_stats",
keyvalues={},
retcol="unencrypted_message_count",
desc="count_total_messages",
)
async def count_total_e2ee_events(self) -> int:
"""
Returns the total number of `m.room.encrypted` events present on the
server.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_stats",
keyvalues={},
retcol="e2ee_event_count",
desc="count_total_e2ee_events",
)
async def count_daily_sent_e2ee_messages(self) -> int:
def _count_messages(txn: LoggingTransaction) -> int:
# This is good enough as if you have silly characters in your own

View File

@@ -19,7 +19,7 @@
#
#
SCHEMA_VERSION = 91 # remember to update the list below when updating
SCHEMA_VERSION = 92 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 89
Changes in SCHEMA_VERSION = 90
- Add a column `participant` to `room_memberships` table
- Add background update to delete unreferenced state groups.
Changes in SCHEMA_VERSION = 91
- TODO
Changes in SCHEMA_VERSION = 92
- Add `event_stats` table to store global event statistics like total counts
"""

View File

@@ -0,0 +1,33 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Create the `event_stats` table to store these statistics.
CREATE TABLE event_stats (
total_event_count INTEGER NOT NULL DEFAULT 0,
unencrypted_message_count INTEGER NOT NULL DEFAULT 0,
e2ee_event_count INTEGER NOT NULL DEFAULT 0
);
-- Insert initial values into the table.
INSERT INTO event_stats (
total_event_count,
unencrypted_message_count,
e2ee_event_count
) VALUES (0, 0, 0);
-- Add a background update to populate the `event_stats` table with the current counts
-- from the `events` table and add triggers to keep this count up-to-date.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9201, 'event_stats_populate_counts_bg_update', '{}');

View File

@@ -52,3 +52,5 @@ class _BackgroundUpdates:
MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
"mark_unreferenced_state_groups_for_deletion_bg_update"
)
EVENT_STATS_POPULATE_COUNTS_BG_UPDATE = "event_stats_populate_counts_bg_update"

View File

@@ -220,7 +220,9 @@ class TestRatelimiter(unittest.HomeserverTestCase):
self.assertIn("test_id_1", limiter.actions)
self.reactor.advance(60)
self.get_success_or_raise(
limiter.can_do_action(None, key="test_id_2", _time_now_s=10)
)
self.assertNotIn("test_id_1", limiter.actions)

View File

@@ -0,0 +1,258 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
import logging
from unittest.mock import AsyncMock
from twisted.test.proto_helpers import MemoryReactor
from synapse.app.phone_stats_home import (
PHONE_HOME_INTERVAL_SECONDS,
start_phone_stats_home,
)
from synapse.rest import admin, login, register, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
from tests.server import ThreadedMemoryReactorClock
TEST_REPORT_STATS_ENDPOINT = "https://fake.endpoint/stats"
TEST_SERVER_CONTEXT = "test-server-context"
class PhoneHomeStatsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
register.register_servlets,
login.register_servlets,
]
def make_homeserver(
self, reactor: ThreadedMemoryReactorClock, clock: Clock
) -> HomeServer:
# Configure the homeserver to enable stats reporting.
config = self.default_config()
config["report_stats"] = True
config["report_stats_endpoint"] = TEST_REPORT_STATS_ENDPOINT
# Configure the server context so we can check it ends up being reported
config["server_context"] = TEST_SERVER_CONTEXT
# Allow guests to be registered
config["allow_guest_access"] = True
hs = self.setup_test_homeserver(config=config)
# Replace the proxied http client with a mock, so we can inspect outbound requests to
# the configured stats endpoint.
self.put_json_mock = AsyncMock(return_value={})
hs.get_proxied_http_client().put_json = self.put_json_mock # type: ignore[method-assign]
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
# Wait for the background updates to add the database triggers that keep the
# `event_stats` table up-to-date.
self.wait_for_background_updates()
# Force stats reporting to occur
start_phone_stats_home(hs=hs)
super().prepare(reactor, clock, hs)
def _get_latest_phone_home_stats(self) -> JsonDict:
# Wait for `phone_stats_home` to be called again + a healthy margin (50s).
self.reactor.advance(2 * PHONE_HOME_INTERVAL_SECONDS + 50)
# Extract the reported stats from our http client mock
mock_calls = self.put_json_mock.call_args_list
report_stats_calls = []
for call in mock_calls:
if call.args[0] == TEST_REPORT_STATS_ENDPOINT:
report_stats_calls.append(call)
self.assertGreaterEqual(
(len(report_stats_calls)),
1,
"Expected at-least one call to the report_stats endpoint",
)
# Extract the phone home stats from the call
phone_home_stats = report_stats_calls[0].args[1]
return phone_home_stats
def _perform_user_actions(self) -> None:
"""
Perform some actions on the homeserver that would bump the phone home
stats.
"""
# Create some users
user_1_mxid = self.register_user(
username="test_user_1",
password="test",
)
user_2_mxid = self.register_user(
username="test_user_2",
password="test",
)
# Note: `self.register_user` does not support guest registration, and updating the
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
# in a backwards-incompatible manner. Hence, we make a manual request here.
_guest_user_mxid = self.make_request(
method="POST",
path="/_matrix/client/v3/register?kind=guest",
content={
"username": "guest_user",
"password": "test",
},
shorthand=False,
)
# Log in to each user
user_1_token = self.login(username=user_1_mxid, password="test")
user_2_token = self.login(username=user_2_mxid, password="test")
# Create a room between the two users
room_1_id = self.helper.create_room_as(
is_public=False,
tok=user_1_token,
)
# Mark this room as end-to-end encrypted
self.helper.send_state(
room_id=room_1_id,
event_type="m.room.encryption",
body={
"algorithm": "m.megolm.v1.aes-sha2",
"rotation_period_ms": 604800000,
"rotation_period_msgs": 100,
},
state_key="",
tok=user_1_token,
)
# User 1 invites user 2
self.helper.invite(
room=room_1_id,
src=user_1_mxid,
targ=user_2_mxid,
tok=user_1_token,
)
# User 2 joins
self.helper.join(
room=room_1_id,
user=user_2_mxid,
tok=user_2_token,
)
# User 1 sends 10 unencrypted messages
for _ in range(10):
self.helper.send(
room_id=room_1_id,
body="Zoinks Scoob! A message!",
tok=user_1_token,
)
# User 2 sends 5 encrypted "messages"
for _ in range(5):
self.helper.send_event(
room_id=room_1_id,
type="m.room.encrypted",
content={
"algorithm": "m.olm.v1.curve25519-aes-sha2",
"sender_key": "some_key",
"ciphertext": {
"some_key": {
"type": 0,
"body": "encrypted_payload",
},
},
},
tok=user_2_token,
)
def test_phone_home_stats(self) -> None:
"""
Test that the phone home stats contain the stats we expect based on
the scenario carried out in `prepare`
"""
# Do things to bump the stats
self._perform_user_actions()
# Wait for the stats to be reported
phone_home_stats = self._get_latest_phone_home_stats()
self.assertEqual(
phone_home_stats["homeserver"], self.hs.config.server.server_name
)
self.assertTrue(isinstance(phone_home_stats["memory_rss"], int))
self.assertTrue(isinstance(phone_home_stats["cpu_average"], int))
self.assertEqual(phone_home_stats["server_context"], TEST_SERVER_CONTEXT)
self.assertTrue(isinstance(phone_home_stats["timestamp"], int))
self.assertTrue(isinstance(phone_home_stats["uptime_seconds"], int))
self.assertTrue(isinstance(phone_home_stats["python_version"], str))
# We expect only our test users to exist on the homeserver
self.assertEqual(phone_home_stats["total_users"], 3)
self.assertEqual(phone_home_stats["total_nonbridged_users"], 3)
self.assertEqual(phone_home_stats["daily_user_type_native"], 2)
self.assertEqual(phone_home_stats["daily_user_type_guest"], 1)
self.assertEqual(phone_home_stats["daily_user_type_bridged"], 0)
self.assertEqual(phone_home_stats["total_room_count"], 1)
self.assertEqual(phone_home_stats["total_event_count"], 24)
self.assertEqual(phone_home_stats["total_message_count"], 10)
self.assertEqual(phone_home_stats["total_e2ee_event_count"], 5)
self.assertEqual(phone_home_stats["daily_active_users"], 2)
self.assertEqual(phone_home_stats["monthly_active_users"], 2)
self.assertEqual(phone_home_stats["daily_active_rooms"], 1)
self.assertEqual(phone_home_stats["daily_active_e2ee_rooms"], 1)
self.assertEqual(phone_home_stats["daily_messages"], 10)
self.assertEqual(phone_home_stats["daily_e2ee_messages"], 5)
self.assertEqual(phone_home_stats["daily_sent_messages"], 10)
self.assertEqual(phone_home_stats["daily_sent_e2ee_messages"], 5)
# Our users have not been around for >30 days, hence these are all 0.
self.assertEqual(phone_home_stats["r30v2_users_all"], 0)
self.assertEqual(phone_home_stats["r30v2_users_android"], 0)
self.assertEqual(phone_home_stats["r30v2_users_ios"], 0)
self.assertEqual(phone_home_stats["r30v2_users_electron"], 0)
self.assertEqual(phone_home_stats["r30v2_users_web"], 0)
self.assertEqual(
phone_home_stats["cache_factor"], self.hs.config.caches.global_factor
)
self.assertEqual(
phone_home_stats["event_cache_size"],
self.hs.config.caches.event_cache_size,
)
self.assertEqual(
phone_home_stats["database_engine"],
self.hs.config.database.databases[0].config["name"],
)
self.assertEqual(
phone_home_stats["database_server_version"],
self.hs.get_datastores().main.database_engine.server_version,
)
synapse_logger = logging.getLogger("synapse")
log_level = synapse_logger.getEffectiveLevel()
self.assertEqual(phone_home_stats["log_level"], logging.getLevelName(log_level))

View File

@@ -1167,81 +1167,3 @@ class HTTPPusherTests(HomeserverTestCase):
self.assertEqual(
self.push_attempts[0][2]["notification"]["counts"]["unread"], 1
)
def test_push_backoff(self) -> None:
"""
The HTTP pusher will backoff correctly if it fails to contact the pusher.
"""
# Register the user who gets notified
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
# Register the user who sends the message
other_user_id = self.register_user("otheruser", "pass")
other_access_token = self.login("otheruser", "pass")
# Register the pusher
user_tuple = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
assert user_tuple is not None
device_id = user_tuple.device_id
self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=user_id,
device_id=device_id,
kind="http",
app_id="m.http",
app_display_name="HTTP Push Notifications",
device_display_name="pushy push",
pushkey="a@example.com",
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
)
)
# Create a room with the other user
room = self.helper.create_room_as(user_id, tok=access_token)
self.helper.join(room=room, user=other_user_id, tok=other_access_token)
# The other user sends some messages
self.helper.send(room, body="Message 1", tok=other_access_token)
# One push was attempted to be sent
self.assertEqual(len(self.push_attempts), 1)
self.assertEqual(
self.push_attempts[0][1], "http://example.com/_matrix/push/v1/notify"
)
self.assertEqual(
self.push_attempts[0][2]["notification"]["content"]["body"], "Message 1"
)
self.push_attempts[0][0].callback({})
self.pump()
# Send another message, this time it fails
self.helper.send(room, body="Message 2", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 2)
self.push_attempts[1][0].errback(Exception("couldn't connect"))
self.pump()
# Sending yet another message doesn't trigger a push immediately
self.helper.send(room, body="Message 3", tok=other_access_token)
self.pump()
self.assertEqual(len(self.push_attempts), 2)
# .. but waiting for a bit will cause more pushes
self.reactor.advance(10)
self.assertEqual(len(self.push_attempts), 3)
self.assertEqual(
self.push_attempts[2][2]["notification"]["content"]["body"], "Message 2"
)
self.push_attempts[2][0].callback({})
self.pump()
self.assertEqual(len(self.push_attempts), 4)
self.assertEqual(
self.push_attempts[3][2]["notification"]["content"]["body"], "Message 3"
)
self.push_attempts[3][0].callback({})

View File

@@ -0,0 +1,237 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from twisted.test.proto_helpers import MemoryReactor
from synapse.rest import admin, login, register, room
from synapse.server import HomeServer
from synapse.types.storage import _BackgroundUpdates
from synapse.util import Clock
from tests import unittest
class EventStatsTestCase(unittest.HomeserverTestCase):
"""
Tests for the `event_stats` table
"""
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
register.register_servlets,
login.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
# Wait for the background updates to add the database triggers that keep the
# `event_stats` table up-to-date.
#
# This also prevents background updates running during the tests and messing
# with the results.
self.wait_for_background_updates()
super().prepare(reactor, clock, hs)
def _perform_user_actions(self) -> None:
"""
Perform some actions on the homeserver that would bump the event counts.
"""
# Create some users
user_1_mxid = self.register_user(
username="test_user_1",
password="test",
)
user_2_mxid = self.register_user(
username="test_user_2",
password="test",
)
# Note: `self.register_user` does not support guest registration, and updating the
# Admin API it calls to add a new parameter would cause the `mac` parameter to fail
# in a backwards-incompatible manner. Hence, we make a manual request here.
_guest_user_mxid = self.make_request(
method="POST",
path="/_matrix/client/v3/register?kind=guest",
content={
"username": "guest_user",
"password": "test",
},
shorthand=False,
)
# Log in to each user
user_1_token = self.login(username=user_1_mxid, password="test")
user_2_token = self.login(username=user_2_mxid, password="test")
# Create a room between the two users
room_1_id = self.helper.create_room_as(
is_public=False,
tok=user_1_token,
)
# Mark this room as end-to-end encrypted
self.helper.send_state(
room_id=room_1_id,
event_type="m.room.encryption",
body={
"algorithm": "m.megolm.v1.aes-sha2",
"rotation_period_ms": 604800000,
"rotation_period_msgs": 100,
},
state_key="",
tok=user_1_token,
)
# User 1 invites user 2
self.helper.invite(
room=room_1_id,
src=user_1_mxid,
targ=user_2_mxid,
tok=user_1_token,
)
# User 2 joins
self.helper.join(
room=room_1_id,
user=user_2_mxid,
tok=user_2_token,
)
# User 1 sends 10 unencrypted messages
for _ in range(10):
self.helper.send(
room_id=room_1_id,
body="Zoinks Scoob! A message!",
tok=user_1_token,
)
# User 2 sends 5 encrypted "messages"
for _ in range(5):
self.helper.send_event(
room_id=room_1_id,
type="m.room.encrypted",
content={
"algorithm": "m.olm.v1.curve25519-aes-sha2",
"sender_key": "some_key",
"ciphertext": {
"some_key": {
"type": 0,
"body": "encrypted_payload",
},
},
},
tok=user_2_token,
)
def test_background_update_with_events(self) -> None:
"""
Test that the background update to populate the `event_stats` table works
correctly when there are events in the database.
"""
# Do things to bump the stats
self._perform_user_actions()
# Keep in mind: These are already populated as the background update has already
# ran once when Synapse started and added the database triggers which are
# incrementing things as new events come in.
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
# Run the background update again
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# We expect these values to double as the background update is being run *again*
# and will double-count the `events`.
self.assertEqual(self.get_success(self.store.count_total_events()), 48)
self.assertEqual(self.get_success(self.store.count_total_messages()), 20)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 10)
def test_background_update_without_events(self) -> None:
"""
Test that the background update to populate the `event_stats` table works
correctly without events in the database.
"""
# Keep in mind: These are already populated as the background update has already
# ran once when Synapse started and added the database triggers which are
# incrementing things as new events come in.
#
# In this case, no events have been sent, so we expect the counts to be 0.
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
# Run the background update again
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
self.assertEqual(self.get_success(self.store.count_total_events()), 0)
self.assertEqual(self.get_success(self.store.count_total_messages()), 0)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 0)
def test_background_update_resume_progress(self) -> None:
"""
Test that the background update to populate the `event_stats` table works
correctly to resume from `progress_json`.
"""
# Do things to bump the stats
self._perform_user_actions()
# Keep in mind: These are already populated as the background update has already
# ran once when Synapse started and added the database triggers which are
# incrementing things as new events come in.
self.assertEqual(self.get_success(self.store.count_total_events()), 24)
self.assertEqual(self.get_success(self.store.count_total_messages()), 10)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 5)
# Run the background update again
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
"progress_json": '{ "last_event_stream_ordering": 14, "stop_event_stream_ordering": 21 }',
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()
# We expect these values to increase as the background update is being run
# *again* and will double-count some of the `events` over the range specified
# by the `progress_json`.
self.assertEqual(self.get_success(self.store.count_total_events()), 24 + 7)
self.assertEqual(self.get_success(self.store.count_total_messages()), 16)
self.assertEqual(self.get_success(self.store.count_total_e2ee_events()), 6)