Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cbb1575486 | |||
| 7af07f9716 | |||
| a25a37002c | |||
| f7ddfe17a3 | |||
| 05c9c7363b | |||
| bdfff9c36e | |||
| ca3d19b05f | |||
| aec87a0f93 | |||
| ea85a2bf6c | |||
| 956e015413 | |||
| 5e7847dc92 | |||
| 79281f517d | |||
| f8b9abdcdb | |||
| d6f5699737 | |||
| f0b23927fc | |||
| 37f329c9ad | |||
| 9385c41ba4 | |||
| 3dd175b628 | |||
| 94375f7a91 | |||
| 237f03d85b | |||
| d3f36d0f64 |
+81
@@ -1,3 +1,84 @@
|
||||
Synapse 1.66.0rc1 (2022-08-23)
|
||||
==============================
|
||||
|
||||
This release removes the ability for homeservers to delegate email ownership
|
||||
verification and password reset confirmation to identity servers. This removal
|
||||
was originally planned for Synapse 1.64, but was later deferred until now.
|
||||
|
||||
See the [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken). ([\#13188](https://github.com/matrix-org/synapse/issues/13188), [\#13563](https://github.com/matrix-org/synapse/issues/13563))
|
||||
- Add forgotten status to [Room Details Admin API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#room-details-api). ([\#13503](https://github.com/matrix-org/synapse/issues/13503))
|
||||
- Add an experimental implementation for [MSC3852 (Expose user agents on `Device`)](https://github.com/matrix-org/matrix-spec-proposals/pull/3852). ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
|
||||
- Add `org.matrix.msc2716v4` experimental room version with updated content fields. Part of [MSC2716 (Importing history)](https://github.com/matrix-org/matrix-spec-proposals/pull/2716). ([\#13551](https://github.com/matrix-org/synapse/issues/13551))
|
||||
- Add support for compression to federation responses. ([\#13537](https://github.com/matrix-org/synapse/issues/13537))
|
||||
- Improve performance of sending messages in rooms with thousands of local users. ([\#13522](https://github.com/matrix-org/synapse/issues/13522), [\#13547](https://github.com/matrix-org/synapse/issues/13547))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Faster room joins: make `/joined_members` block whilst the room is partial stated. ([\#13514](https://github.com/matrix-org/synapse/issues/13514))
|
||||
- Fix a bug introduced in Synapse 1.21.0 where the [`/event_reports` Admin API](https://matrix-org.github.io/synapse/develop/admin_api/event_reports.html) could return a total count which was larger than the number of results you can actually query for. ([\#13525](https://github.com/matrix-org/synapse/issues/13525))
|
||||
- Fix a bug introduced in Synapse 1.52.0 where sending server notices fails if `max_avatar_size` or `allowed_avatar_mimetypes` is set and not `system_mxid_avatar_url`. ([\#13566](https://github.com/matrix-org/synapse/issues/13566))
|
||||
- Fix a bug where the `opentracing.force_tracing_for_users` config option would not apply to [`/sendToDevice`](https://spec.matrix.org/v1.3/client-server-api/#put_matrixclientv3sendtodeviceeventtypetxnid) and [`/keys/upload`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3keysupload) requests. ([\#13574](https://github.com/matrix-org/synapse/issues/13574))
|
||||
|
||||
|
||||
Improved Documentation
|
||||
----------------------
|
||||
|
||||
- Add `openssl` example for generating registration HMAC digest. ([\#13472](https://github.com/matrix-org/synapse/issues/13472))
|
||||
- Tidy up Synapse's README. ([\#13491](https://github.com/matrix-org/synapse/issues/13491))
|
||||
- Document that event purging related to the `redaction_retention_period` config option is executed only every 5 minutes. ([\#13492](https://github.com/matrix-org/synapse/issues/13492))
|
||||
- Add a warning to retention documentation regarding the possibility of database corruption. ([\#13497](https://github.com/matrix-org/synapse/issues/13497))
|
||||
- Document that the `DOCKER_BUILDKIT=1` flag is needed to build the docker image. ([\#13515](https://github.com/matrix-org/synapse/issues/13515))
|
||||
- Add missing links in `user_consent` section of configuration manual. ([\#13536](https://github.com/matrix-org/synapse/issues/13536))
|
||||
- Fix the doc and some warnings that were referring to the nonexistent `custom_templates_directory` setting (instead of `custom_template_directory`). ([\#13538](https://github.com/matrix-org/synapse/issues/13538))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- Remove the ability for homeservers to delegate email ownership verification
|
||||
and password reset confirmation to identity servers. See [upgrade notes](https://matrix-org.github.io/synapse/v1.66/upgrade.html#upgrading-to-v1660) for more details.
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
### Faster room joins
|
||||
|
||||
- Update the rejected state of events during de-partial-stating. ([\#13459](https://github.com/matrix-org/synapse/issues/13459))
|
||||
- Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state. ([\#13477](https://github.com/matrix-org/synapse/issues/13477))
|
||||
- Refuse to start when faster joins is enabled on a deployment with workers, since worker configurations are not currently supported. ([\#13531](https://github.com/matrix-org/synapse/issues/13531))
|
||||
|
||||
### Metrics and tracing
|
||||
|
||||
- Allow use of both `@trace` and `@tag_args` stacked on the same function. ([\#13453](https://github.com/matrix-org/synapse/issues/13453))
|
||||
- Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger. ([\#13489](https://github.com/matrix-org/synapse/issues/13489))
|
||||
- Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger. ([\#13499](https://github.com/matrix-org/synapse/issues/13499), [\#13554](https://github.com/matrix-org/synapse/issues/13554))
|
||||
- Track HTTP response times over 10 seconds from `/messages` (`synapse_room_message_list_rest_servlet_response_time_seconds`). ([\#13533](https://github.com/matrix-org/synapse/issues/13533))
|
||||
- Add metrics to track how the rate limiter is affecting requests (sleep/reject). ([\#13534](https://github.com/matrix-org/synapse/issues/13534), [\#13541](https://github.com/matrix-org/synapse/issues/13541))
|
||||
- Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`). ([\#13535](https://github.com/matrix-org/synapse/issues/13535), [\#13584](https://github.com/matrix-org/synapse/issues/13584))
|
||||
- Add metrics to track rate limiter queue timing (`synapse_rate_limit_queue_wait_time_seconds`). ([\#13544](https://github.com/matrix-org/synapse/issues/13544))
|
||||
- Update metrics to track `/messages` response time by room size. ([\#13545](https://github.com/matrix-org/synapse/issues/13545))
|
||||
|
||||
### Everything else
|
||||
|
||||
- Refactor methods in `synapse.api.auth.Auth` to use `Requester` objects everywhere instead of user IDs. ([\#13024](https://github.com/matrix-org/synapse/issues/13024))
|
||||
- Clean-up tests for notifications. ([\#13471](https://github.com/matrix-org/synapse/issues/13471))
|
||||
- Add some miscellaneous comments to document sync, especially around `compute_state_delta`. ([\#13474](https://github.com/matrix-org/synapse/issues/13474))
|
||||
- Use literals in place of `HTTPStatus` constants in tests. ([\#13479](https://github.com/matrix-org/synapse/issues/13479), [\#13488](https://github.com/matrix-org/synapse/issues/13488))
|
||||
- Add comments about how event push actions are rotated. ([\#13485](https://github.com/matrix-org/synapse/issues/13485))
|
||||
- Modify HTML template content to better support mobile devices' screen sizes. ([\#13493](https://github.com/matrix-org/synapse/issues/13493))
|
||||
- Add a linter script which will reject non-strict types in Pydantic models. ([\#13502](https://github.com/matrix-org/synapse/issues/13502))
|
||||
- Reduce the number of tests using legacy TCP replication. ([\#13543](https://github.com/matrix-org/synapse/issues/13543))
|
||||
- Allow specifying additional request fields when using the `HomeServerTestCase.login` helper method. ([\#13549](https://github.com/matrix-org/synapse/issues/13549))
|
||||
- Make `HomeServerTestCase` load any configured homeserver modules automatically. ([\#13558](https://github.com/matrix-org/synapse/issues/13558))
|
||||
|
||||
|
||||
Synapse 1.65.0 (2022-08-16)
|
||||
===========================
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken).
|
||||
@@ -0,0 +1 @@
|
||||
Don't fetch the full state during membership changes.
|
||||
@@ -1 +0,0 @@
|
||||
Allow use of both `@trace` and `@tag_args` stacked on the same function (tracing).
|
||||
@@ -1 +0,0 @@
|
||||
Faster joins: update the rejected state of events during de-partial-stating.
|
||||
@@ -1 +0,0 @@
|
||||
Clean-up tests for notifications.
|
||||
@@ -1 +0,0 @@
|
||||
Add `openssl` example for generating registration HMAC digest.
|
||||
@@ -1 +0,0 @@
|
||||
Add some miscellaneous comments to document sync, especially around `compute_state_delta`.
|
||||
@@ -1 +0,0 @@
|
||||
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.
|
||||
@@ -1 +0,0 @@
|
||||
Use literals in place of `HTTPStatus` constants in tests.
|
||||
@@ -1 +0,0 @@
|
||||
Add comments about how event push actions are rotated.
|
||||
@@ -1 +0,0 @@
|
||||
Use literals in place of `HTTPStatus` constants in tests.
|
||||
@@ -1 +0,0 @@
|
||||
Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger.
|
||||
@@ -1 +0,0 @@
|
||||
Tidy up Synapse's README.
|
||||
@@ -1 +0,0 @@
|
||||
Document that event purging related to the `redaction_retention_period` config option is executed only every 5 minutes.
|
||||
@@ -1 +0,0 @@
|
||||
Modify HTML template content to better support mobile devices' screen sizes.
|
||||
@@ -1,2 +0,0 @@
|
||||
Add a warning to retention documentation regarding the possibility of database corruption.
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger.
|
||||
@@ -1 +0,0 @@
|
||||
Add a linter script which will reject non-strict types in Pydantic models.
|
||||
@@ -1 +0,0 @@
|
||||
Add forgotten status to Room Details API.
|
||||
@@ -1 +0,0 @@
|
||||
Faster room joins: make `/joined_members` block whilst the room is partial stated.
|
||||
@@ -1 +0,0 @@
|
||||
Document that the `DOCKER_BUILDKIT=1` flag is needed to build the docker image.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of sending messages in rooms with thousands of local users.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a bug in the `/event_reports` Admin API which meant that the total count could be larger than the number of results you can actually query for.
|
||||
@@ -1 +0,0 @@
|
||||
Faster room joins: Refuse to start when faster joins is enabled on a deployment with workers, since worker configurations are not currently supported.
|
||||
@@ -1 +0,0 @@
|
||||
Track HTTP response times over 10 seconds from `/messages` (`synapse_room_message_list_rest_servlet_response_time_seconds`).
|
||||
@@ -1 +0,0 @@
|
||||
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
|
||||
@@ -1 +0,0 @@
|
||||
Add metrics to time how long it takes us to do backfill processing (`synapse_federation_backfill_processing_before_time_seconds`, `synapse_federation_backfill_processing_after_time_seconds`).
|
||||
@@ -1 +0,0 @@
|
||||
Add missing links in `user_consent` section of configuration manual.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for compression to federation responses.
|
||||
@@ -1 +0,0 @@
|
||||
Fix the doc and some warnings that were referring to the nonexistent `custom_templates_directory` setting (instead of `custom_template_directory`).
|
||||
@@ -1 +0,0 @@
|
||||
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
|
||||
@@ -1 +0,0 @@
|
||||
Reduce the number of tests using legacy TCP replication.
|
||||
@@ -1 +0,0 @@
|
||||
Add metrics to track rate limiter queue timing (`synapse_rate_limit_queue_wait_time_seconds`).
|
||||
@@ -1 +0,0 @@
|
||||
Update metrics to track `/messages` response time by room size.
|
||||
@@ -1 +0,0 @@
|
||||
Improve performance of sending messages in rooms with thousands of local users.
|
||||
@@ -1 +0,0 @@
|
||||
Add an experimental implementation for [MSC3852](https://github.com/matrix-org/matrix-spec-proposals/pull/3852).
|
||||
@@ -1 +0,0 @@
|
||||
Allow specifying additional request fields when using the `HomeServerTestCase.login` helper method.
|
||||
@@ -1 +0,0 @@
|
||||
Add `org.matrix.msc2716v4` experimental room version with updated content fields.
|
||||
@@ -1 +0,0 @@
|
||||
Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger.
|
||||
@@ -1 +0,0 @@
|
||||
Make `HomeServerTestCase` load any configured homeserver modules automatically.
|
||||
@@ -1 +0,0 @@
|
||||
Improve validation of request bodies for the following client-server API endpoints: [`/account/password`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpassword), [`/account/password/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountpasswordemailrequesttoken), [`/account/deactivate`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3accountdeactivate) and [`/account/3pid/email/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidemailrequesttoken).
|
||||
@@ -0,0 +1 @@
|
||||
Cache user IDs instead of profiles to reduce cache memory usage. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -0,0 +1 @@
|
||||
Instrument `_check_sigs_and_hash_and_fetch` to trace time spent in child concurrent calls for understandable traces in Jaeger.
|
||||
@@ -0,0 +1 @@
|
||||
Improve performance of `@cachedList`.
|
||||
@@ -0,0 +1 @@
|
||||
Minor speed up of fetching large numbers of push rules.
|
||||
@@ -0,0 +1 @@
|
||||
Cache user IDs instead of profiles to reduce cache memory usage. Contributed by Nick @ Beeper (@fizzadar).
|
||||
@@ -0,0 +1 @@
|
||||
Improve the description of the ["chain cover index"](https://matrix-org.github.io/synapse/latest/auth_chain_difference_algorithm.html) used internally by Synapse.
|
||||
Vendored
+6
@@ -1,3 +1,9 @@
|
||||
matrix-synapse-py3 (1.66.0~rc1) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.66.0rc1.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 23 Aug 2022 09:48:55 +0100
|
||||
|
||||
matrix-synapse-py3 (1.65.0) stable; urgency=medium
|
||||
|
||||
* New Synapse release 1.65.0.
|
||||
|
||||
@@ -337,6 +337,8 @@ A response body like the following is returned:
|
||||
}
|
||||
```
|
||||
|
||||
_Changed in Synapse 1.66:_ Added the `forgotten` key to the response body.
|
||||
|
||||
# Room Members API
|
||||
|
||||
The Room Members admin API allows server admins to get a list of all members of a room.
|
||||
|
||||
@@ -34,13 +34,45 @@ the process of indexing it).
|
||||
## Chain Cover Index
|
||||
|
||||
Synapse computes auth chain differences by pre-computing a "chain cover" index
|
||||
for the auth chain in a room, allowing efficient reachability queries like "is
|
||||
event A in the auth chain of event B". This is done by assigning every event a
|
||||
*chain ID* and *sequence number* (e.g. `(5,3)`), and having a map of *links*
|
||||
between chains (e.g. `(5,3) -> (2,4)`) such that A is reachable by B (i.e. `A`
|
||||
is in the auth chain of `B`) if and only if either:
|
||||
for the auth chain in a room, allowing us to efficiently make reachability queries
|
||||
like "is event `A` in the auth chain of event `B`?". We could do this with an index
|
||||
that tracks all pairs `(A, B)` such that `A` is in the auth chain of `B`. However, this
|
||||
would be prohibitively large, scaling poorly as the room accumulates more state
|
||||
events.
|
||||
|
||||
1. A and B have the same chain ID and `A`'s sequence number is less than `B`'s
|
||||
Instead, we break down the graph into *chains*. A chain is a subset of a DAG
|
||||
with the following property: for any pair of events `E` and `F` in the chain,
|
||||
the chain contains a path `E -> F` or a path `F -> E`. This forces a chain to be
|
||||
linear (without forks), e.g. `E -> F -> G -> ... -> H`. Each event in the chain
|
||||
is given a *sequence number* local to that chain. The oldest event `E` in the
|
||||
chain has sequence number 1. If `E` has a child `F` in the chain, then `F` has
|
||||
sequence number 2. If `E` has a grandchild `G` in the chain, then `G` has
|
||||
sequence number 3; and so on.
|
||||
|
||||
Synapse ensures that each persisted event belongs to exactly one chain, and
|
||||
tracks how the chains are connected to one another. This allows us to
|
||||
efficiently answer reachability queries. Doing so uses less storage than
|
||||
tracking reachability on an event-by-event basis, particularly when we have
|
||||
fewer and longer chains. See
|
||||
|
||||
> Jagadish, H. (1990). [A compression technique to materialize transitive closure](https://doi.org/10.1145/99935.99944).
|
||||
> *ACM Transactions on Database Systems (TODS)*, 15*(4)*, 558-598.
|
||||
|
||||
for the original idea or
|
||||
|
||||
> Y. Chen, Y. Chen, [An efficient algorithm for answering graph
|
||||
> reachability queries](https://doi.org/10.1109/ICDE.2008.4497498),
|
||||
> in: 2008 IEEE 24th International Conference on Data Engineering, April 2008,
|
||||
> pp. 893–902. (PDF available via [Google Scholar](https://scholar.google.com/scholar?q=Y.%20Chen,%20Y.%20Chen,%20An%20efficient%20algorithm%20for%20answering%20graph%20reachability%20queries,%20in:%202008%20IEEE%2024th%20International%20Conference%20on%20Data%20Engineering,%20April%202008,%20pp.%20893902.).)
|
||||
|
||||
for a more modern take.
|
||||
|
||||
In practical terms, the chain cover assigns every event a
|
||||
*chain ID* and *sequence number* (e.g. `(5,3)`), and maintains a map of *links*
|
||||
between events in chains (e.g. `(5,3) -> (2,4)`) such that `A` is reachable by `B`
|
||||
(i.e. `A` is in the auth chain of `B`) if and only if either:
|
||||
|
||||
1. `A` and `B` have the same chain ID and `A`'s sequence number is less than `B`'s
|
||||
sequence number; or
|
||||
2. there is a link `L` between `B`'s chain ID and `A`'s chain ID such that
|
||||
`L.start_seq_no` <= `B.seq_no` and `A.seq_no` <= `L.end_seq_no`.
|
||||
@@ -49,8 +81,9 @@ There are actually two potential implementations, one where we store links from
|
||||
each chain to every other reachable chain (the transitive closure of the links
|
||||
graph), and one where we remove redundant links (the transitive reduction of the
|
||||
links graph) e.g. if we have chains `C3 -> C2 -> C1` then the link `C3 -> C1`
|
||||
would not be stored. Synapse uses the former implementations so that it doesn't
|
||||
need to recurse to test reachability between chains.
|
||||
would not be stored. Synapse uses the former implementation so that it doesn't
|
||||
need to recurse to test reachability between chains. This trades-off extra storage
|
||||
in order to save CPU cycles and DB queries.
|
||||
|
||||
### Example
|
||||
|
||||
|
||||
+3
-1
@@ -174,7 +174,9 @@ oidc_providers:
|
||||
|
||||
1. Create a regular web application for Synapse
|
||||
2. Set the Allowed Callback URLs to `[synapse public baseurl]/_synapse/client/oidc/callback`
|
||||
3. Add a rule to add the `preferred_username` claim.
|
||||
3. Add a rule with any name to add the `preferred_username` claim.
|
||||
(See https://auth0.com/docs/customize/rules/create-rules for more information on how to create rules.)
|
||||
|
||||
<details>
|
||||
<summary>Code sample</summary>
|
||||
|
||||
|
||||
@@ -89,6 +89,25 @@ process, for example:
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
```
|
||||
|
||||
# Upgrading to v1.66.0
|
||||
|
||||
## Delegation of email validation no longer supported
|
||||
|
||||
As of this version, Synapse no longer allows the tasks of verifying email address
|
||||
ownership, and password reset confirmation, to be delegated to an identity server.
|
||||
This removal was previously planned for Synapse 1.64.0, but was
|
||||
[delayed](https://github.com/matrix-org/synapse/issues/13421) until now to give
|
||||
homeserver administrators more notice of the change.
|
||||
|
||||
To continue to allow users to add email addresses to their homeserver accounts,
|
||||
and perform password resets, make sure that Synapse is configured with a working
|
||||
email server in the [`email` configuration
|
||||
section](https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#email)
|
||||
(including, at a minimum, a `notif_from` setting.)
|
||||
|
||||
Specifying an `email` setting under `account_threepid_delegates` will now cause
|
||||
an error at startup.
|
||||
|
||||
# Upgrading to v1.64.0
|
||||
|
||||
## Deprecation of the ability to delegate e-mail verification to identity servers
|
||||
|
||||
@@ -2182,7 +2182,10 @@ their account.
|
||||
by the Matrix Identity Service API
|
||||
[specification](https://matrix.org/docs/spec/identity_service/latest).)
|
||||
|
||||
*Updated in Synapse 1.64.0*: The `email` option is deprecated.
|
||||
*Deprecated in Synapse 1.64.0*: The `email` option is deprecated.
|
||||
|
||||
*Removed in Synapse 1.66.0*: The `email` option has been removed.
|
||||
If present, Synapse will report a configuration error on startup.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
|
||||
+1
-1
@@ -54,7 +54,7 @@ skip_gitignore = true
|
||||
|
||||
[tool.poetry]
|
||||
name = "matrix-synapse"
|
||||
version = "1.65.0"
|
||||
version = "1.66.0rc1"
|
||||
description = "Homeserver for the Matrix decentralised comms protocol"
|
||||
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
|
||||
license = "Apache-2.0"
|
||||
|
||||
+94
-106
@@ -37,8 +37,7 @@ from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
trace,
|
||||
)
|
||||
from synapse.storage.databases.main.registration import TokenLookupResult
|
||||
from synapse.types import Requester, UserID, create_requester
|
||||
from synapse.types import Requester, create_requester
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -70,14 +69,14 @@ class Auth:
|
||||
async def check_user_in_room(
|
||||
self,
|
||||
room_id: str,
|
||||
user_id: str,
|
||||
requester: Requester,
|
||||
allow_departed_users: bool = False,
|
||||
) -> Tuple[str, Optional[str]]:
|
||||
"""Check if the user is in the room, or was at some point.
|
||||
Args:
|
||||
room_id: The room to check.
|
||||
|
||||
user_id: The user to check.
|
||||
requester: The user making the request, according to the access token.
|
||||
|
||||
current_state: Optional map of the current state of the room.
|
||||
If provided then that map is used to check whether they are a
|
||||
@@ -94,6 +93,7 @@ class Auth:
|
||||
membership event ID of the user.
|
||||
"""
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
(
|
||||
membership,
|
||||
member_event_id,
|
||||
@@ -182,96 +182,69 @@ class Auth:
|
||||
|
||||
access_token = self.get_access_token_from_request(request)
|
||||
|
||||
(
|
||||
user_id,
|
||||
device_id,
|
||||
app_service,
|
||||
) = await self._get_appservice_user_id_and_device_id(request)
|
||||
if user_id and app_service:
|
||||
if ip_addr and self._track_appservice_user_ips:
|
||||
await self.store.insert_client_ip(
|
||||
user_id=user_id,
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
user_agent=user_agent,
|
||||
device_id="dummy-device"
|
||||
if device_id is None
|
||||
else device_id, # stubbed
|
||||
)
|
||||
|
||||
requester = create_requester(
|
||||
user_id, app_service=app_service, device_id=device_id
|
||||
# First check if it could be a request from an appservice
|
||||
requester = await self._get_appservice_user(request)
|
||||
if not requester:
|
||||
# If not, it should be from a regular user
|
||||
requester = await self.get_user_by_access_token(
|
||||
access_token, allow_expired=allow_expired
|
||||
)
|
||||
|
||||
request.requester = user_id
|
||||
return requester
|
||||
# Deny the request if the user account has expired.
|
||||
# This check is only done for regular users, not appservice ones.
|
||||
if not allow_expired:
|
||||
if await self._account_validity_handler.is_user_expired(
|
||||
requester.user.to_string()
|
||||
):
|
||||
# Raise the error if either an account validity module has determined
|
||||
# the account has expired, or the legacy account validity
|
||||
# implementation is enabled and determined the account has expired
|
||||
raise AuthError(
|
||||
403,
|
||||
"User account has expired",
|
||||
errcode=Codes.EXPIRED_ACCOUNT,
|
||||
)
|
||||
|
||||
user_info = await self.get_user_by_access_token(
|
||||
access_token, allow_expired=allow_expired
|
||||
)
|
||||
token_id = user_info.token_id
|
||||
is_guest = user_info.is_guest
|
||||
shadow_banned = user_info.shadow_banned
|
||||
|
||||
# Deny the request if the user account has expired.
|
||||
if not allow_expired:
|
||||
if await self._account_validity_handler.is_user_expired(
|
||||
user_info.user_id
|
||||
):
|
||||
# Raise the error if either an account validity module has determined
|
||||
# the account has expired, or the legacy account validity
|
||||
# implementation is enabled and determined the account has expired
|
||||
raise AuthError(
|
||||
403,
|
||||
"User account has expired",
|
||||
errcode=Codes.EXPIRED_ACCOUNT,
|
||||
)
|
||||
|
||||
device_id = user_info.device_id
|
||||
|
||||
if access_token and ip_addr:
|
||||
if ip_addr and (
|
||||
not requester.app_service or self._track_appservice_user_ips
|
||||
):
|
||||
# XXX(quenting): I'm 95% confident that we could skip setting the
|
||||
# device_id to "dummy-device" for appservices, and that the only impact
|
||||
# would be some rows which whould not deduplicate in the 'user_ips'
|
||||
# table during the transition
|
||||
recorded_device_id = (
|
||||
"dummy-device"
|
||||
if requester.device_id is None and requester.app_service is not None
|
||||
else requester.device_id
|
||||
)
|
||||
await self.store.insert_client_ip(
|
||||
user_id=user_info.token_owner,
|
||||
user_id=requester.authenticated_entity,
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
user_agent=user_agent,
|
||||
device_id=device_id,
|
||||
device_id=recorded_device_id,
|
||||
)
|
||||
|
||||
# Track also the puppeted user client IP if enabled and the user is puppeting
|
||||
if (
|
||||
user_info.user_id != user_info.token_owner
|
||||
requester.user.to_string() != requester.authenticated_entity
|
||||
and self._track_puppeted_user_ips
|
||||
):
|
||||
await self.store.insert_client_ip(
|
||||
user_id=user_info.user_id,
|
||||
user_id=requester.user.to_string(),
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
user_agent=user_agent,
|
||||
device_id=device_id,
|
||||
device_id=requester.device_id,
|
||||
)
|
||||
|
||||
if is_guest and not allow_guest:
|
||||
if requester.is_guest and not allow_guest:
|
||||
raise AuthError(
|
||||
403,
|
||||
"Guest access not allowed",
|
||||
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
|
||||
)
|
||||
|
||||
# Mark the token as used. This is used to invalidate old refresh
|
||||
# tokens after some time.
|
||||
if not user_info.token_used and token_id is not None:
|
||||
await self.store.mark_access_token_as_used(token_id)
|
||||
|
||||
requester = create_requester(
|
||||
user_info.user_id,
|
||||
token_id,
|
||||
is_guest,
|
||||
shadow_banned,
|
||||
device_id,
|
||||
app_service=app_service,
|
||||
authenticated_entity=user_info.token_owner,
|
||||
)
|
||||
|
||||
request.requester = requester
|
||||
return requester
|
||||
except KeyError:
|
||||
@@ -308,9 +281,7 @@ class Auth:
|
||||
403, "Application service has not registered this user (%s)" % user_id
|
||||
)
|
||||
|
||||
async def _get_appservice_user_id_and_device_id(
|
||||
self, request: Request
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[ApplicationService]]:
|
||||
async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
|
||||
"""
|
||||
Given a request, reads the request parameters to determine:
|
||||
- whether it's an application service that's making this request
|
||||
@@ -325,15 +296,13 @@ class Auth:
|
||||
Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
|
||||
|
||||
Returns:
|
||||
3-tuple of
|
||||
(user ID?, device ID?, application service?)
|
||||
the application service `Requester` of that request
|
||||
|
||||
Postconditions:
|
||||
- If an application service is returned, so is a user ID
|
||||
- A user ID is never returned without an application service
|
||||
- A device ID is never returned without a user ID or an application service
|
||||
- The returned application service, if present, is permitted to control the
|
||||
returned user ID.
|
||||
- The `app_service` field in the returned `Requester` is set
|
||||
- The `user_id` field in the returned `Requester` is either the application
|
||||
service sender or the controlled user set by the `user_id` URI parameter
|
||||
- The returned application service is permitted to control the returned user ID.
|
||||
- The returned device ID, if present, has been checked to be a valid device ID
|
||||
for the returned user ID.
|
||||
"""
|
||||
@@ -343,12 +312,12 @@ class Auth:
|
||||
self.get_access_token_from_request(request)
|
||||
)
|
||||
if app_service is None:
|
||||
return None, None, None
|
||||
return None
|
||||
|
||||
if app_service.ip_range_whitelist:
|
||||
ip_address = IPAddress(request.getClientAddress().host)
|
||||
if ip_address not in app_service.ip_range_whitelist:
|
||||
return None, None, None
|
||||
return None
|
||||
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
@@ -382,13 +351,15 @@ class Auth:
|
||||
Codes.EXCLUSIVE,
|
||||
)
|
||||
|
||||
return effective_user_id, effective_device_id, app_service
|
||||
return create_requester(
|
||||
effective_user_id, app_service=app_service, device_id=effective_device_id
|
||||
)
|
||||
|
||||
async def get_user_by_access_token(
|
||||
self,
|
||||
token: str,
|
||||
allow_expired: bool = False,
|
||||
) -> TokenLookupResult:
|
||||
) -> Requester:
|
||||
"""Validate access token and get user_id from it
|
||||
|
||||
Args:
|
||||
@@ -405,9 +376,9 @@ class Auth:
|
||||
|
||||
# First look in the database to see if the access token is present
|
||||
# as an opaque token.
|
||||
r = await self.store.get_user_by_access_token(token)
|
||||
if r:
|
||||
valid_until_ms = r.valid_until_ms
|
||||
user_info = await self.store.get_user_by_access_token(token)
|
||||
if user_info:
|
||||
valid_until_ms = user_info.valid_until_ms
|
||||
if (
|
||||
not allow_expired
|
||||
and valid_until_ms is not None
|
||||
@@ -419,7 +390,20 @@ class Auth:
|
||||
msg="Access token has expired", soft_logout=True
|
||||
)
|
||||
|
||||
return r
|
||||
# Mark the token as used. This is used to invalidate old refresh
|
||||
# tokens after some time.
|
||||
await self.store.mark_access_token_as_used(user_info.token_id)
|
||||
|
||||
requester = create_requester(
|
||||
user_id=user_info.user_id,
|
||||
access_token_id=user_info.token_id,
|
||||
is_guest=user_info.is_guest,
|
||||
shadow_banned=user_info.shadow_banned,
|
||||
device_id=user_info.device_id,
|
||||
authenticated_entity=user_info.token_owner,
|
||||
)
|
||||
|
||||
return requester
|
||||
|
||||
# If the token isn't found in the database, then it could still be a
|
||||
# macaroon for a guest, so we check that here.
|
||||
@@ -445,11 +429,12 @@ class Auth:
|
||||
"Guest access token used for regular user"
|
||||
)
|
||||
|
||||
return TokenLookupResult(
|
||||
return create_requester(
|
||||
user_id=user_id,
|
||||
is_guest=True,
|
||||
# all guests get the same device id
|
||||
device_id=GUEST_DEVICE_ID,
|
||||
authenticated_entity=user_id,
|
||||
)
|
||||
except (
|
||||
pymacaroons.exceptions.MacaroonException,
|
||||
@@ -472,32 +457,33 @@ class Auth:
|
||||
request.requester = create_requester(service.sender, app_service=service)
|
||||
return service
|
||||
|
||||
async def is_server_admin(self, user: UserID) -> bool:
|
||||
async def is_server_admin(self, requester: Requester) -> bool:
|
||||
"""Check if the given user is a local server admin.
|
||||
|
||||
Args:
|
||||
user: user to check
|
||||
requester: The user making the request, according to the access token.
|
||||
|
||||
Returns:
|
||||
True if the user is an admin
|
||||
"""
|
||||
return await self.store.is_server_admin(user)
|
||||
return await self.store.is_server_admin(requester.user)
|
||||
|
||||
async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
|
||||
async def check_can_change_room_list(
|
||||
self, room_id: str, requester: Requester
|
||||
) -> bool:
|
||||
"""Determine whether the user is allowed to edit the room's entry in the
|
||||
published room list.
|
||||
|
||||
Args:
|
||||
room_id
|
||||
user
|
||||
room_id: The room to check.
|
||||
requester: The user making the request, according to the access token.
|
||||
"""
|
||||
|
||||
is_admin = await self.is_server_admin(user)
|
||||
is_admin = await self.is_server_admin(requester)
|
||||
if is_admin:
|
||||
return True
|
||||
|
||||
user_id = user.to_string()
|
||||
await self.check_user_in_room(room_id, user_id)
|
||||
await self.check_user_in_room(room_id, requester)
|
||||
|
||||
# We currently require the user is a "moderator" in the room. We do this
|
||||
# by checking if they would (theoretically) be able to change the
|
||||
@@ -516,7 +502,9 @@ class Auth:
|
||||
send_level = event_auth.get_send_level(
|
||||
EventTypes.CanonicalAlias, "", power_level_event
|
||||
)
|
||||
user_level = event_auth.get_user_power_level(user_id, auth_events)
|
||||
user_level = event_auth.get_user_power_level(
|
||||
requester.user.to_string(), auth_events
|
||||
)
|
||||
|
||||
return user_level >= send_level
|
||||
|
||||
@@ -574,16 +562,16 @@ class Auth:
|
||||
|
||||
@trace
|
||||
async def check_user_in_room_or_world_readable(
|
||||
self, room_id: str, user_id: str, allow_departed_users: bool = False
|
||||
self, room_id: str, requester: Requester, allow_departed_users: bool = False
|
||||
) -> Tuple[str, Optional[str]]:
|
||||
"""Checks that the user is or was in the room or the room is world
|
||||
readable. If it isn't then an exception is raised.
|
||||
|
||||
Args:
|
||||
room_id: room to check
|
||||
user_id: user to check
|
||||
allow_departed_users: if True, accept users that were previously
|
||||
members but have now departed
|
||||
room_id: The room to check.
|
||||
requester: The user making the request, according to the access token.
|
||||
allow_departed_users: If True, accept users that were previously
|
||||
members but have now departed.
|
||||
|
||||
Returns:
|
||||
Resolves to the current membership of the user in the room and the
|
||||
@@ -598,7 +586,7 @@ class Auth:
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
return await self.check_user_in_room(
|
||||
room_id, user_id, allow_departed_users=allow_departed_users
|
||||
room_id, requester, allow_departed_users=allow_departed_users
|
||||
)
|
||||
except AuthError:
|
||||
visibility = await self._storage_controllers.state.get_current_state_event(
|
||||
@@ -613,6 +601,6 @@ class Auth:
|
||||
raise UnstableSpecAuthError(
|
||||
403,
|
||||
"User %s not in room %s, and room previews are disabled"
|
||||
% (user_id, room_id),
|
||||
% (requester.user, room_id),
|
||||
errcode=Codes.NOT_JOINED,
|
||||
)
|
||||
|
||||
@@ -44,7 +44,6 @@ from synapse.app._base import (
|
||||
register_start,
|
||||
)
|
||||
from synapse.config._base import ConfigError, format_config_error
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.server import ListenerConfig
|
||||
from synapse.federation.transport.server import TransportLayerServer
|
||||
@@ -202,7 +201,7 @@ class SynapseHomeServer(HomeServer):
|
||||
}
|
||||
)
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.config.email.can_verify_email:
|
||||
from synapse.rest.synapse.client.password_reset import (
|
||||
PasswordResetSubmitTokenResource,
|
||||
)
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
import email.utils
|
||||
import logging
|
||||
import os
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import attr
|
||||
@@ -136,40 +135,22 @@ class EmailConfig(Config):
|
||||
|
||||
self.email_enable_notifs = email_config.get("enable_notifs", False)
|
||||
|
||||
self.threepid_behaviour_email = (
|
||||
# Have Synapse handle the email sending if account_threepid_delegates.email
|
||||
# is not defined
|
||||
# msisdn is currently always remote while Synapse does not support any method of
|
||||
# sending SMS messages
|
||||
ThreepidBehaviour.REMOTE
|
||||
if self.root.registration.account_threepid_delegate_email
|
||||
else ThreepidBehaviour.LOCAL
|
||||
)
|
||||
|
||||
if config.get("trust_identity_server_for_password_resets"):
|
||||
raise ConfigError(
|
||||
'The config option "trust_identity_server_for_password_resets" has been removed.'
|
||||
"Please consult the configuration manual at docs/usage/configuration/config_documentation.md for "
|
||||
"details and update your config file."
|
||||
'The config option "trust_identity_server_for_password_resets" '
|
||||
"is no longer supported. Please remove it from the config file."
|
||||
)
|
||||
|
||||
self.local_threepid_handling_disabled_due_to_email_config = False
|
||||
if (
|
||||
self.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
and email_config == {}
|
||||
):
|
||||
# We cannot warn the user this has happened here
|
||||
# Instead do so when a user attempts to reset their password
|
||||
self.local_threepid_handling_disabled_due_to_email_config = True
|
||||
|
||||
self.threepid_behaviour_email = ThreepidBehaviour.OFF
|
||||
# If we have email config settings, assume that we can verify ownership of
|
||||
# email addresses.
|
||||
self.can_verify_email = email_config != {}
|
||||
|
||||
# Get lifetime of a validation token in milliseconds
|
||||
self.email_validation_token_lifetime = self.parse_duration(
|
||||
email_config.get("validation_token_lifetime", "1h")
|
||||
)
|
||||
|
||||
if self.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.can_verify_email:
|
||||
missing = []
|
||||
if not self.email_notif_from:
|
||||
missing.append("email.notif_from")
|
||||
@@ -360,18 +341,3 @@ class EmailConfig(Config):
|
||||
"Config option email.invite_client_location must be a http or https URL",
|
||||
path=("email", "invite_client_location"),
|
||||
)
|
||||
|
||||
|
||||
class ThreepidBehaviour(Enum):
|
||||
"""
|
||||
Enum to define the behaviour of Synapse with regards to when it contacts an identity
|
||||
server for 3pid registration and password resets
|
||||
|
||||
REMOTE = use an external server to send tokens
|
||||
LOCAL = send tokens ourselves
|
||||
OFF = disable registration via 3pid and password resets
|
||||
"""
|
||||
|
||||
REMOTE = "remote"
|
||||
LOCAL = "local"
|
||||
OFF = "off"
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import argparse
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
from synapse.api.constants import RoomCreationPreset
|
||||
@@ -21,15 +20,11 @@ from synapse.config._base import Config, ConfigError
|
||||
from synapse.types import JsonDict, RoomAlias, UserID
|
||||
from synapse.util.stringutils import random_string_with_symbols, strtobool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LEGACY_EMAIL_DELEGATE_WARNING = """\
|
||||
Delegation of email verification to an identity server is now deprecated. To
|
||||
NO_EMAIL_DELEGATE_ERROR = """\
|
||||
Delegation of email verification to an identity server is no longer supported. To
|
||||
continue to allow users to add email addresses to their accounts, and use them for
|
||||
password resets, configure Synapse with an SMTP server via the `email` setting, and
|
||||
remove `account_threepid_delegates.email`.
|
||||
|
||||
This will be an error in a future version.
|
||||
"""
|
||||
|
||||
|
||||
@@ -64,9 +59,7 @@ class RegistrationConfig(Config):
|
||||
|
||||
account_threepid_delegates = config.get("account_threepid_delegates") or {}
|
||||
if "email" in account_threepid_delegates:
|
||||
logger.warning(LEGACY_EMAIL_DELEGATE_WARNING)
|
||||
|
||||
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
|
||||
raise ConfigError(NO_EMAIL_DELEGATE_ERROR)
|
||||
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
|
||||
self.default_identity_server = config.get("default_identity_server")
|
||||
self.allow_guest_access = config.get("allow_guest_access", False)
|
||||
|
||||
@@ -28,6 +28,7 @@ from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import prune_event, prune_event_dict
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.types import JsonDict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -35,6 +36,7 @@ logger = logging.getLogger(__name__)
|
||||
Hasher = Callable[[bytes], "hashlib._Hash"]
|
||||
|
||||
|
||||
@trace
|
||||
def check_event_content_hash(
|
||||
event: EventBase, hash_algorithm: Hasher = hashlib.sha256
|
||||
) -> bool:
|
||||
|
||||
@@ -32,6 +32,7 @@ from typing_extensions import Literal
|
||||
|
||||
import synapse
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.rest.media.v1._base import FileInfo
|
||||
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
|
||||
from synapse.spam_checker_api import RegistrationBehaviour
|
||||
@@ -378,6 +379,7 @@ class SpamChecker:
|
||||
if check_media_file_for_spam is not None:
|
||||
self._check_media_file_for_spam_callbacks.append(check_media_file_for_spam)
|
||||
|
||||
@trace
|
||||
async def check_event_for_spam(
|
||||
self, event: "synapse.events.EventBase"
|
||||
) -> Union[Tuple[Codes, JsonDict], str]:
|
||||
|
||||
@@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
|
||||
from synapse.events import EventBase, make_event_from_dict
|
||||
from synapse.events.utils import prune_event, validate_canonicaljson
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -55,6 +56,7 @@ class FederationBase:
|
||||
self._clock = hs.get_clock()
|
||||
self._storage_controllers = hs.get_storage_controllers()
|
||||
|
||||
@trace
|
||||
async def _check_sigs_and_hash(
|
||||
self, room_version: RoomVersion, pdu: EventBase
|
||||
) -> EventBase:
|
||||
@@ -97,17 +99,36 @@ class FederationBase:
|
||||
"Event %s seems to have been redacted; using our redacted copy",
|
||||
pdu.event_id,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Event seems to have been redacted; using our redacted copy",
|
||||
"event_id": pdu.event_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Event %s content has been tampered, redacting",
|
||||
pdu.event_id,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Event content has been tampered, redacting",
|
||||
"event_id": pdu.event_id,
|
||||
}
|
||||
)
|
||||
return redacted_event
|
||||
|
||||
spam_check = await self.spam_checker.check_event_for_spam(pdu)
|
||||
|
||||
if spam_check != self.spam_checker.NOT_SPAM:
|
||||
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Event contains spam, redacting (to save disk space) "
|
||||
"as well as soft-failing (to stop using the event in prev_events)",
|
||||
"event_id": pdu.event_id,
|
||||
}
|
||||
)
|
||||
# we redact (to save disk space) as well as soft-failing (to stop
|
||||
# using the event in prev_events).
|
||||
redacted_event = prune_event(pdu)
|
||||
@@ -117,6 +138,7 @@ class FederationBase:
|
||||
return pdu
|
||||
|
||||
|
||||
@trace
|
||||
async def _check_sigs_on_pdu(
|
||||
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
|
||||
) -> None:
|
||||
|
||||
@@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
|
||||
)
|
||||
from synapse.federation.transport.client import SendJoinResponse
|
||||
from synapse.http.types import QueryParams
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
|
||||
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
|
||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -587,11 +587,15 @@ class FederationClient(FederationBase):
|
||||
Returns:
|
||||
A list of PDUs that have valid signatures and hashes.
|
||||
"""
|
||||
set_tag(
|
||||
SynapseTags.RESULT_PREFIX + "pdus.length",
|
||||
str(len(pdus)),
|
||||
)
|
||||
|
||||
# We limit how many PDUs we check at once, as if we try to do hundreds
|
||||
# of thousands of PDUs at once we see large memory spikes.
|
||||
|
||||
valid_pdus = []
|
||||
valid_pdus: List[EventBase] = []
|
||||
|
||||
async def _execute(pdu: EventBase) -> None:
|
||||
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
||||
@@ -607,6 +611,8 @@ class FederationClient(FederationBase):
|
||||
|
||||
return valid_pdus
|
||||
|
||||
@trace
|
||||
@tag_args
|
||||
async def _check_sigs_and_hash_and_fetch_one(
|
||||
self,
|
||||
pdu: EventBase,
|
||||
@@ -639,16 +645,27 @@ class FederationClient(FederationBase):
|
||||
except InvalidEventSignatureError as e:
|
||||
logger.warning(
|
||||
"Signature on retrieved event %s was invalid (%s). "
|
||||
"Checking local store/orgin server",
|
||||
"Checking local store/origin server",
|
||||
pdu.event_id,
|
||||
e,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Signature on retrieved event was invalid. "
|
||||
"Checking local store/origin server",
|
||||
"event_id": pdu.event_id,
|
||||
"InvalidEventSignatureError": e,
|
||||
}
|
||||
)
|
||||
|
||||
# Check local db.
|
||||
res = await self.store.get_event(
|
||||
pdu.event_id, allow_rejected=True, allow_none=True
|
||||
)
|
||||
|
||||
# If the PDU fails its signature check and we don't have it in our
|
||||
# database, we then request it from sender's server (if that is not the
|
||||
# same as `origin`).
|
||||
pdu_origin = get_domain_from_id(pdu.sender)
|
||||
if not res and pdu_origin != origin:
|
||||
try:
|
||||
|
||||
@@ -280,7 +280,7 @@ class AuthHandler:
|
||||
that it isn't stolen by re-authenticating them.
|
||||
|
||||
Args:
|
||||
requester: The user, as given by the access token
|
||||
requester: The user making the request, according to the access token.
|
||||
|
||||
request: The request sent by the client.
|
||||
|
||||
@@ -1435,20 +1435,25 @@ class AuthHandler:
|
||||
access_token: access token to be deleted
|
||||
|
||||
"""
|
||||
user_info = await self.auth.get_user_by_access_token(access_token)
|
||||
token = await self.store.get_user_by_access_token(access_token)
|
||||
if not token:
|
||||
# At this point, the token should already have been fetched once by
|
||||
# the caller, so this should not happen, unless of a race condition
|
||||
# between two delete requests
|
||||
raise SynapseError(HTTPStatus.UNAUTHORIZED, "Unrecognised access token")
|
||||
await self.store.delete_access_token(access_token)
|
||||
|
||||
# see if any modules want to know about this
|
||||
await self.password_auth_provider.on_logged_out(
|
||||
user_id=user_info.user_id,
|
||||
device_id=user_info.device_id,
|
||||
user_id=token.user_id,
|
||||
device_id=token.device_id,
|
||||
access_token=access_token,
|
||||
)
|
||||
|
||||
# delete pushers associated with this access token
|
||||
if user_info.token_id is not None:
|
||||
if token.token_id is not None:
|
||||
await self.hs.get_pusherpool().remove_pushers_by_access_token(
|
||||
user_info.user_id, (user_info.token_id,)
|
||||
token.user_id, (token.token_id,)
|
||||
)
|
||||
|
||||
async def delete_access_tokens_for_user(
|
||||
|
||||
@@ -30,7 +30,7 @@ from synapse.api.errors import (
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.storage.databases.main.directory import RoomAliasMapping
|
||||
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
|
||||
from synapse.types import JsonDict, Requester, RoomAlias, get_domain_from_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -133,7 +133,7 @@ class DirectoryHandler:
|
||||
else:
|
||||
# Server admins are not subject to the same constraints as normal
|
||||
# users when creating an alias (e.g. being in the room).
|
||||
is_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_admin = await self.auth.is_server_admin(requester)
|
||||
|
||||
if (self.require_membership and check_membership) and not is_admin:
|
||||
rooms_for_user = await self.store.get_rooms_for_user(user_id)
|
||||
@@ -197,7 +197,7 @@ class DirectoryHandler:
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
try:
|
||||
can_delete = await self._user_can_delete_alias(room_alias, user_id)
|
||||
can_delete = await self._user_can_delete_alias(room_alias, requester)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
raise NotFoundError("Unknown room alias")
|
||||
@@ -400,7 +400,9 @@ class DirectoryHandler:
|
||||
# either no interested services, or no service with an exclusive lock
|
||||
return True
|
||||
|
||||
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
|
||||
async def _user_can_delete_alias(
|
||||
self, alias: RoomAlias, requester: Requester
|
||||
) -> bool:
|
||||
"""Determine whether a user can delete an alias.
|
||||
|
||||
One of the following must be true:
|
||||
@@ -413,7 +415,7 @@ class DirectoryHandler:
|
||||
"""
|
||||
creator = await self.store.get_room_alias_creator(alias.to_string())
|
||||
|
||||
if creator == user_id:
|
||||
if creator == requester.user.to_string():
|
||||
return True
|
||||
|
||||
# Resolve the alias to the corresponding room.
|
||||
@@ -422,9 +424,7 @@ class DirectoryHandler:
|
||||
if not room_id:
|
||||
return False
|
||||
|
||||
return await self.auth.check_can_change_room_list(
|
||||
room_id, UserID.from_string(user_id)
|
||||
)
|
||||
return await self.auth.check_can_change_room_list(room_id, requester)
|
||||
|
||||
async def edit_published_room_list(
|
||||
self, requester: Requester, room_id: str, visibility: str
|
||||
@@ -463,7 +463,7 @@ class DirectoryHandler:
|
||||
raise SynapseError(400, "Unknown room")
|
||||
|
||||
can_change_room_list = await self.auth.check_can_change_room_list(
|
||||
room_id, requester.user
|
||||
room_id, requester
|
||||
)
|
||||
if not can_change_room_list:
|
||||
raise AuthError(
|
||||
@@ -528,10 +528,8 @@ class DirectoryHandler:
|
||||
Get a list of the aliases that currently point to this room on this server
|
||||
"""
|
||||
# allow access to server admins and current members of the room
|
||||
is_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_admin = await self.auth.is_server_admin(requester)
|
||||
if not is_admin:
|
||||
await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, requester.user.to_string()
|
||||
)
|
||||
await self.auth.check_user_in_room_or_world_readable(room_id, requester)
|
||||
|
||||
return await self.store.get_aliases_for_room(room_id)
|
||||
|
||||
@@ -86,9 +86,14 @@ backfill_processing_before_timer = Histogram(
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
0.1,
|
||||
0.5,
|
||||
1.0,
|
||||
2.5,
|
||||
5.0,
|
||||
7.5,
|
||||
10.0,
|
||||
15.0,
|
||||
20.0,
|
||||
30.0,
|
||||
40.0,
|
||||
@@ -482,7 +487,7 @@ class FederationHandler:
|
||||
|
||||
processing_end_time = self.clock.time_msec()
|
||||
backfill_processing_before_timer.observe(
|
||||
(processing_start_time - processing_end_time) / 1000
|
||||
(processing_end_time - processing_start_time) / 1000
|
||||
)
|
||||
|
||||
success = await try_backfill(likely_domains)
|
||||
@@ -851,13 +856,20 @@ class FederationHandler:
|
||||
# Note that this requires the /send_join request to come back to the
|
||||
# same server.
|
||||
if room_version.msc3083_join_rules:
|
||||
state_ids = await self._state_storage_controller.get_current_state_ids(
|
||||
room_id
|
||||
partial_state_ids = (
|
||||
await self._state_storage_controller.get_current_state_ids(
|
||||
room_id,
|
||||
state_filter=StateFilter.from_types(
|
||||
[(EventTypes.JoinRules, ""), (EventTypes.Member, user_id)]
|
||||
),
|
||||
)
|
||||
)
|
||||
if await self._event_auth_handler.has_restricted_join_rules(
|
||||
state_ids, room_version
|
||||
partial_state_ids, room_version
|
||||
):
|
||||
prev_member_event_id = state_ids.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event_id = partial_state_ids.get(
|
||||
(EventTypes.Member, user_id), None
|
||||
)
|
||||
# If the user is invited or joined to the room already, then
|
||||
# no additional info is needed.
|
||||
include_auth_user_id = True
|
||||
@@ -869,6 +881,12 @@ class FederationHandler:
|
||||
)
|
||||
|
||||
if include_auth_user_id:
|
||||
state_ids = (
|
||||
await self._state_storage_controller.get_current_state_ids(
|
||||
room_id,
|
||||
)
|
||||
)
|
||||
|
||||
event_content[
|
||||
EventContentFields.AUTHORISING_USER
|
||||
] = await self._event_auth_handler.get_user_which_could_invite(
|
||||
|
||||
@@ -104,15 +104,25 @@ backfill_processing_after_timer = Histogram(
|
||||
"sec",
|
||||
[],
|
||||
buckets=(
|
||||
0.1,
|
||||
0.25,
|
||||
0.5,
|
||||
1.0,
|
||||
2.5,
|
||||
5.0,
|
||||
7.5,
|
||||
10.0,
|
||||
15.0,
|
||||
20.0,
|
||||
25.0,
|
||||
30.0,
|
||||
40.0,
|
||||
50.0,
|
||||
60.0,
|
||||
80.0,
|
||||
100.0,
|
||||
120.0,
|
||||
150.0,
|
||||
180.0,
|
||||
"+Inf",
|
||||
),
|
||||
|
||||
@@ -26,7 +26,6 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.site import SynapseRequest
|
||||
@@ -416,48 +415,6 @@ class IdentityHandler:
|
||||
|
||||
return session_id
|
||||
|
||||
async def request_email_token(
|
||||
self,
|
||||
id_server: str,
|
||||
email: str,
|
||||
client_secret: str,
|
||||
send_attempt: int,
|
||||
next_link: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
"""
|
||||
Request an external server send an email on our behalf for the purposes of threepid
|
||||
validation.
|
||||
|
||||
Args:
|
||||
id_server: The identity server to proxy to
|
||||
email: The email to send the message to
|
||||
client_secret: The unique client_secret sends by the user
|
||||
send_attempt: Which attempt this is
|
||||
next_link: A link to redirect the user to once they submit the token
|
||||
|
||||
Returns:
|
||||
The json response body from the server
|
||||
"""
|
||||
params = {
|
||||
"email": email,
|
||||
"client_secret": client_secret,
|
||||
"send_attempt": send_attempt,
|
||||
}
|
||||
if next_link:
|
||||
params["next_link"] = next_link
|
||||
|
||||
try:
|
||||
data = await self.http_client.post_json_get_json(
|
||||
id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
|
||||
params,
|
||||
)
|
||||
return data
|
||||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
except RequestTimedOutError:
|
||||
raise SynapseError(500, "Timed out contacting identity server")
|
||||
|
||||
async def requestMsisdnToken(
|
||||
self,
|
||||
id_server: str,
|
||||
@@ -531,18 +488,7 @@ class IdentityHandler:
|
||||
validation_session = None
|
||||
|
||||
# Try to validate as email
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
# Remote emails will only be used if a valid identity server is provided.
|
||||
assert (
|
||||
self.hs.config.registration.account_threepid_delegate_email is not None
|
||||
)
|
||||
|
||||
# Ask our delegated email identity server
|
||||
validation_session = await self.threepid_from_creds(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
threepid_creds,
|
||||
)
|
||||
elif self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.hs.config.email.can_verify_email:
|
||||
# Get a validated session matching these details
|
||||
validation_session = await self.store.get_threepid_validation_session(
|
||||
"email", client_secret, sid=sid, validated=True
|
||||
|
||||
@@ -309,18 +309,18 @@ class InitialSyncHandler:
|
||||
if blocked:
|
||||
raise SynapseError(403, "This room has been blocked on this server")
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
(
|
||||
membership,
|
||||
member_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id,
|
||||
user_id,
|
||||
requester,
|
||||
allow_departed_users=True,
|
||||
)
|
||||
is_peeking = member_event_id is None
|
||||
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
result = await self._room_initial_sync_joined(
|
||||
user_id, room_id, pagin_config, membership, is_peeking
|
||||
|
||||
+12
-11
@@ -104,7 +104,7 @@ class MessageHandler:
|
||||
|
||||
async def get_room_data(
|
||||
self,
|
||||
user_id: str,
|
||||
requester: Requester,
|
||||
room_id: str,
|
||||
event_type: str,
|
||||
state_key: str,
|
||||
@@ -112,7 +112,7 @@ class MessageHandler:
|
||||
"""Get data from a room.
|
||||
|
||||
Args:
|
||||
user_id
|
||||
requester: The user who did the request.
|
||||
room_id
|
||||
event_type
|
||||
state_key
|
||||
@@ -125,7 +125,7 @@ class MessageHandler:
|
||||
membership,
|
||||
membership_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, user_id, allow_departed_users=True
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
@@ -161,11 +161,10 @@ class MessageHandler:
|
||||
|
||||
async def get_state_events(
|
||||
self,
|
||||
user_id: str,
|
||||
requester: Requester,
|
||||
room_id: str,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
at_token: Optional[StreamToken] = None,
|
||||
is_guest: bool = False,
|
||||
) -> List[dict]:
|
||||
"""Retrieve all state events for a given room. If the user is
|
||||
joined to the room then return the current state. If the user has
|
||||
@@ -174,14 +173,13 @@ class MessageHandler:
|
||||
visible.
|
||||
|
||||
Args:
|
||||
user_id: The user requesting state events.
|
||||
requester: The user requesting state events.
|
||||
room_id: The room ID to get all state events from.
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
at_token: the stream token of the at which we are requesting
|
||||
the stats. If the user is not allowed to view the state as of that
|
||||
stream token, we raise a 403 SynapseError. If None, returns the current
|
||||
state based on the current_state_events table.
|
||||
is_guest: whether this user is a guest
|
||||
Returns:
|
||||
A list of dicts representing state events. [{}, {}, {}]
|
||||
Raises:
|
||||
@@ -191,6 +189,7 @@ class MessageHandler:
|
||||
members of this room.
|
||||
"""
|
||||
state_filter = state_filter or StateFilter.all()
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if at_token:
|
||||
last_event_id = (
|
||||
@@ -223,7 +222,7 @@ class MessageHandler:
|
||||
membership,
|
||||
membership_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, user_id, allow_departed_users=True
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
@@ -317,12 +316,11 @@ class MessageHandler:
|
||||
Returns:
|
||||
A dict of user_id to profile info
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
if not requester.app_service:
|
||||
# We check AS auth after fetching the room membership, as it
|
||||
# requires us to pull out all joined members anyway.
|
||||
membership, _ = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, user_id, allow_departed_users=True
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
if membership != Membership.JOIN:
|
||||
raise SynapseError(
|
||||
@@ -340,7 +338,10 @@ class MessageHandler:
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or because there
|
||||
# is a user in the room that the AS is "interested in"
|
||||
if requester.app_service and user_id not in users_with_profile:
|
||||
if (
|
||||
requester.app_service
|
||||
and requester.user.to_string() not in users_with_profile
|
||||
):
|
||||
for uid in users_with_profile:
|
||||
if requester.app_service.is_interested_in_user(uid):
|
||||
break
|
||||
|
||||
@@ -464,7 +464,7 @@ class PaginationHandler:
|
||||
membership,
|
||||
member_event_id,
|
||||
) = await self.auth.check_user_in_room_or_world_readable(
|
||||
room_id, user_id, allow_departed_users=True
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
if pagin_config.direction == "b":
|
||||
|
||||
@@ -29,7 +29,13 @@ from synapse.api.constants import (
|
||||
JoinRules,
|
||||
LoginType,
|
||||
)
|
||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
ConsentNotGivenError,
|
||||
InvalidClientTokenError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
@@ -180,10 +186,7 @@ class RegistrationHandler:
|
||||
)
|
||||
if guest_access_token:
|
||||
user_data = await self.auth.get_user_by_access_token(guest_access_token)
|
||||
if (
|
||||
not user_data.is_guest
|
||||
or UserID.from_string(user_data.user_id).localpart != localpart
|
||||
):
|
||||
if not user_data.is_guest or user_data.user.localpart != localpart:
|
||||
raise AuthError(
|
||||
403,
|
||||
"Cannot register taken user ID without valid guest "
|
||||
@@ -618,7 +621,7 @@ class RegistrationHandler:
|
||||
user_id = user.to_string()
|
||||
service = self.store.get_app_service_by_token(as_token)
|
||||
if not service:
|
||||
raise AuthError(403, "Invalid application service token.")
|
||||
raise InvalidClientTokenError()
|
||||
if not service.is_interested_in_user(user_id):
|
||||
raise SynapseError(
|
||||
400,
|
||||
|
||||
@@ -103,7 +103,7 @@ class RelationsHandler:
|
||||
|
||||
# TODO Properly handle a user leaving a room.
|
||||
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
|
||||
room_id, user_id, allow_departed_users=True
|
||||
room_id, requester, allow_departed_users=True
|
||||
)
|
||||
|
||||
# This gets the original event and checks that a) the event exists and
|
||||
|
||||
@@ -721,7 +721,7 @@ class RoomCreationHandler:
|
||||
# allow the server notices mxid to create rooms
|
||||
is_requester_admin = True
|
||||
else:
|
||||
is_requester_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_requester_admin = await self.auth.is_server_admin(requester)
|
||||
|
||||
# Let the third party rules modify the room creation config if needed, or abort
|
||||
# the room creation entirely with an exception.
|
||||
@@ -1279,7 +1279,7 @@ class RoomContextHandler:
|
||||
"""
|
||||
user = requester.user
|
||||
if use_admin_priviledge:
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
before_limit = math.floor(limit / 2.0)
|
||||
after_limit = limit - before_limit
|
||||
|
||||
@@ -16,7 +16,7 @@ import abc
|
||||
import logging
|
||||
import random
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from synapse import types
|
||||
from synapse.api.constants import (
|
||||
@@ -179,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
"""Try and join a room that this server is not in
|
||||
|
||||
Args:
|
||||
requester
|
||||
requester: The user making the request, according to the access token.
|
||||
remote_room_hosts: List of servers that can be used to join via.
|
||||
room_id: Room that we are trying to join
|
||||
user: User who is trying to join
|
||||
@@ -410,11 +410,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
historical=historical,
|
||||
)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids(
|
||||
StateFilter.from_types([(EventTypes.Member, None)])
|
||||
prev_member_event_ids = await context.get_prev_state_ids(
|
||||
StateFilter.from_types([(EventTypes.Member, user_id)])
|
||||
)
|
||||
|
||||
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event_id = prev_member_event_ids.get(
|
||||
(EventTypes.Member, user_id), None
|
||||
)
|
||||
|
||||
if event.membership == Membership.JOIN:
|
||||
newly_joined = True
|
||||
@@ -689,7 +691,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
errcode=Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
if "avatar_url" in content:
|
||||
if "avatar_url" in content and content.get("avatar_url") is not None:
|
||||
if not await self.profile_handler.check_avatar_size_and_mime_type(
|
||||
content["avatar_url"],
|
||||
):
|
||||
@@ -744,7 +746,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
is_requester_admin = True
|
||||
|
||||
else:
|
||||
is_requester_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_requester_admin = await self.auth.is_server_admin(requester)
|
||||
|
||||
if not is_requester_admin:
|
||||
if self.config.server.block_non_admin_invites:
|
||||
@@ -790,14 +792,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
|
||||
|
||||
state_before_join = await self.state_handler.compute_state_after_events(
|
||||
room_id, latest_event_ids
|
||||
old_membership_state_ids = await self.state_handler.compute_state_after_events(
|
||||
room_id,
|
||||
event_ids=latest_event_ids,
|
||||
state_filter=StateFilter.from_types(
|
||||
[(EventTypes.Member, target.to_string())]
|
||||
),
|
||||
)
|
||||
|
||||
# TODO: Refactor into dictionary of explicitly allowed transitions
|
||||
# between old and new state, with specific error messages for some
|
||||
# transitions and generic otherwise
|
||||
old_state_id = state_before_join.get((EventTypes.Member, target.to_string()))
|
||||
old_state_id = old_membership_state_ids.get(
|
||||
(EventTypes.Member, target.to_string())
|
||||
)
|
||||
if old_state_id:
|
||||
old_state = await self.store.get_event(old_state_id, allow_none=True)
|
||||
old_membership = old_state.content.get("membership") if old_state else None
|
||||
@@ -848,11 +856,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
if action == "kick":
|
||||
raise AuthError(403, "The target user is not in the room")
|
||||
|
||||
is_host_in_room = await self._is_host_in_room(state_before_join)
|
||||
is_host_in_room = await self.store.is_host_joined(room_id, self._server_name)
|
||||
|
||||
if effective_membership_state == Membership.JOIN:
|
||||
if requester.is_guest:
|
||||
guest_can_join = await self._can_guest_join(state_before_join)
|
||||
guest_access_ids = await self.state_handler.compute_state_after_events(
|
||||
room_id,
|
||||
event_ids=latest_event_ids,
|
||||
state_filter=StateFilter.from_types([(EventTypes.GuestAccess, "")]),
|
||||
)
|
||||
guest_can_join = await self._can_guest_join(guest_access_ids)
|
||||
if not guest_can_join:
|
||||
# This should be an auth check, but guests are a local concept,
|
||||
# so don't really fit into the general auth process.
|
||||
@@ -868,7 +881,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
bypass_spam_checker = True
|
||||
|
||||
else:
|
||||
bypass_spam_checker = await self.auth.is_server_admin(requester.user)
|
||||
bypass_spam_checker = await self.auth.is_server_admin(requester)
|
||||
|
||||
inviter = await self._get_inviter(target.to_string(), room_id)
|
||||
if (
|
||||
@@ -895,7 +908,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
remote_room_hosts,
|
||||
content,
|
||||
is_host_in_room,
|
||||
state_before_join,
|
||||
latest_event_ids,
|
||||
)
|
||||
if remote_join:
|
||||
if ratelimit:
|
||||
@@ -1040,7 +1053,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
remote_room_hosts: List[str],
|
||||
content: JsonDict,
|
||||
is_host_in_room: bool,
|
||||
state_before_join: StateMap[str],
|
||||
latest_event_ids: Collection[str],
|
||||
) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Check whether the server should do a remote join (as opposed to a local
|
||||
@@ -1060,8 +1073,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
content: The content to use as the event body of the join. This may
|
||||
be modified.
|
||||
is_host_in_room: True if the host is in the room.
|
||||
state_before_join: The state before the join event (i.e. the resolution of
|
||||
the states after its parent events).
|
||||
latest_event_ids: The parent events of the join event.
|
||||
|
||||
Returns:
|
||||
A tuple of:
|
||||
@@ -1079,16 +1091,26 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# for restricted join rules, a remote join must be used.
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
# Only fetch the state that we need to check if we need to worry about
|
||||
# restricted join rules.
|
||||
partial_state_ids = await self.state_handler.compute_state_after_events(
|
||||
room_id,
|
||||
latest_event_ids,
|
||||
StateFilter.from_types(
|
||||
[(EventTypes.JoinRules, ""), (EventTypes.Member, user_id)]
|
||||
),
|
||||
)
|
||||
|
||||
# If restricted join rules are not being used, a local join can always
|
||||
# be used.
|
||||
if not await self.event_auth_handler.has_restricted_join_rules(
|
||||
state_before_join, room_version
|
||||
partial_state_ids, room_version
|
||||
):
|
||||
return False, []
|
||||
|
||||
# If the user is invited to the room or already joined, the join
|
||||
# event can always be issued locally.
|
||||
prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event_id = partial_state_ids.get((EventTypes.Member, user_id), None)
|
||||
prev_member_event = None
|
||||
if prev_member_event_id:
|
||||
prev_member_event = await self.store.get_event(prev_member_event_id)
|
||||
@@ -1098,15 +1120,22 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
):
|
||||
return False, []
|
||||
|
||||
# Now we need to inspect the full membership, so pull that from the DB.
|
||||
members_before_join = await self.state_handler.compute_state_after_events(
|
||||
room_id,
|
||||
latest_event_ids,
|
||||
StateFilter.from_types([(EventTypes.Member, None)]),
|
||||
)
|
||||
|
||||
# If the local host has a user who can issue invites, then a local
|
||||
# join can be done.
|
||||
#
|
||||
# If not, generate a new list of remote hosts based on which
|
||||
# can issue invites.
|
||||
event_map = await self.store.get_events(state_before_join.values())
|
||||
event_map = await self.store.get_events(members_before_join.values())
|
||||
current_state = {
|
||||
state_key: event_map[event_id]
|
||||
for state_key, event_id in state_before_join.items()
|
||||
for state_key, event_id in members_before_join.items()
|
||||
}
|
||||
allowed_servers = get_servers_from_users(
|
||||
get_users_which_can_issue_invite(current_state)
|
||||
@@ -1120,7 +1149,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
# Ensure the member should be allowed access via membership in a room.
|
||||
await self.event_auth_handler.check_restricted_join_rules(
|
||||
state_before_join, room_version, user_id, prev_member_event
|
||||
members_before_join, room_version, user_id, prev_member_event
|
||||
)
|
||||
|
||||
# If this is going to be a local join, additional information must
|
||||
@@ -1130,7 +1159,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
EventContentFields.AUTHORISING_USER
|
||||
] = await self.event_auth_handler.get_user_which_could_invite(
|
||||
room_id,
|
||||
state_before_join,
|
||||
members_before_join,
|
||||
)
|
||||
|
||||
return False, []
|
||||
@@ -1236,7 +1265,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
requester = types.create_requester(target_user)
|
||||
|
||||
prev_state_ids = await context.get_prev_state_ids(
|
||||
StateFilter.from_types([(EventTypes.GuestAccess, None)])
|
||||
StateFilter.from_types(
|
||||
[(EventTypes.GuestAccess, None), (EventTypes.Member, event.state_key)]
|
||||
)
|
||||
)
|
||||
if event.membership == Membership.JOIN:
|
||||
if requester.is_guest:
|
||||
@@ -1410,7 +1441,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
ShadowBanError if the requester has been shadow-banned.
|
||||
"""
|
||||
if self.config.server.block_non_admin_invites:
|
||||
is_requester_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_requester_admin = await self.auth.is_server_admin(requester)
|
||||
if not is_requester_admin:
|
||||
raise SynapseError(
|
||||
403, "Invites have been disabled on this server", Codes.FORBIDDEN
|
||||
@@ -1693,7 +1724,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
check_complexity
|
||||
and self.hs.config.server.limit_remote_rooms.admins_can_join
|
||||
):
|
||||
check_complexity = not await self.auth.is_server_admin(user)
|
||||
check_complexity = not await self.store.is_server_admin(user)
|
||||
|
||||
if check_complexity:
|
||||
# Fetch the room complexity
|
||||
|
||||
@@ -2421,10 +2421,10 @@ class SyncHandler:
|
||||
joined_room.room_id, joined_room.event_pos.stream
|
||||
)
|
||||
)
|
||||
users_in_room = await self.state.get_current_users_in_room(
|
||||
user_ids_in_room = await self.state.get_current_user_ids_in_room(
|
||||
joined_room.room_id, extrems
|
||||
)
|
||||
if user_id in users_in_room:
|
||||
if user_id in user_ids_in_room:
|
||||
joined_room_ids.add(joined_room.room_id)
|
||||
|
||||
return frozenset(joined_room_ids)
|
||||
|
||||
@@ -253,12 +253,11 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
|
||||
) -> None:
|
||||
target_user_id = target_user.to_string()
|
||||
auth_user_id = requester.user.to_string()
|
||||
|
||||
if not self.is_mine_id(target_user_id):
|
||||
raise SynapseError(400, "User is not hosted on this homeserver")
|
||||
|
||||
if target_user_id != auth_user_id:
|
||||
if target_user != requester.user:
|
||||
raise AuthError(400, "Cannot set another user's typing state")
|
||||
|
||||
if requester.shadow_banned:
|
||||
@@ -266,7 +265,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
await self.clock.sleep(random.randint(1, 10))
|
||||
raise ShadowBanError()
|
||||
|
||||
await self.auth.check_user_in_room(room_id, target_user_id)
|
||||
await self.auth.check_user_in_room(room_id, requester)
|
||||
|
||||
logger.debug("%s has started typing in %s", target_user_id, room_id)
|
||||
|
||||
@@ -289,12 +288,11 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
self, target_user: UserID, requester: Requester, room_id: str
|
||||
) -> None:
|
||||
target_user_id = target_user.to_string()
|
||||
auth_user_id = requester.user.to_string()
|
||||
|
||||
if not self.is_mine_id(target_user_id):
|
||||
raise SynapseError(400, "User is not hosted on this homeserver")
|
||||
|
||||
if target_user_id != auth_user_id:
|
||||
if target_user != requester.user:
|
||||
raise AuthError(400, "Cannot set another user's typing state")
|
||||
|
||||
if requester.shadow_banned:
|
||||
@@ -302,7 +300,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
await self.clock.sleep(random.randint(1, 10))
|
||||
raise ShadowBanError()
|
||||
|
||||
await self.auth.check_user_in_room(room_id, target_user_id)
|
||||
await self.auth.check_user_in_room(room_id, requester)
|
||||
|
||||
logger.debug("%s has stopped typing in %s", target_user_id, room_id)
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ from twisted.web.client import PartialDownloadError
|
||||
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import Codes, LoginError, SynapseError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.util import json_decoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -153,7 +152,7 @@ class _BaseThreepidAuthChecker:
|
||||
|
||||
logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
|
||||
|
||||
# msisdns are currently always ThreepidBehaviour.REMOTE
|
||||
# msisdns are currently always verified via the IS
|
||||
if medium == "msisdn":
|
||||
if not self.hs.config.registration.account_threepid_delegate_msisdn:
|
||||
raise SynapseError(
|
||||
@@ -164,18 +163,7 @@ class _BaseThreepidAuthChecker:
|
||||
threepid_creds,
|
||||
)
|
||||
elif medium == "email":
|
||||
if (
|
||||
self.hs.config.email.threepid_behaviour_email
|
||||
== ThreepidBehaviour.REMOTE
|
||||
):
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
threepid = await identity_handler.threepid_from_creds(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
threepid_creds,
|
||||
)
|
||||
elif (
|
||||
self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
):
|
||||
if self.hs.config.email.can_verify_email:
|
||||
threepid = None
|
||||
row = await self.store.get_threepid_validation_session(
|
||||
medium,
|
||||
@@ -227,10 +215,7 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
|
||||
_BaseThreepidAuthChecker.__init__(self, hs)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
return self.hs.config.email.threepid_behaviour_email in (
|
||||
ThreepidBehaviour.REMOTE,
|
||||
ThreepidBehaviour.LOCAL,
|
||||
)
|
||||
return self.hs.config.email.can_verify_email
|
||||
|
||||
async def check_auth(self, authdict: dict, clientip: str) -> Any:
|
||||
return await self._check_threepid("email", authdict)
|
||||
|
||||
@@ -226,7 +226,7 @@ class SynapseRequest(Request):
|
||||
|
||||
# If this is a request where the target user doesn't match the user who
|
||||
# authenticated (e.g. and admin is puppetting a user) then we return both.
|
||||
if self._requester.user.to_string() != authenticated_entity:
|
||||
if requester != authenticated_entity:
|
||||
return requester, authenticated_entity
|
||||
|
||||
return requester, None
|
||||
|
||||
@@ -31,6 +31,5 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
||||
self._push_rules_stream_id_gen.advance(instance_name, token)
|
||||
for row in rows:
|
||||
self.get_push_rules_for_user.invalidate((row.user_id,))
|
||||
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
|
||||
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
|
||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
@@ -19,7 +19,7 @@ from typing import Iterable, Pattern
|
||||
from synapse.api.auth import Auth
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.types import UserID
|
||||
from synapse.types import Requester
|
||||
|
||||
|
||||
def admin_patterns(path_regex: str, version: str = "v1") -> Iterable[Pattern]:
|
||||
@@ -48,19 +48,19 @@ async def assert_requester_is_admin(auth: Auth, request: SynapseRequest) -> None
|
||||
AuthError if the requester is not a server admin
|
||||
"""
|
||||
requester = await auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(auth, requester.user)
|
||||
await assert_user_is_admin(auth, requester)
|
||||
|
||||
|
||||
async def assert_user_is_admin(auth: Auth, user_id: UserID) -> None:
|
||||
async def assert_user_is_admin(auth: Auth, requester: Requester) -> None:
|
||||
"""Verify that the given user is an admin user
|
||||
|
||||
Args:
|
||||
auth: Auth singleton
|
||||
user_id: user to check
|
||||
requester: The user making the request, according to the access token.
|
||||
|
||||
Raises:
|
||||
AuthError if the user is not a server admin
|
||||
"""
|
||||
is_admin = await auth.is_server_admin(user_id)
|
||||
is_admin = await auth.is_server_admin(requester)
|
||||
if not is_admin:
|
||||
raise AuthError(HTTPStatus.FORBIDDEN, "You are not a server admin")
|
||||
|
||||
@@ -54,7 +54,7 @@ class QuarantineMediaInRoom(RestServlet):
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining room: %s", room_id)
|
||||
|
||||
@@ -81,7 +81,7 @@ class QuarantineMediaByUser(RestServlet):
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining media by user: %s", user_id)
|
||||
|
||||
@@ -110,7 +110,7 @@ class QuarantineMediaByID(RestServlet):
|
||||
self, request: SynapseRequest, server_name: str, media_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
logging.info("Quarantining media by ID: %s/%s", server_name, media_id)
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ class RoomRestV2Servlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_user_is_admin(self._auth, requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -327,7 +327,7 @@ class RoomRestServlet(RestServlet):
|
||||
pagination_handler: "PaginationHandler",
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(auth, requester.user)
|
||||
await assert_user_is_admin(auth, requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -461,7 +461,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
|
||||
assert request.args is not None
|
||||
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -551,7 +551,7 @@ class MakeRoomAdminRestServlet(ResolveRoomIdMixin, RestServlet):
|
||||
self, request: SynapseRequest, room_identifier: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
content = parse_json_object_from_request(request, allow_empty_body=True)
|
||||
|
||||
room_id, _ = await self.resolve_room_id(room_identifier)
|
||||
@@ -742,7 +742,7 @@ class RoomEventContextServlet(RestServlet):
|
||||
self, request: SynapseRequest, room_id: str, event_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=False)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
limit = parse_integer(request, "limit", default=10)
|
||||
|
||||
@@ -834,7 +834,7 @@ class BlockRoomRestServlet(RestServlet):
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_user_is_admin(self._auth, requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
|
||||
@@ -183,7 +183,7 @@ class UserRestServletV2(RestServlet):
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
body = parse_json_object_from_request(request)
|
||||
@@ -575,10 +575,9 @@ class WhoisRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
target_user = UserID.from_string(user_id)
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
auth_user = requester.user
|
||||
|
||||
if target_user != auth_user:
|
||||
await assert_user_is_admin(self.auth, auth_user)
|
||||
if target_user != requester.user:
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only whois a local user")
|
||||
@@ -601,7 +600,7 @@ class DeactivateAccountRestServlet(RestServlet):
|
||||
self, request: SynapseRequest, target_user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
if not self.is_mine(UserID.from_string(target_user_id)):
|
||||
raise SynapseError(
|
||||
@@ -693,7 +692,7 @@ class ResetPasswordRestServlet(RestServlet):
|
||||
This needs user to have administrator access in Synapse.
|
||||
"""
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
|
||||
UserID.from_string(target_user_id)
|
||||
|
||||
@@ -807,7 +806,7 @@ class UserAdminServlet(RestServlet):
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
auth_user = requester.user
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
@@ -921,7 +920,7 @@ class UserTokenRestServlet(RestServlet):
|
||||
self, request: SynapseRequest, user_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_user_is_admin(self.auth, requester)
|
||||
auth_user = requester.user
|
||||
|
||||
if not self.is_mine_id(user_id):
|
||||
|
||||
@@ -29,7 +29,6 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
ThreepidValidationError,
|
||||
)
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.handlers.ui_auth import UIAuthSessionDataConstants
|
||||
from synapse.http.server import HttpServer, finish_request, respond_with_html
|
||||
from synapse.http.servlet import (
|
||||
@@ -68,7 +67,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
self.config = hs.config
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.config.email.can_verify_email:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -77,11 +76,10 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"User password resets have been disabled due to lack of email config"
|
||||
)
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"User password resets have been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based password resets have been disabled on this server"
|
||||
)
|
||||
@@ -117,35 +115,20 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.request_email_token(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
body.next_link,
|
||||
)
|
||||
else:
|
||||
# Send password reset emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
self.mailer.send_password_reset_mail,
|
||||
body.next_link,
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
ret = {"sid": sid}
|
||||
|
||||
# Send password reset emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
self.mailer.send_password_reset_mail,
|
||||
body.next_link,
|
||||
)
|
||||
threepid_send_requests.labels(type="email", reason="password_reset").observe(
|
||||
body.send_attempt
|
||||
)
|
||||
|
||||
return 200, ret
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
|
||||
|
||||
class PasswordRestServlet(RestServlet):
|
||||
@@ -340,7 +323,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.config.email.can_verify_email:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -349,11 +332,10 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Adding an email to your account is disabled on this server",
|
||||
@@ -391,35 +373,21 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.request_email_token(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
body.next_link,
|
||||
)
|
||||
else:
|
||||
# Send threepid validation emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
self.mailer.send_add_threepid_mail,
|
||||
body.next_link,
|
||||
)
|
||||
|
||||
# Wrap the session id in a JSON object
|
||||
ret = {"sid": sid}
|
||||
# Send threepid validation emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
body.email,
|
||||
body.client_secret,
|
||||
body.send_attempt,
|
||||
self.mailer.send_add_threepid_mail,
|
||||
body.next_link,
|
||||
)
|
||||
|
||||
threepid_send_requests.labels(type="email", reason="add_threepid").observe(
|
||||
body.send_attempt
|
||||
)
|
||||
|
||||
return 200, ret
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
|
||||
|
||||
class MsisdnThreepidRequestTokenRestServlet(RestServlet):
|
||||
@@ -512,25 +480,19 @@ class AddThreepidEmailSubmitTokenServlet(RestServlet):
|
||||
self.config = hs.config
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.config.email.can_verify_email:
|
||||
self._failure_email_template = (
|
||||
self.config.email.email_add_threepid_template_failure_html
|
||||
)
|
||||
|
||||
async def on_GET(self, request: Request) -> None:
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Adding emails have been disabled due to lack of an email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Adding an email to your account is disabled on this server"
|
||||
)
|
||||
elif self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"This homeserver is not validating threepids.",
|
||||
)
|
||||
|
||||
sid = parse_string(request, "sid", required=True)
|
||||
token = parse_string(request, "token", required=True)
|
||||
|
||||
@@ -26,7 +26,7 @@ from synapse.http.servlet import (
|
||||
parse_string,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
|
||||
from synapse.logging.opentracing import log_kv, set_tag
|
||||
from synapse.types import JsonDict, StreamToken
|
||||
|
||||
from ._base import client_patterns, interactive_auth_handler
|
||||
@@ -71,7 +71,6 @@ class KeyUploadServlet(RestServlet):
|
||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
|
||||
@trace_with_opname("upload_keys")
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, device_id: Optional[str]
|
||||
) -> Tuple[int, JsonDict]:
|
||||
|
||||
@@ -66,7 +66,7 @@ class ProfileDisplaynameRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
user = UserID.from_string(user_id)
|
||||
is_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_admin = await self.auth.is_server_admin(requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -123,7 +123,7 @@ class ProfileAvatarURLRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
user = UserID.from_string(user_id)
|
||||
is_admin = await self.auth.is_server_admin(requester.user)
|
||||
is_admin = await self.auth.is_server_admin(requester)
|
||||
|
||||
content = parse_json_object_from_request(request)
|
||||
try:
|
||||
|
||||
@@ -31,7 +31,6 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config import ConfigError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.ratelimiting import FederationRatelimitSettings
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
@@ -74,7 +73,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
self.identity_handler = hs.get_identity_handler()
|
||||
self.config = hs.config
|
||||
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.hs.config.email.can_verify_email:
|
||||
self.mailer = Mailer(
|
||||
hs=self.hs,
|
||||
app_name=self.config.email.email_app_name,
|
||||
@@ -83,13 +82,10 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if (
|
||||
self.hs.config.email.local_threepid_handling_disabled_due_to_email_config
|
||||
):
|
||||
logger.warning(
|
||||
"Email registration has been disabled due to lack of email config"
|
||||
)
|
||||
if not self.hs.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"Email registration has been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based registration has been disabled on this server"
|
||||
)
|
||||
@@ -138,35 +134,21 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
|
||||
raise SynapseError(400, "Email is already in use", Codes.THREEPID_IN_USE)
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
|
||||
assert self.hs.config.registration.account_threepid_delegate_email
|
||||
|
||||
# Have the configured identity server handle the request
|
||||
ret = await self.identity_handler.request_email_token(
|
||||
self.hs.config.registration.account_threepid_delegate_email,
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
next_link,
|
||||
)
|
||||
else:
|
||||
# Send registration emails from Synapse,
|
||||
# wrapping the session id in a JSON object.
|
||||
ret = {
|
||||
"sid": await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_registration_mail,
|
||||
next_link,
|
||||
)
|
||||
}
|
||||
# Send registration emails from Synapse
|
||||
sid = await self.identity_handler.send_threepid_validation(
|
||||
email,
|
||||
client_secret,
|
||||
send_attempt,
|
||||
self.mailer.send_registration_mail,
|
||||
next_link,
|
||||
)
|
||||
|
||||
threepid_send_requests.labels(type="email", reason="register").observe(
|
||||
send_attempt
|
||||
)
|
||||
|
||||
return 200, ret
|
||||
# Wrap the session id in a JSON object
|
||||
return 200, {"sid": sid}
|
||||
|
||||
|
||||
class MsisdnRegisterRequestTokenRestServlet(RestServlet):
|
||||
@@ -260,7 +242,7 @@ class RegistrationSubmitTokenServlet(RestServlet):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
|
||||
if self.config.email.can_verify_email:
|
||||
self._failure_email_template = (
|
||||
self.config.email.email_registration_template_failure_html
|
||||
)
|
||||
@@ -270,11 +252,10 @@ class RegistrationSubmitTokenServlet(RestServlet):
|
||||
raise SynapseError(
|
||||
400, "This medium is currently not supported for registration"
|
||||
)
|
||||
if self.config.email.threepid_behaviour_email == ThreepidBehaviour.OFF:
|
||||
if self.config.email.local_threepid_handling_disabled_due_to_email_config:
|
||||
logger.warning(
|
||||
"User registration via email has been disabled due to lack of email config"
|
||||
)
|
||||
if not self.config.email.can_verify_email:
|
||||
logger.warning(
|
||||
"User registration via email has been disabled due to lack of email config"
|
||||
)
|
||||
raise SynapseError(
|
||||
400, "Email-based registration is disabled on this server"
|
||||
)
|
||||
@@ -484,9 +465,6 @@ class RegisterRestServlet(RestServlet):
|
||||
"Appservice token must be provided when using a type of m.login.application_service",
|
||||
)
|
||||
|
||||
# Verify the AS
|
||||
self.auth.get_appservice_by_req(request)
|
||||
|
||||
# Set the desired user according to the AS API (which uses the
|
||||
# 'user' key not 'username'). Since this is a new addition, we'll
|
||||
# fallback to 'username' if they gave one.
|
||||
|
||||
@@ -116,9 +116,13 @@ messsages_response_timer = Histogram(
|
||||
2.5,
|
||||
5.0,
|
||||
10.0,
|
||||
20.0,
|
||||
30.0,
|
||||
60.0,
|
||||
80.0,
|
||||
100.0,
|
||||
120.0,
|
||||
150.0,
|
||||
180.0,
|
||||
"+Inf",
|
||||
),
|
||||
@@ -229,7 +233,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
|
||||
|
||||
msg_handler = self.message_handler
|
||||
data = await msg_handler.get_room_data(
|
||||
user_id=requester.user.to_string(),
|
||||
requester=requester,
|
||||
room_id=room_id,
|
||||
event_type=event_type,
|
||||
state_key=state_key,
|
||||
@@ -574,7 +578,7 @@ class RoomMemberListRestServlet(RestServlet):
|
||||
|
||||
events = await handler.get_state_events(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
requester=requester,
|
||||
at_token=at_token,
|
||||
state_filter=StateFilter.from_types([(EventTypes.Member, None)]),
|
||||
)
|
||||
@@ -674,7 +678,7 @@ class RoomMessageListRestServlet(RestServlet):
|
||||
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
|
||||
messsages_response_timer.labels(
|
||||
room_size=_RoomSize.from_member_count(room_member_count)
|
||||
).observe((processing_start_time - processing_end_time) / 1000)
|
||||
).observe((processing_end_time - processing_start_time) / 1000)
|
||||
|
||||
return 200, msgs
|
||||
|
||||
@@ -696,8 +700,7 @@ class RoomStateRestServlet(RestServlet):
|
||||
# Get all the current state for this room
|
||||
events = await self.message_handler.get_state_events(
|
||||
room_id=room_id,
|
||||
user_id=requester.user.to_string(),
|
||||
is_guest=requester.is_guest,
|
||||
requester=requester,
|
||||
)
|
||||
return 200, events
|
||||
|
||||
@@ -755,7 +758,7 @@ class RoomEventServlet(RestServlet):
|
||||
== "true"
|
||||
)
|
||||
if include_unredacted_content and not await self.auth.is_server_admin(
|
||||
requester.user
|
||||
requester
|
||||
):
|
||||
power_level_event = (
|
||||
await self._storage_controllers.state.get_current_state_event(
|
||||
@@ -1260,9 +1263,7 @@ class TimestampLookupRestServlet(RestServlet):
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await self._auth.check_user_in_room_or_world_readable(
|
||||
room_id, requester.user.to_string()
|
||||
)
|
||||
await self._auth.check_user_in_room_or_world_readable(room_id, requester)
|
||||
|
||||
timestamp = parse_integer(request, "ts", required=True)
|
||||
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
|
||||
|
||||
@@ -19,7 +19,7 @@ from synapse.http import servlet
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import set_tag, trace_with_opname
|
||||
from synapse.logging.opentracing import set_tag
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
from synapse.types import JsonDict
|
||||
|
||||
@@ -43,7 +43,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
|
||||
self.txns = HttpTransactionCache(hs)
|
||||
self.device_message_handler = hs.get_device_message_handler()
|
||||
|
||||
@trace_with_opname("sendToDevice")
|
||||
def on_PUT(
|
||||
self, request: SynapseRequest, message_type: str, txn_id: str
|
||||
) -> Awaitable[Tuple[int, JsonDict]]:
|
||||
|
||||
@@ -17,7 +17,6 @@ from typing import TYPE_CHECKING, Tuple
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.errors import ThreepidValidationError
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http.server import DirectServeHtmlResource
|
||||
from synapse.http.servlet import parse_string
|
||||
from synapse.util.stringutils import assert_valid_client_secret
|
||||
@@ -46,9 +45,6 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
self._local_threepid_handling_disabled_due_to_email_config = (
|
||||
hs.config.email.local_threepid_handling_disabled_due_to_email_config
|
||||
)
|
||||
self._confirmation_email_template = (
|
||||
hs.config.email.email_password_reset_template_confirmation_html
|
||||
)
|
||||
@@ -59,8 +55,8 @@ class PasswordResetSubmitTokenResource(DirectServeHtmlResource):
|
||||
hs.config.email.email_password_reset_template_failure_html
|
||||
)
|
||||
|
||||
# This resource should not be mounted if threepid behaviour is not LOCAL
|
||||
assert hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
|
||||
# This resource should only be mounted if email validation is enabled
|
||||
assert hs.config.email.can_verify_email
|
||||
|
||||
async def _async_render_GET(self, request: Request) -> Tuple[int, bytes]:
|
||||
sid = parse_string(request, "sid", required=True)
|
||||
|
||||
@@ -244,7 +244,7 @@ class ServerNoticesManager:
|
||||
assert self.server_notices_mxid is not None
|
||||
|
||||
notice_user_data_in_room = await self._message_handler.get_room_data(
|
||||
self.server_notices_mxid,
|
||||
create_requester(self.server_notices_mxid),
|
||||
room_id,
|
||||
EventTypes.Member,
|
||||
self.server_notices_mxid,
|
||||
|
||||
@@ -44,7 +44,6 @@ from synapse.logging.context import ContextResourceUsage
|
||||
from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet
|
||||
from synapse.state import v1, v2
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.roommember import ProfileInfo
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import StateMap
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
@@ -210,11 +209,11 @@ class StateHandler:
|
||||
ret = await self.resolve_state_groups_for_events(room_id, event_ids)
|
||||
return await ret.get_state(self._state_storage_controller, state_filter)
|
||||
|
||||
async def get_current_users_in_room(
|
||||
async def get_current_user_ids_in_room(
|
||||
self, room_id: str, latest_event_ids: List[str]
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
) -> Set[str]:
|
||||
"""
|
||||
Get the users who are currently in a room.
|
||||
Get the users IDs who are currently in a room.
|
||||
|
||||
Note: This is much slower than using the equivalent method
|
||||
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
|
||||
@@ -225,15 +224,15 @@ class StateHandler:
|
||||
room_id: The ID of the room.
|
||||
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
|
||||
Returns:
|
||||
Dictionary of user IDs to their profileinfo.
|
||||
Set of user IDs in the room.
|
||||
"""
|
||||
|
||||
assert latest_event_ids is not None
|
||||
|
||||
logger.debug("calling resolve_state_groups from get_current_users_in_room")
|
||||
logger.debug("calling resolve_state_groups from get_current_user_ids_in_room")
|
||||
entry = await self.resolve_state_groups_for_events(room_id, latest_event_ids)
|
||||
state = await entry.get_state(self._state_storage_controller, StateFilter.all())
|
||||
return await self.store.get_joined_users_from_state(room_id, state, entry)
|
||||
return await self.store.get_joined_user_ids_from_state(room_id, state, entry)
|
||||
|
||||
async def get_hosts_in_room_at_events(
|
||||
self, room_id: str, event_ids: Collection[str]
|
||||
|
||||
@@ -650,9 +650,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||
txn, self.get_account_data_for_room, (user_id,)
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_push_rules_enabled_for_user, (user_id,)
|
||||
)
|
||||
# This user might be contained in the ignored_by cache for other users,
|
||||
# so we have to invalidate it all.
|
||||
self._invalidate_all_cache_and_stream(txn, self.ignored_by)
|
||||
|
||||
@@ -165,7 +165,6 @@ class PushRulesWorkerStore(
|
||||
|
||||
return _load_rules(rows, enabled_map, self.hs.config.experimental)
|
||||
|
||||
@cached(max_entries=5000)
|
||||
async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]:
|
||||
results = await self.db_pool.simple_select_list(
|
||||
table="push_rules_enable",
|
||||
@@ -229,9 +228,6 @@ class PushRulesWorkerStore(
|
||||
|
||||
return results
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids"
|
||||
)
|
||||
async def bulk_get_push_rules_enabled(
|
||||
self, user_ids: Collection[str]
|
||||
) -> Dict[str, Dict[str, bool]]:
|
||||
@@ -246,6 +242,7 @@ class PushRulesWorkerStore(
|
||||
iterable=user_ids,
|
||||
retcols=("user_name", "rule_id", "enabled"),
|
||||
desc="bulk_get_push_rules_enabled",
|
||||
batch_size=1000,
|
||||
)
|
||||
for row in rows:
|
||||
enabled = bool(row["enabled"])
|
||||
@@ -792,7 +789,6 @@ class PushRuleStore(PushRulesWorkerStore):
|
||||
self.db_pool.simple_insert_txn(txn, "push_rules_stream", values=values)
|
||||
|
||||
txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,))
|
||||
txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,))
|
||||
txn.call_after(
|
||||
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
|
||||
)
|
||||
|
||||
@@ -69,9 +69,9 @@ class TokenLookupResult:
|
||||
"""
|
||||
|
||||
user_id: str
|
||||
token_id: int
|
||||
is_guest: bool = False
|
||||
shadow_banned: bool = False
|
||||
token_id: Optional[int] = None
|
||||
device_id: Optional[str] = None
|
||||
valid_until_ms: Optional[int] = None
|
||||
token_owner: str = attr.ib()
|
||||
|
||||
@@ -835,9 +835,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
return shared_room_ids or frozenset()
|
||||
|
||||
async def get_joined_users_from_state(
|
||||
async def get_joined_user_ids_from_state(
|
||||
self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
) -> Set[str]:
|
||||
state_group: Union[object, int] = state_entry.state_group
|
||||
if not state_group:
|
||||
# If state_group is None it means it has yet to be assigned a
|
||||
@@ -848,25 +848,25 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
assert state_group is not None
|
||||
with Measure(self._clock, "get_joined_users_from_state"):
|
||||
return await self._get_joined_users_from_context(
|
||||
return await self._get_joined_user_ids_from_context(
|
||||
room_id, state_group, state, context=state_entry
|
||||
)
|
||||
|
||||
@cached(num_args=2, iterable=True, max_entries=100000)
|
||||
async def _get_joined_users_from_context(
|
||||
async def _get_joined_user_ids_from_context(
|
||||
self,
|
||||
room_id: str,
|
||||
state_group: Union[object, int],
|
||||
current_state_ids: StateMap[str],
|
||||
event: Optional[EventBase] = None,
|
||||
context: Optional["_StateCacheEntry"] = None,
|
||||
) -> Dict[str, ProfileInfo]:
|
||||
) -> Set[str]:
|
||||
# We don't use `state_group`, it's there so that we can cache based
|
||||
# on it. However, it's important that it's never None, since two current_states
|
||||
# with a state_group of None are likely to be different.
|
||||
assert state_group is not None
|
||||
|
||||
users_in_room = {}
|
||||
users_in_room = set()
|
||||
member_event_ids = [
|
||||
e_id
|
||||
for key, e_id in current_state_ids.items()
|
||||
@@ -879,11 +879,11 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
# If we do then we can reuse that result and simply update it with
|
||||
# any membership changes in `delta_ids`
|
||||
if context.prev_group and context.delta_ids:
|
||||
prev_res = self._get_joined_users_from_context.cache.get_immediate(
|
||||
prev_res = self._get_joined_user_ids_from_context.cache.get_immediate(
|
||||
(room_id, context.prev_group), None
|
||||
)
|
||||
if prev_res and isinstance(prev_res, dict):
|
||||
users_in_room = dict(prev_res)
|
||||
if prev_res and isinstance(prev_res, set):
|
||||
users_in_room = prev_res
|
||||
member_event_ids = [
|
||||
e_id
|
||||
for key, e_id in context.delta_ids.items()
|
||||
@@ -891,7 +891,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
]
|
||||
for etype, state_key in context.delta_ids:
|
||||
if etype == EventTypes.Member:
|
||||
users_in_room.pop(state_key, None)
|
||||
users_in_room.discard(state_key)
|
||||
|
||||
# We check if we have any of the member event ids in the event cache
|
||||
# before we ask the DB
|
||||
@@ -908,71 +908,64 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
ev_entry = event_map.get(event_id)
|
||||
if ev_entry and not ev_entry.event.rejected_reason:
|
||||
if ev_entry.event.membership == Membership.JOIN:
|
||||
users_in_room[ev_entry.event.state_key] = ProfileInfo(
|
||||
display_name=ev_entry.event.content.get("displayname", None),
|
||||
avatar_url=ev_entry.event.content.get("avatar_url", None),
|
||||
)
|
||||
users_in_room.add(ev_entry.event.state_key)
|
||||
else:
|
||||
missing_member_event_ids.append(event_id)
|
||||
|
||||
if missing_member_event_ids:
|
||||
event_to_memberships = await self._get_joined_profiles_from_event_ids(
|
||||
event_to_memberships = await self._get_user_ids_from_membership_event_ids(
|
||||
missing_member_event_ids
|
||||
)
|
||||
users_in_room.update(row for row in event_to_memberships.values() if row)
|
||||
users_in_room.update(
|
||||
user_id for user_id in event_to_memberships.values() if user_id
|
||||
)
|
||||
|
||||
if event is not None and event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
if event.event_id in member_event_ids:
|
||||
users_in_room[event.state_key] = ProfileInfo(
|
||||
display_name=event.content.get("displayname", None),
|
||||
avatar_url=event.content.get("avatar_url", None),
|
||||
)
|
||||
users_in_room.add(event.state_key)
|
||||
|
||||
return users_in_room
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_joined_profile_from_event_id(
|
||||
@cached(
|
||||
max_entries=10000,
|
||||
# This name matches the old function that has been replaced - the cache name
|
||||
# is kept here to maintain backwards compatibility.
|
||||
name="_get_joined_profile_from_event_id",
|
||||
)
|
||||
def _get_user_id_from_membership_event_id(
|
||||
self, event_id: str
|
||||
) -> Optional[Tuple[str, ProfileInfo]]:
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="_get_joined_profile_from_event_id",
|
||||
cached_method_name="_get_user_id_from_membership_event_id",
|
||||
list_name="event_ids",
|
||||
)
|
||||
async def _get_joined_profiles_from_event_ids(
|
||||
async def _get_user_ids_from_membership_event_ids(
|
||||
self, event_ids: Iterable[str]
|
||||
) -> Dict[str, Optional[Tuple[str, ProfileInfo]]]:
|
||||
) -> Dict[str, Optional[str]]:
|
||||
"""For given set of member event_ids check if they point to a join
|
||||
event and if so return the associated user and profile info.
|
||||
event.
|
||||
|
||||
Args:
|
||||
event_ids: The member event IDs to lookup
|
||||
|
||||
Returns:
|
||||
Map from event ID to `user_id` and ProfileInfo (or None if not join event).
|
||||
Map from event ID to `user_id`, or None if event is not a join.
|
||||
"""
|
||||
|
||||
rows = await self.db_pool.simple_select_many_batch(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=event_ids,
|
||||
retcols=("user_id", "display_name", "avatar_url", "event_id"),
|
||||
retcols=("user_id", "event_id"),
|
||||
keyvalues={"membership": Membership.JOIN},
|
||||
batch_size=1000,
|
||||
desc="_get_joined_profiles_from_event_ids",
|
||||
desc="_get_user_ids_from_membership_event_ids",
|
||||
)
|
||||
|
||||
return {
|
||||
row["event_id"]: (
|
||||
row["user_id"],
|
||||
ProfileInfo(
|
||||
avatar_url=row["avatar_url"], display_name=row["display_name"]
|
||||
),
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
return {row["event_id"]: row["user_id"] for row in rows}
|
||||
|
||||
@cached(max_entries=10000)
|
||||
async def is_host_joined(self, room_id: str, host: str) -> bool:
|
||||
@@ -1131,12 +1124,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
else:
|
||||
# The cache doesn't match the state group or prev state group,
|
||||
# so we calculate the result from first principles.
|
||||
joined_users = await self.get_joined_users_from_state(
|
||||
joined_user_ids = await self.get_joined_user_ids_from_state(
|
||||
room_id, state, state_entry
|
||||
)
|
||||
|
||||
cache.hosts_to_joined_users = {}
|
||||
for user_id in joined_users:
|
||||
for user_id in joined_user_ids:
|
||||
host = intern_string(get_domain_from_id(user_id))
|
||||
cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
|
||||
|
||||
|
||||
@@ -14,15 +14,19 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import enum
|
||||
import threading
|
||||
from typing import (
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
Generic,
|
||||
Iterable,
|
||||
MutableMapping,
|
||||
Optional,
|
||||
Set,
|
||||
Sized,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
@@ -31,7 +35,6 @@ from typing import (
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python import failure
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
@@ -94,7 +97,7 @@ class DeferredCache(Generic[KT, VT]):
|
||||
|
||||
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
|
||||
self._pending_deferred_cache: Union[
|
||||
TreeCache, "MutableMapping[KT, CacheEntry]"
|
||||
TreeCache, "MutableMapping[KT, CacheEntry[KT, VT]]"
|
||||
] = cache_type()
|
||||
|
||||
def metrics_cb() -> None:
|
||||
@@ -159,15 +162,16 @@ class DeferredCache(Generic[KT, VT]):
|
||||
Raises:
|
||||
KeyError if the key is not found in the cache
|
||||
"""
|
||||
callbacks = [callback] if callback else []
|
||||
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
|
||||
if val is not _Sentinel.sentinel:
|
||||
val.callbacks.update(callbacks)
|
||||
val.add_invalidation_callback(key, callback)
|
||||
if update_metrics:
|
||||
m = self.cache.metrics
|
||||
assert m # we always have a name, so should always have metrics
|
||||
m.inc_hits()
|
||||
return val.deferred.observe()
|
||||
return val.deferred(key)
|
||||
|
||||
callbacks = (callback,) if callback else ()
|
||||
|
||||
val2 = self.cache.get(
|
||||
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
|
||||
@@ -177,6 +181,73 @@ class DeferredCache(Generic[KT, VT]):
|
||||
else:
|
||||
return defer.succeed(val2)
|
||||
|
||||
def get_bulk(
|
||||
self,
|
||||
keys: Collection[KT],
|
||||
callback: Optional[Callable[[], None]] = None,
|
||||
) -> Tuple[Dict[KT, VT], Optional["defer.Deferred[Dict[KT, VT]]"], Collection[KT]]:
|
||||
"""Bulk lookup of items in the cache.
|
||||
|
||||
Returns:
|
||||
A 3-tuple of:
|
||||
1. a dict of key/value of items already cached;
|
||||
2. a deferred that resolves to a dict of key/value of items
|
||||
we're already fetching; and
|
||||
3. a collection of keys that don't appear in the previous two.
|
||||
"""
|
||||
|
||||
# The cached results
|
||||
cached = {}
|
||||
|
||||
# List of pending deferreds
|
||||
pending = []
|
||||
|
||||
# Dict that gets filled out when the pending deferreds complete
|
||||
pending_results = {}
|
||||
|
||||
# List of keys that aren't in either cache
|
||||
missing = []
|
||||
|
||||
callbacks = (callback,) if callback else ()
|
||||
|
||||
for key in keys:
|
||||
# Check if its in the main cache.
|
||||
immediate_value = self.cache.get(
|
||||
key,
|
||||
_Sentinel.sentinel,
|
||||
callbacks=callbacks,
|
||||
)
|
||||
if immediate_value is not _Sentinel.sentinel:
|
||||
cached[key] = immediate_value
|
||||
continue
|
||||
|
||||
# Check if its in the pending cache
|
||||
pending_value = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
|
||||
if pending_value is not _Sentinel.sentinel:
|
||||
pending_value.add_invalidation_callback(key, callback)
|
||||
|
||||
def completed_cb(value: VT, key: KT) -> VT:
|
||||
pending_results[key] = value
|
||||
return value
|
||||
|
||||
# Add a callback to fill out `pending_results` when that completes
|
||||
d = pending_value.deferred(key).addCallback(completed_cb, key)
|
||||
pending.append(d)
|
||||
continue
|
||||
|
||||
# Not in either cache
|
||||
missing.append(key)
|
||||
|
||||
# If we've got pending deferreds, squash them into a single one that
|
||||
# returns `pending_results`.
|
||||
pending_deferred = None
|
||||
if pending:
|
||||
pending_deferred = defer.gatherResults(
|
||||
pending, consumeErrors=True
|
||||
).addCallback(lambda _: pending_results)
|
||||
|
||||
return (cached, pending_deferred, missing)
|
||||
|
||||
def get_immediate(
|
||||
self, key: KT, default: T, update_metrics: bool = True
|
||||
) -> Union[VT, T]:
|
||||
@@ -218,84 +289,89 @@ class DeferredCache(Generic[KT, VT]):
|
||||
value: a deferred which will complete with a result to add to the cache
|
||||
callback: An optional callback to be called when the entry is invalidated
|
||||
"""
|
||||
if not isinstance(value, defer.Deferred):
|
||||
raise TypeError("not a Deferred")
|
||||
|
||||
callbacks = [callback] if callback else []
|
||||
self.check_thread()
|
||||
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry:
|
||||
existing_entry.invalidate()
|
||||
self._pending_deferred_cache.pop(key, None)
|
||||
|
||||
# XXX: why don't we invalidate the entry in `self.cache` yet?
|
||||
|
||||
# we can save a whole load of effort if the deferred is ready.
|
||||
if value.called:
|
||||
result = value.result
|
||||
if not isinstance(result, failure.Failure):
|
||||
self.cache.set(key, cast(VT, result), callbacks)
|
||||
return value
|
||||
|
||||
# otherwise, we'll add an entry to the _pending_deferred_cache for now,
|
||||
# and add callbacks to add it to the cache properly later.
|
||||
|
||||
observable = ObservableDeferred(value, consumeErrors=True)
|
||||
observer = observable.observe()
|
||||
entry = CacheEntry(deferred=observable, callbacks=callbacks)
|
||||
|
||||
entry = CacheEntrySingle[KT, VT](value)
|
||||
entry.add_invalidation_callback(key, callback)
|
||||
self._pending_deferred_cache[key] = entry
|
||||
|
||||
def compare_and_pop() -> bool:
|
||||
"""Check if our entry is still the one in _pending_deferred_cache, and
|
||||
if so, pop it.
|
||||
|
||||
Returns true if the entries matched.
|
||||
"""
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
return True
|
||||
|
||||
# oops, the _pending_deferred_cache has been updated since
|
||||
# we started our query, so we are out of date.
|
||||
#
|
||||
# Better put back whatever we took out. (We do it this way
|
||||
# round, rather than peeking into the _pending_deferred_cache
|
||||
# and then removing on a match, to make the common case faster)
|
||||
if existing_entry is not None:
|
||||
self._pending_deferred_cache[key] = existing_entry
|
||||
|
||||
return False
|
||||
|
||||
def cb(result: VT) -> None:
|
||||
if compare_and_pop():
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
# we're not going to put this entry into the cache, so need
|
||||
# to make sure that the invalidation callbacks are called.
|
||||
# That was probably done when _pending_deferred_cache was
|
||||
# updated, but it's possible that `set` was called without
|
||||
# `invalidate` being previously called, in which case it may
|
||||
# not have been. Either way, let's double-check now.
|
||||
entry.invalidate()
|
||||
|
||||
def eb(_fail: Failure) -> None:
|
||||
compare_and_pop()
|
||||
entry.invalidate()
|
||||
|
||||
# once the deferred completes, we can move the entry from the
|
||||
# _pending_deferred_cache to the real cache.
|
||||
#
|
||||
observer.addCallbacks(cb, eb)
|
||||
deferred = entry.deferred(key).addCallbacks(
|
||||
self._completed_callback,
|
||||
self._error_callback,
|
||||
callbackArgs=(entry, key),
|
||||
errbackArgs=(entry, key),
|
||||
)
|
||||
|
||||
# we return a new Deferred which will be called before any subsequent observers.
|
||||
return observable.observe()
|
||||
return deferred
|
||||
|
||||
def start_bulk_input(
|
||||
self,
|
||||
keys: Collection[KT],
|
||||
callback: Optional[Callable[[], None]] = None,
|
||||
) -> "CacheMultipleEntries[KT, VT]":
|
||||
"""Bulk set API for use when fetching multiple keys at once from the DB.
|
||||
|
||||
Called *before* starting the fetch from the DB, and the caller *must*
|
||||
call either `complete_bulk(..)` or `error_bulk(..)` on the return value.
|
||||
"""
|
||||
|
||||
entry = CacheMultipleEntries[KT, VT]()
|
||||
entry.add_global_invalidation_callback(callback)
|
||||
|
||||
for key in keys:
|
||||
self._pending_deferred_cache[key] = entry
|
||||
|
||||
return entry
|
||||
|
||||
def _completed_callback(
|
||||
self, value: VT, entry: "CacheEntry[KT, VT]", key: KT
|
||||
) -> VT:
|
||||
"""Called when a deferred is completed."""
|
||||
# We check if the current entry matches the entry associated with the
|
||||
# deferred. If they don't match then it got invalidated.
|
||||
current_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if current_entry is not entry:
|
||||
if current_entry:
|
||||
self._pending_deferred_cache[key] = current_entry
|
||||
return value
|
||||
|
||||
self.cache.set(key, value, entry.get_invalidation_callbacks(key))
|
||||
|
||||
return value
|
||||
|
||||
def _error_callback(
|
||||
self,
|
||||
failure: Failure,
|
||||
entry: "CacheEntry[KT, VT]",
|
||||
key: KT,
|
||||
) -> Failure:
|
||||
"""Called when a deferred errors."""
|
||||
|
||||
# We check if the current entry matches the entry associated with the
|
||||
# deferred. If they don't match then it got invalidated.
|
||||
current_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if current_entry is not entry:
|
||||
if current_entry:
|
||||
self._pending_deferred_cache[key] = current_entry
|
||||
return failure
|
||||
|
||||
for cb in entry.get_invalidation_callbacks(key):
|
||||
cb()
|
||||
|
||||
return failure
|
||||
|
||||
def prefill(
|
||||
self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None
|
||||
) -> None:
|
||||
callbacks = [callback] if callback else []
|
||||
callbacks = (callback,) if callback else ()
|
||||
self.cache.set(key, value, callbacks=callbacks)
|
||||
self._pending_deferred_cache.pop(key, None)
|
||||
|
||||
def invalidate(self, key: KT) -> None:
|
||||
"""Delete a key, or tree of entries
|
||||
@@ -311,41 +387,129 @@ class DeferredCache(Generic[KT, VT]):
|
||||
self.cache.del_multi(key)
|
||||
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, which will (a) stop it being returned
|
||||
# for future queries and (b) stop it being persisted as a proper entry
|
||||
# _pending_deferred_cache, which will (a) stop it being returned for
|
||||
# future queries and (b) stop it being persisted as a proper entry
|
||||
# in self.cache.
|
||||
entry = self._pending_deferred_cache.pop(key, None)
|
||||
|
||||
# run the invalidation callbacks now, rather than waiting for the
|
||||
# deferred to resolve.
|
||||
if entry:
|
||||
# _pending_deferred_cache.pop should either return a CacheEntry, or, in the
|
||||
# case of a TreeCache, a dict of keys to cache entries. Either way calling
|
||||
# iterate_tree_cache_entry on it will do the right thing.
|
||||
for entry in iterate_tree_cache_entry(entry):
|
||||
entry.invalidate()
|
||||
for cb in entry.get_invalidation_callbacks(key):
|
||||
cb()
|
||||
|
||||
def invalidate_all(self) -> None:
|
||||
self.check_thread()
|
||||
self.cache.clear()
|
||||
for entry in self._pending_deferred_cache.values():
|
||||
entry.invalidate()
|
||||
for key, entry in self._pending_deferred_cache.items():
|
||||
for cb in entry.get_invalidation_callbacks(key):
|
||||
cb()
|
||||
|
||||
self._pending_deferred_cache.clear()
|
||||
|
||||
|
||||
class CacheEntry:
|
||||
__slots__ = ["deferred", "callbacks", "invalidated"]
|
||||
class CacheEntry(Generic[KT, VT], metaclass=abc.ABCMeta):
|
||||
"""Abstract class for entries in `DeferredCache[KT, VT]`"""
|
||||
|
||||
def __init__(
|
||||
self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]]
|
||||
):
|
||||
self.deferred = deferred
|
||||
self.callbacks = set(callbacks)
|
||||
self.invalidated = False
|
||||
@abc.abstractmethod
|
||||
def deferred(self, key: KT) -> "defer.Deferred[VT]":
|
||||
"""Get a deferred that a caller can wait on to get the value at the
|
||||
given key"""
|
||||
...
|
||||
|
||||
def invalidate(self) -> None:
|
||||
if not self.invalidated:
|
||||
self.invalidated = True
|
||||
for callback in self.callbacks:
|
||||
callback()
|
||||
self.callbacks.clear()
|
||||
@abc.abstractmethod
|
||||
def add_invalidation_callback(
|
||||
self, key: KT, callback: Optional[Callable[[], None]]
|
||||
) -> None:
|
||||
"""Add an invalidation callback"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
|
||||
"""Get all invalidation callbacks"""
|
||||
...
|
||||
|
||||
|
||||
class CacheEntrySingle(CacheEntry[KT, VT]):
|
||||
"""An implementation of `CacheEntry` wrapping a deferred that results in a
|
||||
single cache entry.
|
||||
"""
|
||||
|
||||
__slots__ = ["_deferred", "_callbacks"]
|
||||
|
||||
def __init__(self, deferred: "defer.Deferred[VT]") -> None:
|
||||
self._deferred = ObservableDeferred(deferred, consumeErrors=True)
|
||||
self._callbacks: Set[Callable[[], None]] = set()
|
||||
|
||||
def deferred(self, key: KT) -> "defer.Deferred[VT]":
|
||||
return self._deferred.observe()
|
||||
|
||||
def add_invalidation_callback(
|
||||
self, key: KT, callback: Optional[Callable[[], None]]
|
||||
) -> None:
|
||||
if callback is None:
|
||||
return
|
||||
|
||||
self._callbacks.add(callback)
|
||||
|
||||
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
|
||||
return self._callbacks
|
||||
|
||||
|
||||
class CacheMultipleEntries(CacheEntry[KT, VT]):
|
||||
"""Cache entry that is used for bulk lookups and insertions."""
|
||||
|
||||
__slots__ = ["_deferred", "_callbacks", "_global_callbacks"]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._deferred: Optional[ObservableDeferred[Dict[KT, VT]]] = None
|
||||
self._callbacks: Dict[KT, Set[Callable[[], None]]] = {}
|
||||
self._global_callbacks: Set[Callable[[], None]] = set()
|
||||
|
||||
def deferred(self, key: KT) -> "defer.Deferred[VT]":
|
||||
if not self._deferred:
|
||||
self._deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
|
||||
return self._deferred.observe().addCallback(lambda res: res.get(key))
|
||||
|
||||
def add_invalidation_callback(
|
||||
self, key: KT, callback: Optional[Callable[[], None]]
|
||||
) -> None:
|
||||
if callback is None:
|
||||
return
|
||||
|
||||
self._callbacks.setdefault(key, set()).add(callback)
|
||||
|
||||
def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]:
|
||||
return self._callbacks.get(key, set()) | self._global_callbacks
|
||||
|
||||
def add_global_invalidation_callback(
|
||||
self, callback: Optional[Callable[[], None]]
|
||||
) -> None:
|
||||
"""Add a callback for when any keys get invalidated."""
|
||||
if callback is None:
|
||||
return
|
||||
|
||||
self._global_callbacks.add(callback)
|
||||
|
||||
def complete_bulk(
|
||||
self,
|
||||
cache: DeferredCache[KT, VT],
|
||||
result: Dict[KT, VT],
|
||||
) -> None:
|
||||
"""Called when there is a result"""
|
||||
for key, value in result.items():
|
||||
cache._completed_callback(value, self, key)
|
||||
|
||||
if self._deferred:
|
||||
self._deferred.callback(result)
|
||||
|
||||
def error_bulk(
|
||||
self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure
|
||||
) -> None:
|
||||
"""Called when bulk lookup failed."""
|
||||
for key in keys:
|
||||
cache._error_callback(failure, self, key)
|
||||
|
||||
if self._deferred:
|
||||
self._deferred.errback(failure)
|
||||
|
||||
@@ -25,6 +25,7 @@ from typing import (
|
||||
Generic,
|
||||
Hashable,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
@@ -73,8 +74,10 @@ class _CacheDescriptorBase:
|
||||
num_args: Optional[int],
|
||||
uncached_args: Optional[Collection[str]] = None,
|
||||
cache_context: bool = False,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
self.orig = orig
|
||||
self.name = name or orig.__name__
|
||||
|
||||
arg_spec = inspect.getfullargspec(orig)
|
||||
all_args = arg_spec.args
|
||||
@@ -211,7 +214,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
|
||||
|
||||
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
|
||||
cache: LruCache[CacheKey, Any] = LruCache(
|
||||
cache_name=self.orig.__name__,
|
||||
cache_name=self.name,
|
||||
max_size=self.max_entries,
|
||||
)
|
||||
|
||||
@@ -241,7 +244,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
|
||||
|
||||
wrapped = cast(_CachedFunction, _wrapped)
|
||||
wrapped.cache = cache
|
||||
obj.__dict__[self.orig.__name__] = wrapped
|
||||
obj.__dict__[self.name] = wrapped
|
||||
|
||||
return wrapped
|
||||
|
||||
@@ -301,12 +304,14 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
cache_context: bool = False,
|
||||
iterable: bool = False,
|
||||
prune_unread_entries: bool = True,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
super().__init__(
|
||||
orig,
|
||||
num_args=num_args,
|
||||
uncached_args=uncached_args,
|
||||
cache_context=cache_context,
|
||||
name=name,
|
||||
)
|
||||
|
||||
if tree and self.num_args < 2:
|
||||
@@ -321,7 +326,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
|
||||
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
|
||||
cache: DeferredCache[CacheKey, Any] = DeferredCache(
|
||||
name=self.orig.__name__,
|
||||
name=self.name,
|
||||
max_entries=self.max_entries,
|
||||
tree=self.tree,
|
||||
iterable=self.iterable,
|
||||
@@ -372,7 +377,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
|
||||
wrapped.cache = cache
|
||||
wrapped.num_args = self.num_args
|
||||
|
||||
obj.__dict__[self.orig.__name__] = wrapped
|
||||
obj.__dict__[self.name] = wrapped
|
||||
|
||||
return wrapped
|
||||
|
||||
@@ -393,6 +398,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
cached_method_name: str,
|
||||
list_name: str,
|
||||
num_args: Optional[int] = None,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
@@ -403,7 +409,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
but including list_name) to use as cache keys. Defaults to all
|
||||
named args of the function.
|
||||
"""
|
||||
super().__init__(orig, num_args=num_args, uncached_args=None)
|
||||
super().__init__(orig, num_args=num_args, uncached_args=None, name=name)
|
||||
|
||||
self.list_name = list_name
|
||||
|
||||
@@ -435,16 +441,6 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
|
||||
list_args = arg_dict[self.list_name]
|
||||
|
||||
results = {}
|
||||
|
||||
def update_results_dict(res: Any, arg: Hashable) -> None:
|
||||
results[arg] = res
|
||||
|
||||
# list of deferreds to wait for
|
||||
cached_defers = []
|
||||
|
||||
missing = set()
|
||||
|
||||
# If the cache takes a single arg then that is used as the key,
|
||||
# otherwise a tuple is used.
|
||||
if num_args == 1:
|
||||
@@ -452,6 +448,9 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
def arg_to_cache_key(arg: Hashable) -> Hashable:
|
||||
return arg
|
||||
|
||||
def cache_key_to_arg(key: tuple) -> Hashable:
|
||||
return key
|
||||
|
||||
else:
|
||||
keylist = list(keyargs)
|
||||
|
||||
@@ -459,58 +458,53 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
keylist[self.list_pos] = arg
|
||||
return tuple(keylist)
|
||||
|
||||
for arg in list_args:
|
||||
try:
|
||||
res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
|
||||
if not res.called:
|
||||
res.addCallback(update_results_dict, arg)
|
||||
cached_defers.append(res)
|
||||
else:
|
||||
results[arg] = res.result
|
||||
except KeyError:
|
||||
missing.add(arg)
|
||||
def cache_key_to_arg(key: tuple) -> Hashable:
|
||||
return key[self.list_pos]
|
||||
|
||||
cache_keys = [arg_to_cache_key(arg) for arg in list_args]
|
||||
immediate_results, pending_deferred, missing = cache.get_bulk(
|
||||
cache_keys, callback=invalidate_callback
|
||||
)
|
||||
|
||||
results = {cache_key_to_arg(key): v for key, v in immediate_results.items()}
|
||||
|
||||
cached_defers: List["defer.Deferred[Any]"] = []
|
||||
if pending_deferred:
|
||||
|
||||
def update_results(r: Dict) -> None:
|
||||
for k, v in r.items():
|
||||
results[cache_key_to_arg(k)] = v
|
||||
|
||||
pending_deferred.addCallback(update_results)
|
||||
cached_defers.append(pending_deferred)
|
||||
|
||||
if missing:
|
||||
# we need a deferred for each entry in the list,
|
||||
# which we put in the cache. Each deferred resolves with the
|
||||
# relevant result for that key.
|
||||
deferreds_map = {}
|
||||
for arg in missing:
|
||||
deferred: "defer.Deferred[Any]" = defer.Deferred()
|
||||
deferreds_map[arg] = deferred
|
||||
key = arg_to_cache_key(arg)
|
||||
cached_defers.append(
|
||||
cache.set(key, deferred, callback=invalidate_callback)
|
||||
)
|
||||
cache_entry = cache.start_bulk_input(missing, invalidate_callback)
|
||||
|
||||
def complete_all(res: Dict[Hashable, Any]) -> None:
|
||||
# the wrapped function has completed. It returns a dict.
|
||||
# We can now update our own result map, and then resolve the
|
||||
# observable deferreds in the cache.
|
||||
for e, d1 in deferreds_map.items():
|
||||
val = res.get(e, None)
|
||||
# make sure we update the results map before running the
|
||||
# deferreds, because as soon as we run the last deferred, the
|
||||
# gatherResults() below will complete and return the result
|
||||
# dict to our caller.
|
||||
results[e] = val
|
||||
d1.callback(val)
|
||||
missing_results = {}
|
||||
for key in missing:
|
||||
arg = cache_key_to_arg(key)
|
||||
val = res.get(arg, None)
|
||||
|
||||
results[arg] = val
|
||||
missing_results[key] = val
|
||||
|
||||
cache_entry.complete_bulk(cache, missing_results)
|
||||
|
||||
def errback_all(f: Failure) -> None:
|
||||
# the wrapped function has failed. Propagate the failure into
|
||||
# the cache, which will invalidate the entry, and cause the
|
||||
# relevant cached_deferreds to fail, which will propagate the
|
||||
# failure to our caller.
|
||||
for d1 in deferreds_map.values():
|
||||
d1.errback(f)
|
||||
cache_entry.error_bulk(cache, missing, f)
|
||||
|
||||
args_to_call = dict(arg_dict)
|
||||
args_to_call[self.list_name] = missing
|
||||
args_to_call[self.list_name] = {
|
||||
cache_key_to_arg(key) for key in missing
|
||||
}
|
||||
|
||||
# dispatch the call, and attach the two handlers
|
||||
defer.maybeDeferred(
|
||||
missing_d = defer.maybeDeferred(
|
||||
preserve_fn(self.orig), **args_to_call
|
||||
).addCallbacks(complete_all, errback_all)
|
||||
cached_defers.append(missing_d)
|
||||
|
||||
if cached_defers:
|
||||
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
|
||||
@@ -525,7 +519,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
|
||||
else:
|
||||
return defer.succeed(results)
|
||||
|
||||
obj.__dict__[self.orig.__name__] = wrapped
|
||||
obj.__dict__[self.name] = wrapped
|
||||
|
||||
return wrapped
|
||||
|
||||
@@ -577,6 +571,7 @@ def cached(
|
||||
cache_context: bool = False,
|
||||
iterable: bool = False,
|
||||
prune_unread_entries: bool = True,
|
||||
name: Optional[str] = None,
|
||||
) -> Callable[[F], _CachedFunction[F]]:
|
||||
func = lambda orig: DeferredCacheDescriptor(
|
||||
orig,
|
||||
@@ -587,13 +582,18 @@ def cached(
|
||||
cache_context=cache_context,
|
||||
iterable=iterable,
|
||||
prune_unread_entries=prune_unread_entries,
|
||||
name=name,
|
||||
)
|
||||
|
||||
return cast(Callable[[F], _CachedFunction[F]], func)
|
||||
|
||||
|
||||
def cachedList(
|
||||
*, cached_method_name: str, list_name: str, num_args: Optional[int] = None
|
||||
*,
|
||||
cached_method_name: str,
|
||||
list_name: str,
|
||||
num_args: Optional[int] = None,
|
||||
name: Optional[str] = None,
|
||||
) -> Callable[[F], _CachedFunction[F]]:
|
||||
"""Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`.
|
||||
|
||||
@@ -628,6 +628,7 @@ def cachedList(
|
||||
cached_method_name=cached_method_name,
|
||||
list_name=list_name,
|
||||
num_args=num_args,
|
||||
name=name,
|
||||
)
|
||||
|
||||
return cast(Callable[[F], _CachedFunction[F]], func)
|
||||
|
||||
@@ -135,6 +135,9 @@ class TreeCache:
|
||||
def values(self):
|
||||
return iterate_tree_cache_entry(self.root)
|
||||
|
||||
def items(self):
|
||||
return iterate_tree_cache_items((), self.root)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return self.size
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user