1
0

Compare commits

..

5 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre) 3933bb17de Fix running background tasks on BG worker 2022-03-18 13:53:55 +00:00
Olivier Wilkinson (reivilibre) 70ee28f386 WORKING workers setup template 2022-03-17 10:42:28 +00:00
Olivier Wilkinson (reivilibre) f5bb3887c8 STASH 2022-03-16 14:02:58 +00:00
Olivier Wilkinson (reivilibre) 85774e17be STASH 2022-03-11 11:36:47 +00:00
Olivier Wilkinson (reivilibre) a0e699b301 STASH 2022-02-24 17:10:23 +00:00
203 changed files with 3626 additions and 3875 deletions
+3 -10
View File
@@ -10,19 +10,12 @@ concurrency:
cancel-in-progress: true
jobs:
check-sampleconfig:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install -e .
- run: scripts-dev/generate_sample_config --check
lint:
runs-on: ubuntu-latest
strategy:
matrix:
toxenv:
- "check-sampleconfig"
- "check_codestyle"
- "check_isort"
- "mypy"
@@ -50,7 +43,7 @@ jobs:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- uses: actions/setup-python@v2
- run: "pip install 'towncrier>=18.6.0rc1'"
- run: pip install tox
- run: scripts-dev/check-newsfragment
env:
PULL_REQUEST_NUMBER: ${{ github.event.number }}
@@ -58,7 +51,7 @@ jobs:
# Dummy step to gate other tests on without repeating the whole list
linting-done:
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
needs: [lint, lint-crlf, lint-newsfile, check-sampleconfig]
needs: [lint, lint-crlf, lint-newsfile]
runs-on: ubuntu-latest
steps:
- run: "true"
-117
View File
@@ -1,120 +1,3 @@
Synapse 1.54.0 (2022-03-08)
===========================
Please note that this will be the last release of Synapse that is compatible with Mjolnir 1.3.1 and earlier.
Administrators of servers which have the Mjolnir module installed are advised to upgrade Mjolnir to version 1.3.2 or later.
Bugfixes
--------
- Fix a bug introduced in Synapse 1.54.0rc1 preventing the new module callbacks introduced in this release from being registered by modules. ([\#12141](https://github.com/matrix-org/synapse/issues/12141))
- Fix a bug introduced in Synapse 1.54.0rc1 where runtime dependency version checks would mistakenly check development dependencies if they were present and would not accept pre-release versions of dependencies. ([\#12129](https://github.com/matrix-org/synapse/issues/12129), [\#12177](https://github.com/matrix-org/synapse/issues/12177))
Internal Changes
----------------
- Update release script to insert the previous version when writing "No significant changes" line in the changelog. ([\#12127](https://github.com/matrix-org/synapse/issues/12127))
- Relax the version guard for "packaging" added in [\#12088](https://github.com/matrix-org/synapse/issues/12088). ([\#12166](https://github.com/matrix-org/synapse/issues/12166))
Synapse 1.54.0rc1 (2022-03-02)
==============================
Features
--------
- Add support for [MSC3202](https://github.com/matrix-org/matrix-doc/pull/3202): sending one-time key counts and fallback key usage states to Application Services. ([\#11617](https://github.com/matrix-org/synapse/issues/11617))
- Improve the generated URL previews for some web pages. Contributed by @AndrewRyanChama. ([\#11985](https://github.com/matrix-org/synapse/issues/11985))
- Track cache invalidations in Prometheus metrics, as already happens for cache eviction based on size or time. ([\#12000](https://github.com/matrix-org/synapse/issues/12000))
- Implement experimental support for [MSC3720](https://github.com/matrix-org/matrix-doc/pull/3720) (account status endpoints). ([\#12001](https://github.com/matrix-org/synapse/issues/12001), [\#12067](https://github.com/matrix-org/synapse/issues/12067))
- Enable modules to set a custom display name when registering a user. ([\#12009](https://github.com/matrix-org/synapse/issues/12009))
- Advertise Matrix 1.1 and 1.2 support on `/_matrix/client/versions`. ([\#12020](https://github.com/matrix-org/synapse/issues/12020), ([\#12022](https://github.com/matrix-org/synapse/issues/12022))
- Support only the stable identifier for [MSC3069](https://github.com/matrix-org/matrix-doc/pull/3069)'s `is_guest` on `/_matrix/client/v3/account/whoami`. ([\#12021](https://github.com/matrix-org/synapse/issues/12021))
- Use room version 9 as the default room version (per [MSC3589](https://github.com/matrix-org/matrix-doc/pull/3589)). ([\#12058](https://github.com/matrix-org/synapse/issues/12058))
- Add module callbacks to react to user deactivation status changes (i.e. deactivations and reactivations) and profile updates. ([\#12062](https://github.com/matrix-org/synapse/issues/12062))
Bugfixes
--------
- Fix a bug introduced in Synapse 1.48.0 where an edit of the latest event in a thread would not be properly applied to the thread summary. ([\#11992](https://github.com/matrix-org/synapse/issues/11992))
- Fix long-standing bug where the `get_rooms_for_user` cache was not correctly invalidated for remote users when the server left a room. ([\#11999](https://github.com/matrix-org/synapse/issues/11999))
- Fix a 500 error with Postgres when looking backwards with the [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) `/timestamp_to_event?dir=b` endpoint. ([\#12024](https://github.com/matrix-org/synapse/issues/12024))
- Properly fix a long-standing bug where wrong data could be inserted into the `event_search` table when using SQLite. This could block running `synapse_port_db` with an `argument of type 'int' is not iterable` error. This bug was partially fixed by a change in Synapse 1.44.0. ([\#12037](https://github.com/matrix-org/synapse/issues/12037))
- Fix slow performance of `/logout` in some cases where refresh tokens are in use. The slowness existed since the initial implementation of refresh tokens in version 1.38.0. ([\#12056](https://github.com/matrix-org/synapse/issues/12056))
- Fix a long-standing bug where Synapse would make additional failing requests over federation for missing data. ([\#12077](https://github.com/matrix-org/synapse/issues/12077))
- Fix occasional `Unhandled error in Deferred` error message. ([\#12089](https://github.com/matrix-org/synapse/issues/12089))
- Fix a bug introduced in Synapse 1.51.0 where incoming federation transactions containing at least one EDU would be dropped if debug logging was enabled for `synapse.8631_debug`. ([\#12098](https://github.com/matrix-org/synapse/issues/12098))
- Fix a long-standing bug which could cause push notifications to malfunction if `use_frozen_dicts` was set in the configuration. ([\#12100](https://github.com/matrix-org/synapse/issues/12100))
- Fix an extremely rare, long-standing bug in `ReadWriteLock` that would cause an error when a newly unblocked writer completes instantly. ([\#12105](https://github.com/matrix-org/synapse/issues/12105))
- Make a `POST` to `/rooms/<room_id>/receipt/m.read/<event_id>` only trigger a push notification if the count of unread messages is different to the one in the last successfully sent push. This reduces server load and load on the receiving device. ([\#11835](https://github.com/matrix-org/synapse/issues/11835))
Updates to the Docker image
---------------------------
- The Docker image no longer automatically creates a temporary volume at `/data`. This is not expected to affect normal usage. ([\#11997](https://github.com/matrix-org/synapse/issues/11997))
- Use Python 3.9 in Docker images by default. ([\#12112](https://github.com/matrix-org/synapse/issues/12112))
Improved Documentation
----------------------
- Document support for the `to_device`, `account_data`, `receipts`, and `presence` stream writers for workers. ([\#11599](https://github.com/matrix-org/synapse/issues/11599))
- Explain the meaning of spam checker callbacks' return values. ([\#12003](https://github.com/matrix-org/synapse/issues/12003))
- Clarify information about external Identity Provider IDs. ([\#12004](https://github.com/matrix-org/synapse/issues/12004))
Deprecations and Removals
-------------------------
- Deprecate using `synctl` with the config option `synctl_cache_factor` and print a warning if a user still uses this option. ([\#11865](https://github.com/matrix-org/synapse/issues/11865))
- Remove support for the legacy structured logging configuration (please see the the [upgrade notes](https://matrix-org.github.io/synapse/develop/upgrade#legacy-structured-logging-configuration-removal) if you are using `structured: true` in the Synapse configuration). ([\#12008](https://github.com/matrix-org/synapse/issues/12008))
- Drop support for [MSC3283](https://github.com/matrix-org/matrix-doc/pull/3283) unstable flags now that the stable flags are supported. ([\#12018](https://github.com/matrix-org/synapse/issues/12018))
- Remove the unstable `/spaces` endpoint from [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). ([\#12073](https://github.com/matrix-org/synapse/issues/12073))
Internal Changes
----------------
- Make the `get_room_version` method use `get_room_version_id` to benefit from caching. ([\#11808](https://github.com/matrix-org/synapse/issues/11808))
- Remove unnecessary condition on knock -> leave auth rule check. ([\#11900](https://github.com/matrix-org/synapse/issues/11900))
- Add tests for device list changes between local users. ([\#11972](https://github.com/matrix-org/synapse/issues/11972))
- Optimise calculating `device_list` changes in `/sync`. ([\#11974](https://github.com/matrix-org/synapse/issues/11974))
- Add missing type hints to storage classes. ([\#11984](https://github.com/matrix-org/synapse/issues/11984))
- Refactor the search code for improved readability. ([\#11991](https://github.com/matrix-org/synapse/issues/11991))
- Move common deduplication code down into `_auth_and_persist_outliers`. ([\#11994](https://github.com/matrix-org/synapse/issues/11994))
- Limit concurrent joins from applications services. ([\#11996](https://github.com/matrix-org/synapse/issues/11996))
- Preparation for faster-room-join work: when parsing the `send_join` response, get the `m.room.create` event from `state`, not `auth_chain`. ([\#12005](https://github.com/matrix-org/synapse/issues/12005), [\#12039](https://github.com/matrix-org/synapse/issues/12039))
- Preparation for faster-room-join work: parse MSC3706 fields in send_join response. ([\#12011](https://github.com/matrix-org/synapse/issues/12011))
- Preparation for faster-room-join work: persist information on which events and rooms have partial state to the database. ([\#12012](https://github.com/matrix-org/synapse/issues/12012))
- Preparation for faster-room-join work: Support for calling `/federation/v1/state` on a remote server. ([\#12013](https://github.com/matrix-org/synapse/issues/12013))
- Configure `tox` to use `venv` rather than `virtualenv`. ([\#12015](https://github.com/matrix-org/synapse/issues/12015))
- Fix bug in `StateFilter.return_expanded()` and add some tests. ([\#12016](https://github.com/matrix-org/synapse/issues/12016))
- Use Matrix v1.1 endpoints (`/_matrix/client/v3/auth/...`) in fallback auth HTML forms. ([\#12019](https://github.com/matrix-org/synapse/issues/12019))
- Update the `olddeps` CI job to use an old version of `markupsafe`. ([\#12025](https://github.com/matrix-org/synapse/issues/12025))
- Upgrade Mypy to version 0.931. ([\#12030](https://github.com/matrix-org/synapse/issues/12030))
- Remove legacy `HomeServer.get_datastore()`. ([\#12031](https://github.com/matrix-org/synapse/issues/12031), [\#12070](https://github.com/matrix-org/synapse/issues/12070))
- Minor typing fixes. ([\#12034](https://github.com/matrix-org/synapse/issues/12034), [\#12069](https://github.com/matrix-org/synapse/issues/12069))
- After joining a room, create a dedicated logcontext to process the queued events. ([\#12041](https://github.com/matrix-org/synapse/issues/12041))
- Tidy up GitHub Actions config which builds distributions for PyPI. ([\#12051](https://github.com/matrix-org/synapse/issues/12051))
- Move configuration out of `setup.cfg`. ([\#12052](https://github.com/matrix-org/synapse/issues/12052), [\#12059](https://github.com/matrix-org/synapse/issues/12059))
- Fix error message when a worker process fails to talk to another worker process. ([\#12060](https://github.com/matrix-org/synapse/issues/12060))
- Fix using the `complement.sh` script without specifying a directory or a branch. Contributed by Nico on behalf of Famedly. ([\#12063](https://github.com/matrix-org/synapse/issues/12063))
- Add type hints to `tests/rest/client`. ([\#12066](https://github.com/matrix-org/synapse/issues/12066), [\#12072](https://github.com/matrix-org/synapse/issues/12072), [\#12084](https://github.com/matrix-org/synapse/issues/12084), [\#12094](https://github.com/matrix-org/synapse/issues/12094))
- Add some logging to `/sync` to try and track down #11916. ([\#12068](https://github.com/matrix-org/synapse/issues/12068))
- Inspect application dependencies using `importlib.metadata` or its backport. ([\#12088](https://github.com/matrix-org/synapse/issues/12088))
- Use `assertEqual` instead of the deprecated `assertEquals` in test code. ([\#12092](https://github.com/matrix-org/synapse/issues/12092))
- Move experimental support for [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440) to `/versions`. ([\#12099](https://github.com/matrix-org/synapse/issues/12099))
- Add `stop_cancellation` utility function to stop `Deferred`s from being cancelled. ([\#12106](https://github.com/matrix-org/synapse/issues/12106))
- Improve exception handling for concurrent execution. ([\#12109](https://github.com/matrix-org/synapse/issues/12109))
- Advertise support for Python 3.10 in packaging files. ([\#12111](https://github.com/matrix-org/synapse/issues/12111))
- Move CI checks out of tox, to facilitate a move to using poetry. ([\#12119](https://github.com/matrix-org/synapse/issues/12119))
Synapse 1.53.0 (2022-02-22)
===========================
+1
View File
@@ -0,0 +1 @@
Deduplicate in-flight requests in `_get_state_for_groups`.
+1
View File
@@ -0,0 +1 @@
Deduplicate in-flight requests in `_get_state_for_groups`.
+1
View File
@@ -0,0 +1 @@
Make a `POST` to `/rooms/<room_id>/receipt/m.read/<event_id>` only trigger a push notification if the count of unread messages is different to the one in the last successfully sent push.
+1
View File
@@ -0,0 +1 @@
Remove unnecessary condition on knock->leave auth rule check.
+1
View File
@@ -0,0 +1 @@
Add tests for device list changes between local users.
+1
View File
@@ -0,0 +1 @@
Optimise calculating device_list changes in `/sync`.
+1
View File
@@ -0,0 +1 @@
Add missing type hints to storage classes.
+1
View File
@@ -0,0 +1 @@
Fetch images when previewing Twitter URLs. Contributed by @AndrewRyanChama.
+1
View File
@@ -0,0 +1 @@
Refactor the search code for improved readability.
+1
View File
@@ -0,0 +1 @@
Fix a bug introduced in Synapse v1.48.0 where an edit of the latest event in a thread would not be properly applied to the thread summary.
+1
View File
@@ -0,0 +1 @@
Move common deduplication code down into `_auth_and_persist_outliers`.
+1
View File
@@ -0,0 +1 @@
Limit concurrent joins from applications services.
+1
View File
@@ -0,0 +1 @@
The docker image no longer automatically creates a temporary volume at `/data`. This is not expected to affect normal usage.
+1
View File
@@ -0,0 +1 @@
Fix long standing bug where `get_rooms_for_user` was not correctly invalidated for remote users when the server left a room.
+1
View File
@@ -0,0 +1 @@
Track cache invalidations in Prometheus metrics, as already happens for cache eviction based on size or time.
+1
View File
@@ -0,0 +1 @@
Implement experimental support for [MSC3720](https://github.com/matrix-org/matrix-doc/pull/3720) (account status endpoints).
+1
View File
@@ -0,0 +1 @@
Explain the meaning of spam checker callbacks' return values.
+1
View File
@@ -0,0 +1 @@
Clarify information about external Identity Provider IDs.
+1
View File
@@ -0,0 +1 @@
Preparation for faster-room-join work: when parsing the `send_join` response, get the `m.room.create` event from `state`, not `auth_chain`.
+1
View File
@@ -0,0 +1 @@
Remove support for the legacy structured logging configuration (please see the the [upgrade notes](https://matrix-org.github.io/synapse/develop/upgrade#legacy-structured-logging-configuration-removal) if you are using `structured: true` in the Synapse configuration).
+1
View File
@@ -0,0 +1 @@
Enable modules to set a custom display name when registering a user.
+1
View File
@@ -0,0 +1 @@
Preparation for faster-room-join work: parse msc3706 fields in send_join response.
+1
View File
@@ -0,0 +1 @@
Preparation for faster-room-join work: Support for calling `/federation/v1/state` on a remote server.
+1
View File
@@ -0,0 +1 @@
Configure `tox` to use `venv` rather than `virtualenv`.
+1
View File
@@ -0,0 +1 @@
Fix bug in `StateFilter.return_expanded()` and add some tests.
+1
View File
@@ -0,0 +1 @@
Drop support for [MSC3283](https://github.com/matrix-org/matrix-doc/pull/3283) unstable flags now that the stable flags are supported.
+1
View File
@@ -0,0 +1 @@
Use Matrix v1.1 endpoints (`/_matrix/client/v3/auth/...`) in fallback auth HTML forms.
+1
View File
@@ -0,0 +1 @@
Advertise Matrix 1.1 support on `/_matrix/client/versions`.
+1
View File
@@ -0,0 +1 @@
Support only the stable identifier for [MSC3069](https://github.com/matrix-org/matrix-doc/pull/3069)'s `is_guest` on `/_matrix/client/v3/account/whoami`.
+1
View File
@@ -0,0 +1 @@
Advertise Matrix 1.2 support on `/_matrix/client/versions`.
+1
View File
@@ -0,0 +1 @@
Fix 500 error with Postgres when looking backwards with the [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) `/timestamp_to_event?dir=b` endpoint.
+1
View File
@@ -0,0 +1 @@
Update the `olddeps` CI job to use an old version of `markupsafe`.
+1
View File
@@ -0,0 +1 @@
Upgrade mypy to version 0.931.
+1
View File
@@ -0,0 +1 @@
Remove legacy `HomeServer.get_datastore()`.
+1
View File
@@ -0,0 +1 @@
Deduplicate in-flight requests in `_get_state_for_groups`.
+1
View File
@@ -0,0 +1 @@
Minor typing fixes.
+1
View File
@@ -0,0 +1 @@
Preparation for faster-room-join work: when parsing the `send_join` response, get the `m.room.create` event from `state`, not `auth_chain`.
+1
View File
@@ -0,0 +1 @@
After joining a room, create a dedicated logcontext to process the queued events.
+1
View File
@@ -0,0 +1 @@
Tidy up GitHub Actions config which builds distributions for PyPI.
+1
View File
@@ -0,0 +1 @@
Move configuration out of `setup.cfg`.
+1
View File
@@ -0,0 +1 @@
Fix slow performance of `/logout` in some cases where refresh tokens are in use. The slowness existed since the initial implementation of refresh tokens.
+1
View File
@@ -0,0 +1 @@
Use room version 9 as the default room version (per [MSC3589](https://github.com/matrix-org/matrix-doc/pull/3589)).
+1
View File
@@ -0,0 +1 @@
Move configuration out of `setup.cfg`.
+1
View File
@@ -0,0 +1 @@
Fix error message when a worker process fails to talk to another worker process.
+1
View File
@@ -0,0 +1 @@
Fix using the complement.sh script without specifying a dir or a branch. Contributed by Nico on behalf of Famedly.
-12
View File
@@ -1,15 +1,3 @@
matrix-synapse-py3 (1.54.0) stable; urgency=medium
* New synapse release 1.54.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 08 Mar 2022 10:54:52 +0000
matrix-synapse-py3 (1.54.0~rc1) stable; urgency=medium
* New synapse release 1.54.0~rc1.
-- Synapse Packaging team <packages@matrix.org> Wed, 02 Mar 2022 10:43:22 +0000
matrix-synapse-py3 (1.53.0) stable; urgency=medium
* New synapse release 1.53.0.
+2 -2
View File
@@ -11,10 +11,10 @@
# There is an optional PYTHON_VERSION build argument which sets the
# version of python to build against: for example:
#
# DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.10 .
# DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.9 .
#
ARG PYTHON_VERSION=3.9
ARG PYTHON_VERSION=3.8
###
### Stage 0: builder
@@ -148,62 +148,6 @@ deny an incoming event, see [`check_event_for_spam`](spam_checker_callbacks.md#c
If multiple modules implement this callback, Synapse runs them all in order.
### `on_profile_update`
_First introduced in Synapse v1.54.0_
```python
async def on_profile_update(
user_id: str,
new_profile: "synapse.module_api.ProfileInfo",
by_admin: bool,
deactivation: bool,
) -> None:
```
Called after updating a local user's profile. The update can be triggered either by the
user themselves or a server admin. The update can also be triggered by a user being
deactivated (in which case their display name is set to an empty string (`""`) and the
avatar URL is set to `None`). The module is passed the Matrix ID of the user whose profile
has been updated, their new profile, as well as a `by_admin` boolean that is `True` if the
update was triggered by a server admin (and `False` otherwise), and a `deactivated`
boolean that is `True` if the update is a result of the user being deactivated.
Note that the `by_admin` boolean is also `True` if the profile change happens as a result
of the user logging in through Single Sign-On, or if a server admin updates their own
profile.
Per-room profile changes do not trigger this callback to be called. Synapse administrators
wishing this callback to be called on every profile change are encouraged to disable
per-room profiles globally using the `allow_per_room_profiles` configuration setting in
Synapse's configuration file.
This callback is not called when registering a user, even when setting it through the
[`get_displayname_for_registration`](https://matrix-org.github.io/synapse/latest/modules/password_auth_provider_callbacks.html#get_displayname_for_registration)
module callback.
If multiple modules implement this callback, Synapse runs them all in order.
### `on_user_deactivation_status_changed`
_First introduced in Synapse v1.54.0_
```python
async def on_user_deactivation_status_changed(
user_id: str, deactivated: bool, by_admin: bool
) -> None:
```
Called after deactivating a local user, or reactivating them through the admin API. The
deactivation can be triggered either by the user themselves or a server admin. The module
is passed the Matrix ID of the user whose status is changed, as well as a `deactivated`
boolean that is `True` if the user is being deactivated and `False` if they're being
reactivated, and a `by_admin` boolean that is `True` if the deactivation was triggered by
a server admin (and `False` otherwise). This latter `by_admin` boolean is always `True`
if the user is being reactivated, as this operation can only be performed through the
admin API.
If multiple modules implement this callback, Synapse runs them all in order.
## Example
The example below is a module that implements the third-party rules callback
+16 -76
View File
@@ -178,11 +178,8 @@ recommend the use of `systemd` where available: for information on setting up
### `synapse.app.generic_worker`
This worker can handle API requests matching the following regular expressions.
These endpoints can be routed to any worker. If a worker is set up to handle a
stream then, for maximum efficiency, additional endpoints should be routed to that
worker: refer to the [stream writers](#stream-writers) section below for further
information.
This worker can handle API requests matching the following regular
expressions:
# Sync requests
^/_matrix/client/(v2_alpha|r0|v3)/sync$
@@ -212,6 +209,7 @@ information.
^/_matrix/federation/v1/user/devices/
^/_matrix/federation/v1/get_groups_publicised$
^/_matrix/key/v2/query
^/_matrix/federation/unstable/org.matrix.msc2946/spaces/
^/_matrix/federation/(v1|unstable/org.matrix.msc2946)/hierarchy/
# Inbound federation transaction request
@@ -224,25 +222,22 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$
^/_matrix/client/unstable/org.matrix.msc2946/rooms/.*/spaces$
^/_matrix/client/(v1|unstable/org.matrix.msc2946)/rooms/.*/hierarchy$
^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$
^/_matrix/client/(r0|v3|unstable)/account/3pid$
^/_matrix/client/(r0|v3|unstable)/devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$
^/_matrix/client/(r0|v3|unstable)/joined_groups$
^/_matrix/client/(r0|v3|unstable)/publicised_groups$
^/_matrix/client/(r0|v3|unstable)/publicised_groups/
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|v3|unstable)/publicised_groups/
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/
^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
^/_matrix/client/(api/v1|r0|v3|unstable)/search$
# Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$
^/_matrix/client/(r0|v3|unstable)/keys/changes$
^/_matrix/client/(r0|v3|unstable)/keys/claim$
^/_matrix/client/(r0|v3|unstable)/room_keys/
# Registration/login requests
^/_matrix/client/(api/v1|r0|v3|unstable)/login$
^/_matrix/client/(r0|v3|unstable)/register$
@@ -256,20 +251,6 @@ information.
^/_matrix/client/(api/v1|r0|v3|unstable)/join/
^/_matrix/client/(api/v1|r0|v3|unstable)/profile/
# Device requests
^/_matrix/client/(r0|v3|unstable)/sendToDevice/
# Account data requests
^/_matrix/client/(r0|v3|unstable)/.*/tags
^/_matrix/client/(r0|v3|unstable)/.*/account_data
# Receipts requests
^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt
^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers
# Presence requests
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
Additionally, the following REST endpoints can be handled for GET requests:
@@ -349,10 +330,12 @@ Additionally, there is *experimental* support for moving writing of specific
streams (such as events) off of the main process to a particular worker. (This
is only supported with Redis-based replication.)
Currently supported streams are `events` and `typing`.
To enable this, the worker must have a HTTP replication listener configured,
have a `worker_name` and be listed in the `instance_map` config. The same worker
can handle multiple streams. For example, to move event persistence off to a
dedicated worker, the shared configuration would include:
have a `worker_name` and be listed in the `instance_map` config. For example to
move event persistence off to a dedicated worker, the shared configuration would
include:
```yaml
instance_map:
@@ -364,12 +347,6 @@ stream_writers:
events: event_persister1
```
Some of the streams have associated endpoints which, for maximum efficiency, should
be routed to the workers handling that stream. See below for the currently supported
streams and the endpoints associated with them:
##### The `events` stream
The `events` stream also experimentally supports having multiple writers, where
work is sharded between them by room ID. Note that you *must* restart all worker
instances when adding or removing event persisters. An example `stream_writers`
@@ -382,43 +359,6 @@ stream_writers:
- event_persister2
```
##### The `typing` stream
The following endpoints should be routed directly to the workers configured as
stream writers for the `typing` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing
##### The `to_device` stream
The following endpoints should be routed directly to the workers configured as
stream writers for the `to_device` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/sendToDevice/
##### The `account_data` stream
The following endpoints should be routed directly to the workers configured as
stream writers for the `account_data` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/.*/tags
^/_matrix/client/(api/v1|r0|v3|unstable)/.*/account_data
##### The `receipts` stream
The following endpoints should be routed directly to the workers configured as
stream writers for the `receipts` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/receipt
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/read_markers
##### The `presence` stream
The following endpoints should be routed directly to the workers configured as
stream writers for the `presence` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
#### Background tasks
There is also *experimental* support for moving background tasks to a separate
+5 -1
View File
@@ -75,12 +75,16 @@ exclude = (?x)
|tests/push/test_presentable_names.py
|tests/push/test_push_rule_evaluator.py
|tests/rest/client/test_account.py
|tests/rest/client/test_events.py
|tests/rest/client/test_filter.py
|tests/rest/client/test_groups.py
|tests/rest/client/test_register.py
|tests/rest/client/test_report_event.py
|tests/rest/client/test_rooms.py
|tests/rest/client/test_third_party_rules.py
|tests/rest/client/test_transactions.py
|tests/rest/client/test_typing.py
|tests/rest/client/utils.py
|tests/rest/key/v2/test_remote_key_resource.py
|tests/rest/media/v1/test_base.py
|tests/rest/media/v1/test_media_storage.py
@@ -249,7 +253,7 @@ disallow_untyped_defs = True
[mypy-tests.rest.admin.*]
disallow_untyped_defs = True
[mypy-tests.rest.client.*]
[mypy-tests.rest.client.test_directory]
disallow_untyped_defs = True
[mypy-tests.federation.transport.test_client]
+1 -1
View File
@@ -35,7 +35,7 @@ CONTRIBUTING_GUIDE_TEXT="!! Please see the contributing guide for help writing y
https://github.com/matrix-org/synapse/blob/develop/CONTRIBUTING.md#changelog"
# If check-newsfragment returns a non-zero exit code, print the contributing guide and exit
python -m towncrier.check --compare-with=origin/develop || (echo -e "$CONTRIBUTING_GUIDE_TEXT" >&2 && exit 1)
tox -qe check-newsfragment || (echo -e "$CONTRIBUTING_GUIDE_TEXT" >&2 && exit 1)
echo
echo "--------------------------"
+2 -28
View File
@@ -17,8 +17,6 @@
"""An interactive script for doing a release. See `cli()` below.
"""
import glob
import os
import re
import subprocess
import sys
@@ -211,8 +209,8 @@ def prepare():
with open("synapse/__init__.py", "w") as f:
f.write(parsed_synapse_ast.dumps())
# Generate changelogs.
generate_and_write_changelog(current_version)
# Generate changelogs
run_until_successful("python3 -m towncrier", shell=True)
# Generate debian changelogs
if parsed_new_version.pre is not None:
@@ -525,29 +523,5 @@ def get_changes_for_version(wanted_version: version.Version) -> str:
return "\n".join(version_changelog)
def generate_and_write_changelog(current_version: version.Version):
# We do this by getting a draft so that we can edit it before writing to the
# changelog.
result = run_until_successful(
"python3 -m towncrier --draft", shell=True, capture_output=True
)
new_changes = result.stdout.decode("utf-8")
new_changes = new_changes.replace(
"No significant changes.", f"No significant changes since {current_version}."
)
# Prepend changes to changelog
with open("CHANGES.md", "r+") as f:
existing_content = f.read()
f.seek(0, 0)
f.write(new_changes)
f.write("\n")
f.write(existing_content)
# Remove all the news fragments
for f in glob.iglob("changelog.d/*.*"):
os.remove(f)
if __name__ == "__main__":
cli()
+61
View File
@@ -0,0 +1,61 @@
# workers_setup
This gives you a **development-grade** installation of workerised Synapse.
DO NOT USE ME IN PRODUCTION.
## Known defects
* Non-generic workers aren't set up properly with their worker type.
* I haven't checked the routes that well; they are probably wrong.
## Requirements from you:
* Redis on default port (unauthenticated)
```
# You need Redis. On Ubuntu, this gets you what you need running on the right port:
apt install redis-server redis-tools
```
* Postgres on default port, using UNIX sockets for authentication.
This means you want your normal user account to have a corresponding Postgres account,
and let Postgres authenticate you automatically.
On Ubuntu, this just means you need to `createuser <your Linux account name>`.
You need a database with the same name as your server_name (I used `syn7`).
It should be owned by your user; see `createdb` to do that properly (and don't
forget to follow the Synapse instructions to use a C locale!)
Typing `psql syn7` should just work once your database is ready.
(If your UNIX socket is not numbered 5432, you might have to add `port: 5433`
to the config. Somehow I messed up my Postgres installation ages ago that it
chose port 5433 rather than the default 5432...)
* Virtualenv with Synapse (don't forget: `[postgres,redis]`)
* You'll need a bog standard Caddy binary (as the reverse proxy / router).
The website offers pre-built static binaries.
* (Optional): If you want to federate, you can set up TLS yourself afterwards.
I haven't bothered so far.
## Run the script
```
# python scripts-dev/workers_setup.py (path to server dir) (server name)
python scripts-dev/workers_setup.py ../servers/syn7_auto syn7
```
## Launching the homeserver
```
cd syn7_auto
/path/to/synapse/.venv/bin/synctl start homeserver.yaml -a workers
/path/to/caddy run
```
## Stopping the homeserver
```
# ^C to stop Caddy
/path/to/synapse/.venv/bin/synctl stop homeserver.yaml -a workers
```
+305
View File
@@ -0,0 +1,305 @@
#!/usr/bin/env python
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import sys
from os.path import dirname
from pathlib import Path
from typing import Collection, Dict, Iterable, List, Sequence, Tuple
from jinja2 import Environment, FileSystemLoader
from signedjson.key import generate_signing_key, write_signing_keys
from synapse.util.stringutils import random_string
DESIRED_WORKERS = (
("main", 1),
("synchrotron", 2),
("federation_inbound", 2),
("federation_reader", 2),
("federation_sender", 2),
("typing", 1),
("appservice", 1),
("client_reader", 2),
("event_creator", 2),
("event_persister", 2),
("media_repository", 1),
("pusher", 2),
("user_dir", 1),
("background_worker", 1),
# TODO ("encryption", 1), # ??
("receipts_account_data", 1)
# TODO frontend_proxy?
)
# TODO These are probably all wrong
# ^/_matrix/client/(api/v1|r0|v3|unstable)/sendToDevice/ ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/.*/tags
# ^/_matrix/client/(api/v1|r0|v3|unstable)/.*/account_data ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/receipt
# ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/read_markers ?
# ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ ?
WORKER_ROUTES: Dict[str, Tuple[str, ...]] = {
"main": (),
"synchrotron": (
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
),
"federation_inbound": ("^/_matrix/federation/v1/send/",),
"federation_reader": (
"^/_matrix/federation/v1/event/",
"^/_matrix/federation/v1/state/",
"^/_matrix/federation/v1/state_ids/",
"^/_matrix/federation/v1/backfill/",
"^/_matrix/federation/v1/get_missing_events/",
"^/_matrix/federation/v1/publicRooms",
"^/_matrix/federation/v1/query/",
"^/_matrix/federation/v1/make_join/",
"^/_matrix/federation/v1/make_leave/",
"^/_matrix/federation/v1/send_join/",
"^/_matrix/federation/v2/send_join/",
"^/_matrix/federation/v1/send_leave/",
"^/_matrix/federation/v2/send_leave/",
"^/_matrix/federation/v1/invite/",
"^/_matrix/federation/v2/invite/",
"^/_matrix/federation/v1/query_auth/",
"^/_matrix/federation/v1/event_auth/",
"^/_matrix/federation/v1/exchange_third_party_invite/",
"^/_matrix/federation/v1/user/devices/",
"^/_matrix/federation/v1/get_groups_publicised$",
"^/_matrix/key/v2/query",
"^/_matrix/federation/(v1|unstable/org.matrix.msc2946)/hierarchy/",
),
"federation_sender": (),
"typing": ("^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing",),
"appservice": (),
"client_reader": (
"^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
"^/_matrix/client/(v1|unstable/org.matrix.msc2946)/rooms/.*/hierarchy$",
"^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$",
"^/_matrix/client/(r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(r0|v3|unstable)/devices$",
"^/_matrix/client/versions$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
"^/_matrix/client/(r0|v3|unstable)/joined_groups$",
"^/_matrix/client/(r0|v3|unstable)/publicised_groups$",
"^/_matrix/client/(r0|v3|unstable)/publicised_groups/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search$",
"^/_matrix/client/(r0|v3|unstable)/keys/query$",
"^/_matrix/client/(r0|v3|unstable)/keys/changes$",
"^/_matrix/client/(r0|v3|unstable)/keys/claim$",
"^/_matrix/client/(r0|v3|unstable)/room_keys/",
#
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(r0|v3|unstable)/register$",
"^/_matrix/client/v1/register/m.login.registration_token/validity$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(r0|v3|unstable)/sendToDevice/",
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/login/sso/redirect",
"^/_synapse/client/pick_idp$",
"^/_synapse/client/pick_username",
"^/_synapse/client/new_user_consent$",
"^/_synapse/client/sso_register$",
"^/_synapse/client/oidc/callback$",
"^/_synapse/client/saml2/authn_response$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/login/cas/ticket$",
),
"event_creator": (),
"event_persister": (),
"media_repository": (
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
"^/_synapse/admin/v1/users/.*/media$",
),
"pusher": (),
"user_dir": ("^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$",),
"background_worker": (),
"receipts_account_data": (),
}
@dataclasses.dataclass
class Worker:
name: str
kind: str
index: int
ip: str
def worker_num_to_ip(num: int) -> str:
return f"127.0.57.{num}"
def make_workers(workers: Iterable[Tuple[str, int]]) -> List[Worker]:
result = []
worker_overall_num = 0
for worker_type, worker_type_count in workers:
for worker_idx in range(1, worker_type_count + 1):
worker_overall_num += 1
if worker_type == "main":
worker_name = "main"
else:
worker_name = f"{worker_type}{worker_idx}"
result.append(
Worker(
worker_name,
worker_type,
worker_idx,
worker_num_to_ip(worker_overall_num),
)
)
return result
def generate(
worker_counts: Tuple[Tuple[str, int], ...], target_path: Path, server_name: str
) -> None:
if target_path.exists():
print("Target path already exists. Won't overwrite.")
return
target_path.mkdir()
# Generate a signing key
key_id = "a_" + random_string(4)
key = (generate_signing_key(key_id),)
with open(target_path.joinpath("signing.key"), "w") as fout:
write_signing_keys(fout, key)
macaroon_secret_key = random_string(32)
env = Environment(loader=FileSystemLoader(dirname(__file__) + "/workers_setup"))
hs_template = env.get_template("homeserver.yaml.j2")
worker_template = env.get_template("worker.yaml.j2")
logging_template = env.get_template("logging.yaml.j2")
rp_template = env.get_template("Caddyfile.j2")
worker_dir = target_path.joinpath("workers")
worker_dir.mkdir()
worker_logging_dir = target_path.joinpath("workers.logging")
worker_logging_dir.mkdir()
worker_dir = worker_dir.resolve()
logs_dir = target_path.joinpath("logs")
logs_dir.mkdir()
logs_dir = logs_dir.resolve()
all_workers = make_workers(worker_counts)
workers_by_name = {worker.name: worker for worker in all_workers}
for worker in all_workers:
log_config_path = worker_logging_dir.joinpath(f"{worker.name}.logging.yaml")
log_config = logging_template.render(
worker=worker,
worker_dir=worker_dir,
logs_dir=logs_dir,
all_workers=all_workers,
workers_by_name=workers_by_name,
)
with open(log_config_path, "w") as fout:
fout.write(log_config)
# if worker.name == "main":
# Main can't use a worker file.
# continue
worker_config_path = worker_dir.joinpath(f"{worker.name}.yaml")
worker_config = worker_template.render(
worker=worker,
worker_dir=worker_dir,
logs_dir=logs_dir,
all_workers=all_workers,
workers_by_name=workers_by_name,
)
with open(worker_config_path, "w") as fout:
fout.write(worker_config)
hs_config_path = target_path.joinpath("homeserver.yaml")
hs_config = hs_template.render(
all_workers=all_workers,
workers_by_name=workers_by_name,
worker_dir=worker_dir,
logs_dir=logs_dir,
server_name=server_name,
macaroon_secret_key=macaroon_secret_key,
)
with open(hs_config_path, "w") as fout:
fout.write(hs_config)
caddy_config_path = target_path.joinpath("Caddyfile")
caddy_config = rp_template.render(
server_name=server_name,
port=8447,
http_ip=worker_num_to_ip(1),
routing=build_routes_template_var(all_workers),
main_server=f"{worker_num_to_ip(1)}:8080",
)
with open(caddy_config_path, "w") as fout:
fout.write(caddy_config)
def build_routes_template_var(
all_workers: List[Worker],
) -> Sequence[Tuple[str, Collection[str], List[str]]]:
route_groups = {}
for worker in all_workers:
if worker.kind not in route_groups:
if WORKER_ROUTES[worker.kind]:
route_groups[worker.kind] = worker.kind, WORKER_ROUTES[worker.kind], []
else:
continue
_, _routes, server_endpoints = route_groups[worker.kind]
server_endpoints.append(f"{worker.ip}:8080")
return tuple(route_groups.values())
def main(target_path: Path, server_name: str) -> None:
generate(DESIRED_WORKERS, target_path, server_name)
if __name__ == "__main__":
target_path = Path(sys.argv[1])
server_name = sys.argv[2]
main(target_path, server_name)
+24
View File
@@ -0,0 +1,24 @@
{
# Prevents Caddy from asking for sudo password to install a root cert that
# we don't even want to use here.
skip_install_trust
}
# If you want TLS, you can add https:// schemes and configure the TLS cert... somehow.
http://{{ server_name }}:{{ port }}, http://{{ http_ip }}:{{ port }} {
{%- for route_group_name, routes, route_servers in routing %}
@{{ route_group_name }} {
{%- for route in routes %}
path_regexp {{ route }}
{%- endfor %}
}
route @{{ route_group_name }} {
reverse_proxy {% for server in route_servers %} {{ server }} {% endfor %}
}
{%- endfor %}
# fallback to main
route {
reverse_proxy {{ main_server }}
}
}
@@ -0,0 +1,96 @@
server_name: {{ server_name }}
report_stats: false
signing_key_path: "{{ worker_dir }}/../signing.key"
macaroon_secret_key: "{{ macaroon_secret_key }}"
enable_registration: true
redis:
enabled: true
#host: localhost
#port: 6379
trusted_key_servers: []
listeners:
- port: 8080
bind_address: {{ workers_by_name.main.ip }}
type: http
resources:
- names: [client, federation]
# The HTTP replication port
- port: 9090
bind_address: {{ workers_by_name.main.ip }}
type: http
resources:
- names: [replication]
database:
name: psycopg2
args:
# Comment out user, password and host to use UNIX socket auth.
# For testing, create a database owned by your Postgres user that is logged
# in with your UNIX user
#user: "synapse"
#password:
database: "{{ server_name }}"
#host: "localhost"
cp_min: 5
cp_max: 10
instance_map:
{%- for worker in all_workers %}
{{ worker.name }}:
host: {{ worker.ip }}
port: 9090
{%- endfor %}
stream_writers:
events:
{%- for worker in all_workers %}
{%- if worker.kind == "event_persister" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
typing:
{%- for worker in all_workers %}
{%- if worker.kind == "typing" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
start_pushers: false
pusher_instances:
{% for worker in all_workers -%}
{%- if worker.kind == "pusher" %}
- {{ worker.name }}
{%- endif %}
{%- endfor %}
notify_appservices: False
federation_sender_instances:
{% for worker in all_workers -%}
{%- if worker.kind == "federation_sender" %}
- {{ worker.name }}
{% endif -%}
{% endfor %}
enable_media_repo: False
media_instance_running_background_jobs: "media1"
update_user_directory: False
pid_file: "{{ logs_dir }}/main.pid"
log_config: '{{ worker_dir }}.logging/main.logging.yaml'
run_background_tasks_on: background_worker1
+32
View File
@@ -0,0 +1,32 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.util.logcontext.LoggingContextFilter
request: ""
handlers:
console:
class: logging.FileHandler
formatter: precise
filters: [context]
encoding: 'UTF-8'
filename: '{{ logs_dir }}/{{ worker.name }}.log'
loggers:
synapse:
level: DEBUG
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
root:
level: WARNING
handlers: [console]
+36
View File
@@ -0,0 +1,36 @@
{% set main_worker = workers_by_name.main %}
{% if worker.kind == "main" %}
worker_app: synapse.app.homeserver
{% else %}
worker_app: synapse.app.generic_worker
worker_name: {{ worker.name }}
# The replication listener on the main synapse process.
worker_replication_host: {{ main_worker.ip }}
worker_replication_http_port: 9090
worker_listeners:
- type: http
bind_address: {{ worker.ip }}
port: 8080
resources:
- names:
- client
- federation
{%- if worker.kind == "media" %}
- media
{%- endif %}
- type: http
bind_address: {{ worker.ip }}
port: 9090
resources:
- names: [replication]
worker_log_config: '{{ worker_dir }}.logging/{{ worker.name }}.logging.yaml'
worker_pid_file: '{{ logs_dir }}/{{ worker.name }}.pid'
worker_main_http_uri: http://{{ main_worker.ip }}:8080
{% endif %}
-1
View File
@@ -165,7 +165,6 @@ setup(
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
scripts=["synctl"] + glob.glob("scripts/*"),
cmdclass={"test": TestCommand},
+1 -1
View File
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.54.0"
__version__ = "1.53.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
+2 -3
View File
@@ -22,7 +22,6 @@ from typing import (
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
TypeVar,
@@ -362,10 +361,10 @@ class Filter:
return self._check_fields(field_matchers)
else:
content = event.get("content")
# Content is assumed to be a mapping below, so ensure it is. This should
# Content is assumed to be a dict below, so ensure it is. This should
# always be true for events, but account_data has been allowed to
# have non-dict content.
if not isinstance(content, Mapping):
if not isinstance(content, dict):
content = {}
sender = event.get("sender", None)
+3 -3
View File
@@ -15,13 +15,13 @@ import logging
import sys
from typing import Container
from synapse.util import check_dependencies
from synapse import python_dependencies # noqa: E402
logger = logging.getLogger(__name__)
try:
check_dependencies.check_requirements()
except check_dependencies.DependencyException as e:
python_dependencies.check_requirements()
except python_dependencies.DependencyException as e:
sys.stderr.writelines(
e.message # noqa: B306, DependencyException.message is a property
)
+1 -1
View File
@@ -59,6 +59,7 @@ from synapse.http.server import (
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.python_dependencies import check_requirements
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
@@ -69,7 +70,6 @@ from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.rest.well_known import well_known_resource
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.util.check_dependencies import check_requirements
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.module_loader import load_module
-16
View File
@@ -31,14 +31,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Type for the `device_one_time_key_counts` field in an appservice transaction
# user ID -> {device ID -> {algorithm -> count}}
TransactionOneTimeKeyCounts = Dict[str, Dict[str, Dict[str, int]]]
# Type for the `device_unused_fallback_keys` field in an appservice transaction
# user ID -> {device ID -> [algorithm]}
TransactionUnusedFallbackKeys = Dict[str, Dict[str, List[str]]]
class ApplicationServiceState(Enum):
DOWN = "down"
@@ -80,7 +72,6 @@ class ApplicationService:
rate_limited: bool = True,
ip_range_whitelist: Optional[IPSet] = None,
supports_ephemeral: bool = False,
msc3202_transaction_extensions: bool = False,
):
self.token = token
self.url = (
@@ -93,7 +84,6 @@ class ApplicationService:
self.id = id
self.ip_range_whitelist = ip_range_whitelist
self.supports_ephemeral = supports_ephemeral
self.msc3202_transaction_extensions = msc3202_transaction_extensions
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
@@ -349,16 +339,12 @@ class AppServiceTransaction:
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
self.to_device_messages = to_device_messages
self.one_time_key_counts = one_time_key_counts
self.unused_fallback_keys = unused_fallback_keys
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -373,8 +359,6 @@ class AppServiceTransaction:
events=self.events,
ephemeral=self.ephemeral,
to_device_messages=self.to_device_messages,
one_time_key_counts=self.one_time_key_counts,
unused_fallback_keys=self.unused_fallback_keys,
txn_id=self.id,
)
+2 -18
View File
@@ -19,11 +19,6 @@ from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.appservice import (
ApplicationService,
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
@@ -31,6 +26,7 @@ from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
from synapse.appservice import ApplicationService
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -223,8 +219,6 @@ class ApplicationServiceApi(SimpleHttpClient):
events: List[EventBase],
ephemeral: List[JsonDict],
to_device_messages: List[JsonDict],
one_time_key_counts: TransactionOneTimeKeyCounts,
unused_fallback_keys: TransactionUnusedFallbackKeys,
txn_id: Optional[int] = None,
) -> bool:
"""
@@ -258,7 +252,7 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
# Never send ephemeral events to appservices that do not support it
body: JsonDict = {"events": serialized_events}
body: Dict[str, List[JsonDict]] = {"events": serialized_events}
if service.supports_ephemeral:
body.update(
{
@@ -268,16 +262,6 @@ class ApplicationServiceApi(SimpleHttpClient):
}
)
if service.msc3202_transaction_extensions:
if one_time_key_counts:
body[
"org.matrix.msc3202.device_one_time_key_counts"
] = one_time_key_counts
if unused_fallback_keys:
body[
"org.matrix.msc3202.device_unused_fallback_keys"
] = unused_fallback_keys
try:
await self.put_json(
uri=uri,
+4 -94
View File
@@ -54,19 +54,12 @@ from typing import (
Callable,
Collection,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
)
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice.api import ApplicationServiceApi
from synapse.events import EventBase
from synapse.logging.context import run_in_background
@@ -103,7 +96,7 @@ class ApplicationServiceScheduler:
self.as_api = hs.get_application_service_api()
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock, hs)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
async def start(self) -> None:
logger.info("Starting appservice scheduler")
@@ -160,9 +153,7 @@ class _ServiceQueuer:
appservice at a given time.
"""
def __init__(
self, txn_ctrl: "_TransactionController", clock: Clock, hs: "HomeServer"
):
def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
# dict of {service_id: [events]}
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [events]}
@@ -174,10 +165,6 @@ class _ServiceQueuer:
self.requests_in_flight: Set[str] = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
self._msc3202_transaction_extensions_enabled: bool = (
hs.config.experimental.msc3202_transaction_extensions
)
self._store = hs.get_datastores().main
def start_background_request(self, service: ApplicationService) -> None:
# start a sender for this appservice if we don't already have one
@@ -215,84 +202,15 @@ class _ServiceQueuer:
if not events and not ephemeral and not to_device_messages_to_send:
return
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None
if (
self._msc3202_transaction_extensions_enabled
and service.msc3202_transaction_extensions
):
# Compute the one-time key counts and fallback key usage states
# for the users which are mentioned in this transaction,
# as well as the appservice's sender.
(
one_time_key_counts,
unused_fallback_keys,
) = await self._compute_msc3202_otk_counts_and_fallback_keys(
service, events, ephemeral, to_device_messages_to_send
)
try:
await self.txn_ctrl.send(
service,
events,
ephemeral,
to_device_messages_to_send,
one_time_key_counts,
unused_fallback_keys,
service, events, ephemeral, to_device_messages_to_send
)
except Exception:
logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)
async def _compute_msc3202_otk_counts_and_fallback_keys(
self,
service: ApplicationService,
events: Iterable[EventBase],
ephemerals: Iterable[JsonDict],
to_device_messages: Iterable[JsonDict],
) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]:
"""
Given a list of the events, ephemeral messages and to-device messages,
- first computes a list of application services users that may have
interesting updates to the one-time key counts or fallback key usage.
- then computes one-time key counts and fallback key usages for those users.
Given a list of application service users that are interesting,
compute one-time key counts and fallback key usages for the users.
"""
# Set of 'interesting' users who may have updates
users: Set[str] = set()
# The sender is always included
users.add(service.sender)
# All AS users that would receive the PDUs or EDUs sent to these rooms
# are classed as 'interesting'.
rooms_of_interesting_users: Set[str] = set()
# PDUs
rooms_of_interesting_users.update(event.room_id for event in events)
# EDUs
rooms_of_interesting_users.update(
ephemeral["room_id"] for ephemeral in ephemerals
)
# Look up the AS users in those rooms
for room_id in rooms_of_interesting_users:
users.update(
await self._store.get_app_service_users_in_room(room_id, service)
)
# Add recipients of to-device messages.
# device_message["user_id"] is the ID of the recipient.
users.update(device_message["user_id"] for device_message in to_device_messages)
# Compute and return the counts / fallback key usage states
otk_counts = await self._store.count_bulk_e2e_one_time_keys_for_as(users)
unused_fbks = await self._store.get_e2e_bulk_unused_fallback_key_types(users)
return otk_counts, unused_fbks
class _TransactionController:
"""Transaction manager.
@@ -320,8 +238,6 @@ class _TransactionController:
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
to_device_messages: Optional[List[JsonDict]] = None,
one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
) -> None:
"""
Create a transaction with the given data and send to the provided
@@ -332,10 +248,6 @@ class _TransactionController:
events: The persistent events to include in the transaction.
ephemeral: The ephemeral events to include in the transaction.
to_device_messages: The to-device messages to include in the transaction.
one_time_key_counts: Counts of remaining one-time keys for relevant
appservice devices in the transaction.
unused_fallback_keys: Lists of unused fallback keys for relevant
appservice devices in the transaction.
"""
try:
txn = await self.store.create_appservice_txn(
@@ -343,8 +255,6 @@ class _TransactionController:
events=events,
ephemeral=ephemeral or [],
to_device_messages=to_device_messages or [],
one_time_key_counts=one_time_key_counts or {},
unused_fallback_keys=unused_fallback_keys or {},
)
service_is_up = await self._is_service_up(service)
if service_is_up:
+1 -12
View File
@@ -166,16 +166,6 @@ def _load_appservice(
supports_ephemeral = as_info.get("de.sorunome.msc2409.push_ephemeral", False)
# Opt-in flag for the MSC3202-specific transactional behaviour.
# When enabled, appservice transactions contain the following information:
# - device One-Time Key counts
# - device unused fallback key usage states
msc3202_transaction_extensions = as_info.get("org.matrix.msc3202", False)
if not isinstance(msc3202_transaction_extensions, bool):
raise ValueError(
"The `org.matrix.msc3202` option should be true or false if specified."
)
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
@@ -184,9 +174,8 @@ def _load_appservice(
hs_token=as_info["hs_token"],
sender=user_id,
id=as_info["id"],
supports_ephemeral=supports_ephemeral,
protocols=protocols,
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,
supports_ephemeral=supports_ephemeral,
msc3202_transaction_extensions=msc3202_transaction_extensions,
)
+1 -1
View File
@@ -20,7 +20,7 @@ from typing import Callable, Dict, Optional
import attr
from synapse.util.check_dependencies import DependencyException, check_requirements
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
+5 -11
View File
@@ -47,22 +47,16 @@ class ExperimentalConfig(Config):
# MSC3030 (Jump to date API endpoint)
self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
# MSC2409 (this setting only relates to optionally sending to-device messages).
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. If enabled, this adds to-device messages to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)
# The portion of MSC3202 which is related to device masquerading.
self.msc3202_device_masquerading_enabled: bool = experimental.get(
"msc3202_device_masquerading", False
)
# Portion of MSC3202 related to transaction extensions:
# sending one-time key counts and fallback key usage to application services.
self.msc3202_transaction_extensions: bool = experimental.get(
"msc3202_transaction_extensions", False
# MSC2409 (this setting only relates to optionally sending to-device messages).
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. If enabled, this adds to-device messages to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)
# MSC3706 (server-side support for partial state in /send_join responses)
+1 -1
View File
@@ -15,7 +15,7 @@
import attr
from synapse.util.check_dependencies import DependencyException, check_requirements
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
+1 -1
View File
@@ -20,11 +20,11 @@ import attr
from synapse.config._util import validate_config
from synapse.config.sso import SsoAttributeRequirement
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.types import JsonDict
from synapse.util.module_loader import load_module
from synapse.util.stringutils import parse_and_validate_mxc_uri
from ..util.check_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError, read_file
DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc.JinjaOidcMappingProvider"
+1 -1
View File
@@ -13,7 +13,7 @@
# limitations under the License.
from synapse.config._base import Config
from synapse.util.check_dependencies import check_requirements
from synapse.python_dependencies import check_requirements
class RedisConfig(Config):
+1 -1
View File
@@ -20,8 +20,8 @@ from urllib.request import getproxies_environment # type: ignore
import attr
from synapse.config.server import DEFAULT_IP_RANGE_BLACKLIST, generate_ip_set
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements
from synapse.util.module_loader import load_module
from ._base import Config, ConfigError
+1 -1
View File
@@ -17,8 +17,8 @@ import logging
from typing import Any, List, Set
from synapse.config.sso import SsoAttributeRequirement
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements
from synapse.util.module_loader import load_module, load_python_module
from ._base import Config, ConfigError
+1 -1
View File
@@ -14,7 +14,7 @@
from typing import Set
from synapse.util.check_dependencies import DependencyException, check_requirements
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
-9
View File
@@ -101,9 +101,6 @@ class EventContext:
As with _current_state_ids, this is a private attribute. It should be
accessed via get_prev_state_ids.
partial_state: if True, we may be storing this event with a temporary,
incomplete state.
"""
rejected: Union[bool, str] = False
@@ -116,15 +113,12 @@ class EventContext:
_current_state_ids: Optional[StateMap[str]] = None
_prev_state_ids: Optional[StateMap[str]] = None
partial_state: bool = False
@staticmethod
def with_state(
state_group: Optional[int],
state_group_before_event: Optional[int],
current_state_ids: Optional[StateMap[str]],
prev_state_ids: Optional[StateMap[str]],
partial_state: bool,
prev_group: Optional[int] = None,
delta_ids: Optional[StateMap[str]] = None,
) -> "EventContext":
@@ -135,7 +129,6 @@ class EventContext:
state_group_before_event=state_group_before_event,
prev_group=prev_group,
delta_ids=delta_ids,
partial_state=partial_state,
)
@staticmethod
@@ -177,7 +170,6 @@ class EventContext:
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"app_service_id": self.app_service.id if self.app_service else None,
"partial_state": self.partial_state,
}
@staticmethod
@@ -204,7 +196,6 @@ class EventContext:
prev_group=input["prev_group"],
delta_ids=_decode_state_dict(input["delta_ids"]),
rejected=input["rejected"],
partial_state=input.get("partial_state", False),
)
app_service_id = input["app_service_id"]
+3 -57
View File
@@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tupl
from synapse.api.errors import ModuleFailedException, SynapseError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.storage.roommember import ProfileInfo
from synapse.types import Requester, StateMap
from synapse.util.async_helpers import maybe_awaitable
@@ -38,8 +37,6 @@ CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[
[str, StateMap[EventBase], str], Awaitable[bool]
]
ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable]
ON_PROFILE_UPDATE_CALLBACK = Callable[[str, ProfileInfo, bool, bool], Awaitable]
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK = Callable[[str, bool, bool], Awaitable]
def load_legacy_third_party_event_rules(hs: "HomeServer") -> None:
@@ -157,10 +154,6 @@ class ThirdPartyEventRules:
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK
] = []
self._on_new_event_callbacks: List[ON_NEW_EVENT_CALLBACK] = []
self._on_profile_update_callbacks: List[ON_PROFILE_UPDATE_CALLBACK] = []
self._on_user_deactivation_status_changed_callbacks: List[
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK
] = []
def register_third_party_rules_callbacks(
self,
@@ -173,10 +166,6 @@ class ThirdPartyEventRules:
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK
] = None,
on_new_event: Optional[ON_NEW_EVENT_CALLBACK] = None,
on_profile_update: Optional[ON_PROFILE_UPDATE_CALLBACK] = None,
on_user_deactivation_status_changed: Optional[
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK
] = None,
) -> None:
"""Register callbacks from modules for each hook."""
if check_event_allowed is not None:
@@ -198,14 +187,6 @@ class ThirdPartyEventRules:
if on_new_event is not None:
self._on_new_event_callbacks.append(on_new_event)
if on_profile_update is not None:
self._on_profile_update_callbacks.append(on_profile_update)
if on_user_deactivation_status_changed is not None:
self._on_user_deactivation_status_changed_callbacks.append(
on_user_deactivation_status_changed,
)
async def check_event_allowed(
self, event: EventBase, context: EventContext
) -> Tuple[bool, Optional[dict]]:
@@ -353,6 +334,9 @@ class ThirdPartyEventRules:
Args:
event_id: The ID of the event.
Raises:
ModuleFailureError if a callback raised any exception.
"""
# Bail out early without hitting the store if we don't have any callbacks
if len(self._on_new_event_callbacks) == 0:
@@ -386,41 +370,3 @@ class ThirdPartyEventRules:
state_events[key] = room_state_events[event_id]
return state_events
async def on_profile_update(
self, user_id: str, new_profile: ProfileInfo, by_admin: bool, deactivation: bool
) -> None:
"""Called after the global profile of a user has been updated. Does not include
per-room profile changes.
Args:
user_id: The user whose profile was changed.
new_profile: The updated profile for the user.
by_admin: Whether the profile update was performed by a server admin.
deactivation: Whether this change was made while deactivating the user.
"""
for callback in self._on_profile_update_callbacks:
try:
await callback(user_id, new_profile, by_admin, deactivation)
except Exception as e:
logger.exception(
"Failed to run module API callback %s: %s", callback, e
)
async def on_user_deactivation_status_changed(
self, user_id: str, deactivated: bool, by_admin: bool
) -> None:
"""Called after a user has been deactivated or reactivated.
Args:
user_id: The deactivated user.
deactivated: Whether the user is now deactivated.
by_admin: Whether the deactivation was performed by a server admin.
"""
for callback in self._on_user_deactivation_status_changed_callbacks:
try:
await callback(user_id, deactivated, by_admin)
except Exception as e:
logger.exception(
"Failed to run module API callback %s: %s", callback, e
)
+203 -45
View File
@@ -615,15 +615,11 @@ class FederationClient(FederationBase):
synapse_error = e.to_synapse_error()
# There is no good way to detect an "unknown" endpoint.
#
# Dendrite returns a 404 (with a body of "404 page not found");
# Conduit returns a 404 (with no body); and Synapse returns a 400
# Dendrite returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
#
# This needs to be rather specific as some endpoints truly do return 404
# errors.
return (
e.code == 404 and (not e.response or e.response == b"404 page not found")
) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
return e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
)
async def _try_destination_list(
self,
@@ -1006,7 +1002,7 @@ class FederationClient(FederationBase):
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
@@ -1075,7 +1071,7 @@ class FederationClient(FederationBase):
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint if the room uses old-style event IDs.
# Otherwise, consider it a legitimate error and raise.
# Otherwise consider it a legitmate error and raise.
err = e.to_synapse_error()
if self._is_unknown_endpoint(e, err):
if room_version.event_format != EventFormatVersions.V1:
@@ -1136,7 +1132,7 @@ class FederationClient(FederationBase):
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the v1 endpoint. Otherwise, consider it a legitimate error
# fallback to the v1 endpoint. Otherwise consider it a legitmate error
# and raise.
if not self._is_unknown_endpoint(e):
raise
@@ -1362,6 +1358,61 @@ class FederationClient(FederationBase):
# server doesn't give it to us.
return None
async def get_space_summary(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> "FederationSpaceSummaryResult":
"""
Call other servers to get a summary of the given space
Args:
destinations: The remote servers. We will try them in turn, omitting any
that have been blacklisted.
room_id: ID of the space to be queried
suggested_only: If true, ask the remote server to only return children
with the "suggested" flag set
max_rooms_per_space: A limit on the number of children to return for each
space
exclude_rooms: A list of room IDs to tell the remote server to skip
Returns:
a parsed FederationSpaceSummaryResult
Raises:
SynapseError if we were unable to get a valid summary from any of the
remote servers
"""
async def send_request(destination: str) -> FederationSpaceSummaryResult:
res = await self.transport_layer.get_space_summary(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
exclude_rooms=exclude_rooms,
)
try:
return FederationSpaceSummaryResult.from_json_dict(res)
except ValueError as e:
raise InvalidResponseError(str(e))
return await self._try_destination_list(
"fetch space summary",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)
async def get_room_hierarchy(
self,
destinations: Iterable[str],
@@ -1407,8 +1458,8 @@ class FederationClient(FederationBase):
)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised endpoint,
# fallback to the unstable endpoint. Otherwise, consider it a
# legitimate error and raise.
# fallback to the unstable endpoint. Otherwise consider it a
# legitmate error and raise.
if not self._is_unknown_endpoint(e):
raise
@@ -1433,8 +1484,10 @@ class FederationClient(FederationBase):
if any(not isinstance(e, dict) for e in children_state):
raise InvalidResponseError("Invalid event in 'children_state' list")
try:
for child_state in children_state:
_validate_hierarchy_event(child_state)
[
FederationSpaceSummaryEventResult.from_json_dict(e)
for e in children_state
]
except ValueError as e:
raise InvalidResponseError(str(e))
@@ -1456,12 +1509,62 @@ class FederationClient(FederationBase):
return room, children_state, children, inaccessible_children
result = await self._try_destination_list(
"fetch room hierarchy",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)
try:
result = await self._try_destination_list(
"fetch room hierarchy",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)
except SynapseError as e:
# If an unexpected error occurred, re-raise it.
if e.code != 502:
raise
logger.debug(
"Couldn't fetch room hierarchy, falling back to the spaces API"
)
# Fallback to the old federation API and translate the results if
# no servers implement the new API.
#
# The algorithm below is a bit inefficient as it only attempts to
# parse information for the requested room, but the legacy API may
# return additional layers.
legacy_result = await self.get_space_summary(
destinations,
room_id,
suggested_only,
max_rooms_per_space=None,
exclude_rooms=[],
)
# Find the requested room in the response (and remove it).
for _i, room in enumerate(legacy_result.rooms):
if room.get("room_id") == room_id:
break
else:
# The requested room was not returned, nothing we can do.
raise
requested_room = legacy_result.rooms.pop(_i)
# Find any children events of the requested room.
children_events = []
children_room_ids = set()
for event in legacy_result.events:
if event.room_id == room_id:
children_events.append(event.data)
children_room_ids.add(event.state_key)
# Find the children rooms.
children = []
for room in legacy_result.rooms:
if room.get("room_id") in children_room_ids:
children.append(room)
# It isn't clear from the response whether some of the rooms are
# not accessible.
result = (requested_room, children_events, children, ())
# Cache the result to avoid fetching data over federation every time.
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
@@ -1603,34 +1706,89 @@ class TimestampToEventResponse:
return cls(event_id, origin_server_ts, d)
def _validate_hierarchy_event(d: JsonDict) -> None:
"""Validate an event within the result of a /hierarchy request
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid event
It's essentially just a serialised event object, but we do a bit of parsing and
validation in `from_json_dict` and store some of the validated properties in
object attributes.
"""
event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
event_type: str
room_id: str
state_key: str
via: Sequence[str]
room_id = d.get("room_id")
if not isinstance(room_id, str):
raise ValueError("Invalid event: 'room_id' must be a str")
# the raw data, including the above keys
data: JsonDict
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
"""Parse an event within the result of a /spaces/ request
content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")
Args:
d: json object to be parsed
via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
Raises:
ValueError if d is not a valid event
"""
event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
room_id = d.get("room_id")
if not isinstance(room_id, str):
raise ValueError("Invalid event: 'room_id' must be a str")
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")
via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
return cls(event_type, room_id, state_key, via, d)
@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""
rooms: List[JsonDict]
events: Sequence[FederationSpaceSummaryEventResult]
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
"""Parse the result of a /spaces/ request
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid /spaces/ response
"""
rooms = d.get("rooms")
if not isinstance(rooms, List):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")
events = d.get("events")
if not isinstance(events, Sequence):
raise ValueError("'events' must be a list")
if any(not isinstance(e, dict) for e in events):
raise ValueError("Invalid event in 'events' list")
parsed_events = [
FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
]
return cls(rooms, parsed_events)
+33
View File
@@ -1179,6 +1179,39 @@ class TransportLayerClient:
return await self.client.get_json(destination=destination, path=path)
async def get_space_summary(
self,
destination: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
max_rooms_per_space: an optional limit to the number of children to be
returned per space
exclude_rooms: a list of any rooms we can skip
"""
# TODO When switching to the stable endpoint, use GET instead of POST.
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)
params = {
"suggested_only": suggested_only,
"exclude_rooms": exclude_rooms,
}
if max_rooms_per_space is not None:
params["max_rooms_per_space"] = max_rooms_per_space
return await self.client.post_json(
destination=destination, path=path, data=params
)
async def get_room_hierarchy(
self, destination: str, room_id: str, suggested_only: bool
) -> JsonDict:
@@ -110,7 +110,7 @@ class FederationSendServlet(BaseFederationServerServlet):
if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = ["m.device_list_update", "m.signing_key_update"]
device_list_updates = [
edu.get("content", {})
edu.content
for edu in transaction_data.get("edus", [])
if edu.get("edu_type") in DEVICE_UPDATE_EDUS
]
@@ -624,6 +624,81 @@ class FederationVersionServlet(BaseFederationServlet):
)
class FederationSpaceSummaryServlet(BaseFederationServlet):
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/spaces/(?P<room_id>[^/]*)"
def __init__(
self,
hs: "HomeServer",
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
):
super().__init__(hs, authenticator, ratelimiter, server_name)
self.handler = hs.get_room_summary_handler()
async def on_GET(
self,
origin: str,
content: Literal[None],
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
max_rooms_per_space = parse_integer_from_args(query, "max_rooms_per_space")
if max_rooms_per_space is not None and max_rooms_per_space < 0:
raise SynapseError(
400,
"Value for 'max_rooms_per_space' must be a non-negative integer",
Codes.BAD_JSON,
)
exclude_rooms = parse_strings_from_args(query, "exclude_rooms", default=[])
return 200, await self.handler.federation_space_summary(
origin, room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = content.get("suggested_only", False)
if not isinstance(suggested_only, bool):
raise SynapseError(
400, "'suggested_only' must be a boolean", Codes.BAD_JSON
)
exclude_rooms = content.get("exclude_rooms", [])
if not isinstance(exclude_rooms, list) or any(
not isinstance(x, str) for x in exclude_rooms
):
raise SynapseError(400, "bad value for 'exclude_rooms'", Codes.BAD_JSON)
max_rooms_per_space = content.get("max_rooms_per_space")
if max_rooms_per_space is not None:
if not isinstance(max_rooms_per_space, int):
raise SynapseError(
400, "bad value for 'max_rooms_per_space'", Codes.BAD_JSON
)
if max_rooms_per_space < 0:
raise SynapseError(
400,
"Value for 'max_rooms_per_space' must be a non-negative integer",
Codes.BAD_JSON,
)
return 200, await self.handler.federation_space_summary(
origin, room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
class FederationRoomHierarchyServlet(BaseFederationServlet):
PATH = "/hierarchy/(?P<room_id>[^/]*)"
@@ -751,6 +826,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
On3pidBindServlet,
FederationVersionServlet,
RoomComplexityServlet,
FederationSpaceSummaryServlet,
FederationRoomHierarchyServlet,
FederationRoomHierarchyUnstableServlet,
FederationV1SendKnockServlet,
+2 -2
View File
@@ -23,7 +23,7 @@ if TYPE_CHECKING:
class AccountHandler:
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self._store = hs.get_datastore()
self._is_mine = hs.is_mine
self._federation_client = hs.get_federation_client()
@@ -98,7 +98,7 @@ class AccountHandler:
"""
status = {"exists": False}
userinfo = await self._main_store.get_userinfo_by_id(user_id.to_string())
userinfo = await self._store.get_userinfo_by_id(user_id.to_string())
if userinfo is not None:
status = {
+2 -18
View File
@@ -38,7 +38,6 @@ class DeactivateAccountHandler:
self._profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
self._server_name = hs.hostname
self._third_party_rules = hs.get_third_party_event_rules()
# Flag that indicates whether the process to part users from rooms is running
self._user_parter_running = False
@@ -136,13 +135,9 @@ class DeactivateAccountHandler:
if erase_data:
user = UserID.from_string(user_id)
# Remove avatar URL from this user
await self._profile_handler.set_avatar_url(
user, requester, "", by_admin, deactivation=True
)
await self._profile_handler.set_avatar_url(user, requester, "", by_admin)
# Remove displayname from this user
await self._profile_handler.set_displayname(
user, requester, "", by_admin, deactivation=True
)
await self._profile_handler.set_displayname(user, requester, "", by_admin)
logger.info("Marking %s as erased", user_id)
await self.store.mark_user_erased(user_id)
@@ -165,13 +160,6 @@ class DeactivateAccountHandler:
# Remove account data (including ignored users and push rules).
await self.store.purge_account_data_for_user(user_id)
# Let modules know the user has been deactivated.
await self._third_party_rules.on_user_deactivation_status_changed(
user_id,
True,
by_admin,
)
return identity_server_supports_unbinding
async def _reject_pending_invites_for_user(self, user_id: str) -> None:
@@ -276,10 +264,6 @@ class DeactivateAccountHandler:
# Mark the user as active.
await self.store.set_user_deactivated_status(user_id, False)
await self._third_party_rules.on_user_deactivation_status_changed(
user_id, False, True
)
# Add the user to the directory, if necessary. Note that
# this must be done after the user is re-activated, because
# deactivated users are excluded from the user directory.
+1 -10
View File
@@ -519,17 +519,8 @@ class FederationHandler:
state_events=state,
)
if ret.partial_state:
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
max_stream_id = await self._federation_event_handler.process_remote_join(
origin,
room_id,
auth_chain,
state,
event,
room_version_obj,
partial_state=ret.partial_state,
origin, room_id, auth_chain, state, event, room_version_obj
)
# We wait here until this instance has seen the events come down
+2 -11
View File
@@ -397,7 +397,6 @@ class FederationEventHandler:
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
partial_state: bool,
) -> int:
"""Persists the events returned by a send_join
@@ -413,7 +412,6 @@ class FederationEventHandler:
event
room_version: The room version we expect this room to have, and
will raise if it doesn't match the version in the create event.
partial_state: True if the state omits non-critical membership events
Returns:
The stream ID after which all events have been persisted.
@@ -455,14 +453,10 @@ class FederationEventHandler:
)
# and now persist the join event itself.
logger.info(
"Peristing join-via-remote %s (partial_state: %s)", event, partial_state
)
logger.info("Peristing join-via-remote %s", event)
with nested_logging_context(suffix=event.event_id):
context = await self._state_handler.compute_event_context(
event,
old_state=state,
partial_state=partial_state,
event, old_state=state
)
context = await self._check_event_auth(origin, event, context)
@@ -704,8 +698,6 @@ class FederationEventHandler:
try:
state = await self._resolve_state_at_missing_prevs(origin, event)
# TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
# not return partial state
await self._process_received_pdu(
origin, event, state=state, backfilled=backfilled
)
@@ -1799,7 +1791,6 @@ class FederationEventHandler:
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=state_updates,
partial_state=context.partial_state,
)
async def _run_push_actions_and_persist_event(
+2 -4
View File
@@ -55,8 +55,8 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -992,8 +992,6 @@ class EventCreationHandler:
and full_state_ids_at_event
and builder.internal_metadata.is_historical()
):
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
old_state = await self.store.get_events_as_list(full_state_ids_at_event)
context = await self.state.compute_event_context(event, old_state=old_state)
else:
-14
View File
@@ -71,8 +71,6 @@ class ProfileHandler:
self.server_name = hs.config.server.server_name
self._third_party_rules = hs.get_third_party_event_rules()
if hs.config.worker.run_background_tasks:
self.clock.looping_call(
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS
@@ -173,7 +171,6 @@ class ProfileHandler:
requester: Requester,
new_displayname: str,
by_admin: bool = False,
deactivation: bool = False,
) -> None:
"""Set the displayname of a user
@@ -182,7 +179,6 @@ class ProfileHandler:
requester: The user attempting to make this change.
new_displayname: The displayname to give this user.
by_admin: Whether this change was made by an administrator.
deactivation: Whether this change was made while deactivating the user.
"""
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -231,10 +227,6 @@ class ProfileHandler:
target_user.to_string(), profile
)
await self._third_party_rules.on_profile_update(
target_user.to_string(), profile, by_admin, deactivation
)
await self._update_join_states(requester, target_user)
async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
@@ -269,7 +261,6 @@ class ProfileHandler:
requester: Requester,
new_avatar_url: str,
by_admin: bool = False,
deactivation: bool = False,
) -> None:
"""Set a new avatar URL for a user.
@@ -278,7 +269,6 @@ class ProfileHandler:
requester: The user attempting to make this change.
new_avatar_url: The avatar URL to give this user.
by_admin: Whether this change was made by an administrator.
deactivation: Whether this change was made while deactivating the user.
"""
if not self.hs.is_mine(target_user):
raise SynapseError(400, "User is not hosted on this homeserver")
@@ -325,10 +315,6 @@ class ProfileHandler:
target_user.to_string(), profile
)
await self._third_party_rules.on_profile_update(
target_user.to_string(), profile, by_admin, deactivation
)
await self._update_join_states(requester, target_user)
@cached()
+312 -11
View File
@@ -15,6 +15,7 @@
import itertools
import logging
import re
from collections import deque
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Set, Tuple
import attr
@@ -106,6 +107,153 @@ class RoomSummaryHandler:
"get_room_hierarchy",
)
async def get_space_summary(
self,
requester: str,
room_id: str,
suggested_only: bool = False,
max_rooms_per_space: Optional[int] = None,
) -> JsonDict:
"""
Implementation of the space summary C-S API
Args:
requester: user id of the user making this request
room_id: room id to start the summary at
suggested_only: whether we should only return children with the "suggested"
flag set.
max_rooms_per_space: an optional limit on the number of child rooms we will
return. This does not apply to the root room (ie, room_id), and
is overridden by MAX_ROOMS_PER_SPACE.
Returns:
summary dict to return
"""
# First of all, check that the room is accessible.
if not await self._is_local_room_accessible(room_id, requester):
raise AuthError(
403,
"User %s not in room %s, and room previews are disabled"
% (requester, room_id),
)
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(room_id, ()),))
# rooms we have already processed
processed_rooms: Set[str] = set()
# events we have already processed. We don't necessarily have their event ids,
# so instead we key on (room id, state key)
processed_events: Set[Tuple[str, str]] = set()
rooms_result: List[JsonDict] = []
events_result: List[JsonDict] = []
if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE:
max_rooms_per_space = MAX_ROOMS_PER_SPACE
while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
if room_id in processed_rooms:
# already done this room
continue
logger.debug("Processing room %s", room_id)
is_in_room = await self._store.is_host_joined(room_id, self._server_name)
# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else MAX_ROOMS
if is_in_room:
room_entry = await self._summarize_local_room(
requester, None, room_id, suggested_only, max_children
)
events: Sequence[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children_state_events
logger.debug(
"Query of local room %s returned events %s",
room_id,
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
else:
fed_rooms = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
exclude_rooms=processed_rooms,
)
# The results over federation might include rooms that the we,
# as the requesting server, are allowed to see, but the requesting
# user is not permitted see.
#
# Filter the returned results to only what is accessible to the user.
events = []
for room_entry in fed_rooms:
room = room_entry.room
fed_room_id = room_entry.room_id
# The user can see the room, include it!
if await self._is_remote_room_accessible(
requester, fed_room_id, room
):
# Before returning to the client, remove the allowed_room_ids
# and allowed_spaces keys.
room.pop("allowed_room_ids", None)
room.pop("allowed_spaces", None) # historical
rooms_result.append(room)
events.extend(room_entry.children_state_events)
# All rooms returned don't need visiting again (even if the user
# didn't have access to them).
processed_rooms.add(fed_room_id)
logger.debug(
"Query of %s returned rooms %s, events %s",
room_id,
[room_entry.room.get("room_id") for room_entry in fed_rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
# the room we queried may or may not have been returned, but don't process
# it again, anyway.
processed_rooms.add(room_id)
# XXX: is it ok that we blindly iterate through any events returned by
# a remote server, whether or not they actually link to any rooms in our
# tree?
for ev in events:
# remote servers might return events we have already processed
# (eg, Dendrite returns inward pointers as well as outward ones), so
# we need to filter them out, to avoid returning duplicate links to the
# client.
ev_key = (ev["room_id"], ev["state_key"])
if ev_key in processed_events:
continue
events_result.append(ev)
# add the child to the queue. we have already validated
# that the vias are a list of server names.
room_queue.append(
_RoomQueueEntry(ev["state_key"], ev["content"]["via"])
)
processed_events.add(ev_key)
return {"rooms": rooms_result, "events": events_result}
async def get_room_hierarchy(
self,
requester: Requester,
@@ -250,6 +398,8 @@ class RoomSummaryHandler:
None,
room_id,
suggested_only,
# Do not limit the maximum children.
max_children=None,
)
# Otherwise, attempt to use information for federation.
@@ -338,6 +488,74 @@ class RoomSummaryHandler:
return result
async def federation_space_summary(
self,
origin: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: Iterable[str],
) -> JsonDict:
"""
Implementation of the space summary Federation API
Args:
origin: The server requesting the spaces summary.
room_id: room id to start the summary at
suggested_only: whether we should only return children with the "suggested"
flag set.
max_rooms_per_space: an optional limit on the number of child rooms we will
return. Unlike the C-S API, this applies to the root room (room_id).
It is clipped to MAX_ROOMS_PER_SPACE.
exclude_rooms: a list of rooms to skip over (presumably because the
calling server has already seen them).
Returns:
summary dict to return
"""
# the queue of rooms to process
room_queue = deque((room_id,))
# the set of rooms that we should not walk further. Initialise it with the
# excluded-rooms list; we will add other rooms as we process them so that
# we do not loop.
processed_rooms: Set[str] = set(exclude_rooms)
rooms_result: List[JsonDict] = []
events_result: List[JsonDict] = []
# Set a limit on the number of rooms to return.
if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE:
max_rooms_per_space = MAX_ROOMS_PER_SPACE
while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft()
if room_id in processed_rooms:
# already done this room
continue
room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, max_rooms_per_space
)
processed_rooms.add(room_id)
if room_entry:
rooms_result.append(room_entry.room)
events_result.extend(room_entry.children_state_events)
# add any children to the queue
room_queue.extend(
edge_event["state_key"]
for edge_event in room_entry.children_state_events
)
return {"rooms": rooms_result, "events": events_result}
async def get_federation_hierarchy(
self,
origin: str,
@@ -361,7 +579,7 @@ class RoomSummaryHandler:
The JSON hierarchy dictionary.
"""
root_room_entry = await self._summarize_local_room(
None, origin, requested_room_id, suggested_only
None, origin, requested_room_id, suggested_only, max_children=None
)
if root_room_entry is None:
# Room is inaccessible to the requesting server.
@@ -382,7 +600,7 @@ class RoomSummaryHandler:
continue
room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, include_children=False
None, origin, room_id, suggested_only, max_children=0
)
# If the room is accessible, include it in the results.
#
@@ -408,7 +626,7 @@ class RoomSummaryHandler:
origin: Optional[str],
room_id: str,
suggested_only: bool,
include_children: bool = True,
max_children: Optional[int],
) -> Optional["_RoomEntry"]:
"""
Generate a room entry and a list of event entries for a given room.
@@ -423,8 +641,9 @@ class RoomSummaryHandler:
room_id: The room ID to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.
include_children:
Whether to include the events of any children.
max_children:
The maximum number of children rooms to include. A value of None
means no limit.
Returns:
A room entry if the room should be returned. None, otherwise.
@@ -434,8 +653,9 @@ class RoomSummaryHandler:
room_entry = await self._build_room_entry(room_id, for_federation=bool(origin))
# If the room is not a space return just the room information.
if room_entry.get("room_type") != RoomTypes.SPACE or not include_children:
# If the room is not a space or the children don't matter, return just
# the room information.
if room_entry.get("room_type") != RoomTypes.SPACE or max_children == 0:
return _RoomEntry(room_id, room_entry)
# Otherwise, look for child rooms/spaces.
@@ -445,6 +665,14 @@ class RoomSummaryHandler:
# we only care about suggested children
child_events = filter(_is_suggested_child_event, child_events)
# TODO max_children is legacy code for the /spaces endpoint.
if max_children is not None:
child_iter: Iterable[EventBase] = itertools.islice(
child_events, max_children
)
else:
child_iter = child_events
stripped_events: List[JsonDict] = [
{
"type": e.type,
@@ -454,10 +682,80 @@ class RoomSummaryHandler:
"sender": e.sender,
"origin_server_ts": e.origin_server_ts,
}
for e in child_events
for e in child_iter
]
return _RoomEntry(room_id, room_entry, stripped_events)
async def _summarize_remote_room(
self,
room: "_RoomQueueEntry",
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
) -> Iterable["_RoomEntry"]:
"""
Request room entries and a list of event entries for a given room by querying a remote server.
Args:
room: The room to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.
max_children:
The maximum number of children rooms to include. This is capped
to a server-set limit.
exclude_rooms:
Rooms IDs which do not need to be summarized.
Returns:
An iterable of room entries.
"""
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
# we need to make the exclusion list json-serialisable
exclude_rooms = list(exclude_rooms)
via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
res = await self._federation_client.get_space_summary(
via,
room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_children,
exclude_rooms=exclude_rooms,
)
except Exception as e:
logger.warning(
"Unable to get summary of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return ()
# Group the events by their room.
children_by_room: Dict[str, List[JsonDict]] = {}
for ev in res.events:
if ev.event_type == EventTypes.SpaceChild:
children_by_room.setdefault(ev.room_id, []).append(ev.data)
# Generate the final results.
results = []
for fed_room in res.rooms:
fed_room_id = fed_room.get("room_id")
if not fed_room_id or not isinstance(fed_room_id, str):
continue
results.append(
_RoomEntry(
fed_room_id,
fed_room,
children_by_room.get(fed_room_id, []),
)
)
return results
async def _summarize_remote_room_hierarchy(
self, room: "_RoomQueueEntry", suggested_only: bool
) -> Tuple[Optional["_RoomEntry"], Dict[str, JsonDict], Set[str]]:
@@ -660,8 +958,9 @@ class RoomSummaryHandler:
):
return True
# Check if the user is a member of any of the allowed rooms from the response.
allowed_rooms = room.get("allowed_room_ids")
# Check if the user is a member of any of the allowed spaces
# from the response.
allowed_rooms = room.get("allowed_room_ids") or room.get("allowed_spaces")
if allowed_rooms and isinstance(allowed_rooms, list):
if await self._event_auth_handler.is_user_in_rooms(
allowed_rooms, requester
@@ -729,6 +1028,8 @@ class RoomSummaryHandler:
)
if allowed_rooms:
entry["allowed_room_ids"] = allowed_rooms
# TODO Remove this key once the API is stable.
entry["allowed_spaces"] = allowed_rooms
# Filter out Nones rather omit the field altogether
room_entry = {k: v for k, v in entry.items() if v is not None}
@@ -793,7 +1094,7 @@ class RoomSummaryHandler:
room_id,
# Suggested-only doesn't matter since no children are requested.
suggested_only=False,
include_children=False,
max_children=0,
)
if not room_entry:
-9
View File
@@ -697,15 +697,6 @@ class SyncHandler:
else:
# no events in this room - so presumably no state
state = {}
# (erikj) This should be rarely hit, but we've had some reports that
# we get more state down gappy syncs than we should, so let's add
# some logging.
logger.info(
"Failed to find any events in room %s at %s",
room_id,
stream_position.room_key,
)
return state
async def compute_summary(
-9
View File
@@ -59,8 +59,6 @@ from synapse.events.third_party_rules import (
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK,
ON_CREATE_ROOM_CALLBACK,
ON_NEW_EVENT_CALLBACK,
ON_PROFILE_UPDATE_CALLBACK,
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK,
)
from synapse.handlers.account_validity import (
IS_USER_EXPIRED_CALLBACK,
@@ -147,7 +145,6 @@ __all__ = [
"JsonDict",
"EventBase",
"StateMap",
"ProfileInfo",
]
logger = logging.getLogger(__name__)
@@ -283,10 +280,6 @@ class ModuleApi:
CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK
] = None,
on_new_event: Optional[ON_NEW_EVENT_CALLBACK] = None,
on_profile_update: Optional[ON_PROFILE_UPDATE_CALLBACK] = None,
on_user_deactivation_status_changed: Optional[
ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK
] = None,
) -> None:
"""Registers callbacks for third party event rules capabilities.
@@ -298,8 +291,6 @@ class ModuleApi:
check_threepid_can_be_invited=check_threepid_can_be_invited,
check_visibility_can_be_modified=check_visibility_can_be_modified,
on_new_event=on_new_event,
on_profile_update=on_profile_update,
on_user_deactivation_status_changed=on_user_deactivation_status_changed,
)
def register_presence_router_callbacks(
+4 -4
View File
@@ -15,12 +15,12 @@
import logging
import re
from typing import Any, Dict, List, Mapping, Optional, Pattern, Tuple, Union
from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
from matrix_common.regex import glob_to_regex, to_word_pattern
from synapse.events import EventBase
from synapse.types import UserID
from synapse.types import JsonDict, UserID
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -223,7 +223,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
def _flatten_dict(
d: Union[EventBase, Mapping[str, Any]],
d: Union[EventBase, JsonDict],
prefix: Optional[List[str]] = None,
result: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
@@ -234,7 +234,7 @@ def _flatten_dict(
for key, value in d.items():
if isinstance(value, str):
result[".".join(prefix + [key])] = value.lower()
elif isinstance(value, Mapping):
elif isinstance(value, dict):
_flatten_dict(value, prefix=(prefix + [key]), result=result)
return result
+104 -3
View File
@@ -17,7 +17,14 @@
import itertools
import logging
from typing import Set
from typing import List, Set
from pkg_resources import (
DistributionNotFound,
Requirement,
VersionConflict,
get_provider,
)
logger = logging.getLogger(__name__)
@@ -83,8 +90,6 @@ REQUIREMENTS = [
# ijson 3.1.4 fixes a bug with "." in property names
"ijson>=3.1.4",
"matrix-common~=1.1.0",
# We need packaging.requirements.Requirement, added in 16.1.
"packaging>=16.1",
]
CONDITIONAL_REQUIREMENTS = {
@@ -139,6 +144,102 @@ def list_requirements():
return list(set(REQUIREMENTS) | ALL_OPTIONAL_REQUIREMENTS)
class DependencyException(Exception):
@property
def message(self):
return "\n".join(
[
"Missing Requirements: %s" % (", ".join(self.dependencies),),
"To install run:",
" pip install --upgrade --force %s" % (" ".join(self.dependencies),),
"",
]
)
@property
def dependencies(self):
for i in self.args[0]:
yield '"' + i + '"'
def check_requirements(for_feature=None):
deps_needed = []
errors = []
if for_feature:
reqs = CONDITIONAL_REQUIREMENTS[for_feature]
else:
reqs = REQUIREMENTS
for dependency in reqs:
try:
_check_requirement(dependency)
except VersionConflict as e:
deps_needed.append(dependency)
errors.append(
"Needed %s, got %s==%s"
% (
dependency,
e.dist.project_name, # type: ignore[attr-defined] # noqa
e.dist.version, # type: ignore[attr-defined] # noqa
)
)
except DistributionNotFound:
deps_needed.append(dependency)
if for_feature:
errors.append(
"Needed %s for the '%s' feature but it was not installed"
% (dependency, for_feature)
)
else:
errors.append("Needed %s but it was not installed" % (dependency,))
if not for_feature:
# Check the optional dependencies are up to date. We allow them to not be
# installed.
OPTS: List[str] = sum(CONDITIONAL_REQUIREMENTS.values(), [])
for dependency in OPTS:
try:
_check_requirement(dependency)
except VersionConflict as e:
deps_needed.append(dependency)
errors.append(
"Needed optional %s, got %s==%s"
% (
dependency,
e.dist.project_name, # type: ignore[attr-defined] # noqa
e.dist.version, # type: ignore[attr-defined] # noqa
)
)
except DistributionNotFound:
# If it's not found, we don't care
pass
if deps_needed:
for err in errors:
logging.error(err)
raise DependencyException(deps_needed)
def _check_requirement(dependency_string):
"""Parses a dependency string, and checks if the specified requirement is installed
Raises:
VersionConflict if the requirement is installed, but with the the wrong version
DistributionNotFound if nothing is found to provide the requirement
"""
req = Requirement.parse(dependency_string)
# first check if the markers specify that this requirement needs installing
if req.marker is not None and not req.marker.evaluate():
# not required for this environment
return
get_provider(req)
if __name__ == "__main__":
import sys
+3 -1
View File
@@ -227,6 +227,8 @@ class ReplicationCommandHandler:
self._is_master = hs.config.worker.worker_app is None
self._is_user_ip_handler = self._is_master # TODO
self._federation_sender = None
if self._is_master and not hs.config.worker.send_federation:
self._federation_sender = hs.get_federation_sender()
@@ -403,7 +405,7 @@ class ReplicationCommandHandler:
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()
if self._is_master:
if self._is_user_ip_handler:
return self._handle_user_ip(cmd)
else:
return None
+3
View File
@@ -904,6 +904,9 @@ class AccountStatusRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._store = hs.get_datastore()
self._is_mine = hs.is_mine
self._federation_client = hs.get_federation_client()
self._account_handler = hs.get_account_handler()
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+3
View File
@@ -72,6 +72,9 @@ class CapabilitiesRestServlet(RestServlet):
"org.matrix.msc3244.room_capabilities"
] = MSC3244_CAPABILITIES
if self.config.experimental.msc3440_enabled:
response["capabilities"]["io.element.thread"] = {"enabled": True}
if self.config.experimental.msc3720_enabled:
response["capabilities"]["org.matrix.msc3720.account_status"] = {
"enabled": True,
+68
View File
@@ -1141,6 +1141,73 @@ class TimestampLookupRestServlet(RestServlet):
}
class RoomSpaceSummaryRestServlet(RestServlet):
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc2946"
"/rooms/(?P<room_id>[^/]*)/spaces$"
),
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._room_summary_handler = hs.get_room_summary_handler()
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request, allow_guest=True)
max_rooms_per_space = parse_integer(request, "max_rooms_per_space")
if max_rooms_per_space is not None and max_rooms_per_space < 0:
raise SynapseError(
400,
"Value for 'max_rooms_per_space' must be a non-negative integer",
Codes.BAD_JSON,
)
return 200, await self._room_summary_handler.get_space_summary(
requester.user.to_string(),
room_id,
suggested_only=parse_boolean(request, "suggested_only", default=False),
max_rooms_per_space=max_rooms_per_space,
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request)
suggested_only = content.get("suggested_only", False)
if not isinstance(suggested_only, bool):
raise SynapseError(
400, "'suggested_only' must be a boolean", Codes.BAD_JSON
)
max_rooms_per_space = content.get("max_rooms_per_space")
if max_rooms_per_space is not None:
if not isinstance(max_rooms_per_space, int):
raise SynapseError(
400, "'max_rooms_per_space' must be an integer", Codes.BAD_JSON
)
if max_rooms_per_space < 0:
raise SynapseError(
400,
"Value for 'max_rooms_per_space' must be a non-negative integer",
Codes.BAD_JSON,
)
return 200, await self._room_summary_handler.get_space_summary(
requester.user.to_string(),
room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
)
class RoomHierarchyRestServlet(RestServlet):
PATTERNS = (
re.compile(
@@ -1234,6 +1301,7 @@ def register_servlets(
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
RoomSpaceSummaryRestServlet(hs).register(http_server)
RoomHierarchyRestServlet(hs).register(http_server)
if hs.config.experimental.msc3266_enabled:
RoomSummaryRestServlet(hs).register(http_server)
-2
View File
@@ -99,8 +99,6 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc2716": self.config.experimental.msc2716_enabled,
# Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
# Adds support for thread relations, per MSC3440.
"org.matrix.msc3440": self.config.experimental.msc3440_enabled,
},
},
)
@@ -54,6 +54,8 @@ class ServerNoticesSender(WorkerServerNoticesSender):
Args:
user_id: mxid
"""
# TODO this will need to run on the user IP handler, at least in a
# stubbed capacity!
# The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing
+2 -29
View File
@@ -258,10 +258,7 @@ class StateHandler:
return await self.store.get_joined_hosts(room_id, entry)
async def compute_event_context(
self,
event: EventBase,
old_state: Optional[Iterable[EventBase]] = None,
partial_state: bool = False,
self, event: EventBase, old_state: Optional[Iterable[EventBase]] = None
) -> EventContext:
"""Build an EventContext structure for a non-outlier event.
@@ -276,8 +273,6 @@ class StateHandler:
calculated from existing events. This is normally only specified
when receiving an event from federation where we don't have the
prev events for, e.g. when backfilling.
partial_state: True if `old_state` is partial and omits non-critical
membership events
Returns:
The event context.
"""
@@ -300,28 +295,8 @@ class StateHandler:
else:
# otherwise, we'll need to resolve the state across the prev_events.
# partial_state should not be set explicitly in this case:
# we work it out dynamically
assert not partial_state
# if any of the prev-events have partial state, so do we.
# (This is slightly racy - the prev-events might get fixed up before we use
# their states - but I don't think that really matters; it just means we
# might redundantly recalculate the state for this event later.)
prev_event_ids = event.prev_event_ids()
incomplete_prev_events = await self.store.get_partial_state_events(
prev_event_ids
)
if any(incomplete_prev_events.values()):
logger.debug(
"New/incoming event %s refers to prev_events %s with partial state",
event.event_id,
[k for (k, v) in incomplete_prev_events.items() if v],
)
partial_state = True
logger.debug("calling resolve_state_groups from compute_event_context")
entry = await self.resolve_state_groups_for_events(
event.room_id, event.prev_event_ids()
)
@@ -367,7 +342,6 @@ class StateHandler:
prev_state_ids=state_ids_before_event,
prev_group=state_group_before_event_prev_group,
delta_ids=deltas_to_state_group_before_event,
partial_state=partial_state,
)
#
@@ -399,7 +373,6 @@ class StateHandler:
prev_state_ids=state_ids_before_event,
prev_group=state_group_before_event,
delta_ids=delta_ids,
partial_state=partial_state,
)
@measure_func()

Some files were not shown because too many files have changed in this diff Show More