1
0

Compare commits

...

31 Commits

Author SHA1 Message Date
Erik Johnston cbb1575486 Merge branch 'develop' into erikj/less_state_membership 2022-08-24 09:58:29 +01:00
Eric Eastwood 7af07f9716 Instrument _check_sigs_and_hash_and_fetch to trace time spent in child concurrent calls (#13588)
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls because I've see `_check_sigs_and_hash_and_fetch` take [10.41s to process 100 events](https://github.com/matrix-org/synapse/issues/13587)

Fix https://github.com/matrix-org/synapse/issues/13587

Part of https://github.com/matrix-org/synapse/issues/13356
2022-08-23 21:53:37 -05:00
David Robertson a25a37002c Write about the chain cover a little. (#13602)
Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
2022-08-23 17:41:55 +00:00
Erik Johnston f7ddfe17a3 Speed up @cachedList (#13591)
This speeds things up by ~2x.

The vast majority of the time is now spent in `LruCache` moving things around the linked lists.

We do this via two things:
1. Don't create a deferred per-key during bulk set operations in `DeferredCache`. Instead, only create them if a subsequent caller asks for the key.
2. Add a bulk lookup API to `DeferredCache` rather than use a loop.
2022-08-23 14:53:27 +00:00
Erik Johnston 05c9c7363b Fix regression caused by #13573 (#13600)
Broke in #13573.
2022-08-23 14:14:05 +00:00
nilsKr3 bdfff9c36e Update openid.md (#13568)
Linking the help article may prevent confusion regarding the creation of the necessary rule using auth0.
2022-08-23 14:34:10 +01:00
David Robertson ca3d19b05f Merge tag 'v1.66.0rc1' into develop
Synapse 1.66.0rc1 (2022-08-23)
==============================

This release removes the ability for homeservers to delegate email ownership
verification and password reset confirmation to identity servers. This removal
was originally planned for Synapse 1.64, but was later deferred until now.

See the [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.

Features
--------

- Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken). ([\#13188](https://github.com/matrix-org/synapse/issues/13188), [\#13563](https://github.com/matrix-org/synapse/issues/13563))
- Add forgotten status to [Room Details Admin API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-details-api). ([\#13503](https://github.com/matrix-org/synapse/issues/13503))
- Add an experimental implementation for [MSC3852 (Expose user agents on `Device`)](https://github.com/matrix-org/matrix-spec-proposals/pull/3852). ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
- Add `org.matrix.msc2716v4` experimental room version with updated content fields. Part of [MSC2716 (Importing history)](https://github.com/matrix-org/matrix-spec-proposals/pull/2716).  ([\#13551](https://github.com/matrix-org/synapse/issues/13551))
- Add support for compression to federation responses. ([\#13537](https://github.com/matrix-org/synapse/issues/13537))
- Improve performance of sending messages in rooms with thousands of local users. ([\#13522](https://github.com/matrix-org/synapse/issues/13522), [\#13547](https://github.com/matrix-org/synapse/issues/13547))

Bugfixes
--------

- Faster room joins: make `/joined_members` block whilst the room is partial stated. ([\#13514](https://github.com/matrix-org/synapse/issues/13514))
- Fix a bug introduced in Synapse 1.21.0 where the [`/event_reports` Admin API](https://matrix-org.github.io/synapse/develop/admin_api/event_reports.html) could return a total count which was larger than the number of results you can actually query for. ([\#13525](https://github.com/matrix-org/synapse/issues/13525))
- Fix a bug introduced in Synapse 1.52.0 where sending server notices fails if `max_avatar_size` or `allowed_avatar_mimetypes` is set and not `system_mxid_avatar_url`. ([\#13566](https://github.com/matrix-org/synapse/issues/13566))
- Fix a bug where the `opentracing.force_tracing_for_users` config option would not apply to [`/sendToDevice`](https://spec.matrix.org/v1.3/client-server-api/#put_matrixclientv3sendtodeviceeventtypetxnid) and [`/keys/upload`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3keysupload) requests. ([\#13574](https://github.com/matrix-org/synapse/issues/13574))

Improved Documentation
----------------------

- Add `openssl` example for generating registration HMAC digest. ([\#13472](https://github.com/matrix-org/synapse/issues/13472))
- Tidy up Synapse's README. ([\#13491](https://github.com/matrix-org/synapse/issues/13491))
- Document that event purging related to the `redaction_retention_period` config option is executed only every 5 minutes. ([\#13492](https://github.com/matrix-org/synapse/issues/13492))
- Add a warning to retention documentation regarding the possibility of database corruption. ([\#13497](https://github.com/matrix-org/synapse/issues/13497))
- Document that the `DOCKER_BUILDKIT=1` flag is needed to build the docker image. ([\#13515](https://github.com/matrix-org/synapse/issues/13515))
- Add missing links in `user_consent` section of configuration manual. ([\#13536](https://github.com/matrix-org/synapse/issues/13536))
- Fix the doc and some warnings that were referring to the nonexistent `custom_templates_directory` setting (instead of `custom_template_directory`). ([\#13538](https://github.com/matrix-org/synapse/issues/13538))

Deprecations and Removals
-------------------------

- Remove the ability for homeservers to delegate email ownership verification
  and password reset confirmation to identity servers. See [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.

Internal Changes
----------------

- Update the rejected state of events during de-partial-stating. ([\#13459](https://github.com/matrix-org/synapse/issues/13459))
- Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state. ([\#13477](https://github.com/matrix-org/synapse/issues/13477))
- Refuse to start when faster joins is enabled on a deployment with workers, since worker configurations are not currently supported. ([\#13531](https://github.com/matrix-org/synapse/issues/13531))

- Allow use of both `@trace` and `@tag_args` stacked on the same function. ([\#13453](https://github.com/matrix-org/synapse/issues/13453))
- Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger. ([\#13489](https://github.com/matrix-org/synapse/issues/13489))
- Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger. ([\#13499](https://github.com/matrix-org/synapse/issues/13499), [\#13554](https://github.com/matrix-org/synapse/issues/13554))
- Track HTTP response times over 10 seconds from `/messages` (`synapse_room_message_list_rest_servlet_response_time_seconds`). ([\#13533](https://github.com/matrix-org/synapse/issues/13533))
- Add metrics to track how the rate limiter is affecting requests (sleep/reject). ([\#13534](https://github.com/matrix-org/synapse/issues/13534), [\#13541](https://github.com/matrix-org/synapse/issues/13541))
- Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`). ([\#13535](https://github.com/matrix-org/synapse/issues/13535), [\#13584](https://github.com/matrix-org/synapse/issues/13584))
- Add metrics to track rate limiter queue timing (`synapse_rate_limit_queue_wait_time_seconds`). ([\#13544](https://github.com/matrix-org/synapse/issues/13544))
- Update metrics to track `/messages` response time by room size. ([\#13545](https://github.com/matrix-org/synapse/issues/13545))

- Refactor methods in `synapse.api.auth.Auth` to use `Requester` objects everywhere instead of user IDs. ([\#13024](https://github.com/matrix-org/synapse/issues/13024))
- Clean-up tests for notifications. ([\#13471](https://github.com/matrix-org/synapse/issues/13471))
- Add some miscellaneous comments to document sync, especially around `compute_state_delta`. ([\#13474](https://github.com/matrix-org/synapse/issues/13474))
- Use literals in place of `HTTPStatus` constants in tests. ([\#13479](https://github.com/matrix-org/synapse/issues/13479), [\#13488](https://github.com/matrix-org/synapse/issues/13488))
- Add comments about how event push actions are rotated. ([\#13485](https://github.com/matrix-org/synapse/issues/13485))
- Modify HTML template content to better support mobile devices' screen sizes. ([\#13493](https://github.com/matrix-org/synapse/issues/13493))
- Add a linter script which will reject non-strict types in Pydantic models. ([\#13502](https://github.com/matrix-org/synapse/issues/13502))
- Reduce the number of tests using legacy TCP replication. ([\#13543](https://github.com/matrix-org/synapse/issues/13543))
- Allow specifying additional request fields when using the `HomeServerTestCase.login` helper method. ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
- Make `HomeServerTestCase` load any configured homeserver modules automatically. ([\#13558](https://github.com/matrix-org/synapse/issues/13558))
2022-08-23 14:00:09 +01:00
Erik Johnston aec87a0f93 Speed up fetching large numbers of push rules (#13592) 2022-08-23 13:15:43 +01:00
David Robertson ea85a2bf6c Remove manually-added changelog 2022-08-23 12:40:35 +01:00
David Robertson 956e015413 Drop support for delegating email validation, round 2 (#13596) 2022-08-23 11:40:00 +00:00
Nick Mills-Barrett 5e7847dc92 Cache user IDs instead of profile objects (#13573)
The profile objects are never used and increase cache size significantly.
2022-08-23 09:49:59 +00:00
David Robertson 79281f517d Update changelog 2022-08-23 10:22:47 +01:00
David Robertson f8b9abdcdb Adjust changelog 2022-08-23 10:10:20 +01:00
David Robertson d6f5699737 Describe changes to admin API in 1.66
Cross-ref #13525
2022-08-23 10:09:45 +01:00
David Robertson f0b23927fc 1.66.0rc1 2022-08-23 09:49:51 +01:00
Dirk Klimpel 37f329c9ad Fix that sending server notices fail if avatar is None (#13566)
Indroduced in #11846.
2022-08-23 09:48:35 +01:00
Eric Eastwood 9385c41ba4 Fix Prometheus metrics being negative (mixed up start/end) (#13584)
Fix:

 - https://github.com/matrix-org/synapse/pull/13535#discussion_r949582508
 - https://github.com/matrix-org/synapse/pull/13533#discussion_r949577244
2022-08-23 08:47:30 +01:00
Quentin Gliech 3dd175b628 synapse.api.auth.Auth cleanup: make permission-related methods use Requester instead of the UserID (#13024)
Part of #13019

This changes all the permission-related methods to rely on the Requester instead of the UserID. This is a first step towards enabling scoped access tokens at some point, since I expect the Requester to have scope-related informations in it.

It also changes methods which figure out the user/device/appservice out of the access token to return a Requester instead of something else. This avoids having store-related objects in the methods signatures.
2022-08-22 14:17:59 +01:00
Andrew Morgan 94375f7a91 Remove redundant opentracing spans for /sendToDevice and /keys/upload (#13574) 2022-08-22 10:03:11 +01:00
Eric Eastwood 06df5d4250 MSC2716v4 room version - remove namespace from MSC2716 event content fields (#13551)
Complement PR: https://github.com/matrix-org/complement/pull/450

As suggested in
https://github.com/matrix-org/matrix-spec-proposals/pull/2716#discussion_r941444525
2022-08-19 15:37:01 -05:00
Andrew Morgan f9f03426de Implement MSC3852: Expose last_seen_user_agent to users for their own devices; also expose to Admin API (#13549) 2022-08-19 16:17:10 +00:00
Andrew Morgan 40e3e68cd7 Register homeserver modules when creating test homeserver (#13558) 2022-08-19 16:52:20 +01:00
Patrick Cloke f3fba4914d Reduce the number of tests using TCP replication. (#13543)
Uses Redis replication in additional test cases (instead of
TCP replication). A small step towards dropping TCP replication.
2022-08-19 08:25:24 -04:00
reivilibre 3a245f6cfe Fix validation problem that occurs when a user tries to deactivate their account or change their password. (#13563) 2022-08-19 11:03:29 +00:00
Eric Eastwood 2c42673a9b Add metrics to track /messages response time by room size (#13545)
Follow-up to https://github.com/matrix-org/synapse/pull/13533

Part of https://github.com/matrix-org/synapse/issues/13356
2022-08-18 14:15:53 -05:00
Sean Quah b251cff819 Fix incorrect juggling of logging contexts in _PerHostRatelimiter (#13554)
Signed-off-by: Sean Quah <seanq@matrix.org>

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2022-08-18 16:26:26 +01:00
Eric Eastwood d64653d062 Track number of hosts affected by the rate limiter (#13541)
Track number of hosts affected by the rate limiter so we can differentiate one really noisy homeserver from a general ratelimit tuning problem across the federation.

Follow-up to https://github.com/matrix-org/synapse/pull/13534

Part of https://github.com/matrix-org/synapse/issues/13356
2022-08-18 10:05:07 -05:00
Ayush Anand 22ea51faf9 Add support for compression to federation responses (#13537)
Closes #13415.

Signed-off-by: Ayush Anand <iamayushanand@gmail.com>
2022-08-18 15:14:47 +01:00
Sean Quah 84169a82dc Avoid blocking lazy-loading /syncs during partial joins (#13477)
Use a state filter or accept partial state in a few places where we
request state, to avoid blocking.

To make lazy-loading `/sync`s work, we need to provide the memberships
of event senders, which are not guaranteed to be in the room state.
Instead we dig through auth events for memberships to present to
clients. The auth events of an event are guaranteed to contain a
passable membership event, otherwise the event would have been rejected.

Note that this only covers the common code paths encountered during
testing. There has been no exhaustive checking of all sync code paths.

Fixes #13146.

Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-18 11:53:02 +01:00
Erik Johnston 237f03d85b Newsfile 2022-07-21 10:45:08 +01:00
Erik Johnston d3f36d0f64 Don't fetch the full state during membership changes 2022-07-21 10:45:08 +01:00
117 changed files with 1584 additions and 915 deletions
+81
View File
@@ -1,3 +1,84 @@
Synapse 1.66.0rc1 (2022-08-23)
==============================
This release removes the ability for homeservers to delegate email ownership
verification and password reset confirmation to identity servers. This removal
was originally planned for Synapse 1.64, but was later deferred until now.
See the [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.
Features
--------
- Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken). ([\#13188](https://github.com/matrix-org/synapse/issues/13188), [\#13563](https://github.com/matrix-org/synapse/issues/13563))
- Add forgotten status to [Room Details Admin API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-details-api). ([\#13503](https://github.com/matrix-org/synapse/issues/13503))
- Add an experimental implementation for [MSC3852 (Expose user agents on `Device`)](https://github.com/matrix-org/matrix-spec-proposals/pull/3852). ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
- Add `org.matrix.msc2716v4` experimental room version with updated content fields. Part of [MSC2716 (Importing history)](https://github.com/matrix-org/matrix-spec-proposals/pull/2716). ([\#13551](https://github.com/matrix-org/synapse/issues/13551))
- Add support for compression to federation responses. ([\#13537](https://github.com/matrix-org/synapse/issues/13537))
- Improve performance of sending messages in rooms with thousands of local users. ([\#13522](https://github.com/matrix-org/synapse/issues/13522), [\#13547](https://github.com/matrix-org/synapse/issues/13547))
Bugfixes
--------
- Faster room joins: make `/joined_members` block whilst the room is partial stated. ([\#13514](https://github.com/matrix-org/synapse/issues/13514))
- Fix a bug introduced in Synapse 1.21.0 where the [`/event_reports` Admin API](https://matrix-org.github.io/synapse/develop/admin_api/event_reports.html) could return a total count which was larger than the number of results you can actually query for. ([\#13525](https://github.com/matrix-org/synapse/issues/13525))
- Fix a bug introduced in Synapse 1.52.0 where sending server notices fails if `max_avatar_size` or `allowed_avatar_mimetypes` is set and not `system_mxid_avatar_url`. ([\#13566](https://github.com/matrix-org/synapse/issues/13566))
- Fix a bug where the `opentracing.force_tracing_for_users` config option would not apply to [`/sendToDevice`](https://spec.matrix.org/v1.3/client-server-api/#put_matrixclientv3sendtodeviceeventtypetxnid) and [`/keys/upload`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3keysupload) requests. ([\#13574](https://github.com/matrix-org/synapse/issues/13574))
Improved Documentation
----------------------
- Add `openssl` example for generating registration HMAC digest. ([\#13472](https://github.com/matrix-org/synapse/issues/13472))
- Tidy up Synapse's README. ([\#13491](https://github.com/matrix-org/synapse/issues/13491))
- Document that event purging related to the `redaction_retention_period` config option is executed only every 5 minutes. ([\#13492](https://github.com/matrix-org/synapse/issues/13492))
- Add a warning to retention documentation regarding the possibility of database corruption. ([\#13497](https://github.com/matrix-org/synapse/issues/13497))
- Document that the `DOCKER_BUILDKIT=1` flag is needed to build the docker image. ([\#13515](https://github.com/matrix-org/synapse/issues/13515))
- Add missing links in `user_consent` section of configuration manual. ([\#13536](https://github.com/matrix-org/synapse/issues/13536))
- Fix the doc and some warnings that were referring to the nonexistent `custom_templates_directory` setting (instead of `custom_template_directory`). ([\#13538](https://github.com/matrix-org/synapse/issues/13538))
Deprecations and Removals
-------------------------
- Remove the ability for homeservers to delegate email ownership verification
and password reset confirmation to identity servers. See [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.
Internal Changes
----------------
### Faster room joins
- Update the rejected state of events during de-partial-stating. ([\#13459](https://github.com/matrix-org/synapse/issues/13459))
- Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state. ([\#13477](https://github.com/matrix-org/synapse/issues/13477))
- Refuse to start when faster joins is enabled on a deployment with workers, since worker configurations are not currently supported. ([\#13531](https://github.com/matrix-org/synapse/issues/13531))
### Metrics and tracing
- Allow use of both `@trace` and `@tag_args` stacked on the same function. ([\#13453](https://github.com/matrix-org/synapse/issues/13453))
- Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger. ([\#13489](https://github.com/matrix-org/synapse/issues/13489))
- Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger. ([\#13499](https://github.com/matrix-org/synapse/issues/13499), [\#13554](https://github.com/matrix-org/synapse/issues/13554))
- Track HTTP response times over 10 seconds from `/messages` (`synapse_room_message_list_rest_servlet_response_time_seconds`). ([\#13533](https://github.com/matrix-org/synapse/issues/13533))
- Add metrics to track how the rate limiter is affecting requests (sleep/reject). ([\#13534](https://github.com/matrix-org/synapse/issues/13534), [\#13541](https://github.com/matrix-org/synapse/issues/13541))
- Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`). ([\#13535](https://github.com/matrix-org/synapse/issues/13535), [\#13584](https://github.com/matrix-org/synapse/issues/13584))
- Add metrics to track rate limiter queue timing (`synapse_rate_limit_queue_wait_time_seconds`). ([\#13544](https://github.com/matrix-org/synapse/issues/13544))
- Update metrics to track `/messages` response time by room size. ([\#13545](https://github.com/matrix-org/synapse/issues/13545))
### Everything else
- Refactor methods in `synapse.api.auth.Auth` to use `Requester` objects everywhere instead of user IDs. ([\#13024](https://github.com/matrix-org/synapse/issues/13024))
- Clean-up tests for notifications. ([\#13471](https://github.com/matrix-org/synapse/issues/13471))
- Add some miscellaneous comments to document sync, especially around `compute_state_delta`. ([\#13474](https://github.com/matrix-org/synapse/issues/13474))
- Use literals in place of `HTTPStatus` constants in tests. ([\#13479](https://github.com/matrix-org/synapse/issues/13479), [\#13488](https://github.com/matrix-org/synapse/issues/13488))
- Add comments about how event push actions are rotated. ([\#13485](https://github.com/matrix-org/synapse/issues/13485))
- Modify HTML template content to better support mobile devices' screen sizes. ([\#13493](https://github.com/matrix-org/synapse/issues/13493))
- Add a linter script which will reject non-strict types in Pydantic models. ([\#13502](https://github.com/matrix-org/synapse/issues/13502))
- Reduce the number of tests using legacy TCP replication. ([\#13543](https://github.com/matrix-org/synapse/issues/13543))
- Allow specifying additional request fields when using the `HomeServerTestCase.login` helper method. ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
- Make `HomeServerTestCase` load any configured homeserver modules automatically. ([\#13558](https://github.com/matrix-org/synapse/issues/13558))
Synapse 1.65.0 (2022-08-16)
===========================
-1
View File
@@ -1 +0,0 @@
Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken).
+1
View File
@@ -0,0 +1 @@
Don't fetch the full state during membership changes.
-1
View File
@@ -1 +0,0 @@
Allow use of both `@trace` and `@tag_args` stacked on the same function (tracing).
-1
View File
@@ -1 +0,0 @@
Faster joins: update the rejected state of events during de-partial-stating.
-1
View File
@@ -1 +0,0 @@
Clean-up tests for notifications.
-1
View File
@@ -1 +0,0 @@
Add `openssl` example for generating registration HMAC digest.
-1
View File
@@ -1 +0,0 @@
Add some miscellaneous comments to document sync, especially around `compute_state_delta`.
-1
View File
@@ -1 +0,0 @@
Use literals in place of `HTTPStatus` constants in tests.
-1
View File
@@ -1 +0,0 @@
Add comments about how event push actions are rotated.
-1
View File
@@ -1 +0,0 @@
Use literals in place of `HTTPStatus` constants in tests.
-1
View File
@@ -1 +0,0 @@
Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger.
-1
View File
@@ -1 +0,0 @@
Tidy up Synapse's README.
-1
View File
@@ -1 +0,0 @@
Document that event purging related to the `redaction_retention_period` config option is executed only every 5 minutes.
-1
View File
@@ -1 +0,0 @@
Modify HTML template content to better support mobile devices' screen sizes.
-2
View File
@@ -1,2 +0,0 @@
Add a warning to retention documentation regarding the possibility of database corruption.
-1
View File
@@ -1 +0,0 @@
Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger.
-1
View File
@@ -1 +0,0 @@
Add a linter script which will reject non-strict types in Pydantic models.
-1
View File
@@ -1 +0,0 @@
Add forgotten status to Room Details API.
-1
View File
@@ -1 +0,0 @@
Faster room joins: make `/joined_members` block whilst the room is partial stated.
-1
View File
@@ -1 +0,0 @@
Document that the `DOCKER_BUILDKIT=1` flag is needed to build the docker image.
-1
View File
@@ -1 +0,0 @@
Improve performance of sending messages in rooms with thousands of local users.
-1
View File
@@ -1 +0,0 @@
Fix a bug in the `/event_reports` Admin API which meant that the total count could be larger than the number of results you can actually query for.
-1
View File
@@ -1 +0,0 @@
Faster room joins: Refuse to start when faster joins is enabled on a deployment with workers, since worker configurations are not currently supported.
-1
View File
@@ -1 +0,0 @@
Track HTTP response times over 10 seconds from `/messages` (`synapse_room_message_list_rest_servlet_response_time_seconds`).
-1
View File
@@ -1 +0,0 @@
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
-1
View File
@@ -1 +0,0 @@
Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`).
-1
View File
@@ -1 +0,0 @@
Add missing links in `user_consent` section of configuration manual.
-1
View File
@@ -1 +0,0 @@
Fix the doc and some warnings that were referring to the nonexistent `custom_templates_directory` setting (instead of `custom_template_directory`).
-1
View File
@@ -1 +0,0 @@
Add metrics to track rate limiter queue timing (`synapse_rate_limit_queue_wait_time_seconds`).
-1
View File
@@ -1 +0,0 @@
Improve performance of sending messages in rooms with thousands of local users.
+1
View File
@@ -0,0 +1 @@
Cache user IDs instead of profiles to reduce cache memory usage. Contributed by Nick @ Beeper (@fizzadar).
+1
View File
@@ -0,0 +1 @@
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls for understandable traces in Jaeger.
+1
View File
@@ -0,0 +1 @@
Improve performance of `@cachedList`.
+1
View File
@@ -0,0 +1 @@
Minor speed up of fetching large numbers of push rules.
+1
View File
@@ -0,0 +1 @@
Cache user IDs instead of profiles to reduce cache memory usage. Contributed by Nick @ Beeper (@fizzadar).
+1
View File
@@ -0,0 +1 @@
Improve the description of the ["chain cover index"](https://matrix-org.github.io/synapse/latest/auth_chain_difference_algorithm.html) used internally by Synapse.
+6
View File
@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.66.0~rc1) stable; urgency=medium
* New Synapse release 1.66.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 23 Aug 2022 09:48:55 +0100
matrix-synapse-py3 (1.65.0) stable; urgency=medium
* New Synapse release 1.65.0.
+2
View File
@@ -337,6 +337,8 @@ A response body like the following is returned:
}
```
_Changed in Synapse 1.66:_ Added the `forgotten` key to the response body.
# Room Members API
The Room Members admin API allows server admins to get a list of all members of a room.
+7
View File
@@ -753,6 +753,7 @@ A response body like the following is returned:
"device_id": "QBUAZIFURK",
"display_name": "android",
"last_seen_ip": "1.2.3.4",
"last_seen_user_agent": "Mozilla/5.0 (X11; Linux x86_64; rv:103.0) Gecko/20100101 Firefox/103.0",
"last_seen_ts": 1474491775024,
"user_id": "<user_id>"
},
@@ -760,6 +761,7 @@ A response body like the following is returned:
"device_id": "AUIECTSRND",
"display_name": "ios",
"last_seen_ip": "1.2.3.5",
"last_seen_user_agent": "Mozilla/5.0 (X11; Linux x86_64; rv:103.0) Gecko/20100101 Firefox/103.0",
"last_seen_ts": 1474491775025,
"user_id": "<user_id>"
}
@@ -786,6 +788,8 @@ The following fields are returned in the JSON response body:
Absent if no name has been set.
- `last_seen_ip` - The IP address where this device was last seen.
(May be a few minutes out of date, for efficiency reasons).
- `last_seen_user_agent` - The user agent of the device when it was last seen.
(May be a few minutes out of date, for efficiency reasons).
- `last_seen_ts` - The timestamp (in milliseconds since the unix epoch) when this
devices was last seen. (May be a few minutes out of date, for efficiency reasons).
- `user_id` - Owner of device.
@@ -837,6 +841,7 @@ A response body like the following is returned:
"device_id": "<device_id>",
"display_name": "android",
"last_seen_ip": "1.2.3.4",
"last_seen_user_agent": "Mozilla/5.0 (X11; Linux x86_64; rv:103.0) Gecko/20100101 Firefox/103.0",
"last_seen_ts": 1474491775024,
"user_id": "<user_id>"
}
@@ -858,6 +863,8 @@ The following fields are returned in the JSON response body:
Absent if no name has been set.
- `last_seen_ip` - The IP address where this device was last seen.
(May be a few minutes out of date, for efficiency reasons).
- `last_seen_user_agent` - The user agent of the device when it was last seen.
(May be a few minutes out of date, for efficiency reasons).
- `last_seen_ts` - The timestamp (in milliseconds since the unix epoch) when this
devices was last seen. (May be a few minutes out of date, for efficiency reasons).
- `user_id` - Owner of device.
+41 -8
View File
@@ -34,13 +34,45 @@ the process of indexing it).
## Chain Cover Index
Synapse computes auth chain differences by pre-computing a "chain cover" index
for the auth chain in a room, allowing efficient reachability queries like "is
event A in the auth chain of event B". This is done by assigning every event a
*chain ID* and *sequence number* (e.g. `(5,3)`), and having a map of *links*
between chains (e.g. `(5,3) -> (2,4)`) such that A is reachable by B (i.e. `A`
is in the auth chain of `B`) if and only if either:
for the auth chain in a room, allowing us to efficiently make reachability queries
like "is event `A` in the auth chain of event `B`?". We could do this with an index
that tracks all pairs `(A, B)` such that `A` is in the auth chain of `B`. However, this
would be prohibitively large, scaling poorly as the room accumulates more state
events.
1. A and B have the same chain ID and `A`'s sequence number is less than `B`'s
Instead, we break down the graph into *chains*. A chain is a subset of a DAG
with the following property: for any pair of events `E` and `F` in the chain,
the chain contains a path `E -> F` or a path `F -> E`. This forces a chain to be
linear (without forks), e.g. `E -> F -> G -> ... -> H`. Each event in the chain
is given a *sequence number* local to that chain. The oldest event `E` in the
chain has sequence number 1. If `E` has a child `F` in the chain, then `F` has
sequence number 2. If `E` has a grandchild `G` in the chain, then `G` has
sequence number 3; and so on.
Synapse ensures that each persisted event belongs to exactly one chain, and
tracks how the chains are connected to one another. This allows us to
efficiently answer reachability queries. Doing so uses less storage than
tracking reachability on an event-by-event basis, particularly when we have
fewer and longer chains. See
> Jagadish, H. (1990). [A compression technique to materialize transitive closure](https://doi.org/10.1145/99935.99944).
> *ACM Transactions on Database Systems (TODS)*, 15*(4)*, 558-598.
for the original idea or
> Y. Chen, Y. Chen, [An efficient algorithm for answering graph
> reachability queries](https://doi.org/10.1109/ICDE.2008.4497498),
> in: 2008 IEEE 24th International Conference on Data Engineering, April 2008,
> pp. 893902. (PDF available via [Google Scholar](https://scholar.google.com/scholar?q=Y.%20Chen,%20Y.%20Chen,%20An%20efficient%20algorithm%20for%20answering%20graph%20reachability%20queries,%20in:%202008%20IEEE%2024th%20International%20Conference%20on%20Data%20Engineering,%20April%202008,%20pp.%20893902.).)
for a more modern take.
In practical terms, the chain cover assigns every event a
*chain ID* and *sequence number* (e.g. `(5,3)`), and maintains a map of *links*
between events in chains (e.g. `(5,3) -> (2,4)`) such that `A` is reachable by `B`
(i.e. `A` is in the auth chain of `B`) if and only if either:
1. `A` and `B` have the same chain ID and `A`'s sequence number is less than `B`'s
sequence number; or
2. there is a link `L` between `B`'s chain ID and `A`'s chain ID such that
`L.start_seq_no` <= `B.seq_no` and `A.seq_no` <= `L.end_seq_no`.
@@ -49,8 +81,9 @@ There are actually two potential implementations, one where we store links from
each chain to every other reachable chain (the transitive closure of the links
graph), and one where we remove redundant links (the transitive reduction of the
links graph) e.g. if we have chains `C3 -> C2 -> C1` then the link `C3 -> C1`
would not be stored. Synapse uses the former implementations so that it doesn't
need to recurse to test reachability between chains.
would not be stored. Synapse uses the former implementation so that it doesn't
need to recurse to test reachability between chains. This trades-off extra storage
in order to save CPU cycles and DB queries.
### Example
+3 -1
View File
@@ -174,7 +174,9 @@ oidc_providers:
1. Create a regular web application for Synapse
2. Set the Allowed Callback URLs to `[synapse public baseurl]/_synapse/client/oidc/callback`
3. Add a rule to add the `preferred_username` claim.
3. Add a rule with any name to add the `preferred_username` claim.
(See https://auth0.com/docs/customize/rules/create-rules for more information on how to create rules.)
<details>
<summary>Code sample</summary>
+19
View File
@@ -89,6 +89,25 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```
# Upgrading to v1.66.0
## Delegation of email validation no longer supported
As of this version, Synapse no longer allows the tasks of verifying email address
ownership, and password reset confirmation, to be delegated to an identity server.
This removal was previously planned for Synapse 1.64.0, but was
[delayed](https://github.com/matrix-org/synapse/issues/13421) until now to give
homeserver administrators more notice of the change.
To continue to allow users to add email addresses to their homeserver accounts,
and perform password resets, make sure that Synapse is configured with a working
email server in the [`email` configuration
section](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#email)
(including, at a minimum, a `notif_from` setting.)
Specifying an `email` setting under `account_threepid_delegates` will now cause
an error at startup.
# Upgrading to v1.64.0
## Deprecation of the ability to delegate e-mail verification to identity servers
@@ -444,7 +444,7 @@ Sub-options for each listener include:
* `names`: a list of names of HTTP resources. See below for a list of valid resource names.
* `compress`: set to true to enable gzip compression on HTTP bodies for this resource. This is currently only supported with the
`client`, `consent` and `metrics` resources.
`client`, `consent`, `metrics` and `federation` resources.
* `additional_resources`: Only valid for an 'http' listener. A map of
additional endpoints which should be loaded via dynamic modules.
@@ -2182,7 +2182,10 @@ their account.
by the Matrix Identity Service API
[specification](https://matrix.org/docs/spec/identity_service/latest).)
*Updated in Synapse 1.64.0*: The `email` option is deprecated.
*Deprecated in Synapse 1.64.0*: The `email` option is deprecated.
*Removed in Synapse 1.66.0*: The `email` option has been removed.
If present, Synapse will report a configuration error on startup.
Example configuration:
```yaml
+1 -1
View File
@@ -54,7 +54,7 @@ skip_gitignore = true
[tool.poetry]
name = "matrix-synapse"
version = "1.65.0"
version = "1.66.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
+94 -106
View File
@@ -37,8 +37,7 @@ from synapse.logging.opentracing import (
start_active_span,
trace,
)
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import Requester, UserID, create_requester
from synapse.types import Requester, create_requester
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -70,14 +69,14 @@ class Auth:
async def check_user_in_room(
self,
room_id: str,
user_id: str,
requester: Requester,
allow_departed_users: bool = False,
) -> Tuple[str, Optional[str]]:
"""Check if the user is in the room, or was at some point.
Args:
room_id: The room to check.
user_id: The user to check.
requester: The user making the request, according to the access token.
current_state: Optional map of the current state of the room.
If provided then that map is used to check whether they are a
@@ -94,6 +93,7 @@ class Auth:
membership event ID of the user.
"""
user_id = requester.user.to_string()
(
membership,
member_event_id,
@@ -182,96 +182,69 @@ class Auth:
access_token = self.get_access_token_from_request(request)
(
user_id,
device_id,
app_service,
) = await self._get_appservice_user_id_and_device_id(request)
if user_id and app_service:
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
user_id=user_id,
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
device_id="dummy-device"
if device_id is None
else device_id, # stubbed
)
requester = create_requester(
user_id, app_service=app_service, device_id=device_id
# First check if it could be a request from an appservice
requester = await self._get_appservice_user(request)
if not requester:
# If not, it should be from a regular user
requester = await self.get_user_by_access_token(
access_token, allow_expired=allow_expired
)
request.requester = user_id
return requester
# Deny the request if the user account has expired.
# This check is only done for regular users, not appservice ones.
if not allow_expired:
if await self._account_validity_handler.is_user_expired(
requester.user.to_string()
):
# Raise the error if either an account validity module has determined
# the account has expired, or the legacy account validity
# implementation is enabled and determined the account has expired
raise AuthError(
403,
"User account has expired",
errcode=Codes.EXPIRED_ACCOUNT,
)
user_info = await self.get_user_by_access_token(
access_token, allow_expired=allow_expired
)
token_id = user_info.token_id
is_guest = user_info.is_guest
shadow_banned = user_info.shadow_banned
# Deny the request if the user account has expired.
if not allow_expired:
if await self._account_validity_handler.is_user_expired(
user_info.user_id
):
# Raise the error if either an account validity module has determined
# the account has expired, or the legacy account validity
# implementation is enabled and determined the account has expired
raise AuthError(
403,
"User account has expired",
errcode=Codes.EXPIRED_ACCOUNT,
)
device_id = user_info.device_id
if access_token and ip_addr:
if ip_addr and (
not requester.app_service or self._track_appservice_user_ips
):
# XXX(quenting): I'm 95% confident that we could skip setting the
# device_id to "dummy-device" for appservices, and that the only impact
# would be some rows which whould not deduplicate in the 'user_ips'
# table during the transition
recorded_device_id = (
"dummy-device"
if requester.device_id is None and requester.app_service is not None
else requester.device_id
)
await self.store.insert_client_ip(
user_id=user_info.token_owner,
user_id=requester.authenticated_entity,
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
device_id=device_id,
device_id=recorded_device_id,
)
# Track also the puppeted user client IP if enabled and the user is puppeting
if (
user_info.user_id != user_info.token_owner
requester.user.to_string() != requester.authenticated_entity
and self._track_puppeted_user_ips
):
await self.store.insert_client_ip(
user_id=user_info.user_id,
user_id=requester.user.to_string(),
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
device_id=device_id,
device_id=requester.device_id,
)
if is_guest and not allow_guest:
if requester.is_guest and not allow_guest:
raise AuthError(
403,
"Guest access not allowed",
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
)
# Mark the token as used. This is used to invalidate old refresh
# tokens after some time.
if not user_info.token_used and token_id is not None:
await self.store.mark_access_token_as_used(token_id)
requester = create_requester(
user_info.user_id,
token_id,
is_guest,
shadow_banned,
device_id,
app_service=app_service,
authenticated_entity=user_info.token_owner,
)
request.requester = requester
return requester
except KeyError:
@@ -308,9 +281,7 @@ class Auth:
403, "Application service has not registered this user (%s)" % user_id
)
async def _get_appservice_user_id_and_device_id(
self, request: Request
) -> Tuple[Optional[str], Optional[str], Optional[ApplicationService]]:
async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
"""
Given a request, reads the request parameters to determine:
- whether it's an application service that's making this request
@@ -325,15 +296,13 @@ class Auth:
Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
Returns:
3-tuple of
(user ID?, device ID?, application service?)
the application service `Requester` of that request
Postconditions:
- If an application service is returned, so is a user ID
- A user ID is never returned without an application service
- A device ID is never returned without a user ID or an application service
- The returned application service, if present, is permitted to control the
returned user ID.
- The `app_service` field in the returned `Requester` is set
- The `user_id` field in the returned `Requester` is either the application
service sender or the controlled user set by the `user_id` URI parameter
- The returned application service is permitted to control the returned user ID.
- The returned device ID, if present, has been checked to be a valid device ID
for the returned user ID.
"""
@@ -343,12 +312,12 @@ class Auth:
self.get_access_token_from_request(request)
)
if app_service is None:
return None, None, None
return None
if app_service.ip_range_whitelist:
ip_address = IPAddress(request.getClientAddress().host)
if ip_address not in app_service.ip_range_whitelist:
return None, None, None
return None
# This will always be set by the time Twisted calls us.
assert request.args is not None
@@ -382,13 +351,15 @@ class Auth:
Codes.EXCLUSIVE,
)
return effective_user_id, effective_device_id, app_service
return create_requester(
effective_user_id, app_service=app_service, device_id=effective_device_id
)
async def get_user_by_access_token(
self,
token: str,
allow_expired: bool = False,
) -> TokenLookupResult:
) -> Requester:
"""Validate access token and get user_id from it
Args:
@@ -405,9 +376,9 @@ class Auth:
# First look in the database to see if the access token is present
# as an opaque token.
r = await self.store.get_user_by_access_token(token)
if r:
valid_until_ms = r.valid_until_ms
user_info = await self.store.get_user_by_access_token(token)
if user_info:
valid_until_ms = user_info.valid_until_ms
if (
not allow_expired
and valid_until_ms is not None
@@ -419,7 +390,20 @@ class Auth:
msg="Access token has expired", soft_logout=True
)
return r
# Mark the token as used. This is used to invalidate old refresh
# tokens after some time.
await self.store.mark_access_token_as_used(user_info.token_id)
requester = create_requester(
user_id=user_info.user_id,
access_token_id=user_info.token_id,
is_guest=user_info.is_guest,
shadow_banned=user_info.shadow_banned,
device_id=user_info.device_id,
authenticated_entity=user_info.token_owner,
)
return requester
# If the token isn't found in the database, then it could still be a
# macaroon for a guest, so we check that here.
@@ -445,11 +429,12 @@ class Auth:
"Guest access token used for regular user"
)
return TokenLookupResult(
return create_requester(
user_id=user_id,
is_guest=True,
# all guests get the same device id
device_id=GUEST_DEVICE_ID,
authenticated_entity=user_id,
)
except (
pymacaroons.exceptions.MacaroonException,
@@ -472,32 +457,33 @@ class Auth:
request.requester = create_requester(service.sender, app_service=service)
return service
async def is_server_admin(self, user: UserID) -> bool:
async def is_server_admin(self, requester: Requester) -> bool:
"""Check if the given user is a local server admin.
Args:
user: user to check
requester: The user making the request, according to the access token.
Returns:
True if the user is an admin
"""
return await self.store.is_server_admin(user)
return await self.store.is_server_admin(requester.user)
async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
async def check_can_change_room_list(
self, room_id: str, requester: Requester
) -> bool:
"""Determine whether the user is allowed to edit the room's entry in the
published room list.
Args:
room_id
user
room_id: The room to check.
requester: The user making the request, according to the access token.
"""
is_admin = await self.is_server_admin(user)
is_admin = await self.is_server_admin(requester)
if is_admin:
return True
user_id = user.to_string()
await self.check_user_in_room(room_id, user_id)
await self.check_user_in_room(room_id, requester)
# We currently require the user is a "moderator" in the room. We do this
# by checking if they would (theoretically) be able to change the
@@ -516,7 +502,9 @@ class Auth:
send_level = event_auth.get_send_level(
EventTypes.CanonicalAlias, "", power_level_event
)
user_level = event_auth.get_user_power_level(user_id, auth_events)
user_level = event_auth.get_user_power_level(
requester.user.to_string(), auth_events
)
return user_level >= send_level
@@ -574,16 +562,16 @@ class Auth:
@trace
async def check_user_in_room_or_world_readable(
self, room_id: str, user_id: str, allow_departed_users: bool = False
self, room_id: str, requester: Requester, allow_departed_users: bool = False
) -> Tuple[str, Optional[str]]:
"""Checks that the user is or was in the room or the room is world
readable. If it isn't then an exception is raised.
Args:
room_id: room to check
user_id: user to check
allow_departed_users: if True, accept users that were previously
members but have now departed
room_id: The room to check.
requester: The user making the request, according to the access token.
allow_departed_users: If True, accept users that were previously
members but have now departed.
Returns:
Resolves to the current membership of the user in the room and the
@@ -598,7 +586,7 @@ class Auth:
# * The user is a guest user, and has joined the room
# else it will throw.
return await self.check_user_in_room(
room_id, user_id, allow_departed_users=allow_departed_users
room_id, requester, allow_departed_users=allow_departed_users
)
except AuthError:
visibility = await self._storage_controllers.state.get_current_state_event(
@@ -613,6 +601,6 @@ class Auth:
raise UnstableSpecAuthError(
403,
"User %s not in room %s, and room previews are disabled"
% (user_id, room_id),
% (requester.user, room_id),
errcode=Codes.NOT_JOINED,
)
+3 -3
View File
@@ -216,11 +216,11 @@ class EventContentFields:
MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical"
# For "insertion" events to indicate what the next batch ID should be in
# order to connect to it
MSC2716_NEXT_BATCH_ID: Final = "org.matrix.msc2716.next_batch_id"
MSC2716_NEXT_BATCH_ID: Final = "next_batch_id"
# Used on "batch" events to indicate which insertion event it connects to
MSC2716_BATCH_ID: Final = "org.matrix.msc2716.batch_id"
MSC2716_BATCH_ID: Final = "batch_id"
# For "marker" events
MSC2716_MARKER_INSERTION: Final = "org.matrix.msc2716.marker.insertion"
MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference"
# The authorising user for joining a restricted room.
AUTHORISING_USER: Final = "join_authorised_via_users_server"
+19 -19
View File
@@ -269,24 +269,6 @@ class RoomVersions:
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
)
MSC2716v3 = RoomVersion(
"org.matrix.msc2716v3",
RoomDisposition.UNSTABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
msc2716_redactions=True,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
)
MSC3787 = RoomVersion(
"org.matrix.msc3787",
RoomDisposition.UNSTABLE,
@@ -323,6 +305,24 @@ class RoomVersions:
msc3787_knock_restricted_join_rule=True,
msc3667_int_only_power_levels=True,
)
MSC2716v4 = RoomVersion(
"org.matrix.msc2716v4",
RoomDisposition.UNSTABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
msc2716_redactions=True,
msc3787_knock_restricted_join_rule=False,
msc3667_int_only_power_levels=False,
)
KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
@@ -338,9 +338,9 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
RoomVersions.V7,
RoomVersions.V8,
RoomVersions.V9,
RoomVersions.MSC2716v3,
RoomVersions.MSC3787,
RoomVersions.V10,
RoomVersions.MSC2716v4,
)
}
+5 -3
View File
@@ -44,7 +44,6 @@ from synapse.app._base import (
register_start,
)
from synapse.config._base import ConfigError, format_config_error
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
from synapse.config.server import ListenerConfig
from synapse.federation.transport.server import TransportLayerServer
@@ -202,7 +201,7 @@ class SynapseHomeServer(HomeServer):
}
)
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.config.email.can_verify_email:
from synapse.rest.synapse.client.password_reset import (
PasswordResetSubmitTokenResource,
)
@@ -220,7 +219,10 @@ class SynapseHomeServer(HomeServer):
resources.update({"/_matrix/consent": consent_resource})
if name == "federation":
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
federation_resource: Resource = TransportLayerServer(self)
if compress:
federation_resource = gz_wrap(federation_resource)
resources.update({FEDERATION_PREFIX: federation_resource})
if name == "openid":
resources.update(
+6 -40
View File
@@ -18,7 +18,6 @@
import email.utils
import logging
import os
from enum import Enum
from typing import Any
import attr
@@ -136,40 +135,22 @@ class EmailConfig(Config):
self.email_enable_notifs = email_config.get("enable_notifs", False)
self.threepid_behaviour_email = (
# Have Synapse handle the email sending if account_threepid_delegates.email
# is not defined
# msisdn is currently always remote while Synapse does not support any method of
# sending SMS messages
ThreepidBehaviour.REMOTE
if self.root.registration.account_threepid_delegate_email
else ThreepidBehaviour.LOCAL
)
if config.get("trust_identity_server_for_password_resets"):
raise ConfigError(
'The config option "trust_identity_server_for_password_resets" has been removed.'
"Please consult the configuration manual at docs/usage/configuration/config_documentation.md for "
"details and update your config file."
'The config option "trust_identity_server_for_password_resets" '
"is no longer supported. Please remove it from the config file."
)
self.local_threepid_handling_disabled_due_to_email_config = False
if (
self.threepid_behaviour_email == ThreepidBehaviour.LOCAL
and email_config == {}
):
# We cannot warn the user this has happened here
# Instead do so when a user attempts to reset their password
self.local_threepid_handling_disabled_due_to_email_config = True
self.threepid_behaviour_email = ThreepidBehaviour.OFF
# If we have email config settings, assume that we can verify ownership of
# email addresses.
self.can_verify_email = email_config != {}
# Get lifetime of a validation token in milliseconds
self.email_validation_token_lifetime = self.parse_duration(
email_config.get("validation_token_lifetime", "1h")
)
if self.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.can_verify_email:
missing = []
if not self.email_notif_from:
missing.append("email.notif_from")
@@ -360,18 +341,3 @@ class EmailConfig(Config):
"Config option email.invite_client_location must be a http or https URL",
path=("email", "invite_client_location"),
)
class ThreepidBehaviour(Enum):
"""
Enum to define the behaviour of Synapse with regards to when it contacts an identity
server for 3pid registration and password resets
REMOTE = use an external server to send tokens
LOCAL = send tokens ourselves
OFF = disable registration via 3pid and password resets
"""
REMOTE = "remote"
LOCAL = "local"
OFF = "off"
+3
View File
@@ -90,3 +90,6 @@ class ExperimentalConfig(Config):
# MSC3848: Introduce errcodes for specific event sending failures
self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)
+3 -10
View File
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
from typing import Any, Optional
from synapse.api.constants import RoomCreationPreset
@@ -21,15 +20,11 @@ from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict, RoomAlias, UserID
from synapse.util.stringutils import random_string_with_symbols, strtobool
logger = logging.getLogger(__name__)
LEGACY_EMAIL_DELEGATE_WARNING = """\
Delegation of email verification to an identity server is now deprecated. To
NO_EMAIL_DELEGATE_ERROR = """\
Delegation of email verification to an identity server is no longer supported. To
continue to allow users to add email addresses to their accounts, and use them for
password resets, configure Synapse with an SMTP server via the `email` setting, and
remove `account_threepid_delegates.email`.
This will be an error in a future version.
"""
@@ -64,9 +59,7 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {}
if "email" in account_threepid_delegates:
logger.warning(LEGACY_EMAIL_DELEGATE_WARNING)
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
raise ConfigError(NO_EMAIL_DELEGATE_ERROR)
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
+2
View File
@@ -28,6 +28,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.logging.opentracing import trace
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -35,6 +36,7 @@ logger = logging.getLogger(__name__)
Hasher = Callable[[bytes], "hashlib._Hash"]
@trace
def check_event_content_hash(
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
) -> bool:
+2
View File
@@ -32,6 +32,7 @@ from typing_extensions import Literal
import synapse
from synapse.api.errors import Codes
from synapse.logging.opentracing import trace
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
@@ -378,6 +379,7 @@ class SpamChecker:
if check_media_file_for_spam is not None:
self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)
@trace
async def check_event_for_spam(
self, event: "synapse.events.EventBase"
) -> Union[Tuple[Codes, JsonDict], str]:
+1 -1
View File
@@ -161,7 +161,7 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
add_fields(EventContentFields.MSC2716_BATCH_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
add_fields(EventContentFields.MSC2716_MARKER_INSERTION)
add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE)
allowed_fields = {k: v for k, v in event_dict.items() if k in allowed_keys}
+22
View File
@@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
if TYPE_CHECKING:
@@ -55,6 +56,7 @@ class FederationBase:
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
@trace
async def _check_sigs_and_hash(
self, room_version: RoomVersion, pdu: EventBase
) -> EventBase:
@@ -97,17 +99,36 @@ class FederationBase:
"Event %s seems to have been redacted; using our redacted copy",
pdu.event_id,
)
log_kv(
{
"message": "Event seems to have been redacted; using our redacted copy",
"event_id": pdu.event_id,
}
)
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id,
)
log_kv(
{
"message": "Event content has been tampered, redacting",
"event_id": pdu.event_id,
}
)
return redacted_event
spam_check = await self.spam_checker.check_event_for_spam(pdu)
if spam_check != self.spam_checker.NOT_SPAM:
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
log_kv(
{
"message": "Event contains spam, redacting (to save disk space) "
"as well as soft-failing (to stop using the event in prev_events)",
"event_id": pdu.event_id,
}
)
# we redact (to save disk space) as well as soft-failing (to stop
# using the event in prev_events).
redacted_event = prune_event(pdu)
@@ -117,6 +138,7 @@ class FederationBase:
return pdu
@trace
async def _check_sigs_on_pdu(
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
) -> None:
+20 -3
View File
@@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -587,11 +587,15 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
set_tag(
SynapseTags.RESULT_PREFIX + "pdus.length",
str(len(pdus)),
)
# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.
valid_pdus = []
valid_pdus: List[EventBase] = []
async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -607,6 +611,8 @@ class FederationClient(FederationBase):
return valid_pdus
@trace
@tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
@@ -639,16 +645,27 @@ class FederationClient(FederationBase):
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
"Checking local store/orgin server",
"Checking local store/origin server",
pdu.event_id,
e,
)
log_kv(
{
"message": "Signature on retrieved event was invalid. "
"Checking local store/origin server",
"event_id": pdu.event_id,
"InvalidEventSignatureError": e,
}
)
# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
# If the PDU fails its signature check and we don't have it in our
# database, we then request it from sender's server (if that is not the
# same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
+11 -6
View File
@@ -280,7 +280,7 @@ class AuthHandler:
that it isn't stolen by re-authenticating them.
Args:
requester: The user, as given by the access token
requester: The user making the request, according to the access token.
request: The request sent by the client.
@@ -1435,20 +1435,25 @@ class AuthHandler:
access_token: access token to be deleted
"""
user_info = await self.auth.get_user_by_access_token(access_token)
token = await self.store.get_user_by_access_token(access_token)
if not token:
# At this point, the token should already have been fetched once by
# the caller, so this should not happen, unless of a race condition
# between two delete requests
raise SynapseError(HTTPStatus.UNAUTHORIZED, "Unrecognised access token")
await self.store.delete_access_token(access_token)
# see if any modules want to know about this
await self.password_auth_provider.on_logged_out(
user_id=user_info.user_id,
device_id=user_info.device_id,
user_id=token.user_id,
device_id=token.device_id,
access_token=access_token,
)
# delete pushers associated with this access token
if user_info.token_id is not None:
if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
user_info.user_id, (user_info.token_id,)
token.user_id, (token.token_id,)
)
async def delete_access_tokens_for_user(
+8 -1
View File
@@ -74,6 +74,7 @@ class DeviceWorkerHandler:
self._state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
self.server_name = hs.hostname
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
@trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
@@ -747,7 +748,13 @@ def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
) -> None:
ip = client_ips.get((device["user_id"], device["device_id"]), {})
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
device.update(
{
"last_seen_user_agent": ip.get("user_agent"),
"last_seen_ts": ip.get("last_seen"),
"last_seen_ip": ip.get("ip"),
}
)
class DeviceListUpdater:
+11 -13
View File
@@ -30,7 +30,7 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.module_api import NOT_SPAM
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
from synapse.types import JsonDict, Requester, RoomAlias, get_domain_from_id
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -133,7 +133,7 @@ class DirectoryHandler:
else:
# Server admins are not subject to the same constraints as normal
# users when creating an alias (e.g. being in the room).
is_admin = await self.auth.is_server_admin(requester.user)
is_admin = await self.auth.is_server_admin(requester)
if (self.require_membership and check_membership) and not is_admin:
rooms_for_user = await self.store.get_rooms_for_user(user_id)
@@ -197,7 +197,7 @@ class DirectoryHandler:
user_id = requester.user.to_string()
try:
can_delete = await self._user_can_delete_alias(room_alias, user_id)
can_delete = await self._user_can_delete_alias(room_alias, requester)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown room alias")
@@ -400,7 +400,9 @@ class DirectoryHandler:
# either no interested services, or no service with an exclusive lock
return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
async def _user_can_delete_alias(
self, alias: RoomAlias, requester: Requester
) -> bool:
"""Determine whether a user can delete an alias.
One of the following must be true:
@@ -413,7 +415,7 @@ class DirectoryHandler:
"""
creator = await self.store.get_room_alias_creator(alias.to_string())
if creator == user_id:
if creator == requester.user.to_string():
return True
# Resolve the alias to the corresponding room.
@@ -422,9 +424,7 @@ class DirectoryHandler:
if not room_id:
return False
return await self.auth.check_can_change_room_list(
room_id, UserID.from_string(user_id)
)
return await self.auth.check_can_change_room_list(room_id, requester)
async def edit_published_room_list(
self, requester: Requester, room_id: str, visibility: str
@@ -463,7 +463,7 @@ class DirectoryHandler:
raise SynapseError(400, "Unknown room")
can_change_room_list = await self.auth.check_can_change_room_list(
room_id, requester.user
room_id, requester
)
if not can_change_room_list:
raise AuthError(
@@ -528,10 +528,8 @@ class DirectoryHandler:
Get a list of the aliases that currently point to this room on this server
"""
# allow access to server admins and current members of the room
is_admin = await self.auth.is_server_admin(requester.user)
is_admin = await self.auth.is_server_admin(requester)
if not is_admin:
await self.auth.check_user_in_room_or_world_readable(
room_id, requester.user.to_string()
)
await self.auth.check_user_in_room_or_world_readable(room_id, requester)
return await self.store.get_aliases_for_room(room_id)
+23 -5
View File
@@ -86,9 +86,14 @@ backfill_processing_before_timer = Histogram(
"sec",
[],
buckets=(
0.1,
0.5,
1.0,
2.5,
5.0,
7.5,
10.0,
15.0,
20.0,
30.0,
40.0,
@@ -482,7 +487,7 @@ class FederationHandler:
processing_end_time = self.clock.time_msec()
backfill_processing_before_timer.observe(
(processing_start_time - processing_end_time) / 1000
(processing_end_time - processing_start_time) / 1000
)
success = await try_backfill(likely_domains)
@@ -851,13 +856,20 @@ class FederationHandler:
# Note that this requires the /send_join request to come back to the
# same server.
if room_version.msc3083_join_rules:
state_ids = await self._state_storage_controller.get_current_state_ids(
room_id
partial_state_ids = (
await self._state_storage_controller.get_current_state_ids(
room_id,
state_filter=StateFilter.from_types(
[(EventTypes.JoinRules, ""), (EventTypes.Member, user_id)]
),
)
)
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
partial_state_ids, room_version
):
prev_member_event_id = state_ids.get((EventTypes.Member, user_id), None)
prev_member_event_id = partial_state_ids.get(
(EventTypes.Member, user_id), None
)
# If the user is invited or joined to the room already, then
# no additional info is needed.
include_auth_user_id = True
@@ -869,6 +881,12 @@ class FederationHandler:
)
if include_auth_user_id:
state_ids = (
await self._state_storage_controller.get_current_state_ids(
room_id,
)
)
event_content[
EventContentFields.AUTHORISING_USER
] = await self._event_auth_handler.get_user_which_could_invite(
+11 -1
View File
@@ -104,15 +104,25 @@ backfill_processing_after_timer = Histogram(
"sec",
[],
buckets=(
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
7.5,
10.0,
15.0,
20.0,
25.0,
30.0,
40.0,
50.0,
60.0,
80.0,
100.0,
120.0,
150.0,
180.0,
"+Inf",
),
@@ -1384,7 +1394,7 @@ class FederationEventHandler:
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
+1 -55
View File
@@ -26,7 +26,6 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.http import RequestTimedOutError
from synapse.http.client import SimpleHttpClient
from synapse.http.site import SynapseRequest
@@ -416,48 +415,6 @@ class IdentityHandler:
return session_id
async def request_email_token(
self,
id_server: str,
email: str,
client_secret: str,
send_attempt: int,
next_link: Optional[str] = None,
) -> JsonDict:
"""
Request an external server send an email on our behalf for the purposes of threepid
validation.
Args:
id_server: The identity server to proxy to
email: The email to send the message to
client_secret: The unique client_secret sends by the user
send_attempt: Which attempt this is
next_link: A link to redirect the user to once they submit the token
Returns:
The json response body from the server
"""
params = {
"email": email,
"client_secret": client_secret,
"send_attempt": send_attempt,
}
if next_link:
params["next_link"] = next_link
try:
data = await self.http_client.post_json_get_json(
id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
params,
)
return data
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
async def requestMsisdnToken(
self,
id_server: str,
@@ -531,18 +488,7 @@ class IdentityHandler:
validation_session = None
# Try to validate as email
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
# Remote emails will only be used if a valid identity server is provided.
assert (
self.hs.config.registration.account_threepid_delegate_email is not None
)
# Ask our delegated email identity server
validation_session = await self.threepid_from_creds(
self.hs.config.registration.account_threepid_delegate_email,
threepid_creds,
)
elif self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.hs.config.email.can_verify_email:
# Get a validated session matching these details
validation_session = await self.store.get_threepid_validation_session(
"email", client_secret, sid=sid, validated=True
+3 -3
View File
@@ -309,18 +309,18 @@ class InitialSyncHandler:
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
user_id = requester.user.to_string()
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id,
user_id,
requester,
allow_departed_users=True,
)
is_peeking = member_event_id is None
user_id = requester.user.to_string()
if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
+12 -11
View File
@@ -104,7 +104,7 @@ class MessageHandler:
async def get_room_data(
self,
user_id: str,
requester: Requester,
room_id: str,
event_type: str,
state_key: str,
@@ -112,7 +112,7 @@ class MessageHandler:
"""Get data from a room.
Args:
user_id
requester: The user who did the request.
room_id
event_type
state_key
@@ -125,7 +125,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -161,11 +161,10 @@ class MessageHandler:
async def get_state_events(
self,
user_id: str,
requester: Requester,
room_id: str,
state_filter: Optional[StateFilter] = None,
at_token: Optional[StreamToken] = None,
is_guest: bool = False,
) -> List[dict]:
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -174,14 +173,13 @@ class MessageHandler:
visible.
Args:
user_id: The user requesting state events.
requester: The user requesting state events.
room_id: The room ID to get all state events from.
state_filter: The state filter used to fetch state from the database.
at_token: the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest: whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
@@ -191,6 +189,7 @@ class MessageHandler:
members of this room.
"""
state_filter = state_filter or StateFilter.all()
user_id = requester.user.to_string()
if at_token:
last_event_id = (
@@ -223,7 +222,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -317,12 +316,11 @@ class MessageHandler:
Returns:
A dict of user_id to profile info
"""
user_id = requester.user.to_string()
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = await self.auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
room_id, requester, allow_departed_users=True
)
if membership != Membership.JOIN:
raise SynapseError(
@@ -340,7 +338,10 @@ class MessageHandler:
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
if (
requester.app_service
and requester.user.to_string() not in users_with_profile
):
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
+1 -1
View File
@@ -464,7 +464,7 @@ class PaginationHandler:
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
room_id, requester, allow_departed_users=True
)
if pagin_config.direction == "b":
+9 -6
View File
@@ -29,7 +29,13 @@ from synapse.api.constants import (
JoinRules,
LoginType,
)
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
InvalidClientTokenError,
SynapseError,
)
from synapse.appservice import ApplicationService
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
@@ -180,10 +186,7 @@ class RegistrationHandler:
)
if guest_access_token:
user_data = await self.auth.get_user_by_access_token(guest_access_token)
if (
not user_data.is_guest
or UserID.from_string(user_data.user_id).localpart != localpart
):
if not user_data.is_guest or user_data.user.localpart != localpart:
raise AuthError(
403,
"Cannot register taken user ID without valid guest "
@@ -618,7 +621,7 @@ class RegistrationHandler:
user_id = user.to_string()
service = self.store.get_app_service_by_token(as_token)
if not service:
raise AuthError(403, "Invalid application service token.")
raise InvalidClientTokenError()
if not service.is_interested_in_user(user_id):
raise SynapseError(
400,
+1 -1
View File
@@ -103,7 +103,7 @@ class RelationsHandler:
# TODO Properly handle a user leaving a room.
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
room_id, requester, allow_departed_users=True
)
# This gets the original event and checks that a) the event exists and
+2 -2
View File
@@ -721,7 +721,7 @@ class RoomCreationHandler:
# allow the server notices mxid to create rooms
is_requester_admin = True
else:
is_requester_admin = await self.auth.is_server_admin(requester.user)
is_requester_admin = await self.auth.is_server_admin(requester)
# Let the third party rules modify the room creation config if needed, or abort
# the room creation entirely with an exception.
@@ -1279,7 +1279,7 @@ class RoomContextHandler:
"""
user = requester.user
if use_admin_priviledge:
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
before_limit = math.floor(limit / 2.0)
after_limit = limit - before_limit
+57 -26
View File
@@ -16,7 +16,7 @@ import abc
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Set, Tuple
from synapse import types
from synapse.api.constants import (
@@ -179,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""Try and join a room that this server is not in
Args:
requester
requester: The user making the request, according to the access token.
remote_room_hosts: List of servers that can be used to join via.
room_id: Room that we are trying to join
user: User who is trying to join
@@ -410,11 +410,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
historical=historical,
)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, None)])
prev_member_event_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, user_id)])
)
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
prev_member_event_id = prev_member_event_ids.get(
(EventTypes.Member, user_id), None
)
if event.membership == Membership.JOIN:
newly_joined = True
@@ -689,7 +691,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
errcode=Codes.BAD_JSON,
)
if "avatar_url" in content:
if "avatar_url" in content and content.get("avatar_url") is not None:
if not await self.profile_handler.check_avatar_size_and_mime_type(
content["avatar_url"],
):
@@ -744,7 +746,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
is_requester_admin = True
else:
is_requester_admin = await self.auth.is_server_admin(requester.user)
is_requester_admin = await self.auth.is_server_admin(requester)
if not is_requester_admin:
if self.config.server.block_non_admin_invites:
@@ -790,14 +792,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
state_before_join = await self.state_handler.compute_state_after_events(
room_id, latest_event_ids
old_membership_state_ids = await self.state_handler.compute_state_after_events(
room_id,
event_ids=latest_event_ids,
state_filter=StateFilter.from_types(
[(EventTypes.Member, target.to_string())]
),
)
# TODO: Refactor into dictionary of explicitly allowed transitions
# between old and new state, with specific error messages for some
# transitions and generic otherwise
old_state_id = state_before_join.get((EventTypes.Member, target.to_string()))
old_state_id = old_membership_state_ids.get(
(EventTypes.Member, target.to_string())
)
if old_state_id:
old_state = await self.store.get_event(old_state_id, allow_none=True)
old_membership = old_state.content.get("membership") if old_state else None
@@ -848,11 +856,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if action == "kick":
raise AuthError(403, "The target user is not in the room")
is_host_in_room = await self._is_host_in_room(state_before_join)
is_host_in_room = await self.store.is_host_joined(room_id, self._server_name)
if effective_membership_state == Membership.JOIN:
if requester.is_guest:
guest_can_join = await self._can_guest_join(state_before_join)
guest_access_ids = await self.state_handler.compute_state_after_events(
room_id,
event_ids=latest_event_ids,
state_filter=StateFilter.from_types([(EventTypes.GuestAccess, "")]),
)
guest_can_join = await self._can_guest_join(guest_access_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -868,7 +881,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
bypass_spam_checker = True
else:
bypass_spam_checker = await self.auth.is_server_admin(requester.user)
bypass_spam_checker = await self.auth.is_server_admin(requester)
inviter = await self._get_inviter(target.to_string(), room_id)
if (
@@ -895,7 +908,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
remote_room_hosts,
content,
is_host_in_room,
state_before_join,
latest_event_ids,
)
if remote_join:
if ratelimit:
@@ -1040,7 +1053,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
remote_room_hosts: List[str],
content: JsonDict,
is_host_in_room: bool,
state_before_join: StateMap[str],
latest_event_ids: Collection[str],
) -> Tuple[bool, List[str]]:
"""
Check whether the server should do a remote join (as opposed to a local
@@ -1060,8 +1073,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: The content to use as the event body of the join. This may
be modified.
is_host_in_room: True if the host is in the room.
state_before_join: The state before the join event (i.e. the resolution of
the states after its parent events).
latest_event_ids: The parent events of the join event.
Returns:
A tuple of:
@@ -1079,16 +1091,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# for restricted join rules, a remote join must be used.
room_version = await self.store.get_room_version(room_id)
# Only fetch the state that we need to check if we need to worry about
# restricted join rules.
partial_state_ids = await self.state_handler.compute_state_after_events(
room_id,
latest_event_ids,
StateFilter.from_types(
[(EventTypes.JoinRules, ""), (EventTypes.Member, user_id)]
),
)
# If restricted join rules are not being used, a local join can always
# be used.
if not await self.event_auth_handler.has_restricted_join_rules(
state_before_join, room_version
partial_state_ids, room_version
):
return False, []
# If the user is invited to the room or already joined, the join
# event can always be issued locally.
prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None)
prev_member_event_id = partial_state_ids.get((EventTypes.Member, user_id), None)
prev_member_event = None
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
@@ -1098,15 +1120,22 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
):
return False, []
# Now we need to inspect the full membership, so pull that from the DB.
members_before_join = await self.state_handler.compute_state_after_events(
room_id,
latest_event_ids,
StateFilter.from_types([(EventTypes.Member, None)]),
)
# If the local host has a user who can issue invites, then a local
# join can be done.
#
# If not, generate a new list of remote hosts based on which
# can issue invites.
event_map = await self.store.get_events(state_before_join.values())
event_map = await self.store.get_events(members_before_join.values())
current_state = {
state_key: event_map[event_id]
for state_key, event_id in state_before_join.items()
for state_key, event_id in members_before_join.items()
}
allowed_servers = get_servers_from_users(
get_users_which_can_issue_invite(current_state)
@@ -1120,7 +1149,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Ensure the member should be allowed access via membership in a room.
await self.event_auth_handler.check_restricted_join_rules(
state_before_join, room_version, user_id, prev_member_event
members_before_join, room_version, user_id, prev_member_event
)
# If this is going to be a local join, additional information must
@@ -1130,7 +1159,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
EventContentFields.AUTHORISING_USER
] = await self.event_auth_handler.get_user_which_could_invite(
room_id,
state_before_join,
members_before_join,
)
return False, []
@@ -1236,7 +1265,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
requester = types.create_requester(target_user)
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.GuestAccess, None)])
StateFilter.from_types(
[(EventTypes.GuestAccess, None), (EventTypes.Member, event.state_key)]
)
)
if event.membership == Membership.JOIN:
if requester.is_guest:
@@ -1410,7 +1441,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ShadowBanError if the requester has been shadow-banned.
"""
if self.config.server.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
is_requester_admin = await self.auth.is_server_admin(requester)
if not is_requester_admin:
raise SynapseError(
403, "Invites have been disabled on this server", Codes.FORBIDDEN
@@ -1693,7 +1724,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
check_complexity
and self.hs.config.server.limit_remote_rooms.admins_can_join
):
check_complexity = not await self.auth.is_server_admin(user)
check_complexity = not await self.store.is_server_admin(user)
if check_complexity:
# Fetch the room complexity
+226 -33
View File
@@ -16,9 +16,11 @@ import logging
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Set,
@@ -517,10 +519,17 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `recents`, so partial state is only a problem when a membership
# event turns up in `recents` but has not made it into the current
# state.
current_state_ids_map = (
await self._state_storage_controller.get_current_state_ids(
room_id
)
await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -589,7 +598,13 @@ class SyncHandler:
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `loaded_recents`, so partial state is only a problem when a
# membership event turns up in `loaded_recents` but has not made it
# into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
@@ -637,7 +652,10 @@ class SyncHandler:
)
async def get_state_after_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event
@@ -645,9 +663,14 @@ class SyncHandler:
Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id, state_filter=state_filter or StateFilter.all()
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -670,6 +693,7 @@ class SyncHandler:
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position
@@ -677,6 +701,9 @@ class SyncHandler:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
@@ -688,7 +715,9 @@ class SyncHandler:
if last_event_id:
state = await self.get_state_after_event(
last_event_id, state_filter=state_filter or StateFilter.all()
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
else:
@@ -891,7 +920,15 @@ class SyncHandler:
with Measure(self.clock, "compute_state_delta"):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch = None
members_to_fetch: Optional[Set[str]] = None
# A dictionary mapping user IDs to the first event in the timeline sent by
# them. Only calculated when `lazy_load_members` is on.
first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state: StateMap[str]
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -902,10 +939,23 @@ class SyncHandler:
# We only request state for the members needed to display the
# timeline:
members_to_fetch = {
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
}
timeline_state = {}
members_to_fetch = set()
first_event_by_sender_map = {}
for event in batch.events:
# Build the map from user IDs to the first timeline event they sent.
if event.sender not in first_event_by_sender_map:
first_event_by_sender_map[event.sender] = event
# We need the event's sender, unless their membership was in a
# previous timeline event.
if (EventTypes.Member, event.sender) not in timeline_state:
members_to_fetch.add(event.sender)
# FIXME: we also care about invite targets etc.
if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
@@ -915,16 +965,21 @@ class SyncHandler:
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
# We are happy to use partial state to compute the `/sync` response.
# Since partial state may not include the lazy-loaded memberships we
# require, we fix up the state response afterwards with memberships from
# auth events.
await_full_state = False
else:
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
state_filter = StateFilter.all()
await_full_state = True
# Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous
@@ -936,19 +991,26 @@ class SyncHandler:
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_start = state_at_timeline_end
@@ -964,14 +1026,19 @@ class SyncHandler:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
@@ -993,20 +1060,28 @@ class SyncHandler:
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_ids = _calculate_state(
@@ -1036,8 +1111,23 @@ class SyncHandler:
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events.
if lazy_load_members and await self.store.is_partial_state_room(room_id):
assert members_to_fetch is not None
assert first_event_by_sender_map is not None
additional_state_ids = (
await self._find_missing_partial_state_memberships(
room_id, members_to_fetch, first_event_by_sender_map, state_ids
)
)
state_ids = {**state_ids, **additional_state_ids}
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
# the memberships of all event senders in the timeline. This is because we
# may not have sent the memberships in a previous sync.
@@ -1086,6 +1176,99 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
async def _find_missing_partial_state_memberships(
self,
room_id: str,
members_to_fetch: Collection[str],
events_with_membership_auth: Mapping[str, EventBase],
found_state_ids: StateMap[str],
) -> StateMap[str]:
"""Finds missing memberships from a set of auth events and returns them as a
state map.
Args:
room_id: The partial state room to find the remaining memberships for.
members_to_fetch: The memberships to find.
events_with_membership_auth: A mapping from user IDs to events whose auth
events are known to contain their membership.
found_state_ids: A dict from (type, state_key) -> state_event_id, containing
memberships that have been previously found. Entries in
`members_to_fetch` that have a membership in `found_state_ids` are
ignored.
Returns:
A dict from ("m.room.member", state_key) -> state_event_id, containing the
memberships missing from `found_state_ids`.
Raises:
KeyError: if `events_with_membership_auth` does not have an entry for a
missing membership. Memberships in `found_state_ids` do not need an
entry in `events_with_membership_auth`.
"""
additional_state_ids: MutableStateMap[str] = {}
# Tracks the missing members for logging purposes.
missing_members = set()
# Identify memberships missing from `found_state_ids` and pick out the auth
# events in which to look for them.
auth_event_ids: Set[str] = set()
for member in members_to_fetch:
if (EventTypes.Member, member) in found_state_ids:
continue
missing_members.add(member)
event_with_membership_auth = events_with_membership_auth[member]
auth_event_ids.update(event_with_membership_auth.auth_event_ids())
auth_events = await self.store.get_events(auth_event_ids)
# Run through the missing memberships once more, picking out the memberships
# from the pile of auth events we have just fetched.
for member in members_to_fetch:
if (EventTypes.Member, member) in found_state_ids:
continue
event_with_membership_auth = events_with_membership_auth[member]
# Dig through the auth events to find the desired membership.
for auth_event_id in event_with_membership_auth.auth_event_ids():
# We only store events once we have all their auth events,
# so the auth event must be in the pile we have just
# fetched.
auth_event = auth_events[auth_event_id]
if (
auth_event.type == EventTypes.Member
and auth_event.state_key == member
):
missing_members.remove(member)
additional_state_ids[
(EventTypes.Member, member)
] = auth_event.event_id
break
if missing_members:
# There really shouldn't be any missing memberships now. Either:
# * we couldn't find an auth event, which shouldn't happen because we do
# not persist events with persisting their auth events first, or
# * the set of auth events did not contain a membership we wanted, which
# means our caller didn't compute the events in `members_to_fetch`
# correctly, or we somehow accepted an event whose auth events were
# dodgy.
logger.error(
"Failed to find memberships for %s in partial state room "
"%s in the auth events of %s.",
missing_members,
room_id,
[
events_with_membership_auth[member].event_id
for member in missing_members
],
)
return additional_state_ids
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> NotifCounts:
@@ -1730,7 +1913,11 @@ class SyncHandler:
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
@@ -1756,7 +1943,13 @@ class SyncHandler:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
@@ -2228,10 +2421,10 @@ class SyncHandler:
joined_room.room_id, joined_room.event_pos.stream
)
)
users_in_room = await self.state.get_current_users_in_room(
user_ids_in_room = await self.state.get_current_user_ids_in_room(
joined_room.room_id, extrems
)
if user_id in users_in_room:
if user_id in user_ids_in_room:
joined_room_ids.add(joined_room.room_id)
return frozenset(joined_room_ids)
+4 -6
View File
@@ -253,12 +253,11 @@ class TypingWriterHandler(FollowerTypingHandler):
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
) -> None:
target_user_id = target_user.to_string()
auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id:
if target_user != requester.user:
raise AuthError(400, "Cannot set another user's typing state")
if requester.shadow_banned:
@@ -266,7 +265,7 @@ class TypingWriterHandler(FollowerTypingHandler):
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()
await self.auth.check_user_in_room(room_id, target_user_id)
await self.auth.check_user_in_room(room_id, requester)
logger.debug("%s has started typing in %s", target_user_id, room_id)
@@ -289,12 +288,11 @@ class TypingWriterHandler(FollowerTypingHandler):
self, target_user: UserID, requester: Requester, room_id: str
) -> None:
target_user_id = target_user.to_string()
auth_user_id = requester.user.to_string()
if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this homeserver")
if target_user_id != auth_user_id:
if target_user != requester.user:
raise AuthError(400, "Cannot set another user's typing state")
if requester.shadow_banned:
@@ -302,7 +300,7 @@ class TypingWriterHandler(FollowerTypingHandler):
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()
await self.auth.check_user_in_room(room_id, target_user_id)
await self.auth.check_user_in_room(room_id, requester)
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
+3 -18
View File
@@ -19,7 +19,6 @@ from twisted.web.client import PartialDownloadError
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.util import json_decoder
if TYPE_CHECKING:
@@ -153,7 +152,7 @@ class _BaseThreepidAuthChecker:
logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
# msisdns are currently always ThreepidBehaviour.REMOTE
# msisdns are currently always verified via the IS
if medium == "msisdn":
if not self.hs.config.registration.account_threepid_delegate_msisdn:
raise SynapseError(
@@ -164,18 +163,7 @@ class _BaseThreepidAuthChecker:
threepid_creds,
)
elif medium == "email":
if (
self.hs.config.email.threepid_behaviour_email
== ThreepidBehaviour.REMOTE
):
assert self.hs.config.registration.account_threepid_delegate_email
threepid = await identity_handler.threepid_from_creds(
self.hs.config.registration.account_threepid_delegate_email,
threepid_creds,
)
elif (
self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
):
if self.hs.config.email.can_verify_email:
threepid = None
row = await self.store.get_threepid_validation_session(
medium,
@@ -227,10 +215,7 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
_BaseThreepidAuthChecker.__init__(self, hs)
def is_enabled(self) -> bool:
return self.hs.config.email.threepid_behaviour_email in (
ThreepidBehaviour.REMOTE,
ThreepidBehaviour.LOCAL,
)
return self.hs.config.email.can_verify_email
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return await self._check_threepid("email", authdict)
+1 -1
View File
@@ -226,7 +226,7 @@ class SynapseRequest(Request):
# If this is a request where the target user doesn't match the user who
# authenticated (e.g. and admin is puppetting a user) then we return both.
if self._requester.user.to_string() != authenticated_entity:
if requester != authenticated_entity:
return requester, authenticated_entity
return requester, None
@@ -31,6 +31,5 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
+5 -5
View File
@@ -19,7 +19,7 @@ from typing import Iterable, Pattern
from synapse.api.auth import Auth
from synapse.api.errors import AuthError
from synapse.http.site import SynapseRequest
from synapse.types import UserID
from synapse.types import Requester
def admin_patterns(path_regex: str, version: str = "v1") -> Iterable[Pattern]:
@@ -48,19 +48,19 @@ async def assert_requester_is_admin(auth: Auth, request: SynapseRequest) -> None
AuthError if the requester is not a server admin
"""
requester = await auth.get_user_by_req(request)
await assert_user_is_admin(auth, requester.user)
await assert_user_is_admin(auth, requester)
async def assert_user_is_admin(auth: Auth, user_id: UserID) -> None:
async def assert_user_is_admin(auth: Auth, requester: Requester) -> None:
"""Verify that the given user is an admin user
Args:
auth: Auth singleton
user_id: user to check
requester: The user making the request, according to the access token.
Raises:
AuthError if the user is not a server admin
"""
is_admin = await auth.is_server_admin(user_id)
is_admin = await auth.is_server_admin(requester)
if not is_admin:
raise AuthError(HTTPStatus.FORBIDDEN, "You are not a server admin")
+3 -3
View File
@@ -54,7 +54,7 @@ class QuarantineMediaInRoom(RestServlet):
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining room: %s", room_id)
@@ -81,7 +81,7 @@ class QuarantineMediaByUser(RestServlet):
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining media by user: %s", user_id)
@@ -110,7 +110,7 @@ class QuarantineMediaByID(RestServlet):
self, request: SynapseRequest, server_name: str, media_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining media by ID: %s/%s", server_name, media_id)
+6 -6
View File
@@ -75,7 +75,7 @@ class RoomRestV2Servlet(RestServlet):
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
await assert_user_is_admin(self._auth, requester)
content = parse_json_object_from_request(request)
@@ -327,7 +327,7 @@ class RoomRestServlet(RestServlet):
pagination_handler: "PaginationHandler",
) -> Tuple[int, JsonDict]:
requester = await auth.get_user_by_req(request)
await assert_user_is_admin(auth, requester.user)
await assert_user_is_admin(auth, requester)
content = parse_json_object_from_request(request)
@@ -461,7 +461,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
assert request.args is not None
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
content = parse_json_object_from_request(request)
@@ -551,7 +551,7 @@ class MakeRoomAdminRestServlet(ResolveRoomIdMixin, RestServlet):
self, request: SynapseRequest, room_identifier: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
content = parse_json_object_from_request(request, allow_empty_body=True)
room_id, _ = await self.resolve_room_id(room_identifier)
@@ -742,7 +742,7 @@ class RoomEventContextServlet(RestServlet):
self, request: SynapseRequest, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=False)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
limit = parse_integer(request, "limit", default=10)
@@ -834,7 +834,7 @@ class BlockRoomRestServlet(RestServlet):
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)
await assert_user_is_admin(self._auth, requester)
content = parse_json_object_from_request(request)
+7 -8
View File
@@ -183,7 +183,7 @@ class UserRestServletV2(RestServlet):
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
target_user = UserID.from_string(user_id)
body = parse_json_object_from_request(request)
@@ -575,10 +575,9 @@ class WhoisRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
target_user = UserID.from_string(user_id)
requester = await self.auth.get_user_by_req(request)
auth_user = requester.user
if target_user != auth_user:
await assert_user_is_admin(self.auth, auth_user)
if target_user != requester.user:
await assert_user_is_admin(self.auth, requester)
if not self.is_mine(target_user):
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only whois a local user")
@@ -601,7 +600,7 @@ class DeactivateAccountRestServlet(RestServlet):
self, request: SynapseRequest, target_user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
if not self.is_mine(UserID.from_string(target_user_id)):
raise SynapseError(
@@ -693,7 +692,7 @@ class ResetPasswordRestServlet(RestServlet):
This needs user to have administrator access in Synapse.
"""
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
UserID.from_string(target_user_id)
@@ -807,7 +806,7 @@ class UserAdminServlet(RestServlet):
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
auth_user = requester.user
target_user = UserID.from_string(user_id)
@@ -921,7 +920,7 @@ class UserTokenRestServlet(RestServlet):
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
await assert_user_is_admin(self.auth, requester)
auth_user = requester.user
if not self.is_mine_id(user_id):
+38 -76
View File
@@ -29,7 +29,6 @@ from synapse.api.errors import (
SynapseError,
ThreepidValidationError,
)
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http.server import HttpServer, finish_request, respond_with_html
from synapse.http.servlet import (
@@ -68,7 +67,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
self.config = hs.config
self.identity_handler = hs.get_identity_handler()
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.config.email.can_verify_email:
self.mailer = Mailer(
hs=self.hs,
app_name=self.config.email.email_app_name,
@@ -77,11 +76,10 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
logger.warning(
"User password resets have been disabled due to lack of email config"
)
if not self.config.email.can_verify_email:
logger.warning(
"User password resets have been disabled due to lack of email config"
)
raise SynapseError(
400, "Email-based password resets have been disabled on this server"
)
@@ -117,35 +115,20 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
assert self.hs.config.registration.account_threepid_delegate_email
# Have the configured identity server handle the request
ret = await self.identity_handler.request_email_token(
self.hs.config.registration.account_threepid_delegate_email,
body.email,
body.client_secret,
body.send_attempt,
body.next_link,
)
else:
# Send password reset emails from Synapse
sid = await self.identity_handler.send_threepid_validation(
body.email,
body.client_secret,
body.send_attempt,
self.mailer.send_password_reset_mail,
body.next_link,
)
# Wrap the session id in a JSON object
ret = {"sid": sid}
# Send password reset emails from Synapse
sid = await self.identity_handler.send_threepid_validation(
body.email,
body.client_secret,
body.send_attempt,
self.mailer.send_password_reset_mail,
body.next_link,
)
threepid_send_requests.labels(type="email", reason="password_reset").observe(
body.send_attempt
)
return 200, ret
# Wrap the session id in a JSON object
return 200, {"sid": sid}
class PasswordRestServlet(RestServlet):
@@ -196,7 +179,7 @@ class PasswordRestServlet(RestServlet):
params, session_id = await self.auth_handler.validate_user_via_ui_auth(
requester,
request,
body.dict(),
body.dict(exclude_unset=True),
"modify your account password",
)
except InteractiveAuthIncompleteError as e:
@@ -219,7 +202,7 @@ class PasswordRestServlet(RestServlet):
result, params, session_id = await self.auth_handler.check_ui_auth(
[[LoginType.EMAIL_IDENTITY]],
request,
body.dict(),
body.dict(exclude_unset=True),
"modify your account password",
)
except InteractiveAuthIncompleteError as e:
@@ -316,7 +299,7 @@ class DeactivateAccountRestServlet(RestServlet):
await self.auth_handler.validate_user_via_ui_auth(
requester,
request,
body.dict(),
body.dict(exclude_unset=True),
"deactivate your account",
)
result = await self._deactivate_account_handler.deactivate_account(
@@ -340,7 +323,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
self.identity_handler = hs.get_identity_handler()
self.store = self.hs.get_datastores().main
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.config.email.can_verify_email:
self.mailer = Mailer(
hs=self.hs,
app_name=self.config.email.email_app_name,
@@ -349,11 +332,10 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
logger.warning(
"Adding emails have been disabled due to lack of an email config"
)
if not self.config.email.can_verify_email:
logger.warning(
"Adding emails have been disabled due to lack of an email config"
)
raise SynapseError(
400,
"Adding an email to your account is disabled on this server",
@@ -391,35 +373,21 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
assert self.hs.config.registration.account_threepid_delegate_email
# Have the configured identity server handle the request
ret = await self.identity_handler.request_email_token(
self.hs.config.registration.account_threepid_delegate_email,
body.email,
body.client_secret,
body.send_attempt,
body.next_link,
)
else:
# Send threepid validation emails from Synapse
sid = await self.identity_handler.send_threepid_validation(
body.email,
body.client_secret,
body.send_attempt,
self.mailer.send_add_threepid_mail,
body.next_link,
)
# Wrap the session id in a JSON object
ret = {"sid": sid}
# Send threepid validation emails from Synapse
sid = await self.identity_handler.send_threepid_validation(
body.email,
body.client_secret,
body.send_attempt,
self.mailer.send_add_threepid_mail,
body.next_link,
)
threepid_send_requests.labels(type="email", reason="add_threepid").observe(
body.send_attempt
)
return 200, ret
# Wrap the session id in a JSON object
return 200, {"sid": sid}
class MsisdnThreepidRequestTokenRestServlet(RestServlet):
@@ -512,25 +480,19 @@ class AddThreepidEmailSubmitTokenServlet(RestServlet):
self.config = hs.config
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.config.email.can_verify_email:
self._failure_email_template = (
self.config.email.email_add_threepid_template_failure_html
)
async def on_GET(self, request: Request) -> None:
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
logger.warning(
"Adding emails have been disabled due to lack of an email config"
)
if not self.config.email.can_verify_email:
logger.warning(
"Adding emails have been disabled due to lack of an email config"
)
raise SynapseError(
400, "Adding an email to your account is disabled on this server"
)
elif self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
raise SynapseError(
400,
"This homeserver is not validating threepids.",
)
sid = parse_string(request, "sid", required=True)
token = parse_string(request, "token", required=True)
+27
View File
@@ -42,12 +42,26 @@ class DevicesRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
devices = await self.device_handler.get_devices_by_user(
requester.user.to_string()
)
# If MSC3852 is disabled, then the "last_seen_user_agent" field will be
# removed from each device. If it is enabled, then the field name will
# be replaced by the unstable identifier.
#
# When MSC3852 is accepted, this block of code can just be removed to
# expose "last_seen_user_agent" to clients.
for device in devices:
last_seen_user_agent = device["last_seen_user_agent"]
del device["last_seen_user_agent"]
if self._msc3852_enabled:
device["org.matrix.msc3852.last_seen_user_agent"] = last_seen_user_agent
return 200, {"devices": devices}
@@ -108,6 +122,7 @@ class DeviceRestServlet(RestServlet):
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
self.auth_handler = hs.get_auth_handler()
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
async def on_GET(
self, request: SynapseRequest, device_id: str
@@ -118,6 +133,18 @@ class DeviceRestServlet(RestServlet):
)
if device is None:
raise NotFoundError("No device found")
# If MSC3852 is disabled, then the "last_seen_user_agent" field will be
# removed from each device. If it is enabled, then the field name will
# be replaced by the unstable identifier.
#
# When MSC3852 is accepted, this block of code can just be removed to
# expose "last_seen_user_agent" to clients.
last_seen_user_agent = device["last_seen_user_agent"]
del device["last_seen_user_agent"]
if self._msc3852_enabled:
device["org.matrix.msc3852.last_seen_user_agent"] = last_seen_user_agent
return 200, device
@interactive_auth_handler
+1 -2
View File
@@ -26,7 +26,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
from synapse.logging.opentracing import log_kv, set_tag
from synapse.types import JsonDict, StreamToken
from ._base import client_patterns, interactive_auth_handler
@@ -71,7 +71,6 @@ class KeyUploadServlet(RestServlet):
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.device_handler = hs.get_device_handler()
@trace_with_opname("upload_keys")
async def on_POST(
self, request: SynapseRequest, device_id: Optional[str]
) -> Tuple[int, JsonDict]:
+2 -2
View File
@@ -66,7 +66,7 @@ class ProfileDisplaynameRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = UserID.from_string(user_id)
is_admin = await self.auth.is_server_admin(requester.user)
is_admin = await self.auth.is_server_admin(requester)
content = parse_json_object_from_request(request)
@@ -123,7 +123,7 @@ class ProfileAvatarURLRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
user = UserID.from_string(user_id)
is_admin = await self.auth.is_server_admin(requester.user)
is_admin = await self.auth.is_server_admin(requester)
content = parse_json_object_from_request(request)
try:
+20 -42
View File
@@ -31,7 +31,6 @@ from synapse.api.errors import (
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.config import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
from synapse.config.ratelimiting import FederationRatelimitSettings
from synapse.config.server import is_threepid_reserved
@@ -74,7 +73,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
self.identity_handler = hs.get_identity_handler()
self.config = hs.config
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.hs.config.email.can_verify_email:
self.mailer = Mailer(
hs=self.hs,
app_name=self.config.email.email_app_name,
@@ -83,13 +82,10 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
if (
self.hs.config.email.local_threepid_handling_disabled_due_to_email_config
):
logger.warning(
"Email registration has been disabled due to lack of email config"
)
if not self.hs.config.email.can_verify_email:
logger.warning(
"Email registration has been disabled due to lack of email config"
)
raise SynapseError(
400, "Email-based registration has been disabled on this server"
)
@@ -138,35 +134,21 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
assert self.hs.config.registration.account_threepid_delegate_email
# Have the configured identity server handle the request
ret = await self.identity_handler.request_email_token(
self.hs.config.registration.account_threepid_delegate_email,
email,
client_secret,
send_attempt,
next_link,
)
else:
# Send registration emails from Synapse,
# wrapping the session id in a JSON object.
ret = {
"sid": await self.identity_handler.send_threepid_validation(
email,
client_secret,
send_attempt,
self.mailer.send_registration_mail,
next_link,
)
}
# Send registration emails from Synapse
sid = await self.identity_handler.send_threepid_validation(
email,
client_secret,
send_attempt,
self.mailer.send_registration_mail,
next_link,
)
threepid_send_requests.labels(type="email", reason="register").observe(
send_attempt
)
return 200, ret
# Wrap the session id in a JSON object
return 200, {"sid": sid}
class MsisdnRegisterRequestTokenRestServlet(RestServlet):
@@ -260,7 +242,7 @@ class RegistrationSubmitTokenServlet(RestServlet):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
if self.config.email.can_verify_email:
self._failure_email_template = (
self.config.email.email_registration_template_failure_html
)
@@ -270,11 +252,10 @@ class RegistrationSubmitTokenServlet(RestServlet):
raise SynapseError(
400, "This medium is currently not supported for registration"
)
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
logger.warning(
"User registration via email has been disabled due to lack of email config"
)
if not self.config.email.can_verify_email:
logger.warning(
"User registration via email has been disabled due to lack of email config"
)
raise SynapseError(
400, "Email-based registration is disabled on this server"
)
@@ -484,9 +465,6 @@ class RegisterRestServlet(RestServlet):
"Appservice token must be provided when using a type of m.login.application_service",
)
# Verify the AS
self.auth.get_appservice_by_req(request)
# Set the desired user according to the AS API (which uses the
# 'user' key not 'username'). Since this is a new addition, we'll
# fallback to 'username' if they gave one.
+62 -10
View File
@@ -16,6 +16,7 @@
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
import logging
import re
from enum import Enum
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple
from urllib import parse as urlparse
@@ -48,6 +49,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
@@ -62,6 +64,33 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class _RoomSize(Enum):
"""
Enum to differentiate sizes of rooms. This is a pretty good approximation
about how hard it will be to get events in the room. We could also look at
room "complexity".
"""
# This doesn't necessarily mean the room is a DM, just that there is a DM
# amount of people there.
DM_SIZE = "direct_message_size"
SMALL = "small"
SUBSTANTIAL = "substantial"
LARGE = "large"
@staticmethod
def from_member_count(member_count: int) -> "_RoomSize":
if member_count <= 2:
return _RoomSize.DM_SIZE
elif member_count < 100:
return _RoomSize.SMALL
elif member_count < 1000:
return _RoomSize.SUBSTANTIAL
else:
return _RoomSize.LARGE
# This is an extra metric on top of `synapse_http_server_response_time_seconds`
# which times the same sort of thing but this one allows us to see values
# greater than 10s. We use a separate dedicated histogram with its own buckets
@@ -70,7 +99,11 @@ logger = logging.getLogger(__name__)
messsages_response_timer = Histogram(
"synapse_room_message_list_rest_servlet_response_time_seconds",
"sec",
[],
# We have a label for room size so we can try to see a more realistic
# picture of /messages response time for bigger rooms. We don't want the
# tiny rooms that can always respond fast skewing our results when we're trying
# to optimize the bigger cases.
["room_size"],
buckets=(
0.005,
0.01,
@@ -83,9 +116,13 @@ messsages_response_timer = Histogram(
2.5,
5.0,
10.0,
20.0,
30.0,
60.0,
80.0,
100.0,
120.0,
150.0,
180.0,
"+Inf",
),
@@ -196,7 +233,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
msg_handler = self.message_handler
data = await msg_handler.get_room_data(
user_id=requester.user.to_string(),
requester=requester,
room_id=room_id,
event_type=event_type,
state_key=state_key,
@@ -541,7 +578,7 @@ class RoomMemberListRestServlet(RestServlet):
events = await handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
requester=requester,
at_token=at_token,
state_filter=StateFilter.from_types([(EventTypes.Member, None)]),
)
@@ -587,14 +624,26 @@ class RoomMessageListRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.clock = hs.get_clock()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
@messsages_response_timer.time()
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
processing_start_time = self.clock.time_msec()
# Fire off and hope that we get a result by the end.
#
# We're using the mypy type ignore comment because the `@cached`
# decorator on `get_number_joined_users_in_room` doesn't play well with
# the type system. Maybe in the future, it can use some ParamSpec
# wizardry to fix it up.
room_member_count_deferred = run_in_background( # type: ignore[call-arg]
self.store.get_number_joined_users_in_room,
room_id, # type: ignore[arg-type]
)
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
@@ -625,6 +674,12 @@ class RoomMessageListRestServlet(RestServlet):
event_filter=event_filter,
)
processing_end_time = self.clock.time_msec()
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
messsages_response_timer.labels(
room_size=_RoomSize.from_member_count(room_member_count)
).observe((processing_end_time - processing_start_time) / 1000)
return 200, msgs
@@ -645,8 +700,7 @@ class RoomStateRestServlet(RestServlet):
# Get all the current state for this room
events = await self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
requester=requester,
)
return 200, events
@@ -704,7 +758,7 @@ class RoomEventServlet(RestServlet):
== "true"
)
if include_unredacted_content and not await self.auth.is_server_admin(
requester.user
requester
):
power_level_event = (
await self._storage_controllers.state.get_current_state_event(
@@ -1209,9 +1263,7 @@ class TimestampLookupRestServlet(RestServlet):
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await self._auth.check_user_in_room_or_world_readable(
room_id, requester.user.to_string()
)
await self._auth.check_user_in_room_or_world_readable(room_id, requester)
timestamp = parse_integer(request, "ts", required=True)
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
+1 -2
View File
@@ -19,7 +19,7 @@ from synapse.http import servlet
from synapse.http.server import HttpServer
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import set_tag, trace_with_opname
from synapse.logging.opentracing import set_tag
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict
@@ -43,7 +43,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
self.txns = HttpTransactionCache(hs)
self.device_message_handler = hs.get_device_message_handler()
@trace_with_opname("sendToDevice")
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
@@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, Tuple
from twisted.web.server import Request
from synapse.api.errors import ThreepidValidationError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.http.server import DirectServeHtmlResource
from synapse.http.servlet import parse_string
from synapse.util.stringutils import assert_valid_client_secret
@@ -46,9 +45,6 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._local_threepid_handling_disabled_due_to_email_config = (
hs.config.email.local_threepid_handling_disabled_due_to_email_config
)
self._confirmation_email_template = (
hs.config.email.email_password_reset_template_confirmation_html
)
@@ -59,8 +55,8 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
hs.config.email.email_password_reset_template_failure_html
)
# This resource should not be mounted if threepid behaviour is not LOCAL
assert hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
# This resource should only be mounted if email validation is enabled
assert hs.config.email.can_verify_email
async def _async_render_GET(self, request: Request) -> Tuple[int, bytes]:
sid = parse_string(request, "sid", required=True)
@@ -244,7 +244,7 @@ class ServerNoticesManager:
assert self.server_notices_mxid is not None
notice_user_data_in_room = await self._message_handler.get_room_data(
self.server_notices_mxid,
create_requester(self.server_notices_mxid),
room_id,
EventTypes.Member,
self.server_notices_mxid,
+6 -7
View File
@@ -44,7 +44,6 @@ from synapse.logging.context import ContextResourceUsage
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.types import StateMap
from synapse.util.async_helpers import Linearizer
@@ -210,11 +209,11 @@ class StateHandler:
ret = await self.resolve_state_groups_for_events(room_id, event_ids)
return await ret.get_state(self._state_storage_controller, state_filter)
async def get_current_users_in_room(
async def get_current_user_ids_in_room(
self, room_id: str, latest_event_ids: List[str]
) -> Dict[str, ProfileInfo]:
) -> Set[str]:
"""
Get the users who are currently in a room.
Get the users IDs who are currently in a room.
Note: This is much slower than using the equivalent method
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
@@ -225,15 +224,15 @@ class StateHandler:
room_id: The ID of the room.
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
Returns:
Dictionary of user IDs to their profileinfo.
Set of user IDs in the room.
"""
assert latest_event_ids is not None
logger.debug("calling resolve_state_groups from get_current_users_in_room")
logger.debug("calling resolve_state_groups from get_current_user_ids_in_room")
entry = await self.resolve_state_groups_for_events(room_id, latest_event_ids)
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
return await self.store.get_joined_users_from_state(room_id, state, entry)
return await self.store.get_joined_user_ids_from_state(room_id, state, entry)
async def get_hosts_in_room_at_events(
self, room_id: str, event_ids: Collection[str]
+20 -4
View File
@@ -234,6 +234,7 @@ class StateStorageController:
self,
event_ids: Collection[str],
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
@@ -242,6 +243,9 @@ class StateStorageController:
Args:
event_ids: events whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at these events and `state_filter` is not satisfied by partial state.
Defaults to `True`.
Returns:
A dict from event_id -> (type, state_key) -> event_id
@@ -250,8 +254,12 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
await_full_state = True
if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
if (
await_full_state
and state_filter
and not state_filter.must_await_full_state(self._is_mine_id)
):
# Full state is not required if the state filter is restrictive enough.
await_full_state = False
event_to_groups = await self.get_state_group_for_events(
@@ -294,7 +302,10 @@ class StateStorageController:
@trace
async def get_state_ids_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the state dict corresponding to a particular event
@@ -302,6 +313,9 @@ class StateStorageController:
Args:
event_id: event whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
Returns:
A dict from (type, state_key) -> state_event_id
@@ -311,7 +325,9 @@ class StateStorageController:
outlier or is unknown)
"""
state_map = await self.get_state_ids_for_events(
[event_id], state_filter or StateFilter.all()
[event_id],
state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
return state_map[event_id]
@@ -650,9 +650,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
txn, self.get_account_data_for_room, (user_id,)
)
self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,))
self._invalidate_cache_and_stream(
txn, self.get_push_rules_enabled_for_user, (user_id,)
)
# This user might be contained in the ignored_by cache for other users,
# so we have to invalidate it all.
self._invalidate_all_cache_and_stream(txn, self.ignored_by)
+1 -5
View File
@@ -165,7 +165,6 @@ class PushRulesWorkerStore(
return _load_rules(rows, enabled_map, self.hs.config.experimental)
@cached(max_entries=5000)
async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]:
results = await self.db_pool.simple_select_list(
table="push_rules_enable",
@@ -229,9 +228,6 @@ class PushRulesWorkerStore(
return results
@cachedList(
cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids"
)
async def bulk_get_push_rules_enabled(
self, user_ids: Collection[str]
) -> Dict[str, Dict[str, bool]]:
@@ -246,6 +242,7 @@ class PushRulesWorkerStore(
iterable=user_ids,
retcols=("user_name", "rule_id", "enabled"),
desc="bulk_get_push_rules_enabled",
batch_size=1000,
)
for row in rows:
enabled = bool(row["enabled"])
@@ -792,7 +789,6 @@ class PushRuleStore(PushRulesWorkerStore):
self.db_pool.simple_insert_txn(txn, "push_rules_stream", values=values)
txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,))
txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,))
txn.call_after(
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
)
@@ -69,9 +69,9 @@ class TokenLookupResult:
"""
user_id: str
token_id: int
is_guest: bool = False
shadow_banned: bool = False
token_id: Optional[int] = None
device_id: Optional[str] = None
valid_until_ms: Optional[int] = None
token_owner: str = attr.ib()
+33 -40
View File
@@ -835,9 +835,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return shared_room_ids or frozenset()
async def get_joined_users_from_state(
async def get_joined_user_ids_from_state(
self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
) -> Dict[str, ProfileInfo]:
) -> Set[str]:
state_group: Union[object, int] = state_entry.state_group
if not state_group:
# If state_group is None it means it has yet to be assigned a
@@ -848,25 +848,25 @@ class RoomMemberWorkerStore(EventsWorkerStore):
assert state_group is not None
with Measure(self._clock, "get_joined_users_from_state"):
return await self._get_joined_users_from_context(
return await self._get_joined_user_ids_from_context(
room_id, state_group, state, context=state_entry
)
@cached(num_args=2, iterable=True, max_entries=100000)
async def _get_joined_users_from_context(
async def _get_joined_user_ids_from_context(
self,
room_id: str,
state_group: Union[object, int],
current_state_ids: StateMap[str],
event: Optional[EventBase] = None,
context: Optional["_StateCacheEntry"] = None,
) -> Dict[str, ProfileInfo]:
) -> Set[str]:
# We don't use `state_group`, it's there so that we can cache based
# on it. However, it's important that it's never None, since two current_states
# with a state_group of None are likely to be different.
assert state_group is not None
users_in_room = {}
users_in_room = set()
member_event_ids = [
e_id
for key, e_id in current_state_ids.items()
@@ -879,11 +879,11 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# If we do then we can reuse that result and simply update it with
# any membership changes in `delta_ids`
if context.prev_group and context.delta_ids:
prev_res = self._get_joined_users_from_context.cache.get_immediate(
prev_res = self._get_joined_user_ids_from_context.cache.get_immediate(
(room_id, context.prev_group), None
)
if prev_res and isinstance(prev_res, dict):
users_in_room = dict(prev_res)
if prev_res and isinstance(prev_res, set):
users_in_room = prev_res
member_event_ids = [
e_id
for key, e_id in context.delta_ids.items()
@@ -891,7 +891,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
]
for etype, state_key in context.delta_ids:
if etype == EventTypes.Member:
users_in_room.pop(state_key, None)
users_in_room.discard(state_key)
# We check if we have any of the member event ids in the event cache
# before we ask the DB
@@ -908,71 +908,64 @@ class RoomMemberWorkerStore(EventsWorkerStore):
ev_entry = event_map.get(event_id)
if ev_entry and not ev_entry.event.rejected_reason:
if ev_entry.event.membership == Membership.JOIN:
users_in_room[ev_entry.event.state_key] = ProfileInfo(
display_name=ev_entry.event.content.get("displayname", None),
avatar_url=ev_entry.event.content.get("avatar_url", None),
)
users_in_room.add(ev_entry.event.state_key)
else:
missing_member_event_ids.append(event_id)
if missing_member_event_ids:
event_to_memberships = await self._get_joined_profiles_from_event_ids(
event_to_memberships = await self._get_user_ids_from_membership_event_ids(
missing_member_event_ids
)
users_in_room.update(row for row in event_to_memberships.values() if row)
users_in_room.update(
user_id for user_id in event_to_memberships.values() if user_id
)
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
if event.event_id in member_event_ids:
users_in_room[event.state_key] = ProfileInfo(
display_name=event.content.get("displayname", None),
avatar_url=event.content.get("avatar_url", None),
)
users_in_room.add(event.state_key)
return users_in_room
@cached(max_entries=10000)
def _get_joined_profile_from_event_id(
@cached(
max_entries=10000,
# This name matches the old function that has been replaced - the cache name
# is kept here to maintain backwards compatibility.
name="_get_joined_profile_from_event_id",
)
def _get_user_id_from_membership_event_id(
self, event_id: str
) -> Optional[Tuple[str, ProfileInfo]]:
raise NotImplementedError()
@cachedList(
cached_method_name="_get_joined_profile_from_event_id",
cached_method_name="_get_user_id_from_membership_event_id",
list_name="event_ids",
)
async def _get_joined_profiles_from_event_ids(
async def _get_user_ids_from_membership_event_ids(
self, event_ids: Iterable[str]
) -> Dict[str, Optional[Tuple[str, ProfileInfo]]]:
) -> Dict[str, Optional[str]]:
"""For given set of member event_ids check if they point to a join
event and if so return the associated user and profile info.
event.
Args:
event_ids: The member event IDs to lookup
Returns:
Map from event ID to `user_id` and ProfileInfo (or None if not join event).
Map from event ID to `user_id`, or None if event is not a join.
"""
rows = await self.db_pool.simple_select_many_batch(
table="room_memberships",
column="event_id",
iterable=event_ids,
retcols=("user_id", "display_name", "avatar_url", "event_id"),
retcols=("user_id", "event_id"),
keyvalues={"membership": Membership.JOIN},
batch_size=1000,
desc="_get_joined_profiles_from_event_ids",
desc="_get_user_ids_from_membership_event_ids",
)
return {
row["event_id"]: (
row["user_id"],
ProfileInfo(
avatar_url=row["avatar_url"], display_name=row["display_name"]
),
)
for row in rows
}
return {row["event_id"]: row["user_id"] for row in rows}
@cached(max_entries=10000)
async def is_host_joined(self, room_id: str, host: str) -> bool:
@@ -1131,12 +1124,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
else:
# The cache doesn't match the state group or prev state group,
# so we calculate the result from first principles.
joined_users = await self.get_joined_users_from_state(
joined_user_ids = await self.get_joined_user_ids_from_state(
room_id, state, state_entry
)
cache.hosts_to_joined_users = {}
for user_id in joined_users:
for user_id in joined_user_ids:
host = intern_string(get_domain_from_id(user_id))
cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+255 -91
View File
@@ -14,15 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import enum
import threading
from typing import (
Callable,
Collection,
Dict,
Generic,
Iterable,
MutableMapping,
Optional,
Set,
Sized,
Tuple,
TypeVar,
Union,
cast,
@@ -31,7 +35,6 @@ from typing import (
from prometheus_client import Gauge
from twisted.internet import defer
from twisted.python import failure
from twisted.python.failure import Failure
from synapse.util.async_helpers import ObservableDeferred
@@ -94,7 +97,7 @@ class DeferredCache(Generic[KT, VT]):
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache: Union[
TreeCache, "MutableMapping[KT, CacheEntry]"
TreeCache, "MutableMapping[KT, CacheEntry[KT, VT]]"
] = cache_type()
def metrics_cb() -> None:
@@ -159,15 +162,16 @@ class DeferredCache(Generic[KT, VT]):
Raises:
KeyError if the key is not found in the cache
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
if val is not _Sentinel.sentinel:
val.callbacks.update(callbacks)
val.add_invalidation_callback(key, callback)
if update_metrics:
m = self.cache.metrics
assert m # we always have a name, so should always have metrics
m.inc_hits()
return val.deferred.observe()
return val.deferred(key)
callbacks = (callback,) if callback else ()
val2 = self.cache.get(
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
@@ -177,6 +181,73 @@ class DeferredCache(Generic[KT, VT]):
else:
return defer.succeed(val2)
def get_bulk(
self,
keys: Collection[KT],
callback: Optional[Callable[[], None]] = None,
) -> Tuple[Dict[KT, VT], Optional["defer.Deferred[Dict[KT, VT]]"], Collection[KT]]:
"""Bulk lookup of items in the cache.
Returns:
A 3-tuple of:
1. a dict of key/value of items already cached;
2. a deferred that resolves to a dict of key/value of items
we're already fetching; and
3. a collection of keys that don't appear in the previous two.
"""
# The cached results
cached = {}
# List of pending deferreds
pending = []
# Dict that gets filled out when the pending deferreds complete
pending_results = {}
# List of keys that aren't in either cache
missing = []
callbacks = (callback,) if callback else ()
for key in keys:
# Check if its in the main cache.
immediate_value = self.cache.get(
key,
_Sentinel.sentinel,
callbacks=callbacks,
)
if immediate_value is not _Sentinel.sentinel:
cached[key] = immediate_value
continue
# Check if its in the pending cache
pending_value = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
if pending_value is not _Sentinel.sentinel:
pending_value.add_invalidation_callback(key, callback)
def completed_cb(value: VT, key: KT) -> VT:
pending_results[key] = value
return value
# Add a callback to fill out `pending_results` when that completes
d = pending_value.deferred(key).addCallback(completed_cb, key)
pending.append(d)
continue
# Not in either cache
missing.append(key)
# If we've got pending deferreds, squash them into a single one that
# returns `pending_results`.
pending_deferred = None
if pending:
pending_deferred = defer.gatherResults(
pending, consumeErrors=True
).addCallback(lambda _: pending_results)
return (cached, pending_deferred, missing)
def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
) -> Union[VT, T]:
@@ -218,84 +289,89 @@ class DeferredCache(Generic[KT, VT]):
value: a deferred which will complete with a result to add to the cache
callback: An optional callback to be called when the entry is invalidated
"""
if not isinstance(value, defer.Deferred):
raise TypeError("not a Deferred")
callbacks = [callback] if callback else []
self.check_thread()
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry:
existing_entry.invalidate()
self._pending_deferred_cache.pop(key, None)
# XXX: why don't we invalidate the entry in `self.cache` yet?
# we can save a whole load of effort if the deferred is ready.
if value.called:
result = value.result
if not isinstance(result, failure.Failure):
self.cache.set(key, cast(VT, result), callbacks)
return value
# otherwise, we'll add an entry to the _pending_deferred_cache for now,
# and add callbacks to add it to the cache properly later.
observable = ObservableDeferred(value, consumeErrors=True)
observer = observable.observe()
entry = CacheEntry(deferred=observable, callbacks=callbacks)
entry = CacheEntrySingle[KT, VT](value)
entry.add_invalidation_callback(key, callback)
self._pending_deferred_cache[key] = entry
def compare_and_pop() -> bool:
"""Check if our entry is still the one in _pending_deferred_cache, and
if so, pop it.
Returns true if the entries matched.
"""
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
return True
# oops, the _pending_deferred_cache has been updated since
# we started our query, so we are out of date.
#
# Better put back whatever we took out. (We do it this way
# round, rather than peeking into the _pending_deferred_cache
# and then removing on a match, to make the common case faster)
if existing_entry is not None:
self._pending_deferred_cache[key] = existing_entry
return False
def cb(result: VT) -> None:
if compare_and_pop():
self.cache.set(key, result, entry.callbacks)
else:
# we're not going to put this entry into the cache, so need
# to make sure that the invalidation callbacks are called.
# That was probably done when _pending_deferred_cache was
# updated, but it's possible that `set` was called without
# `invalidate` being previously called, in which case it may
# not have been. Either way, let's double-check now.
entry.invalidate()
def eb(_fail: Failure) -> None:
compare_and_pop()
entry.invalidate()
# once the deferred completes, we can move the entry from the
# _pending_deferred_cache to the real cache.
#
observer.addCallbacks(cb, eb)
deferred = entry.deferred(key).addCallbacks(
self._completed_callback,
self._error_callback,
callbackArgs=(entry, key),
errbackArgs=(entry, key),
)
# we return a new Deferred which will be called before any subsequent observers.
return observable.observe()
return deferred
def start_bulk_input(
self,
keys: Collection[KT],
callback: Optional[Callable[[], None]] = None,
) -> "CacheMultipleEntries[KT, VT]":
"""Bulk set API for use when fetching multiple keys at once from the DB.
Called *before* starting the fetch from the DB, and the caller *must*
call either `complete_bulk(..)` or `error_bulk(..)` on the return value.
"""
entry = CacheMultipleEntries[KT, VT]()
entry.add_global_invalidation_callback(callback)
for key in keys:
self._pending_deferred_cache[key] = entry
return entry
def _completed_callback(
self, value: VT, entry: "CacheEntry[KT, VT]", key: KT
) -> VT:
"""Called when a deferred is completed."""
# We check if the current entry matches the entry associated with the
# deferred. If they don't match then it got invalidated.
current_entry = self._pending_deferred_cache.pop(key, None)
if current_entry is not entry:
if current_entry:
self._pending_deferred_cache[key] = current_entry
return value
self.cache.set(key, value, entry.get_invalidation_callbacks(key))
return value
def _error_callback(
self,
failure: Failure,
entry: "CacheEntry[KT, VT]",
key: KT,
) -> Failure:
"""Called when a deferred errors."""
# We check if the current entry matches the entry associated with the
# deferred. If they don't match then it got invalidated.
current_entry = self._pending_deferred_cache.pop(key, None)
if current_entry is not entry:
if current_entry:
self._pending_deferred_cache[key] = current_entry
return failure
for cb in entry.get_invalidation_callbacks(key):
cb()
return failure
def prefill(
self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None
) -> None:
callbacks = [callback] if callback else []
callbacks = (callback,) if callback else ()
self.cache.set(key, value, callbacks=callbacks)
self._pending_deferred_cache.pop(key, None)
def invalidate(self, key: KT) -> None:
"""Delete a key, or tree of entries
@@ -311,41 +387,129 @@ class DeferredCache(Generic[KT, VT]):
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
# for future queries and (b) stop it being persisted as a proper entry
# _pending_deferred_cache, which will (a) stop it being returned for
# future queries and (b) stop it being persisted as a proper entry
# in self.cache.
entry = self._pending_deferred_cache.pop(key, None)
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
# _pending_deferred_cache.pop should either return a CacheEntry, or, in the
# case of a TreeCache, a dict of keys to cache entries. Either way calling
# iterate_tree_cache_entry on it will do the right thing.
for entry in iterate_tree_cache_entry(entry):
entry.invalidate()
for cb in entry.get_invalidation_callbacks(key):
cb()
def invalidate_all(self) -> None:
self.check_thread()
self.cache.clear()
for entry in self._pending_deferred_cache.values():
entry.invalidate()
for key, entry in self._pending_deferred_cache.items():
for cb in entry.get_invalidation_callbacks(key):
cb()
self._pending_deferred_cache.clear()
class CacheEntry:
__slots__ = ["deferred", "callbacks", "invalidated"]
class CacheEntry(Generic[KT, VT], metaclass=abc.ABCMeta):
"""Abstract class for entries in `DeferredCache[KT, VT]`"""
def __init__(
self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]]
):
self.deferred = deferred
self.callbacks = set(callbacks)
self.invalidated = False
@abc.abstractmethod
def deferred(self, key: KT) -> "defer.Deferred[VT]":
"""Get a deferred that a caller can wait on to get the value at the
given key"""
...
def invalidate(self) -> None:
if not self.invalidated:
self.invalidated = True
for callback in self.callbacks:
callback()
self.callbacks.clear()
@abc.abstractmethod
def add_invalidation_callback(
self, key: KT, callback: Optional[Callable[[], None]]
) -> None:
"""Add an invalidation callback"""
...
@abc.abstractmethod
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
"""Get all invalidation callbacks"""
...
class CacheEntrySingle(CacheEntry[KT, VT]):
"""An implementation of `CacheEntry` wrapping a deferred that results in a
single cache entry.
"""
__slots__ = ["_deferred", "_callbacks"]
def __init__(self, deferred: "defer.Deferred[VT]") -> None:
self._deferred = ObservableDeferred(deferred, consumeErrors=True)
self._callbacks: Set[Callable[[], None]] = set()
def deferred(self, key: KT) -> "defer.Deferred[VT]":
return self._deferred.observe()
def add_invalidation_callback(
self, key: KT, callback: Optional[Callable[[], None]]
) -> None:
if callback is None:
return
self._callbacks.add(callback)
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
return self._callbacks
class CacheMultipleEntries(CacheEntry[KT, VT]):
"""Cache entry that is used for bulk lookups and insertions."""
__slots__ = ["_deferred", "_callbacks", "_global_callbacks"]
def __init__(self) -> None:
self._deferred: Optional[ObservableDeferred[Dict[KT, VT]]] = None
self._callbacks: Dict[KT, Set[Callable[[], None]]] = {}
self._global_callbacks: Set[Callable[[], None]] = set()
def deferred(self, key: KT) -> "defer.Deferred[VT]":
if not self._deferred:
self._deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
return self._deferred.observe().addCallback(lambda res: res.get(key))
def add_invalidation_callback(
self, key: KT, callback: Optional[Callable[[], None]]
) -> None:
if callback is None:
return
self._callbacks.setdefault(key, set()).add(callback)
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
return self._callbacks.get(key, set()) | self._global_callbacks
def add_global_invalidation_callback(
self, callback: Optional[Callable[[], None]]
) -> None:
"""Add a callback for when any keys get invalidated."""
if callback is None:
return
self._global_callbacks.add(callback)
def complete_bulk(
self,
cache: DeferredCache[KT, VT],
result: Dict[KT, VT],
) -> None:
"""Called when there is a result"""
for key, value in result.items():
cache._completed_callback(value, self, key)
if self._deferred:
self._deferred.callback(result)
def error_bulk(
self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure
) -> None:
"""Called when bulk lookup failed."""
for key in keys:
cache._error_callback(failure, self, key)
if self._deferred:
self._deferred.errback(failure)
+58 -57
View File
@@ -25,6 +25,7 @@ from typing import (
Generic,
Hashable,
Iterable,
List,
Mapping,
Optional,
Sequence,
@@ -73,8 +74,10 @@ class _CacheDescriptorBase:
num_args: Optional[int],
uncached_args: Optional[Collection[str]] = None,
cache_context: bool = False,
name: Optional[str] = None,
):
self.orig = orig
self.name = name or orig.__name__
arg_spec = inspect.getfullargspec(orig)
all_args = arg_spec.args
@@ -211,7 +214,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
cache: LruCache[CacheKey, Any] = LruCache(
cache_name=self.orig.__name__,
cache_name=self.name,
max_size=self.max_entries,
)
@@ -241,7 +244,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
wrapped = cast(_CachedFunction, _wrapped)
wrapped.cache = cache
obj.__dict__[self.orig.__name__] = wrapped
obj.__dict__[self.name] = wrapped
return wrapped
@@ -301,12 +304,14 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
name: Optional[str] = None,
):
super().__init__(
orig,
num_args=num_args,
uncached_args=uncached_args,
cache_context=cache_context,
name=name,
)
if tree and self.num_args < 2:
@@ -321,7 +326,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
cache: DeferredCache[CacheKey, Any] = DeferredCache(
name=self.orig.__name__,
name=self.name,
max_entries=self.max_entries,
tree=self.tree,
iterable=self.iterable,
@@ -372,7 +377,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
wrapped.cache = cache
wrapped.num_args = self.num_args
obj.__dict__[self.orig.__name__] = wrapped
obj.__dict__[self.name] = wrapped
return wrapped
@@ -393,6 +398,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
name: Optional[str] = None,
):
"""
Args:
@@ -403,7 +409,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
but including list_name) to use as cache keys. Defaults to all
named args of the function.
"""
super().__init__(orig, num_args=num_args, uncached_args=None)
super().__init__(orig, num_args=num_args, uncached_args=None, name=name)
self.list_name = list_name
@@ -435,16 +441,6 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
list_args = arg_dict[self.list_name]
results = {}
def update_results_dict(res: Any, arg: Hashable) -> None:
results[arg] = res
# list of deferreds to wait for
cached_defers = []
missing = set()
# If the cache takes a single arg then that is used as the key,
# otherwise a tuple is used.
if num_args == 1:
@@ -452,6 +448,9 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
def arg_to_cache_key(arg: Hashable) -> Hashable:
return arg
def cache_key_to_arg(key: tuple) -> Hashable:
return key
else:
keylist = list(keyargs)
@@ -459,58 +458,53 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
keylist[self.list_pos] = arg
return tuple(keylist)
for arg in list_args:
try:
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
if not res.called:
res.addCallback(update_results_dict, arg)
cached_defers.append(res)
else:
results[arg] = res.result
except KeyError:
missing.add(arg)
def cache_key_to_arg(key: tuple) -> Hashable:
return key[self.list_pos]
cache_keys = [arg_to_cache_key(arg) for arg in list_args]
immediate_results, pending_deferred, missing = cache.get_bulk(
cache_keys, callback=invalidate_callback
)
results = {cache_key_to_arg(key): v for key, v in immediate_results.items()}
cached_defers: List["defer.Deferred[Any]"] = []
if pending_deferred:
def update_results(r: Dict) -> None:
for k, v in r.items():
results[cache_key_to_arg(k)] = v
pending_deferred.addCallback(update_results)
cached_defers.append(pending_deferred)
if missing:
# we need a deferred for each entry in the list,
# which we put in the cache. Each deferred resolves with the
# relevant result for that key.
deferreds_map = {}
for arg in missing:
deferred: "defer.Deferred[Any]" = defer.Deferred()
deferreds_map[arg] = deferred
key = arg_to_cache_key(arg)
cached_defers.append(
cache.set(key, deferred, callback=invalidate_callback)
)
cache_entry = cache.start_bulk_input(missing, invalidate_callback)
def complete_all(res: Dict[Hashable, Any]) -> None:
# the wrapped function has completed. It returns a dict.
# We can now update our own result map, and then resolve the
# observable deferreds in the cache.
for e, d1 in deferreds_map.items():
val = res.get(e, None)
# make sure we update the results map before running the
# deferreds, because as soon as we run the last deferred, the
# gatherResults() below will complete and return the result
# dict to our caller.
results[e] = val
d1.callback(val)
missing_results = {}
for key in missing:
arg = cache_key_to_arg(key)
val = res.get(arg, None)
results[arg] = val
missing_results[key] = val
cache_entry.complete_bulk(cache, missing_results)
def errback_all(f: Failure) -> None:
# the wrapped function has failed. Propagate the failure into
# the cache, which will invalidate the entry, and cause the
# relevant cached_deferreds to fail, which will propagate the
# failure to our caller.
for d1 in deferreds_map.values():
d1.errback(f)
cache_entry.error_bulk(cache, missing, f)
args_to_call = dict(arg_dict)
args_to_call[self.list_name] = missing
args_to_call[self.list_name] = {
cache_key_to_arg(key) for key in missing
}
# dispatch the call, and attach the two handlers
defer.maybeDeferred(
missing_d = defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback_all)
cached_defers.append(missing_d)
if cached_defers:
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
@@ -525,7 +519,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
else:
return defer.succeed(results)
obj.__dict__[self.orig.__name__] = wrapped
obj.__dict__[self.name] = wrapped
return wrapped
@@ -577,6 +571,7 @@ def cached(
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
name: Optional[str] = None,
) -> Callable[[F], _CachedFunction[F]]:
func = lambda orig: DeferredCacheDescriptor(
orig,
@@ -587,13 +582,18 @@ def cached(
cache_context=cache_context,
iterable=iterable,
prune_unread_entries=prune_unread_entries,
name=name,
)
return cast(Callable[[F], _CachedFunction[F]], func)
def cachedList(
*, cached_method_name: str, list_name: str, num_args: Optional[int] = None
*,
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
name: Optional[str] = None,
) -> Callable[[F], _CachedFunction[F]]:
"""Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`.
@@ -628,6 +628,7 @@ def cachedList(
cached_method_name=cached_method_name,
list_name=list_name,
num_args=num_args,
name=name,
)
return cast(Callable[[F], _CachedFunction[F]], func)
+3
View File
@@ -135,6 +135,9 @@ class TreeCache:
def values(self):
return iterate_tree_cache_entry(self.root)
def items(self):
return iterate_tree_cache_items((), self.root)
def __len__(self) -> int:
return self.size
+46 -14
View File
@@ -30,7 +30,7 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.opentracing import start_active_span
from synapse.metrics import Histogram
from synapse.metrics import Histogram, LaterGauge
from synapse.util import Clock
if typing.TYPE_CHECKING:
@@ -74,6 +74,27 @@ class FederationRateLimiter:
str, "_PerHostRatelimiter"
] = collections.defaultdict(new_limiter)
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
[],
lambda: sum(
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
[],
lambda: sum(
ratelimiter.should_reject()
for ratelimiter in self.ratelimiters.values()
),
)
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
"""Used to ratelimit an incoming request from a given host
@@ -133,12 +154,33 @@ class _PerHostRatelimiter:
self.host = host
request_id = object()
ret = self._on_enter(request_id)
# Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant
# type-checking, but we'd need Twisted >= 21.2.
ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id))
try:
yield ret
finally:
self._on_exit(request_id)
def should_reject(self) -> bool:
"""
Whether to reject the request if we already have too many queued up
(either sleeping or in the ready queue).
"""
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
return queue_size > self.reject_limit
def should_sleep(self) -> bool:
"""
Whether to sleep the request if we already have too many requests coming
through within the window.
"""
return len(self.request_times) > self.sleep_limit
async def _on_enter_with_tracing(self, request_id: object) -> None:
with start_active_span("ratelimit wait"), queue_wait_timer.time():
await self._on_enter(request_id)
def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
time_now = self.clock.time_msec()
@@ -149,8 +191,7 @@ class _PerHostRatelimiter:
# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
raise LimitExceededError(
@@ -180,7 +221,7 @@ class _PerHostRatelimiter:
len(self.request_times),
)
if len(self.request_times) > self.sleep_limit:
if self.should_sleep():
logger.debug(
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
self.host,
@@ -222,17 +263,8 @@ class _PerHostRatelimiter:
# Ensure that we've properly cleaned up.
self.sleeping_requests.discard(request_id)
self.ready_request_queue.pop(request_id, None)
wait_span_scope.__exit__(None, None, None)
wait_timer_cm.__exit__(None, None, None)
return r
# Tracing
wait_span_scope = start_active_span("ratelimit wait")
wait_span_scope.__enter__()
# Metrics
wait_timer_cm = queue_wait_timer.time()
wait_timer_cm.__enter__()
ret_defer.addCallbacks(on_start, on_err)
ret_defer.addBoth(on_both)
return make_deferred_yieldable(ret_defer)
+7 -1
View File
@@ -284,10 +284,13 @@ class AuthTestCase(unittest.HomeserverTestCase):
TokenLookupResult(
user_id="@baldrick:matrix.org",
device_id="device",
token_id=5,
token_owner="@admin:matrix.org",
token_used=True,
)
)
self.store.insert_client_ip = simple_async_mock(None)
self.store.mark_access_token_as_used = simple_async_mock(None)
request = Mock(args={})
request.getClientAddress.return_value.host = "127.0.0.1"
request.args[b"access_token"] = [self.test_token]
@@ -301,10 +304,13 @@ class AuthTestCase(unittest.HomeserverTestCase):
TokenLookupResult(
user_id="@baldrick:matrix.org",
device_id="device",
token_id=5,
token_owner="@admin:matrix.org",
token_used=True,
)
)
self.store.insert_client_ip = simple_async_mock(None)
self.store.mark_access_token_as_used = simple_async_mock(None)
request = Mock(args={})
request.getClientAddress.return_value.host = "127.0.0.1"
request.args[b"access_token"] = [self.test_token]
@@ -347,7 +353,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
serialized = macaroon.serialize()
user_info = self.get_success(self.auth.get_user_by_access_token(serialized))
self.assertEqual(user_id, user_info.user_id)
self.assertEqual(user_id, user_info.user.to_string())
self.assertTrue(user_info.is_guest)
self.store.get_user_by_id.assert_called_with(user_id)
-4
View File
@@ -141,10 +141,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
hs = self.setup_test_homeserver(
federation_transport_client=fed_transport_client,
)
# Load the modules into the homeserver
module_api = hs.get_module_api()
for module, config in hs.config.modules.loaded_modules:
module(config=config, api=module_api)
load_legacy_presence_router(hs)

Some files were not shown because too many files have changed in this diff Show More