1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Andrew Morgan
978483e01e changelog 2024-06-28 15:45:28 +01:00
Andrew Morgan
a74b82734a Migrate Synapse flake to element-hq/nix-flakes 2024-06-25 15:19:33 +01:00
76 changed files with 407 additions and 1444 deletions

View File

@@ -2,4 +2,4 @@
(using a matrix.org account if necessary). We do not use GitHub issues for
support.
**If you want to report a security issue** please see https://element.io/security/security-disclosure-policy
**If you want to report a security issue** please see https://matrix.org/security-disclosure-policy/

View File

@@ -7,7 +7,7 @@ body:
**THIS IS NOT A SUPPORT CHANNEL!**
**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**, please ask in **[#synapse:matrix.org](https://matrix.to/#/#synapse:matrix.org)** (using a matrix.org account if necessary).
If you want to report a security issue, please see https://element.io/security/security-disclosure-policy
If you want to report a security issue, please see https://matrix.org/security-disclosure-policy/
This is a bug report form. By following the instructions below and completing the sections with your information, you will help the us to get all the necessary data to fix your issue.

View File

@@ -1,115 +1,3 @@
# Synapse 1.110.0 (2024-07-03)
No significant changes since 1.110.0rc3.
# Synapse 1.110.0rc3 (2024-07-02)
### Bugfixes
- Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0. ([\#17386](https://github.com/element-hq/synapse/issues/17386), [\#17391](https://github.com/element-hq/synapse/issues/17391))
### Internal Changes
- Limit size of presence EDUs to 50 entries. ([\#17371](https://github.com/element-hq/synapse/issues/17371))
- Fix building debian package for debian sid. ([\#17389](https://github.com/element-hq/synapse/issues/17389))
# Synapse 1.110.0rc2 (2024-06-26)
### Internal Changes
- Fix uploading packages to PyPi. ([\#17363](https://github.com/element-hq/synapse/issues/17363))
# Synapse 1.110.0rc1 (2024-06-26)
### Features
- Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17187](https://github.com/element-hq/synapse/issues/17187))
- Add experimental support for [MSC3823](https://github.com/matrix-org/matrix-spec-proposals/pull/3823) - Account suspension. ([\#17255](https://github.com/element-hq/synapse/issues/17255))
- Improve ratelimiting in Synapse. ([\#17256](https://github.com/element-hq/synapse/issues/17256))
- Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API. ([\#17270](https://github.com/element-hq/synapse/issues/17270), [\#17296](https://github.com/element-hq/synapse/issues/17296))
- Filter for public and empty rooms added to Admin-API [List Room API](https://element-hq.github.io/synapse/latest/admin_api/rooms.html#list-room-api). ([\#17276](https://github.com/element-hq/synapse/issues/17276))
- Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17277](https://github.com/element-hq/synapse/issues/17277))
- Add `is_encrypted` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17281](https://github.com/element-hq/synapse/issues/17281))
- Include user membership in events served to clients, per [MSC4115](https://github.com/matrix-org/matrix-spec-proposals/pull/4115). ([\#17282](https://github.com/element-hq/synapse/issues/17282))
- Do not require user-interactive authentication for uploading cross-signing keys for the first time, per [MSC3967](https://github.com/matrix-org/matrix-spec-proposals/pull/3967). ([\#17284](https://github.com/element-hq/synapse/issues/17284))
- Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17293](https://github.com/element-hq/synapse/issues/17293))
- `register_new_matrix_user` now supports a --password-file flag, which
is useful for scripting. ([\#17294](https://github.com/element-hq/synapse/issues/17294))
- `register_new_matrix_user` now supports a --exists-ok flag to allow registration of users that already exist in the database.
This is useful for scripts that bootstrap user accounts with initial passwords. ([\#17304](https://github.com/element-hq/synapse/issues/17304))
- Add support for via query parameter from [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156). ([\#17322](https://github.com/element-hq/synapse/issues/17322))
- Add `is_invite` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17335](https://github.com/element-hq/synapse/issues/17335))
- Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md) by adding a federation /download endpoint. ([\#17350](https://github.com/element-hq/synapse/issues/17350))
### Bugfixes
- Fix searching for users with their exact localpart whose ID includes a hyphen. ([\#17254](https://github.com/element-hq/synapse/issues/17254))
- Fix wrong retention policy being used when filtering events. ([\#17272](https://github.com/element-hq/synapse/issues/17272))
- Fix bug where OTKs were not always included in `/sync` response when using workers. ([\#17275](https://github.com/element-hq/synapse/issues/17275))
- Fix a long-standing bug where an invalid 'from' parameter to [`/notifications`](https://spec.matrix.org/v1.10/client-server-api/#get_matrixclientv3notifications) would result in an Internal Server Error. ([\#17283](https://github.com/element-hq/synapse/issues/17283))
- Fix edge case in `/sync` returning the wrong the state when using sharded event persisters. ([\#17295](https://github.com/element-hq/synapse/issues/17295))
- Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17301](https://github.com/element-hq/synapse/issues/17301))
- Fix email notification subject when invited to a space. ([\#17336](https://github.com/element-hq/synapse/issues/17336))
### Improved Documentation
- Add missing quotes for example for `exclude_rooms_from_sync`. ([\#17308](https://github.com/element-hq/synapse/issues/17308))
- Update header in the README to visually fix the the auto-generated table of contents. ([\#17329](https://github.com/element-hq/synapse/issues/17329))
- Fix stale references to the Foundation's Security Disclosure Policy. ([\#17341](https://github.com/element-hq/synapse/issues/17341))
- Add default values for `rc_invites.per_issuer` to docs. ([\#17347](https://github.com/element-hq/synapse/issues/17347))
- Fix an error in the docs for `search_all_users` parameter under `user_directory`. ([\#17348](https://github.com/element-hq/synapse/issues/17348))
### Internal Changes
- Remove unused `expire_access_token` option in the Synapse Docker config file. Contributed by @AaronDewes. ([\#17198](https://github.com/element-hq/synapse/issues/17198))
- Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation. ([\#17265](https://github.com/element-hq/synapse/issues/17265))
- Add debug logging for when room keys are uploaded, including whether they are replacing other room keys. ([\#17266](https://github.com/element-hq/synapse/issues/17266))
- Handle OTK uploads off master. ([\#17271](https://github.com/element-hq/synapse/issues/17271))
- Don't try and resync devices for remote users whose servers are marked as down. ([\#17273](https://github.com/element-hq/synapse/issues/17273))
- Re-organize Pydantic models and types used in handlers. ([\#17279](https://github.com/element-hq/synapse/issues/17279))
- Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`. ([\#17300](https://github.com/element-hq/synapse/issues/17300))
- Update the README with Element branding, improve headers and fix the #synapse:matrix.org support room link rendering. ([\#17324](https://github.com/element-hq/synapse/issues/17324))
- Change path of the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync implementation to `/org.matrix.simplified_msc3575/sync` since our simplified API is slightly incompatible with what's in the current MSC. ([\#17331](https://github.com/element-hq/synapse/issues/17331))
- Handle device lists notifications for large accounts more efficiently in worker mode. ([\#17333](https://github.com/element-hq/synapse/issues/17333), [\#17358](https://github.com/element-hq/synapse/issues/17358))
- Do not block event sending/receiving while calculating large event auth chains. ([\#17338](https://github.com/element-hq/synapse/issues/17338))
- Tidy up `parse_integer` docs and call sites to reflect the fact that they require non-negative integers by default, and bring `parse_integer_from_args` default in alignment. Contributed by Denis Kasak (@dkasak). ([\#17339](https://github.com/element-hq/synapse/issues/17339))
### Updates to locked dependencies
* Bump authlib from 1.3.0 to 1.3.1. ([\#17343](https://github.com/element-hq/synapse/issues/17343))
* Bump dawidd6/action-download-artifact from 3.1.4 to 5. ([\#17289](https://github.com/element-hq/synapse/issues/17289))
* Bump dawidd6/action-download-artifact from 5 to 6. ([\#17313](https://github.com/element-hq/synapse/issues/17313))
* Bump docker/build-push-action from 5 to 6. ([\#17312](https://github.com/element-hq/synapse/issues/17312))
* Bump jinja2 from 3.1.3 to 3.1.4. ([\#17287](https://github.com/element-hq/synapse/issues/17287))
* Bump lazy_static from 1.4.0 to 1.5.0. ([\#17355](https://github.com/element-hq/synapse/issues/17355))
* Bump msgpack from 1.0.7 to 1.0.8. ([\#17317](https://github.com/element-hq/synapse/issues/17317))
* Bump netaddr from 1.2.1 to 1.3.0. ([\#17353](https://github.com/element-hq/synapse/issues/17353))
* Bump packaging from 24.0 to 24.1. ([\#17352](https://github.com/element-hq/synapse/issues/17352))
* Bump phonenumbers from 8.13.37 to 8.13.39. ([\#17315](https://github.com/element-hq/synapse/issues/17315))
* Bump regex from 1.10.4 to 1.10.5. ([\#17290](https://github.com/element-hq/synapse/issues/17290))
* Bump requests from 2.31.0 to 2.32.2. ([\#17345](https://github.com/element-hq/synapse/issues/17345))
* Bump sentry-sdk from 2.1.1 to 2.3.1. ([\#17263](https://github.com/element-hq/synapse/issues/17263))
* Bump sentry-sdk from 2.3.1 to 2.6.0. ([\#17351](https://github.com/element-hq/synapse/issues/17351))
* Bump tornado from 6.4 to 6.4.1. ([\#17344](https://github.com/element-hq/synapse/issues/17344))
* Bump mypy from 1.8.0 to 1.9.0. ([\#17297](https://github.com/element-hq/synapse/issues/17297))
* Bump types-jsonschema from 4.21.0.20240311 to 4.22.0.20240610. ([\#17288](https://github.com/element-hq/synapse/issues/17288))
* Bump types-netaddr from 1.2.0.20240219 to 1.3.0.20240530. ([\#17314](https://github.com/element-hq/synapse/issues/17314))
* Bump types-pillow from 10.2.0.20240423 to 10.2.0.20240520. ([\#17285](https://github.com/element-hq/synapse/issues/17285))
* Bump types-pyyaml from 6.0.12.12 to 6.0.12.20240311. ([\#17316](https://github.com/element-hq/synapse/issues/17316))
* Bump typing-extensions from 4.11.0 to 4.12.2. ([\#17354](https://github.com/element-hq/synapse/issues/17354))
* Bump urllib3 from 2.0.7 to 2.2.2. ([\#17346](https://github.com/element-hq/synapse/issues/17346))
# Synapse 1.109.0 (2024-06-18)
### Internal Changes

View File

@@ -1,20 +1,20 @@
.. image:: https://github.com/element-hq/product/assets/87339233/7abf477a-5277-47f3-be44-ea44917d8ed7
:height: 60px
**Element Synapse - Matrix homeserver implementation**
===========================================================================================================
Element Synapse - Matrix homeserver implementation |support| |development| |documentation| |license| |pypi| |python|
===========================================================================================================
|support| |development| |documentation| |license| |pypi| |python|
Synapse is an open source `Matrix <https://matrix.org>`__ homeserver
Synapse is an open source `Matrix <https://matrix.org>`_ homeserver
implementation, written and maintained by `Element <https://element.io>`_.
`Matrix <https://github.com/matrix-org>`__ is the open standard for
`Matrix <https://github.com/matrix-org>`_ is the open standard for
secure and interoperable real time communications. You can directly run
and manage the source code in this repository, available under an AGPL
license. There is no support provided from Element unless you have a
subscription.
Subscription alternative
========================
------------------------
Alternatively, for those that need an enterprise-ready solution, Element
Server Suite (ESS) is `available as a subscription <https://element.io/pricing>`_.
@@ -119,7 +119,7 @@ impact to other applications will be minimal.
🧪 Testing a new installation
=============================
============================
The easiest way to try out your new Synapse installation is by connecting to it
from a web client.
@@ -173,10 +173,10 @@ As when logging in, you will need to specify a "Custom server". Specify your
desired ``localpart`` in the 'User name' box.
🎯 Troubleshooting and support
==============================
=============================
🚀 Professional support
-----------------------
----------------------
Enterprise quality support for Synapse including SLAs is available as part of an
`Element Server Suite (ESS) <https://element.io/pricing>` subscription.
@@ -185,7 +185,7 @@ If you are an existing ESS subscriber then you can raise a `support request <htt
and access the `knowledge base <https://ems-docs.element.io>`.
🤝 Community support
--------------------
-------------------
The `Admin FAQ <https://element-hq.github.io/synapse/latest/usage/administration/admin_faq.html>`_
includes tips on dealing with some common problems. For more details, see
@@ -202,7 +202,7 @@ issues for support requests, only for bug reports and feature requests.
.. _docs: docs
🪪 Identity Servers
===================
==================
Identity servers have the job of mapping email addresses and other 3rd Party
IDs (3PIDs) to Matrix user IDs, as well as verifying the ownership of 3PIDs

View File

@@ -0,0 +1 @@
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17198.misc Normal file
View File

@@ -0,0 +1 @@
Remove unused `expire_access_token` option in the Synapse Docker config file. Contributed by @AaronDewes.

1
changelog.d/17254.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix searching for users with their exact localpart whose ID includes a hyphen.

View File

@@ -0,0 +1 @@
Add support for [MSC823](https://github.com/matrix-org/matrix-spec-proposals/pull/3823) - Account suspension.

View File

@@ -0,0 +1 @@
Improve ratelimiting in Synapse (#17256).

1
changelog.d/17265.misc Normal file
View File

@@ -0,0 +1 @@
Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation.

1
changelog.d/17266.misc Normal file
View File

@@ -0,0 +1 @@
Add debug logging for when room keys are uploaded, including whether they are replacing other room keys.

View File

@@ -0,0 +1 @@
Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API.

1
changelog.d/17271.misc Normal file
View File

@@ -0,0 +1 @@
Handle OTK uploads off master.

1
changelog.d/17272.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix wrong retention policy being used when filtering events.

1
changelog.d/17273.misc Normal file
View File

@@ -0,0 +1 @@
Don't try and resync devices for remote users whose servers are marked as down.

1
changelog.d/17275.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug where OTKs were not always included in `/sync` response when using workers.

View File

@@ -0,0 +1 @@
Filter for public and empty rooms added to Admin-API [List Room API](https://element-hq.github.io/synapse/latest/admin_api/rooms.html#list-room-api).

View File

@@ -0,0 +1 @@
Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17279.misc Normal file
View File

@@ -0,0 +1 @@
Re-organize Pydantic models and types used in handlers.

View File

@@ -0,0 +1 @@
Add `is_encrypted` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@@ -0,0 +1 @@
Include user membership in events served to clients, per MSC4115.

1
changelog.d/17283.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where an invalid 'from' parameter to [`/notifications`](https://spec.matrix.org/v1.10/client-server-api/#get_matrixclientv3notifications) would result in an Internal Server Error.

View File

@@ -0,0 +1 @@
Do not require user-interactive authentication for uploading cross-signing keys for the first time, per MSC3967.

View File

@@ -0,0 +1 @@
Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@@ -0,0 +1,2 @@
`register_new_matrix_user` now supports a --password-file flag, which
is useful for scripting.

1
changelog.d/17295.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix edge case in `/sync` returning the wrong the state when using sharded event persisters.

View File

@@ -0,0 +1 @@
Add support for the unstable [MSC4151](https://github.com/matrix-org/matrix-spec-proposals/pull/4151) report room API.

1
changelog.d/17297.misc Normal file
View File

@@ -0,0 +1 @@
Bump `mypy` from 1.8.0 to 1.9.0.

1
changelog.d/17300.misc Normal file
View File

@@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.

1
changelog.d/17301.bugfix Normal file
View File

@@ -0,0 +1 @@
Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@@ -0,0 +1,2 @@
`register_new_matrix_user` now supports a --exists-ok flag to allow registration of users that already exist in the database.
This is useful for scripts that bootstrap user accounts with initial passwords.

1
changelog.d/17308.doc Normal file
View File

@@ -0,0 +1 @@
Add missing quotes for example for `exclude_rooms_from_sync`.

View File

@@ -0,0 +1 @@
Add support for via query parameter from MSC415.

1
changelog.d/17324.misc Normal file
View File

@@ -0,0 +1 @@
Update the README with Element branding, improve headers and fix the #synapse:matrix.org support room link rendering.

1
changelog.d/17325.misc Normal file
View File

@@ -0,0 +1 @@
This is a changelog so tests will run.

1
changelog.d/17331.misc Normal file
View File

@@ -0,0 +1 @@
Change path of the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync implementation to `/org.matrix.simplified_msc3575/sync` since our simplified API is slightly incompatible with what's in the current MSC.

1
changelog.d/17333.misc Normal file
View File

@@ -0,0 +1 @@
Handle device lists notifications for large accounts more efficiently in worker mode.

View File

@@ -0,0 +1 @@
Add `is_invite` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

1
changelog.d/17336.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix email notification subject when invited to a space.

1
changelog.d/17338.misc Normal file
View File

@@ -0,0 +1 @@
Do not block event sending/receiving while calculating large event auth chains.

1
changelog.d/17339.misc Normal file
View File

@@ -0,0 +1 @@
Tidy up `parse_integer` docs and call sites to reflect the fact that they require non-negative integers by default, and bring `parse_integer_from_args` default in alignment. Contributed by Denis Kasak (@dkasak).

1
changelog.d/17347.doc Normal file
View File

@@ -0,0 +1 @@
Add default values for `rc_invites.per_issuer` to docs.

1
changelog.d/17348.doc Normal file
View File

@@ -0,0 +1 @@
Fix an error in the docs for `search_all_users` parameter under `user_directory`.

1
changelog.d/17372.misc Normal file
View File

@@ -0,0 +1 @@
Migrate the nix flake to a separate repo ([element-hq/nix-flakes](https://github.com/element-hq/nix-flakes)).

23
debian/changelog vendored
View File

@@ -1,27 +1,8 @@
matrix-synapse-py3 (1.110.0) stable; urgency=medium
* New Synapse release 1.110.0.
-- Synapse Packaging team <packages@matrix.org> Wed, 03 Jul 2024 09:08:59 -0600
matrix-synapse-py3 (1.110.0~rc3) stable; urgency=medium
* New Synapse release 1.110.0rc3.
-- Synapse Packaging team <packages@matrix.org> Tue, 02 Jul 2024 08:28:56 -0600
matrix-synapse-py3 (1.110.0~rc2) stable; urgency=medium
* New Synapse release 1.110.0rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 26 Jun 2024 18:14:48 +0200
matrix-synapse-py3 (1.110.0~rc1) stable; urgency=medium
matrix-synapse-py3 (1.109.0+nmu1) UNRELEASED; urgency=medium
* `register_new_matrix_user` now supports a --password-file and a --exists-ok flag.
* New Synapse release 1.110.0rc1.
-- Synapse Packaging team <packages@matrix.org> Wed, 26 Jun 2024 14:07:56 +0200
-- Synapse Packaging team <packages@matrix.org> Tue, 18 Jun 2024 13:29:36 +0100
matrix-synapse-py3 (1.109.0) stable; urgency=medium

View File

@@ -73,8 +73,6 @@ RUN apt-get update -qq -o Acquire::Languages=none \
curl \
debhelper \
devscripts \
# Required for building cffi from source.
libffi-dev \
libsystemd-dev \
lsb-release \
pkg-config \

View File

@@ -2719,7 +2719,7 @@ Example configuration:
session_lifetime: 24h
```
---
### `refreshable_access_token_lifetime`
### `refresh_access_token_lifetime`
Time that an access token remains valid for, if the session is using refresh tokens.

View File

@@ -62,6 +62,6 @@ following documentation:
## Reporting a security vulnerability
If you've found a security issue in Synapse or any other Element project,
please report it to us in accordance with our [Security Disclosure
Policy](https://element.io/security/security-disclosure-policy). Thank you!
If you've found a security issue in Synapse or any other Matrix.org Foundation
project, please report it to us in accordance with our [Security Disclosure
Policy](https://www.matrix.org/security-disclosure-policy/). Thank you!

342
flake.lock generated
View File

@@ -1,24 +1,112 @@
{
"nodes": {
"devenv": {
"cachix": {
"inputs": {
"flake-compat": "flake-compat",
"nix": "nix",
"nixpkgs": "nixpkgs",
"pre-commit-hooks": "pre-commit-hooks"
"devenv": "devenv_2",
"flake-compat": [
"element-nix-flakes",
"devenv",
"flake-compat"
],
"nixpkgs": [
"element-nix-flakes",
"devenv",
"nixpkgs"
],
"pre-commit-hooks": [
"element-nix-flakes",
"devenv",
"pre-commit-hooks"
]
},
"locked": {
"lastModified": 1688058187,
"narHash": "sha256-ipDcc7qrucpJ0+0eYNlwnE+ISTcq4m03qW+CWUshRXI=",
"lastModified": 1712055811,
"narHash": "sha256-7FcfMm5A/f02yyzuavJe06zLa9hcMHsagE28ADcmQvk=",
"owner": "cachix",
"repo": "devenv",
"rev": "c8778e3dc30eb9043e218aaa3861d42d4992de77",
"repo": "cachix",
"rev": "02e38da89851ec7fec3356a5c04bc8349cae0e30",
"type": "github"
},
"original": {
"owner": "cachix",
"ref": "v0.6.3",
"repo": "cachix",
"type": "github"
}
},
"devenv": {
"inputs": {
"cachix": "cachix",
"flake-compat": "flake-compat_2",
"nix": "nix_2",
"nixpkgs": "nixpkgs_2",
"pre-commit-hooks": "pre-commit-hooks"
},
"locked": {
"lastModified": 1718265154,
"narHash": "sha256-eTbBvYwGlKExMSTyHQya6+6kdx1rtva/aVfyAZu2NUU=",
"owner": "cachix",
"repo": "devenv",
"rev": "1983f635c29dc68bb0d29b3a7e227579a1d98788",
"type": "github"
},
"original": {
"owner": "cachix",
"ref": "v1.0.7",
"repo": "devenv",
"type": "github"
}
},
"devenv_2": {
"inputs": {
"flake-compat": [
"element-nix-flakes",
"devenv",
"cachix",
"flake-compat"
],
"nix": "nix",
"nixpkgs": "nixpkgs",
"poetry2nix": "poetry2nix",
"pre-commit-hooks": [
"element-nix-flakes",
"devenv",
"cachix",
"pre-commit-hooks"
]
},
"locked": {
"lastModified": 1708704632,
"narHash": "sha256-w+dOIW60FKMaHI1q5714CSibk99JfYxm0CzTinYWr+Q=",
"owner": "cachix",
"repo": "devenv",
"rev": "2ee4450b0f4b95a1b90f2eb5ffea98b90e48c196",
"type": "github"
},
"original": {
"owner": "cachix",
"ref": "python-rewrite",
"repo": "devenv",
"type": "github"
}
},
"element-nix-flakes": {
"inputs": {
"devenv": "devenv",
"nixpkgs": "nixpkgs_3",
"rust-overlay": "rust-overlay",
"systems": "systems_3"
},
"locked": {
"lastModified": 1719311692,
"narHash": "sha256-WYtaML+xsn5r+de6cgi41rXa+6OP9RFLHd4FFxJQCvg=",
"owner": "element-hq",
"repo": "nix-flakes",
"rev": "3d1e3ee99be86b5535c371d13b94a9af02c0ce06",
"type": "github"
},
"original": {
"owner": "element-hq",
"repo": "nix-flakes",
"type": "github"
}
},
@@ -38,16 +126,32 @@
"type": "github"
}
},
"flake-compat_2": {
"flake": false,
"locked": {
"lastModified": 1696426674,
"narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "0f9255e01c2351cc7d116c072cb317785dd33b33",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1685518550,
"narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=",
"lastModified": 1689068808,
"narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef",
"rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4",
"type": "github"
},
"original": {
@@ -61,11 +165,11 @@
"systems": "systems_2"
},
"locked": {
"lastModified": 1681202837,
"narHash": "sha256-H+Rh19JDwRtpVPAWp64F+rlEtxUWBAQW28eAi3SRSzg=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "cfacdce06f30d2b68473a46042957675eebb3401",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@@ -77,17 +181,18 @@
"gitignore": {
"inputs": {
"nixpkgs": [
"element-nix-flakes",
"devenv",
"pre-commit-hooks",
"nixpkgs"
]
},
"locked": {
"lastModified": 1660459072,
"narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=",
"lastModified": 1709087332,
"narHash": "sha256-HG2cCnktfHsKV0s4XW83gU3F57gaTljL9KNSuG6bnQs=",
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "a20de23b925fd8264fd7fad6454652e142fd7f73",
"rev": "637db329424fd7e46cf4185293b9cc8c88c95394",
"type": "github"
},
"original": {
@@ -96,53 +201,94 @@
"type": "github"
}
},
"lowdown-src": {
"flake": false,
"locked": {
"lastModified": 1633514407,
"narHash": "sha256-Dw32tiMjdK9t3ETl5fzGrutQTzh2rufgZV4A/BbxuD4=",
"owner": "kristapsdz",
"repo": "lowdown",
"rev": "d2c2b44ff6c27b936ec27358a2653caaef8f73b8",
"type": "github"
},
"original": {
"owner": "kristapsdz",
"repo": "lowdown",
"type": "github"
}
},
"nix": {
"inputs": {
"lowdown-src": "lowdown-src",
"flake-compat": "flake-compat",
"nixpkgs": [
"element-nix-flakes",
"devenv",
"cachix",
"devenv",
"nixpkgs"
],
"nixpkgs-regression": "nixpkgs-regression"
},
"locked": {
"lastModified": 1676545802,
"narHash": "sha256-EK4rZ+Hd5hsvXnzSzk2ikhStJnD63odF7SzsQ8CuSPU=",
"lastModified": 1712911606,
"narHash": "sha256-BGvBhepCufsjcUkXnEEXhEVjwdJAwPglCC2+bInc794=",
"owner": "domenkozar",
"repo": "nix",
"rev": "7c91803598ffbcfe4a55c44ac6d49b2cf07a527f",
"rev": "b24a9318ea3f3600c1e24b4a00691ee912d4de12",
"type": "github"
},
"original": {
"owner": "domenkozar",
"ref": "relaxed-flakes",
"ref": "devenv-2.21",
"repo": "nix",
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"element-nix-flakes",
"devenv",
"cachix",
"devenv",
"poetry2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1688870561,
"narHash": "sha256-4UYkifnPEw1nAzqqPOTL2MvWtm3sNGw1UTYTalkTcGY=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "165b1650b753316aa7f1787f3005a8d2da0f5301",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nix_2": {
"inputs": {
"flake-compat": [
"element-nix-flakes",
"devenv",
"flake-compat"
],
"nixpkgs": [
"element-nix-flakes",
"devenv",
"nixpkgs"
],
"nixpkgs-regression": "nixpkgs-regression_2"
},
"locked": {
"lastModified": 1712911606,
"narHash": "sha256-BGvBhepCufsjcUkXnEEXhEVjwdJAwPglCC2+bInc794=",
"owner": "domenkozar",
"repo": "nix",
"rev": "b24a9318ea3f3600c1e24b4a00691ee912d4de12",
"type": "github"
},
"original": {
"owner": "domenkozar",
"ref": "devenv-2.21",
"repo": "nix",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1678875422,
"narHash": "sha256-T3o6NcQPwXjxJMn2shz86Chch4ljXgZn746c2caGxd8=",
"lastModified": 1692808169,
"narHash": "sha256-x9Opq06rIiwdwGeK2Ykj69dNc2IvUH1fY55Wm7atwrE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "126f49a01de5b7e35a43fd43f891ecf6d3a51459",
"rev": "9201b5ff357e781bf014d0330d18555695df7ba8",
"type": "github"
},
"original": {
@@ -168,45 +314,73 @@
"type": "github"
}
},
"nixpkgs-stable": {
"nixpkgs-regression_2": {
"locked": {
"lastModified": 1685801374,
"narHash": "sha256-otaSUoFEMM+LjBI1XL/xGB5ao6IwnZOXc47qhIgJe8U=",
"lastModified": 1643052045,
"narHash": "sha256-uGJ0VXIhWKGXxkeNnq4TvV3CIOkUJ3PAoLZ3HMzNVMw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "c37ca420157f4abc31e26f436c1145f8951ff373",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.05",
"repo": "nixpkgs",
"rev": "215d4d0fd80ca5163643b03a33fde804a29cc1e2",
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1710695816,
"narHash": "sha256-3Eh7fhEID17pv9ZxrPwCLfqXnYP006RKzSs0JptsN84=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "614b4613980a522ba49f0d194531beddbb7220d3",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1690535733,
"narHash": "sha256-WgjUPscQOw3cB8yySDGlyzo6cZNihnRzUwE9kadv/5I=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "8cacc05fbfffeaab910e8c2c9e2a7c6b32ce881a",
"lastModified": 1713361204,
"narHash": "sha256-TA6EDunWTkc5FvDCqU3W2T3SFn0gRZqh6D/hJnM02MM=",
"owner": "cachix",
"repo": "devenv-nixpkgs",
"rev": "285676e87ad9f0ca23d8714a6ab61e7e027020c6",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "master",
"repo": "nixpkgs",
"owner": "cachix",
"ref": "rolling",
"repo": "devenv-nixpkgs",
"type": "github"
}
},
"nixpkgs_3": {
"locked": {
"lastModified": 1681358109,
"narHash": "sha256-eKyxW4OohHQx9Urxi7TQlFBTDWII+F+x2hklDOQPB50=",
"lastModified": 0,
"narHash": "sha256-CyyxvOwFf12I91PBWz43iGT1kjsf5oi6ax7CrvaMyAo=",
"path": "/nix/store/yqy82fn77fy3rv7lpwa9m11w3a2nnqg5-source",
"type": "path"
},
"original": {
"id": "nixpkgs",
"type": "indirect"
}
},
"nixpkgs_4": {
"locked": {
"lastModified": 1718428119,
"narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "96ba1c52e54e74c3197f4d43026b3f3d92e83ff9",
"rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5",
"type": "github"
},
"original": {
@@ -216,26 +390,54 @@
"type": "github"
}
},
"poetry2nix": {
"inputs": {
"flake-utils": "flake-utils",
"nix-github-actions": "nix-github-actions",
"nixpkgs": [
"element-nix-flakes",
"devenv",
"cachix",
"devenv",
"nixpkgs"
]
},
"locked": {
"lastModified": 1692876271,
"narHash": "sha256-IXfZEkI0Mal5y1jr6IRWMqK8GW2/f28xJenZIPQqkY0=",
"owner": "nix-community",
"repo": "poetry2nix",
"rev": "d5006be9c2c2417dafb2e2e5034d83fabd207ee3",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "poetry2nix",
"type": "github"
}
},
"pre-commit-hooks": {
"inputs": {
"flake-compat": [
"element-nix-flakes",
"devenv",
"flake-compat"
],
"flake-utils": "flake-utils",
"flake-utils": "flake-utils_2",
"gitignore": "gitignore",
"nixpkgs": [
"element-nix-flakes",
"devenv",
"nixpkgs"
],
"nixpkgs-stable": "nixpkgs-stable"
},
"locked": {
"lastModified": 1688056373,
"narHash": "sha256-2+SDlNRTKsgo3LBRiMUcoEUb6sDViRNQhzJquZ4koOI=",
"lastModified": 1713775815,
"narHash": "sha256-Wu9cdYTnGQQwtT20QQMg7jzkANKQjwBD9iccfGKkfls=",
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"rev": "5843cf069272d92b60c3ed9e55b7a8989c01d4c7",
"rev": "2ac4dcbf55ed43f3be0bae15e181f08a57af24a4",
"type": "github"
},
"original": {
@@ -246,23 +448,19 @@
},
"root": {
"inputs": {
"devenv": "devenv",
"nixpkgs": "nixpkgs_2",
"rust-overlay": "rust-overlay",
"systems": "systems_3"
"element-nix-flakes": "element-nix-flakes"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs_3"
"nixpkgs": "nixpkgs_4"
},
"locked": {
"lastModified": 1693966243,
"narHash": "sha256-a2CA1aMIPE67JWSVIGoGtD3EGlFdK9+OlJQs0FOWCKY=",
"lastModified": 1719281921,
"narHash": "sha256-LIBMfhM9pMOlEvBI757GOK5l0R58SRi6YpwfYMbf4yc=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "a8b4bb4cbb744baaabc3e69099f352f99164e2c1",
"rev": "b6032d3a404d8a52ecfc8571ff0c26dfbe221d07",
"type": "github"
},
"original": {

230
flake.nix
View File

@@ -39,224 +39,22 @@
{
inputs = {
# Use the master/unstable branch of nixpkgs. Used to fetch the latest
# available versions of packages.
nixpkgs.url = "github:NixOS/nixpkgs/master";
# Output a development shell for x86_64/aarch64 Linux/Darwin (MacOS).
systems.url = "github:nix-systems/default";
# A development environment manager built on Nix. See https://devenv.sh.
devenv.url = "github:cachix/devenv/v0.6.3";
# Rust toolchain.
rust-overlay.url = "github:oxalica/rust-overlay";
# A repository of nix development environment flakes.
element-nix-flakes.url = "github:element-hq/nix-flakes";
};
outputs = { self, nixpkgs, devenv, systems, rust-overlay, ... } @ inputs:
let
forEachSystem = nixpkgs.lib.genAttrs (import systems);
in {
devShells = forEachSystem (system:
let
overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs {
inherit system overlays;
};
in {
# Everything is configured via devenv - a Nix module for creating declarative
# developer environments. See https://devenv.sh/reference/options/ for a list
# of all possible options.
default = devenv.lib.mkShell {
inherit inputs pkgs;
modules = [
{
# Make use of the Starship command prompt when this development environment
# is manually activated (via `nix develop --impure`).
# See https://starship.rs/ for details on the prompt itself.
starship.enable = true;
outputs = { self, element-nix-flakes, ... }:
{
# Use the `composeShell` function provided by nix-flakes
# and specify the projects we'd like dependencies for.
devShells = element-nix-flakes.outputs.composeShell [
"complement"
"synapse"
"sytest"
];
# Configure packages to install.
# Search for package names at https://search.nixos.org/packages?channel=unstable
packages = with pkgs; [
# The rust toolchain and related tools.
# This will install the "default" profile of rust components.
# https://rust-lang.github.io/rustup/concepts/profiles.html
#
# NOTE: We currently need to set the Rust version unnecessarily high
# in order to work around https://github.com/matrix-org/synapse/issues/15939
(rust-bin.stable."1.71.1".default.override {
# Additionally install the "rust-src" extension to allow diving into the
# Rust source code in an IDE (rust-analyzer will also make use of it).
extensions = [ "rust-src" ];
})
# The rust-analyzer language server implementation.
rust-analyzer
# GCC includes a linker; needed for building `ruff`
gcc
# Needed for building `ruff`
gnumake
# Native dependencies for running Synapse.
icu
libffi
libjpeg
libpqxx
libwebp
libxml2
libxslt
sqlite
# Native dependencies for unit tests (SyTest also requires OpenSSL).
openssl
xmlsec
# Native dependencies for running Complement.
olm
# For building the Synapse documentation website.
mdbook
# For releasing Synapse
debian-devscripts # (`dch` for manipulating the Debian changelog)
libnotify # (the release script uses `notify-send` to tell you when CI jobs are done)
];
# Install Python and manage a virtualenv with Poetry.
languages.python.enable = true;
languages.python.poetry.enable = true;
# Automatically activate the poetry virtualenv upon entering the shell.
languages.python.poetry.activate.enable = true;
# Install all extra Python dependencies; this is needed to run the unit
# tests and utilitise all Synapse features.
languages.python.poetry.install.arguments = ["--extras all"];
# Install the 'matrix-synapse' package from the local checkout.
languages.python.poetry.install.installRootPackage = true;
# This is a work-around for NixOS systems. NixOS is special in
# that you can have multiple versions of packages installed at
# once, including your libc linker!
#
# Some binaries built for Linux expect those to be in a certain
# filepath, but that is not the case on NixOS. In that case, we
# force compiling those binaries locally instead.
env.POETRY_INSTALLER_NO_BINARY = "ruff";
# Install dependencies for the additional programming languages
# involved with Synapse development.
#
# * Golang is needed to run the Complement test suite.
# * Perl is needed to run the SyTest test suite.
# * Rust is used for developing and running Synapse.
# It is installed manually with `packages` above.
languages.go.enable = true;
languages.perl.enable = true;
# Postgres is needed to run Synapse with postgres support and
# to run certain unit tests that require postgres.
services.postgres.enable = true;
# On the first invocation of `devenv up`, create a database for
# Synapse to store data in.
services.postgres.initdbArgs = ["--locale=C" "--encoding=UTF8"];
services.postgres.initialDatabases = [
{ name = "synapse"; }
];
# Create a postgres user called 'synapse_user' which has ownership
# over the 'synapse' database.
services.postgres.initialScript = ''
CREATE USER synapse_user;
ALTER DATABASE synapse OWNER TO synapse_user;
'';
# Redis is needed in order to run Synapse in worker mode.
services.redis.enable = true;
# Configure and start Synapse. Before starting Synapse, this shell code:
# * generates a default homeserver.yaml config file if one does not exist, and
# * ensures a directory containing two additional homeserver config files exists;
# one to configure using the development environment's PostgreSQL as the
# database backend and another for enabling Redis support.
process.before = ''
python -m synapse.app.homeserver -c homeserver.yaml --generate-config --server-name=synapse.dev --report-stats=no
mkdir -p homeserver-config-overrides.d
cat > homeserver-config-overrides.d/database.yaml << EOF
## Do not edit this file. This file is generated by flake.nix
database:
name: psycopg2
args:
user: synapse_user
database: synapse
host: $PGHOST
cp_min: 5
cp_max: 10
EOF
cat > homeserver-config-overrides.d/redis.yaml << EOF
## Do not edit this file. This file is generated by flake.nix
redis:
enabled: true
EOF
'';
# Start synapse when `devenv up` is run.
processes.synapse.exec = "poetry run python -m synapse.app.homeserver -c homeserver.yaml -c homeserver-config-overrides.d";
# Define the perl modules we require to run SyTest.
#
# This list was compiled by cross-referencing https://metacpan.org/
# with the modules defined in './cpanfile' and then finding the
# corresponding Nix packages on https://search.nixos.org/packages.
#
# This was done until `./install-deps.pl --dryrun` produced no output.
env.PERL5LIB = "${with pkgs.perl536Packages; makePerlPath [
DBI
ClassMethodModifiers
CryptEd25519
DataDump
DBDPg
DigestHMAC
DigestSHA1
EmailAddressXS
EmailMIME
EmailSimple # required by Email::Mime
EmailMessageID # required by Email::Mime
EmailMIMEContentType # required by Email::Mime
TextUnidecode # required by Email::Mime
ModuleRuntime # required by Email::Mime
EmailMIMEEncodings # required by Email::Mime
FilePath
FileSlurper
Future
GetoptLong
HTTPMessage
IOAsync
IOAsyncSSL
IOSocketSSL
NetSSLeay
JSON
ListUtilsBy
ScalarListUtils
ModulePluggable
NetAsyncHTTP
MetricsAny # required by Net::Async::HTTP
NetAsyncHTTPServer
StructDumb
URI
YAMLLibYAML
]}";
# Clear the LD_LIBRARY_PATH environment variable on shell init.
#
# By default, devenv will set LD_LIBRARY_PATH to point to .devenv/profile/lib. This causes
# issues when we include `gcc` as a dependency to build C libraries, as the version of glibc
# that the development environment's cc compiler uses may differ from that of the system.
#
# When LD_LIBRARY_PATH is set, system tools will attempt to use the development environment's
# libraries. Which, when built against a different glibc version lead, to "version 'GLIBC_X.YY'
# not found" errors.
enterShell = ''
unset LD_LIBRARY_PATH
'';
}
];
};
});
# Use the `setupPackages` function provided by nix-flakes
# in order to make `devenv up` work.
packages = element-nix-flakes.outputs.setupPackages "synapse";
};
}

View File

@@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.110.0"
version = "1.109.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View File

@@ -21,7 +21,6 @@
#
import datetime
import logging
from collections import OrderedDict
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
@@ -69,10 +68,6 @@ sent_edus_by_type = Counter(
# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# Limit how many presence states we add to each presence EDU, to ensure that
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50
class PerDestinationQueue:
"""
@@ -149,7 +144,7 @@ class PerDestinationQueue:
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
self._pending_presence: Dict[str, UserPresenceState] = {}
# List of room_id -> receipt_type -> user_id -> receipt_dict,
#
@@ -404,7 +399,7 @@ class PerDestinationQueue:
# through another mechanism, because this is all volatile!
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence.clear()
self._pending_presence = {}
self._pending_receipt_edus = []
self._start_catching_up()
@@ -726,26 +721,22 @@ class _TransactionQueueManager:
# Add presence EDU.
if self.queue._pending_presence:
# Only send max 50 presence entries in the EDU, to bound the amount
# of data we're sending.
presence_to_add: List[JsonDict] = []
while (
self.queue._pending_presence
and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
):
_, presence = self.queue._pending_presence.popitem(last=False)
presence_to_add.append(
format_user_presence_state(presence, self.queue._clock.time_msec())
)
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={"push": presence_to_add},
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
)
)
self.queue._pending_presence = {}
# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))

View File

@@ -33,7 +33,6 @@ from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationAccountStatusServlet,
FederationUnstableClientKeysClaimServlet,
FederationUnstableMediaDownloadServlet,
)
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
@@ -316,13 +315,6 @@ def register_servlets(
):
continue
if servletclass == FederationUnstableMediaDownloadServlet:
if (
not hs.config.server.enable_media_repo
or not hs.config.experimental.msc3916_authenticated_media_enabled
):
continue
servletclass(
hs=hs,
authenticator=authenticator,

View File

@@ -360,29 +360,13 @@ class BaseFederationServlet:
"request"
)
return None
if (
func.__self__.__class__.__name__ # type: ignore
== "FederationUnstableMediaDownloadServlet"
):
response = await func(
origin, content, request, *args, **kwargs
)
else:
response = await func(
origin, content, request.args, *args, **kwargs
)
else:
if (
func.__self__.__class__.__name__ # type: ignore
== "FederationUnstableMediaDownloadServlet"
):
response = await func(
origin, content, request, *args, **kwargs
)
else:
response = await func(
origin, content, request.args, *args, **kwargs
)
else:
response = await func(
origin, content, request.args, *args, **kwargs
)
finally:
# if we used the origin's context as the parent, add a new span using
# the servlet span as a parent, so that we have a link

View File

@@ -44,13 +44,10 @@ from synapse.federation.transport.server._base import (
)
from synapse.http.servlet import (
parse_boolean_from_args,
parse_integer,
parse_integer_from_args,
parse_string_from_args,
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS
from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.ratelimitutils import FederationRateLimiter
@@ -790,43 +787,6 @@ class FederationAccountStatusServlet(BaseFederationServerServlet):
return 200, {"account_statuses": statuses, "failures": failures}
class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet):
"""
Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns
a multipart/mixed response consisting of a JSON object and the requested media
item. This endpoint only returns local media.
"""
PATH = "/media/download/(?P<media_id>[^/]*)"
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916"
RATELIMIT = True
def __init__(
self,
hs: "HomeServer",
ratelimiter: FederationRateLimiter,
authenticator: Authenticator,
server_name: str,
):
super().__init__(hs, authenticator, ratelimiter, server_name)
self.media_repo = self.hs.get_media_repository()
async def on_GET(
self,
origin: Optional[str],
content: Literal[None],
request: SynapseRequest,
media_id: str,
) -> None:
max_timeout_ms = parse_integer(
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
)
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
await self.media_repo.get_local_media(
request, media_id, None, max_timeout_ms, federation=True
)
FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationSendServlet,
FederationEventServlet,
@@ -858,5 +818,4 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
FederationAccountStatusServlet,
FederationUnstableMediaDownloadServlet,
)

View File

@@ -25,16 +25,7 @@ import os
import urllib
from abc import ABC, abstractmethod
from types import TracebackType
from typing import (
TYPE_CHECKING,
Awaitable,
Dict,
Generator,
List,
Optional,
Tuple,
Type,
)
from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type
import attr
@@ -46,13 +37,8 @@ from synapse.api.errors import Codes, cs_error
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.stringutils import is_ascii
if TYPE_CHECKING:
from synapse.storage.databases.main.media_repository import LocalMedia
logger = logging.getLogger(__name__)
# list all text content types that will have the charset default to UTF-8 when
@@ -274,68 +260,6 @@ def _can_encode_filename_as_token(x: str) -> bool:
return True
async def respond_with_multipart_responder(
clock: Clock,
request: SynapseRequest,
responder: "Optional[Responder]",
media_info: "LocalMedia",
) -> None:
"""
Responds to requests originating from the federation media `/download` endpoint by
streaming a multipart/mixed response
Args:
clock:
request: the federation request to respond to
responder: the responder which will send the response
media_info: metadata about the media item
"""
if not responder:
respond_404(request)
return
# If we have a responder we *must* use it as a context manager.
with responder:
if request._disconnected:
logger.warning(
"Not sending response to request %s, already disconnected.", request
)
return
from synapse.media.media_storage import MultipartFileConsumer
# note that currently the json_object is just {}, this will change when linked media
# is implemented
multipart_consumer = MultipartFileConsumer(
clock, request, media_info.media_type, {}, media_info.media_length
)
logger.debug("Responding to media request with responder %s", responder)
if media_info.media_length is not None:
content_length = multipart_consumer.content_length()
assert content_length is not None
request.setHeader(b"Content-Length", b"%d" % (content_length,))
request.setHeader(
b"Content-Type",
b"multipart/mixed; boundary=%s" % multipart_consumer.boundary,
)
try:
await responder.write_to_consumer(multipart_consumer)
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
# in that case.
logger.warning("Failed to write to consumer: %s %s", type(e), e)
# Unregister the producer, if it has one, so Twisted doesn't complain
if request.producer:
request.unregisterProducer()
finish_request(request)
async def respond_with_responder(
request: SynapseRequest,
responder: "Optional[Responder]",

View File

@@ -54,7 +54,6 @@ from synapse.media._base import (
ThumbnailInfo,
get_filename_from_headers,
respond_404,
respond_with_multipart_responder,
respond_with_responder,
)
from synapse.media.filepath import MediaFilePaths
@@ -430,7 +429,6 @@ class MediaRepository:
media_id: str,
name: Optional[str],
max_timeout_ms: int,
federation: bool = False,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.
@@ -442,7 +440,6 @@ class MediaRepository:
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
federation: whether the local media being fetched is for a federation request
Returns:
Resolves once a response has successfully been written to request
@@ -463,14 +460,9 @@ class MediaRepository:
file_info = FileInfo(None, media_id, url_cache=bool(url_cache))
responder = await self.media_storage.fetch_media(file_info)
if federation:
await respond_with_multipart_responder(
self.clock, request, responder, media_info
)
else:
await respond_with_responder(
request, responder, media_type, media_length, upload_name
)
await respond_with_responder(
request, responder, media_type, media_length, upload_name
)
async def get_remote_media(
self,

View File

@@ -19,12 +19,9 @@
#
#
import contextlib
import json
import logging
import os
import shutil
from contextlib import closing
from io import BytesIO
from types import TracebackType
from typing import (
IO,
@@ -33,35 +30,24 @@ from typing import (
AsyncIterator,
BinaryIO,
Callable,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
from uuid import uuid4
import attr
from zope.interface import implementer
from twisted.internet import interfaces
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConsumer
from twisted.protocols.basic import FileSender
from synapse.api.errors import NotFoundError
from synapse.logging.context import (
defer_to_thread,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
from ..types import JsonDict
from ._base import FileInfo, Responder
from .filepath import MediaFilePaths
@@ -71,8 +57,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
CRLF = b"\r\n"
class MediaStorage:
"""Responsible for storing/fetching files from local sources.
@@ -190,7 +174,7 @@ class MediaStorage:
and configured storage providers.
Args:
file_info: Metadata about the media file
file_info
Returns:
Returns a Responder if the file was found, otherwise None.
@@ -332,7 +316,7 @@ class FileResponder(Responder):
"""Wraps an open file that can be sent to a request.
Args:
open_file: A file like object to be streamed to the client,
open_file: A file like object to be streamed ot the client,
is closed when finished streaming.
"""
@@ -386,240 +370,3 @@ class ReadableFileWrapper:
# We yield to the reactor by sleeping for 0 seconds.
await self.clock.sleep(0)
@implementer(interfaces.IConsumer)
@implementer(interfaces.IPushProducer)
class MultipartFileConsumer:
"""Wraps a given consumer so that any data that gets written to it gets
converted to a multipart format.
"""
def __init__(
self,
clock: Clock,
wrapped_consumer: interfaces.IConsumer,
file_content_type: str,
json_object: JsonDict,
content_length: Optional[int] = None,
) -> None:
self.clock = clock
self.wrapped_consumer = wrapped_consumer
self.json_field = json_object
self.json_field_written = False
self.content_type_written = False
self.file_content_type = file_content_type
self.boundary = uuid4().hex.encode("ascii")
# The producer that registered with us, and if it's a push or pull
# producer.
self.producer: Optional["interfaces.IProducer"] = None
self.streaming: Optional[bool] = None
# Whether the wrapped consumer has asked us to pause.
self.paused = False
self.length = content_length
### IConsumer APIs ###
def registerProducer(
self, producer: "interfaces.IProducer", streaming: bool
) -> None:
"""
Register to receive data from a producer.
This sets self to be a consumer for a producer. When this object runs
out of data (as when a send(2) call on a socket succeeds in moving the
last data from a userspace buffer into a kernelspace buffer), it will
ask the producer to resumeProducing().
For L{IPullProducer} providers, C{resumeProducing} will be called once
each time data is required.
For L{IPushProducer} providers, C{pauseProducing} will be called
whenever the write buffer fills up and C{resumeProducing} will only be
called when it empties. The consumer will only call C{resumeProducing}
to balance a previous C{pauseProducing} call; the producer is assumed
to start in an un-paused state.
@param streaming: C{True} if C{producer} provides L{IPushProducer},
C{False} if C{producer} provides L{IPullProducer}.
@raise RuntimeError: If a producer is already registered.
"""
self.producer = producer
self.streaming = streaming
self.wrapped_consumer.registerProducer(self, True)
# kick off producing if `self.producer` is not a streaming producer
if not streaming:
self.resumeProducing()
def unregisterProducer(self) -> None:
"""
Stop consuming data from a producer, without disconnecting.
"""
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF)
self.wrapped_consumer.unregisterProducer()
self.paused = True
def write(self, data: bytes) -> None:
"""
The producer will write data by calling this method.
The implementation must be non-blocking and perform whatever
buffering is necessary. If the producer has provided enough data
for now and it is a L{IPushProducer}, the consumer may call its
C{pauseProducing} method.
"""
if not self.json_field_written:
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF)
content_type = Header(b"Content-Type", b"application/json")
self.wrapped_consumer.write(bytes(content_type) + CRLF)
json_field = json.dumps(self.json_field)
json_bytes = json_field.encode("utf-8")
self.wrapped_consumer.write(CRLF + json_bytes)
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF)
self.json_field_written = True
# if we haven't written the content type yet, do so
if not self.content_type_written:
type = self.file_content_type.encode("utf-8")
content_type = Header(b"Content-Type", type)
self.wrapped_consumer.write(bytes(content_type) + CRLF + CRLF)
self.content_type_written = True
self.wrapped_consumer.write(data)
### IPushProducer APIs ###
def stopProducing(self) -> None:
"""
Stop producing data.
This tells a producer that its consumer has died, so it must stop
producing data for good.
"""
assert self.producer is not None
self.paused = True
self.producer.stopProducing()
def pauseProducing(self) -> None:
"""
Pause producing data.
Tells a producer that it has produced too much data to process for
the time being, and to stop until C{resumeProducing()} is called.
"""
assert self.producer is not None
self.paused = True
if self.streaming:
cast("interfaces.IPushProducer", self.producer).pauseProducing()
else:
self.paused = True
def resumeProducing(self) -> None:
"""
Resume producing data.
This tells a producer to re-add itself to the main loop and produce
more data for its consumer.
"""
assert self.producer is not None
if self.streaming:
cast("interfaces.IPushProducer", self.producer).resumeProducing()
else:
# If the producer is not a streaming producer we need to start
# repeatedly calling `resumeProducing` in a loop.
run_in_background(self._resumeProducingRepeatedly)
def content_length(self) -> Optional[int]:
"""
Calculate the content length of the multipart response
in bytes.
"""
if not self.length:
return None
# calculate length of json field and content-type header
json_field = json.dumps(self.json_field)
json_bytes = json_field.encode("utf-8")
json_length = len(json_bytes)
type = self.file_content_type.encode("utf-8")
content_type = Header(b"Content-Type", type)
type_length = len(bytes(content_type))
# 154 is the length of the elements that aren't variable, ie
# CRLFs and boundary strings, etc
self.length += json_length + type_length + 154
return self.length
### Internal APIs. ###
async def _resumeProducingRepeatedly(self) -> None:
assert self.producer is not None
assert not self.streaming
producer = cast("interfaces.IPullProducer", self.producer)
self.paused = False
while not self.paused:
producer.resumeProducing()
await self.clock.sleep(0)
class Header:
"""
`Header` This class is a tiny wrapper that produces
request headers. We can't use standard python header
class because it encodes unicode fields using =? bla bla ?=
encoding, which is correct, but no one in HTTP world expects
that, everyone wants utf-8 raw bytes. (stolen from treq.multipart)
"""
def __init__(
self,
name: bytes,
value: Any,
params: Optional[List[Tuple[Any, Any]]] = None,
):
self.name = name
self.value = value
self.params = params or []
def add_param(self, name: Any, value: Any) -> None:
self.params.append((name, value))
def __bytes__(self) -> bytes:
with closing(BytesIO()) as h:
h.write(self.name + b": " + escape(self.value).encode("us-ascii"))
if self.params:
for name, val in self.params:
h.write(b"; ")
h.write(escape(name).encode("us-ascii"))
h.write(b"=")
h.write(b'"' + escape(val).encode("utf-8") + b'"')
h.seek(0)
return h.read()
def escape(value: Union[str, bytes]) -> str:
"""
This function prevents header values from corrupting the request,
a newline in the file name parameter makes form-data request unreadable
for a majority of parsers. (stolen from treq.multipart)
"""
if isinstance(value, bytes):
value = value.decode("utf-8")
return value.replace("\r", "").replace("\n", "").replace('"', '\\"')

View File

@@ -764,13 +764,6 @@ class Notifier:
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
"""Wait for this worker to catch up with the given stream token."""
current_token = self.event_sources.get_current_token()
if stream_token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
stream_token = await self.event_sources.bound_future_token(stream_token)
start = self.clock.time_msec()
while True:

View File

@@ -43,7 +43,10 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -68,7 +71,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self._instance_name in hs.config.worker.writers.account_data
)
self._account_data_id_gen: MultiWriterIdGenerator
self._account_data_id_gen: AbstractStreamIdGenerator
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
@@ -110,9 +113,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"""
return self._account_data_id_gen.get_current_token()
def get_account_data_id_generator(self) -> MultiWriterIdGenerator:
return self._account_data_id_gen
@cached()
async def get_global_account_data_for_user(
self, user_id: str

View File

@@ -50,7 +50,10 @@ from synapse.storage.database import (
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
@@ -89,7 +92,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self._instance_name in hs.config.worker.writers.to_device
)
self._to_device_msg_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator(
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
@@ -166,9 +169,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_to_device_stream_token(self) -> int:
return self._to_device_msg_id_gen.get_current_token()
def get_to_device_id_generator(self) -> MultiWriterIdGenerator:
return self._to_device_msg_id_gen
async def get_messages_for_user_devices(
self,
user_ids: Collection[str],

View File

@@ -243,9 +243,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
def get_device_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._device_list_id_gen
async def count_devices_by_users(
self, user_ids: Optional[Collection[str]] = None
) -> int:
@@ -2134,7 +2131,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
user_id: str,
device_id: str,
hosts: Collection[str],
stream_id: int,
stream_ids: List[int],
context: Optional[Dict[str, str]],
) -> None:
if self._device_list_federation_stream_cache:
@@ -2142,10 +2139,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
txn.call_after(
self._device_list_federation_stream_cache.entity_has_changed,
host,
stream_id,
stream_ids[-1],
)
now = self._clock.time_msec()
stream_id_iterator = iter(stream_ids)
encoded_context = json_encoder.encode(context)
mark_sent = not self.hs.is_mine_id(user_id)
@@ -2154,7 +2152,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
(
destination,
self._instance_name,
stream_id,
next(stream_id_iterator),
user_id,
device_id,
mark_sent,
@@ -2339,22 +2337,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
return
def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_id: int
txn: LoggingTransaction, stream_ids: List[int]
) -> None:
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_id=stream_id,
stream_ids=stream_ids,
context=context,
)
async with self._device_list_id_gen.get_next() as stream_id:
async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
stream_id,
stream_ids,
)
async def add_remote_device_list_to_pending(

View File

@@ -192,8 +192,8 @@ class EventsWorkerStore(SQLBaseStore):
):
super().__init__(database, db_conn, hs)
self._stream_id_gen: MultiWriterIdGenerator
self._backfill_id_gen: MultiWriterIdGenerator
self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator
self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,

View File

@@ -42,7 +42,10 @@ from synapse.storage.database import (
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
@@ -80,7 +83,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
super().__init__(database, db_conn, hs)
self._instance_name = hs.get_instance_name()
self._presence_id_gen: MultiWriterIdGenerator
self._presence_id_gen: AbstractStreamIdGenerator
self._can_persist_presence = (
self._instance_name in hs.config.worker.writers.presence
@@ -452,9 +455,6 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
def get_current_presence_token(self) -> int:
return self._presence_id_gen.get_current_token()
def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._presence_id_gen
def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]:
"""Fetch non-offline presence from the database so that we can register
the appropriate time outs.

View File

@@ -178,9 +178,6 @@ class PushRulesWorkerStore(
"""
return self._push_rules_stream_id_gen.get_current_token()
def get_push_rules_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._push_rules_stream_id_gen
def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:

View File

@@ -45,7 +45,10 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import (
JsonDict,
JsonMapping,
@@ -73,7 +76,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._receipts_id_gen: MultiWriterIdGenerator
self._receipts_id_gen: AbstractStreamIdGenerator
self._can_write_to_receipts = (
self._instance_name in hs.config.worker.writers.receipts
@@ -133,9 +136,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
def get_receipt_stream_id_for_instance(self, instance_name: str) -> int:
return self._receipts_id_gen.get_current_token_for_writer(instance_name)
def get_receipts_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._receipts_id_gen
def get_last_unthreaded_receipt_for_user_txn(
self,
txn: LoggingTransaction,

View File

@@ -59,7 +59,11 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
@@ -147,7 +151,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config
self._un_partial_stated_rooms_stream_id_gen: MultiWriterIdGenerator
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
@@ -1405,9 +1409,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
instance_name
)
def get_un_partial_stated_rooms_id_generator(self) -> MultiWriterIdGenerator:
return self._un_partial_stated_rooms_stream_id_gen
async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:

View File

@@ -577,9 +577,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
def get_events_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._stream_id_gen
async def get_room_events_stream_for_rooms(
self,
room_ids: Collection[str],

View File

@@ -812,11 +812,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
pos = self.get_current_token_for_writer(self._instance_name)
txn.execute(sql, (self._stream_name, self._instance_name, pos))
async def get_max_allocated_token(self) -> int:
return await self._db.runInteraction(
"get_max_allocated_token", self._sequence_gen.get_max_allocated
)
@attr.s(frozen=True, auto_attribs=True)
class _AsyncCtxManagerWrapper(Generic[T]):

View File

@@ -88,10 +88,6 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
"""
...
@abc.abstractmethod
def get_max_allocated(self, txn: Cursor) -> int:
"""Get the maximum ID that we have allocated"""
class PostgresSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses a postgres sequence"""
@@ -194,17 +190,6 @@ class PostgresSequenceGenerator(SequenceGenerator):
% {"seq": self._sequence_name, "stream_name": stream_name}
)
def get_max_allocated(self, txn: Cursor) -> int:
# We just read from the sequence what the last value we fetched was.
txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
row = txn.fetchone()
assert row is not None
last_value, is_called = row
if not is_called:
last_value -= 1
return last_value
GetFirstCallbackType = Callable[[Cursor], int]
@@ -263,15 +248,6 @@ class LocalSequenceGenerator(SequenceGenerator):
# There is nothing to do for in memory sequences
pass
def get_max_allocated(self, txn: Cursor) -> int:
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None
return self._current_max_id
def build_sequence_generator(
db_conn: "LoggingDatabaseConnection",

View File

@@ -19,7 +19,6 @@
#
#
import logging
from typing import TYPE_CHECKING, Sequence, Tuple
import attr
@@ -31,20 +30,12 @@ from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.logging.opentracing import trace
from synapse.streams import EventSource
from synapse.types import (
AbstractMultiWriterStreamToken,
MultiWriterStreamToken,
StreamKeyType,
StreamToken,
)
from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
@@ -100,77 +91,6 @@ class EventSources:
)
return token
async def bound_future_token(self, token: StreamToken) -> StreamToken:
"""Bound a token that is ahead of the current token to the maximum
persisted values.
This ensures that if we wait for the given token we know the stream will
eventually advance to that point.
This works around a bug where older Synapse versions will give out
tokens for streams, and then after a restart will give back tokens where
the stream has "gone backwards".
"""
current_token = self.get_current_token()
stream_key_to_id_gen = {
StreamKeyType.ROOM: self.store.get_events_stream_id_generator(),
StreamKeyType.PRESENCE: self.store.get_presence_stream_id_gen(),
StreamKeyType.RECEIPT: self.store.get_receipts_stream_id_gen(),
StreamKeyType.ACCOUNT_DATA: self.store.get_account_data_id_generator(),
StreamKeyType.PUSH_RULES: self.store.get_push_rules_stream_id_gen(),
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
}
for _, key in StreamKeyType.__members__.items():
if key == StreamKeyType.TYPING:
# Typing stream is allowed to "reset", and so comparisons don't
# really make sense as is.
# TODO: Figure out a better way of tracking resets.
continue
token_value = token.get_field(key)
current_value = current_token.get_field(key)
if isinstance(token_value, AbstractMultiWriterStreamToken):
assert type(current_value) is type(token_value)
if not token_value.is_before_or_eq(current_value): # type: ignore[arg-type]
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()
if max_token < token_value.get_max_stream_pos():
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(
key, token_value.bound_stream_token(max_token)
)
else:
assert isinstance(current_value, int)
if current_value < token_value:
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()
if max_token < token_value:
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(key, max_token)
return token
@trace
async def get_start_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the start token for a given room to be used to paginate

View File

@@ -536,16 +536,6 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
return True
def bound_stream_token(self, max_stream: int) -> "Self":
"""Bound the stream positions to a maximum value"""
return type(self)(
stream=min(self.stream, max_stream),
instance_map=immutabledict(
{k: min(s, max_stream) for k, s in self.instance_map.items()}
),
)
@attr.s(frozen=True, slots=True, order=False)
class RoomStreamToken(AbstractMultiWriterStreamToken):
@@ -732,14 +722,6 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
else:
return "s%d" % (self.stream,)
def bound_stream_token(self, max_stream: int) -> "RoomStreamToken":
"""See super class"""
# This only makes sense for stream tokens.
assert self.topological is None
return super().bound_stream_token(max_stream)
@attr.s(frozen=True, slots=True, order=False)
class MultiWriterStreamToken(AbstractMultiWriterStreamToken):

View File

@@ -1,173 +0,0 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2024 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import io
import os
import shutil
import tempfile
from twisted.test.proto_helpers import MemoryReactor
from synapse.media.filepath import MediaFilePaths
from synapse.media.media_storage import MediaStorage
from synapse.media.storage_provider import (
FileStorageProviderBackend,
StorageProviderWrapper,
)
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from tests import unittest
from tests.test_utils import SMALL_PNG
from tests.unittest import override_config
class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
super().prepare(reactor, clock, hs)
self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
self.addCleanup(shutil.rmtree, self.test_dir)
self.primary_base_path = os.path.join(self.test_dir, "primary")
self.secondary_base_path = os.path.join(self.test_dir, "secondary")
hs.config.media.media_store_path = self.primary_base_path
storage_providers = [
StorageProviderWrapper(
FileStorageProviderBackend(hs, self.secondary_base_path),
store_local=True,
store_remote=False,
store_synchronous=True,
)
]
self.filepaths = MediaFilePaths(self.primary_base_path)
self.media_storage = MediaStorage(
hs, self.primary_base_path, self.filepaths, storage_providers
)
self.media_repo = hs.get_media_repository()
@override_config(
{"experimental_features": {"msc3916_authenticated_media_enabled": True}}
)
def test_file_download(self) -> None:
content = io.BytesIO(b"file_to_stream")
content_uri = self.get_success(
self.media_repo.create_content(
"text/plain",
"test_upload",
content,
46,
UserID.from_string("@user_id:whatever.org"),
)
)
# test with a text file
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
)
self.pump()
self.assertEqual(200, channel.code)
content_type = channel.headers.getRawHeaders("content-type")
assert content_type is not None
assert "multipart/mixed" in content_type[0]
assert "boundary" in content_type[0]
# extract boundary
boundary = content_type[0].split("boundary=")[1]
# split on boundary and check that json field and expected value exist
stripped = channel.text_body.split("\r\n" + "--" + boundary)
# TODO: the json object expected will change once MSC3911 is implemented, currently
# {} is returned for all requests as a placeholder (per MSC3196)
found_json = any(
"\r\nContent-Type: application/json\r\n\r\n{}" in field
for field in stripped
)
self.assertTrue(found_json)
# check that the text file and expected value exist
found_file = any(
"\r\nContent-Type: text/plain\r\n\r\nfile_to_stream" in field
for field in stripped
)
self.assertTrue(found_file)
content = io.BytesIO(SMALL_PNG)
content_uri = self.get_success(
self.media_repo.create_content(
"image/png",
"test_png_upload",
content,
67,
UserID.from_string("@user_id:whatever.org"),
)
)
# test with an image file
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
)
self.pump()
self.assertEqual(200, channel.code)
content_type = channel.headers.getRawHeaders("content-type")
assert content_type is not None
assert "multipart/mixed" in content_type[0]
assert "boundary" in content_type[0]
# extract boundary
boundary = content_type[0].split("boundary=")[1]
# split on boundary and check that json field and expected value exist
body = channel.result.get("body")
assert body is not None
stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8"))
found_json = any(
b"\r\nContent-Type: application/json\r\n\r\n{}" in field
for field in stripped_bytes
)
self.assertTrue(found_json)
# check that the png file exists and matches what was uploaded
found_file = any(SMALL_PNG in field for field in stripped_bytes)
self.assertTrue(found_file)
@override_config(
{"experimental_features": {"msc3916_authenticated_media_enabled": False}}
)
def test_disable_config(self) -> None:
content = io.BytesIO(b"file_to_stream")
content_uri = self.get_success(
self.media_repo.create_content(
"text/plain",
"test_upload",
content,
46,
UserID.from_string("@user_id:whatever.org"),
)
)
channel = self.make_signed_federation_request(
"GET",
f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
)
self.pump()
self.assertEqual(404, channel.code)
self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")

View File

@@ -27,8 +27,6 @@ from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
from synapse.api.presence import UserPresenceState
from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU
from synapse.federation.units import Transaction
from synapse.handlers.device import DeviceHandler
from synapse.rest import admin
@@ -268,123 +266,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
)
class FederationSenderPresenceTestCases(HomeserverTestCase):
"""
Test federation sending for presence updates.
"""
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(spec=["send_transaction"])
self.federation_transport_client.send_transaction = AsyncMock()
hs = self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
)
return hs
def default_config(self) -> JsonDict:
config = super().default_config()
config["federation_sender_instances"] = None
return config
def test_presence_simple(self) -> None:
"Test that sending a single presence update works"
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
self.get_success(
sender.send_presence_to_destinations(
[UserPresenceState.default("@user:test")],
["server"],
)
)
self.pump()
# expect a call to send_transaction
mock_send_transaction.assert_awaited_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(
data["edus"],
[
{
"edu_type": EduTypes.PRESENCE,
"content": {
"push": [
{
"presence": "offline",
"user_id": "@user:test",
}
]
},
}
],
)
def test_presence_batched(self) -> None:
"""Test that sending lots of presence updates to a destination are
batched, rather than having them all sent in one EDU."""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
# We now send lots of presence updates to force the federation sender to
# batch the mup.
number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2
self.get_success(
sender.send_presence_to_destinations(
[
UserPresenceState.default(f"@user{i}:test")
for i in range(number_presence_updates_to_send)
],
["server"],
)
)
self.pump()
# We should have seen at least one transcation be sent by now.
mock_send_transaction.assert_called()
# We don't want to specify exactly how the presence EDUs get sent out,
# could be one per transaction or multiple per transaction. We just want
# to assert that a) each presence EDU has bounded number of updates, and
# b) that all updates get sent out.
presence_edus = []
for transaction_call in mock_send_transaction.call_args_list:
json_cb = transaction_call[0][1]
data = json_cb()
for edu in data["edus"]:
self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE)
presence_edus.append(edu)
# A set of all user presence we see, this should end up matching the
# number we sent out above.
seen_users: Set[str] = set()
for edu in presence_edus:
presence_states = edu["content"]["push"]
# This is where we actually check that the number of presence
# updates is bounded.
self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU)
seen_users.update(p["user_id"] for p in presence_states)
self.assertEqual(len(seen_users), number_presence_updates_to_send)
class FederationSenderDevicesTestCases(HomeserverTestCase):
"""
Test federation sending to update devices.

View File

@@ -22,7 +22,6 @@ from unittest.mock import AsyncMock, Mock, patch
from parameterized import parameterized
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules
@@ -36,14 +35,7 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.types import (
JsonDict,
MultiWriterStreamToken,
RoomStreamToken,
StreamKeyType,
UserID,
create_requester,
)
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock
import tests.unittest
@@ -967,94 +959,6 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.fail("No push rules found")
def test_wait_for_future_sync_token(self) -> None:
"""Test that if we receive a token that is ahead of our current token,
we'll wait until the stream position advances.
This can happen if replication streams start lagging, and the client's
previous sync request was serviced by a worker ahead of ours.
"""
user = self.register_user("alice", "password")
# We simulate a lagging stream by getting a stream ID from the ID gen
# and then waiting to mark it as "persisted".
presence_id_gen = self.store.get_presence_stream_id_gen()
ctx_mgr = presence_id_gen.get_next()
stream_id = self.get_success(ctx_mgr.__aenter__())
# Create the new token based on the stream ID above.
current_token = self.hs.get_event_sources().get_current_token()
since_token = current_token.copy_and_advance(StreamKeyType.PRESENCE, stream_id)
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
)
)
# This should block waiting for the presence stream to update
self.pump()
self.assertFalse(sync_d.called)
# Marking the stream ID as persisted should unblock the request.
self.get_success(ctx_mgr.__aexit__(None, None, None))
self.get_success(sync_d, by=1.0)
@parameterized.expand(
[(key,) for key in StreamKeyType.__members__.values()],
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
)
def test_wait_for_invalid_future_sync_token(
self, stream_key: StreamKeyType
) -> None:
"""Like the previous test, except we give a token that has a stream
position ahead of what is in the DB, i.e. its invalid and we shouldn't
wait for the stream to advance (as it may never do so).
This can happen due to older versions of Synapse giving out stream
positions without persisting them in the DB, and so on restart the
stream would get reset back to an older position.
"""
user = self.register_user("alice", "password")
# Create a token and advance one of the streams.
current_token = self.hs.get_event_sources().get_current_token()
token_value = current_token.get_field(stream_key)
# How we advance the streams depends on the type.
if isinstance(token_value, int):
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
elif isinstance(token_value, MultiWriterStreamToken):
since_token = current_token.copy_and_advance(
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
)
elif isinstance(token_value, RoomStreamToken):
since_token = current_token.copy_and_advance(
stream_key, RoomStreamToken(stream=token_value.stream + 1)
)
else:
raise Exception("Unreachable")
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
)
)
# We should return without waiting for the presence stream to advance.
self.get_success(sync_d)
def generate_sync_config(
user_id: str,

View File

@@ -1386,12 +1386,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Create a future token that will cause us to wait. Since we never send a new
# event to reach that future stream_ordering, the worker will wait until the
# full timeout.
stream_id_gen = self.store.get_events_stream_id_generator()
stream_id = self.get_success(stream_id_gen.get_next().__aenter__())
current_token = self.event_sources.get_current_token()
future_position_token = current_token.copy_and_replace(
StreamKeyType.ROOM,
RoomStreamToken(stream=stream_id),
RoomStreamToken(stream=current_token.room_key.stream + 1),
)
future_position_token_serialized = self.get_success(