Compare commits
71 Commits
squah/leav
...
rei/librep
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bf7d6457c | ||
|
|
ecd5a03e46 | ||
|
|
c26843c3bb | ||
|
|
4da1fdfd28 | ||
|
|
7abb4bbe73 | ||
|
|
9858a2ca68 | ||
|
|
10bdbf1f5b | ||
|
|
3ae955ea0b | ||
|
|
d9800c3f6f | ||
|
|
efe0059b98 | ||
|
|
0142367bf2 | ||
|
|
13a16f5d03 | ||
|
|
232be1d25d | ||
|
|
8325ddd0bc | ||
|
|
8428ef66c7 | ||
|
|
1847d027e6 | ||
|
|
43f5cc7adc | ||
|
|
c7fe32edb4 | ||
|
|
f901f8b70e | ||
|
|
323151b787 | ||
|
|
17886d2603 | ||
|
|
ecfcd9bbbe | ||
|
|
0147b3de20 | ||
|
|
2519beaad2 | ||
|
|
70ca05373b | ||
|
|
a91698df90 | ||
|
|
4dd9ea8f4f | ||
|
|
92906e1b60 | ||
|
|
9f3c7e85a4 | ||
|
|
a4dce5b53d | ||
|
|
33abbc3278 | ||
|
|
ff6fd52160 | ||
|
|
eb39da6782 | ||
|
|
5305a5e881 | ||
|
|
1abfb15f07 | ||
|
|
6da8591f2e | ||
|
|
e5cdb9e233 | ||
|
|
aa8708ebed | ||
|
|
8391bd6ab5 | ||
|
|
fd2dadb815 | ||
|
|
f0562183e7 | ||
|
|
86e7a6d16e | ||
|
|
9562f0c2f1 | ||
|
|
3b8872299a | ||
|
|
0cc3bf97b4 | ||
|
|
941ebe49ff | ||
|
|
b47d10dc46 | ||
|
|
b3bcacf3c1 | ||
|
|
afa0a5e4fc | ||
|
|
d93362d87f | ||
|
|
7ecaa3b976 | ||
|
|
83a74d9350 | ||
|
|
365e9482fe | ||
|
|
ff7cc17b57 | ||
|
|
158d73ebdd | ||
|
|
e38f7953ef | ||
|
|
db840d2ad5 | ||
|
|
5352f2109c | ||
|
|
8ea530a7e5 | ||
|
|
f78a082989 | ||
|
|
af85ac449d | ||
|
|
58ef32e272 | ||
|
|
cc76d9f100 | ||
|
|
a30042b16a | ||
|
|
857b2d2039 | ||
|
|
831a7a4592 | ||
|
|
7dcdab407c | ||
|
|
247f558c1c | ||
|
|
d998903d46 | ||
|
|
1fceefd65d | ||
|
|
363565e6ac |
22
CHANGES.md
22
CHANGES.md
@@ -1,5 +1,20 @@
|
||||
Synapse 1.49.0rc1 (2021-12-07)
|
||||
==============================
|
||||
Synapse 1.49.0 (2021-12-14)
|
||||
===========================
|
||||
|
||||
No significant changes since version 1.49.0rc1.
|
||||
|
||||
|
||||
Support for Ubuntu 21.04 ends next month on the 20th of January
|
||||
---------------------------------------------------------------
|
||||
|
||||
For users of Ubuntu 21.04 (Hirsute Hippo), please be aware that [upstream support for this version of Ubuntu will end next month][Ubuntu2104EOL].
|
||||
We will stop producing packages for Ubuntu 21.04 after upstream support ends.
|
||||
|
||||
[Ubuntu2104EOL]: https://lists.ubuntu.com/archives/ubuntu-announce/2021-December/000275.html
|
||||
|
||||
|
||||
The wiki has been migrated to the documentation website
|
||||
-------------------------------------------------------
|
||||
|
||||
We've decided to move the existing, somewhat stagnant pages from the GitHub wiki
|
||||
to the [documentation website](https://matrix-org.github.io/synapse/latest/).
|
||||
@@ -16,6 +31,9 @@ requests](https://github.com/matrix-org/synapse/pulls). Please visit [#synapse-d
|
||||
if you need help with the process!
|
||||
|
||||
|
||||
Synapse 1.49.0rc1 (2021-12-07)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
|
||||
1
changelog.d/10870.misc
Normal file
1
changelog.d/10870.misc
Normal file
@@ -0,0 +1 @@
|
||||
Deduplicate in-flight requests in `_get_state_for_groups`.
|
||||
1
changelog.d/11243.misc
Normal file
1
changelog.d/11243.misc
Normal file
@@ -0,0 +1 @@
|
||||
Allow specific, experimental events to be created without `prev_events`. Used by [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716).
|
||||
@@ -1 +0,0 @@
|
||||
Add an admin API endpoint to force a local user to leave all non-public rooms in a space.
|
||||
1
changelog.d/11360.misc
Normal file
1
changelog.d/11360.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add type hints to `synapse.appservice`.
|
||||
1
changelog.d/11378.feature
Normal file
1
changelog.d/11378.feature
Normal file
@@ -0,0 +1 @@
|
||||
Allow guests to send state events per [MSC3419](https://github.com/matrix-org/matrix-doc/pull/3419).
|
||||
1
changelog.d/11427.doc
Normal file
1
changelog.d/11427.doc
Normal file
@@ -0,0 +1 @@
|
||||
Document the usage of refresh tokens.
|
||||
1
changelog.d/11480.misc
Normal file
1
changelog.d/11480.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to `synapse.config` module.
|
||||
1
changelog.d/11487.misc
Normal file
1
changelog.d/11487.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add test to ensure we share the same `state_group` across the whole historical batch when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint.
|
||||
1
changelog.d/11516.bugfix
Normal file
1
changelog.d/11516.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where relations from other rooms could be included in the bundled aggregations of an event.
|
||||
1
changelog.d/11520.misc
Normal file
1
changelog.d/11520.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use HTTPStatus constants in place of literals in `tests.rest.client.test_auth`.
|
||||
1
changelog.d/11531.misc
Normal file
1
changelog.d/11531.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add a receipt types constant for `m.read`.
|
||||
1
changelog.d/11535.misc
Normal file
1
changelog.d/11535.misc
Normal file
@@ -0,0 +1 @@
|
||||
Clean up `synapse.rest.admin`.
|
||||
1
changelog.d/11536.misc
Normal file
1
changelog.d/11536.misc
Normal file
@@ -0,0 +1 @@
|
||||
Improvements to log messages around handling stream ids.
|
||||
1
changelog.d/11538.feature
Normal file
1
changelog.d/11538.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add experimental support for MSC3202: allowing application services to masquerade as specific devices.
|
||||
1
changelog.d/11541.misc
Normal file
1
changelog.d/11541.misc
Normal file
@@ -0,0 +1 @@
|
||||
Support unprefixed versions of fallback key property names.
|
||||
1
changelog.d/11542.misc
Normal file
1
changelog.d/11542.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing `errcode` to `parse_string` and `parse_boolean`.
|
||||
1
changelog.d/11543.misc
Normal file
1
changelog.d/11543.misc
Normal file
@@ -0,0 +1 @@
|
||||
Use HTTPStatus constants in place of literals in `synapse.http`.
|
||||
1
changelog.d/11546.misc
Normal file
1
changelog.d/11546.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11547.bugfix
Normal file
1
changelog.d/11547.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug introduced in Synapse 1.17.0 where a pusher created for an email with capital letters would fail to be created.
|
||||
1
changelog.d/11549.misc
Normal file
1
changelog.d/11549.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11550.misc
Normal file
1
changelog.d/11550.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix an inaccurate and misleading comment in the `/sync` code.
|
||||
1
changelog.d/11551.misc
Normal file
1
changelog.d/11551.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11555.misc
Normal file
1
changelog.d/11555.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11556.misc
Normal file
1
changelog.d/11556.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to `synapse.logging.context`.
|
||||
1
changelog.d/11558.misc
Normal file
1
changelog.d/11558.misc
Normal file
@@ -0,0 +1 @@
|
||||
Stop populating unused database column `state_events.prev_state`.
|
||||
1
changelog.d/11560.misc
Normal file
1
changelog.d/11560.misc
Normal file
@@ -0,0 +1 @@
|
||||
Minor efficiency improvements in event persistence.
|
||||
1
changelog.d/11564.misc
Normal file
1
changelog.d/11564.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add some safety checks that storage functions are used correctly.
|
||||
1
changelog.d/11565.misc
Normal file
1
changelog.d/11565.misc
Normal file
@@ -0,0 +1 @@
|
||||
Make `get_device` return `None` if the device doesn't exist rather than raising an exception.
|
||||
1
changelog.d/11566.misc
Normal file
1
changelog.d/11566.misc
Normal file
@@ -0,0 +1 @@
|
||||
Split the HTML parsing code from the URL preview resource code.
|
||||
1
changelog.d/11570.misc
Normal file
1
changelog.d/11570.misc
Normal file
@@ -0,0 +1 @@
|
||||
Remove redundant `COALESCE()`s around `COUNT()`s in database queries.
|
||||
1
changelog.d/11571.misc
Normal file
1
changelog.d/11571.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to `synapse.http`.
|
||||
1
changelog.d/11574.misc
Normal file
1
changelog.d/11574.misc
Normal file
@@ -0,0 +1 @@
|
||||
Convert `EventStreamResult` from a `namedtuple` to `attrs` to improve type hints.
|
||||
1
changelog.d/11575.misc
Normal file
1
changelog.d/11575.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11580.misc
Normal file
1
changelog.d/11580.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add some safety checks that storage functions are used correctly.
|
||||
1
changelog.d/11582.misc
Normal file
1
changelog.d/11582.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) and [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) to `/versions` -> `unstable_features` to detect server support.
|
||||
1
changelog.d/11589.misc
Normal file
1
changelog.d/11589.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add missing type hints to storage classes.
|
||||
1
changelog.d/11590.misc
Normal file
1
changelog.d/11590.misc
Normal file
@@ -0,0 +1 @@
|
||||
Add type hints to `synapse/tests/rest/admin`.
|
||||
1
changelog.d/11608.misc
Normal file
1
changelog.d/11608.misc
Normal file
@@ -0,0 +1 @@
|
||||
Deduplicate in-flight requests in `_get_state_for_groups`.
|
||||
1
changelog.d/11610.misc
Normal file
1
changelog.d/11610.misc
Normal file
@@ -0,0 +1 @@
|
||||
Deduplicate in-flight requests in `_get_state_for_groups`.
|
||||
6
debian/changelog
vendored
6
debian/changelog
vendored
@@ -1,3 +1,9 @@
|
||||
matrix-synapse-py3 (1.49.0) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.49.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Tue, 14 Dec 2021 12:39:46 +0000
|
||||
|
||||
matrix-synapse-py3 (1.49.0~rc1) stable; urgency=medium
|
||||
|
||||
* New synapse release 1.49.0~rc1.
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
- [SSO Mapping Providers](sso_mapping_providers.md)
|
||||
- [Password Auth Providers](password_auth_providers.md)
|
||||
- [JSON Web Tokens](jwt.md)
|
||||
- [Refresh Tokens](usage/configuration/user_authentication/refresh_tokens.md)
|
||||
- [Registration Captcha](CAPTCHA_SETUP.md)
|
||||
- [Application Services](application_services.md)
|
||||
- [Server Notices](server_notices.md)
|
||||
@@ -61,7 +62,6 @@
|
||||
- [Registration Tokens](usage/administration/admin_api/registration_tokens.md)
|
||||
- [Manipulate Room Membership](admin_api/room_membership.md)
|
||||
- [Rooms](admin_api/rooms.md)
|
||||
- [Spaces](usage/administration/admin_api/spaces.md)
|
||||
- [Server Notices](admin_api/server_notices.md)
|
||||
- [Statistics](admin_api/statistics.md)
|
||||
- [Users](admin_api/user_admin_api.md)
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
# Spaces API
|
||||
|
||||
This API allows a server administrator to manage spaces.
|
||||
|
||||
## Remove local user
|
||||
|
||||
This API forces a local user to leave all non-public rooms in a space.
|
||||
|
||||
The space itself is always left, regardless of whether it is public.
|
||||
|
||||
May succeed partially if the user fails to leave some rooms.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
DELETE /_synapse/admin/v1/rooms/<room_id>/hierarchy/members/<user_id>
|
||||
```
|
||||
|
||||
with an optional body of:
|
||||
|
||||
```json
|
||||
{
|
||||
"include_remote_spaces": true,
|
||||
}
|
||||
```
|
||||
|
||||
`include_remote_spaces` controls whether to process subspaces that the
|
||||
local homeserver is not participating in. The listings of such subspaces
|
||||
have to be retrieved over federation and their accuracy cannot be
|
||||
guaranteed.
|
||||
|
||||
Returning:
|
||||
|
||||
```json
|
||||
{
|
||||
"left_rooms": ["!room1:example.net", "!room2:example.net", ...],
|
||||
"inaccessible_rooms": ["!subspace1:example.net", ...],
|
||||
"failed_rooms": {
|
||||
"!room4:example.net": "Failed to leave room.",
|
||||
...
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`left_rooms`: A list of rooms that the user has been made to leave.
|
||||
|
||||
`inaccessible_rooms`: A list of rooms and spaces that the local
|
||||
homeserver is not in, and may have not been fully processed. Rooms may
|
||||
appear here if:
|
||||
* The room is a space that the local homeserver is not in, and so its
|
||||
full list of child rooms could not be determined.
|
||||
* The room is inaccessible to the local homeserver, and it is not
|
||||
known whether the room is a subspace containing further rooms.
|
||||
|
||||
`failed_rooms`: A dictionary of errors encountered when leaving rooms.
|
||||
The keys of the dictionary are room IDs and the values of the dictionary
|
||||
are error messages.
|
||||
139
docs/usage/configuration/user_authentication/refresh_tokens.md
Normal file
139
docs/usage/configuration/user_authentication/refresh_tokens.md
Normal file
@@ -0,0 +1,139 @@
|
||||
# Refresh Tokens
|
||||
|
||||
Synapse supports refresh tokens since version 1.49 (some earlier versions had support for an earlier, experimental draft of [MSC2918] which is not compatible).
|
||||
|
||||
|
||||
[MSC2918]: https://github.com/matrix-org/matrix-doc/blob/main/proposals/2918-refreshtokens.md#msc2918-refresh-tokens
|
||||
|
||||
|
||||
## Background and motivation
|
||||
|
||||
Synapse users' sessions are identified by **access tokens**; access tokens are
|
||||
issued to users on login. Each session gets a unique access token which identifies
|
||||
it; the access token must be kept secret as it grants access to the user's account.
|
||||
|
||||
Traditionally, these access tokens were eternally valid (at least until the user
|
||||
explicitly chose to log out).
|
||||
|
||||
In some cases, it may be desirable for these access tokens to expire so that the
|
||||
potential damage caused by leaking an access token is reduced.
|
||||
On the other hand, forcing a user to re-authenticate (log in again) often might
|
||||
be too much of an inconvenience.
|
||||
|
||||
**Refresh tokens** are a mechanism to avoid some of this inconvenience whilst
|
||||
still getting most of the benefits of short access token lifetimes.
|
||||
Refresh tokens are also a concept present in OAuth 2 — further reading is available
|
||||
[here](https://datatracker.ietf.org/doc/html/rfc6749#section-1.5).
|
||||
|
||||
When refresh tokens are in use, both an access token and a refresh token will be
|
||||
issued to users on login. The access token will expire after a predetermined amount
|
||||
of time, but otherwise works in the same way as before. When the access token is
|
||||
close to expiring (or has expired), the user's client should present the homeserver
|
||||
(Synapse) with the refresh token.
|
||||
|
||||
The homeserver will then generate a new access token and refresh token for the user
|
||||
and return them. The old refresh token is invalidated and can not be used again*.
|
||||
|
||||
Finally, refresh tokens also make it possible for sessions to be logged out if they
|
||||
are inactive for too long, before the session naturally ends; see the configuration
|
||||
guide below.
|
||||
|
||||
|
||||
*To prevent issues if clients lose connection half-way through refreshing a token,
|
||||
the refresh token is only invalidated once the new access token has been used at
|
||||
least once. For all intents and purposes, the above simplification is sufficient.
|
||||
|
||||
|
||||
## Caveats
|
||||
|
||||
There are some caveats:
|
||||
|
||||
* If a third party gets both your access token and refresh token, they will be able to
|
||||
continue to enjoy access to your session.
|
||||
* This is still an improvement because you (the user) will notice when *your*
|
||||
session expires and you're not able to use your refresh token.
|
||||
That would be a giveaway that someone else has compromised your session.
|
||||
You would be able to log in again and terminate that session.
|
||||
Previously (with long-lived access tokens), a third party that has your access
|
||||
token could go undetected for a very long time.
|
||||
* Clients need to implement support for refresh tokens in order for them to be a
|
||||
useful mechanism.
|
||||
* It is up to homeserver administrators if they want to issue long-lived access
|
||||
tokens to clients not implementing refresh tokens.
|
||||
* For compatibility, it is likely that they should, at least until client support
|
||||
is widespread.
|
||||
* Users with clients that support refresh tokens will still benefit from the
|
||||
added security; it's not possible to downgrade a session to using long-lived
|
||||
access tokens so this effectively gives users the choice.
|
||||
* In a closed environment where all users use known clients, this may not be
|
||||
an issue as the homeserver administrator can know if the clients have refresh
|
||||
token support. In that case, the non-refreshable access token lifetime
|
||||
may be set to a short duration so that a similar level of security is provided.
|
||||
|
||||
|
||||
## Configuration Guide
|
||||
|
||||
The following configuration options, in the `registration` section, are related:
|
||||
|
||||
* `session_lifetime`: maximum length of a session, even if it's refreshed.
|
||||
In other words, the client must log in again after this time period.
|
||||
In most cases, this can be unset (infinite) or set to a long time (years or months).
|
||||
* `refreshable_access_token_lifetime`: lifetime of access tokens that are created
|
||||
by clients supporting refresh tokens.
|
||||
This should be short; a good value might be 5 minutes (`5m`).
|
||||
* `nonrefreshable_access_token_lifetime`: lifetime of access tokens that are created
|
||||
by clients which don't support refresh tokens.
|
||||
Make this short if you want to effectively force use of refresh tokens.
|
||||
Make this long if you don't want to inconvenience users of clients which don't
|
||||
support refresh tokens (by forcing them to frequently re-authenticate using
|
||||
login credentials).
|
||||
* `refresh_token_lifetime`: lifetime of refresh tokens.
|
||||
In other words, the client must refresh within this time period to maintain its session.
|
||||
Unless you want to log inactive sessions out, it is often fine to use a long
|
||||
value here or even leave it unset (infinite).
|
||||
Beware that making it too short will inconvenience clients that do not connect
|
||||
very often, including mobile clients and clients of infrequent users (by making
|
||||
it more difficult for them to refresh in time, which may force them to need to
|
||||
re-authenticate using login credentials).
|
||||
|
||||
**Note:** All four options above only apply when tokens are created (by logging in or refreshing).
|
||||
Changes to these settings do not apply retroactively.
|
||||
|
||||
|
||||
### Using refresh token expiry to log out inactive sessions
|
||||
|
||||
If you'd like to force sessions to be logged out upon inactivity, you can enable
|
||||
refreshable access token expiry and refresh token expiry.
|
||||
|
||||
This works because a client must refresh at least once within a period of
|
||||
`refresh_token_lifetime` in order to maintain valid credentials to access the
|
||||
account.
|
||||
|
||||
(It's suggested that `refresh_token_lifetime` should be longer than
|
||||
`refreshable_access_token_lifetime` and this section assumes that to be the case
|
||||
for simplicity.)
|
||||
|
||||
Note: this will only affect sessions using refresh tokens. You may wish to
|
||||
set a short `nonrefreshable_access_token_lifetime` to prevent this being bypassed
|
||||
by clients that do not support refresh tokens.
|
||||
|
||||
|
||||
#### Choosing values that guarantee permitting some inactivity
|
||||
|
||||
It may be desirable to permit some short periods of inactivity, for example to
|
||||
accommodate brief outages in client connectivity.
|
||||
|
||||
The following model aims to provide guidance for choosing `refresh_token_lifetime`
|
||||
and `refreshable_access_token_lifetime` to satisfy requirements of the form:
|
||||
|
||||
1. inactivity longer than `L` **MUST** cause the session to be logged out; and
|
||||
2. inactivity shorter than `S` **MUST NOT** cause the session to be logged out.
|
||||
|
||||
This model makes the weakest assumption that all active clients will refresh as
|
||||
needed to maintain an active access token, but no sooner.
|
||||
*In reality, clients may refresh more often than this model assumes, but the
|
||||
above requirements will still hold.*
|
||||
|
||||
To satisfy the above model,
|
||||
* `refresh_token_lifetime` should be set to `L`; and
|
||||
* `refreshable_access_token_lifetime` should be set to `L - S`.
|
||||
32
mypy.ini
32
mypy.ini
@@ -25,11 +25,8 @@ exclude = (?x)
|
||||
^(
|
||||
|synapse/storage/databases/__init__.py
|
||||
|synapse/storage/databases/main/__init__.py
|
||||
|synapse/storage/databases/main/account_data.py
|
||||
|synapse/storage/databases/main/cache.py
|
||||
|synapse/storage/databases/main/devices.py
|
||||
|synapse/storage/databases/main/e2e_room_keys.py
|
||||
|synapse/storage/databases/main/end_to_end_keys.py
|
||||
|synapse/storage/databases/main/event_federation.py
|
||||
|synapse/storage/databases/main/event_push_actions.py
|
||||
|synapse/storage/databases/main/events_bg_updates.py
|
||||
@@ -40,12 +37,10 @@ exclude = (?x)
|
||||
|synapse/storage/databases/main/purge_events.py
|
||||
|synapse/storage/databases/main/push_rule.py
|
||||
|synapse/storage/databases/main/receipts.py
|
||||
|synapse/storage/databases/main/room.py
|
||||
|synapse/storage/databases/main/roommember.py
|
||||
|synapse/storage/databases/main/search.py
|
||||
|synapse/storage/databases/main/state.py
|
||||
|synapse/storage/databases/main/stats.py
|
||||
|synapse/storage/databases/main/transactions.py
|
||||
|synapse/storage/databases/main/user_directory.py
|
||||
|synapse/storage/schema/
|
||||
|
||||
@@ -145,6 +140,9 @@ disallow_untyped_defs = True
|
||||
[mypy-synapse.app.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.appservice.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.config._base]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
@@ -163,6 +161,12 @@ disallow_untyped_defs = False
|
||||
[mypy-synapse.handlers.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.http.server]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.logging.context]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.metrics.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
@@ -181,15 +185,27 @@ disallow_untyped_defs = True
|
||||
[mypy-synapse.state.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.account_data]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.client_ips]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.directory]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.e2e_room_keys]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.end_to_end_keys]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.events_worker]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.room]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.room_batch]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
@@ -199,6 +215,9 @@ disallow_untyped_defs = True
|
||||
[mypy-synapse.storage.databases.main.state_deltas]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.transactions]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-synapse.storage.databases.main.user_erasure_store]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
@@ -223,6 +242,9 @@ disallow_untyped_defs = True
|
||||
[mypy-tests.storage.test_user_directory]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.rest.admin.*]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
[mypy-tests.rest.client.test_directory]
|
||||
disallow_untyped_defs = True
|
||||
|
||||
|
||||
@@ -17,11 +17,12 @@
|
||||
from typing import Any, List, Optional, Type, Union
|
||||
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet.defer import Deferred
|
||||
|
||||
class RedisProtocol(protocol.Protocol):
|
||||
def publish(self, channel: str, message: bytes): ...
|
||||
async def ping(self) -> None: ...
|
||||
async def set(
|
||||
def ping(self) -> "Deferred[None]": ...
|
||||
def set(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
@@ -29,8 +30,8 @@ class RedisProtocol(protocol.Protocol):
|
||||
pexpire: Optional[int] = None,
|
||||
only_if_not_exists: bool = False,
|
||||
only_if_exists: bool = False,
|
||||
) -> None: ...
|
||||
async def get(self, key: str) -> Any: ...
|
||||
) -> "Deferred[None]": ...
|
||||
def get(self, key: str) -> "Deferred[Any]": ...
|
||||
|
||||
class SubscriberProtocol(RedisProtocol):
|
||||
def __init__(self, *args, **kwargs): ...
|
||||
|
||||
@@ -47,7 +47,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.49.0rc1"
|
||||
__version__ = "1.49.0"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -155,7 +155,11 @@ class Auth:
|
||||
|
||||
access_token = self.get_access_token_from_request(request)
|
||||
|
||||
user_id, app_service = await self._get_appservice_user_id(request)
|
||||
(
|
||||
user_id,
|
||||
device_id,
|
||||
app_service,
|
||||
) = await self._get_appservice_user_id_and_device_id(request)
|
||||
if user_id and app_service:
|
||||
if ip_addr and self._track_appservice_user_ips:
|
||||
await self.store.insert_client_ip(
|
||||
@@ -163,16 +167,22 @@ class Auth:
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
user_agent=user_agent,
|
||||
device_id="dummy-device", # stubbed
|
||||
device_id="dummy-device"
|
||||
if device_id is None
|
||||
else device_id, # stubbed
|
||||
)
|
||||
|
||||
requester = create_requester(user_id, app_service=app_service)
|
||||
requester = create_requester(
|
||||
user_id, app_service=app_service, device_id=device_id
|
||||
)
|
||||
|
||||
request.requester = user_id
|
||||
if user_id in self._force_tracing_for_users:
|
||||
opentracing.force_tracing()
|
||||
opentracing.set_tag("authenticated_entity", user_id)
|
||||
opentracing.set_tag("user_id", user_id)
|
||||
if device_id is not None:
|
||||
opentracing.set_tag("device_id", device_id)
|
||||
opentracing.set_tag("appservice_id", app_service.id)
|
||||
|
||||
return requester
|
||||
@@ -274,33 +284,81 @@ class Auth:
|
||||
403, "Application service has not registered this user (%s)" % user_id
|
||||
)
|
||||
|
||||
async def _get_appservice_user_id(
|
||||
async def _get_appservice_user_id_and_device_id(
|
||||
self, request: Request
|
||||
) -> Tuple[Optional[str], Optional[ApplicationService]]:
|
||||
) -> Tuple[Optional[str], Optional[str], Optional[ApplicationService]]:
|
||||
"""
|
||||
Given a request, reads the request parameters to determine:
|
||||
- whether it's an application service that's making this request
|
||||
- what user the application service should be treated as controlling
|
||||
(the user_id URI parameter allows an application service to masquerade
|
||||
any applicable user in its namespace)
|
||||
- what device the application service should be treated as controlling
|
||||
(the device_id[^1] URI parameter allows an application service to masquerade
|
||||
as any device that exists for the relevant user)
|
||||
|
||||
[^1] Unstable and provided by MSC3202.
|
||||
Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
|
||||
|
||||
Returns:
|
||||
3-tuple of
|
||||
(user ID?, device ID?, application service?)
|
||||
|
||||
Postconditions:
|
||||
- If an application service is returned, so is a user ID
|
||||
- A user ID is never returned without an application service
|
||||
- A device ID is never returned without a user ID or an application service
|
||||
- The returned application service, if present, is permitted to control the
|
||||
returned user ID.
|
||||
- The returned device ID, if present, has been checked to be a valid device ID
|
||||
for the returned user ID.
|
||||
"""
|
||||
DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
|
||||
|
||||
app_service = self.store.get_app_service_by_token(
|
||||
self.get_access_token_from_request(request)
|
||||
)
|
||||
if app_service is None:
|
||||
return None, None
|
||||
return None, None, None
|
||||
|
||||
if app_service.ip_range_whitelist:
|
||||
ip_address = IPAddress(request.getClientIP())
|
||||
if ip_address not in app_service.ip_range_whitelist:
|
||||
return None, None
|
||||
return None, None, None
|
||||
|
||||
# This will always be set by the time Twisted calls us.
|
||||
assert request.args is not None
|
||||
|
||||
if b"user_id" not in request.args:
|
||||
return app_service.sender, app_service
|
||||
if b"user_id" in request.args:
|
||||
effective_user_id = request.args[b"user_id"][0].decode("utf8")
|
||||
await self.validate_appservice_can_control_user_id(
|
||||
app_service, effective_user_id
|
||||
)
|
||||
else:
|
||||
effective_user_id = app_service.sender
|
||||
|
||||
user_id = request.args[b"user_id"][0].decode("utf8")
|
||||
await self.validate_appservice_can_control_user_id(app_service, user_id)
|
||||
effective_device_id: Optional[str] = None
|
||||
|
||||
if app_service.sender == user_id:
|
||||
return app_service.sender, app_service
|
||||
if (
|
||||
self.hs.config.experimental.msc3202_device_masquerading_enabled
|
||||
and DEVICE_ID_ARG_NAME in request.args
|
||||
):
|
||||
effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
|
||||
# We only just set this so it can't be None!
|
||||
assert effective_device_id is not None
|
||||
device_opt = await self.store.get_device(
|
||||
effective_user_id, effective_device_id
|
||||
)
|
||||
if device_opt is None:
|
||||
# For now, use 400 M_EXCLUSIVE if the device doesn't exist.
|
||||
# This is an open thread of discussion on MSC3202 as of 2021-12-09.
|
||||
raise AuthError(
|
||||
400,
|
||||
f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
|
||||
Codes.EXCLUSIVE,
|
||||
)
|
||||
|
||||
return user_id, app_service
|
||||
return effective_user_id, effective_device_id, app_service
|
||||
|
||||
async def get_user_by_access_token(
|
||||
self,
|
||||
|
||||
@@ -253,5 +253,9 @@ class GuestAccess:
|
||||
FORBIDDEN: Final = "forbidden"
|
||||
|
||||
|
||||
class ReceiptTypes:
|
||||
READ: Final = "m.read"
|
||||
|
||||
|
||||
class ReadReceiptEventFields:
|
||||
MSC2285_HIDDEN: Final = "org.matrix.msc2285.hidden"
|
||||
|
||||
@@ -11,10 +11,14 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import re
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Iterable, List, Match, Optional
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Pattern
|
||||
|
||||
import attr
|
||||
from netaddr import IPSet
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
@@ -33,6 +37,13 @@ class ApplicationServiceState(Enum):
|
||||
UP = "up"
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class Namespace:
|
||||
exclusive: bool
|
||||
group_id: Optional[str]
|
||||
regex: Pattern[str]
|
||||
|
||||
|
||||
class ApplicationService:
|
||||
"""Defines an application service. This definition is mostly what is
|
||||
provided to the /register AS API.
|
||||
@@ -50,17 +61,17 @@ class ApplicationService:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
token,
|
||||
hostname,
|
||||
id,
|
||||
sender,
|
||||
url=None,
|
||||
namespaces=None,
|
||||
hs_token=None,
|
||||
protocols=None,
|
||||
rate_limited=True,
|
||||
ip_range_whitelist=None,
|
||||
supports_ephemeral=False,
|
||||
token: str,
|
||||
hostname: str,
|
||||
id: str,
|
||||
sender: str,
|
||||
url: Optional[str] = None,
|
||||
namespaces: Optional[JsonDict] = None,
|
||||
hs_token: Optional[str] = None,
|
||||
protocols: Optional[Iterable[str]] = None,
|
||||
rate_limited: bool = True,
|
||||
ip_range_whitelist: Optional[IPSet] = None,
|
||||
supports_ephemeral: bool = False,
|
||||
):
|
||||
self.token = token
|
||||
self.url = (
|
||||
@@ -85,27 +96,33 @@ class ApplicationService:
|
||||
|
||||
self.rate_limited = rate_limited
|
||||
|
||||
def _check_namespaces(self, namespaces):
|
||||
def _check_namespaces(
|
||||
self, namespaces: Optional[JsonDict]
|
||||
) -> Dict[str, List[Namespace]]:
|
||||
# Sanity check that it is of the form:
|
||||
# {
|
||||
# users: [ {regex: "[A-z]+.*", exclusive: true}, ...],
|
||||
# aliases: [ {regex: "[A-z]+.*", exclusive: true}, ...],
|
||||
# rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
|
||||
# }
|
||||
if not namespaces:
|
||||
if namespaces is None:
|
||||
namespaces = {}
|
||||
|
||||
result: Dict[str, List[Namespace]] = {}
|
||||
|
||||
for ns in ApplicationService.NS_LIST:
|
||||
result[ns] = []
|
||||
|
||||
if ns not in namespaces:
|
||||
namespaces[ns] = []
|
||||
continue
|
||||
|
||||
if type(namespaces[ns]) != list:
|
||||
if not isinstance(namespaces[ns], list):
|
||||
raise ValueError("Bad namespace value for '%s'" % ns)
|
||||
for regex_obj in namespaces[ns]:
|
||||
if not isinstance(regex_obj, dict):
|
||||
raise ValueError("Expected dict regex for ns '%s'" % ns)
|
||||
if not isinstance(regex_obj.get("exclusive"), bool):
|
||||
exclusive = regex_obj.get("exclusive")
|
||||
if not isinstance(exclusive, bool):
|
||||
raise ValueError("Expected bool for 'exclusive' in ns '%s'" % ns)
|
||||
group_id = regex_obj.get("group_id")
|
||||
if group_id:
|
||||
@@ -126,22 +143,26 @@ class ApplicationService:
|
||||
)
|
||||
|
||||
regex = regex_obj.get("regex")
|
||||
if isinstance(regex, str):
|
||||
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
|
||||
else:
|
||||
if not isinstance(regex, str):
|
||||
raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
|
||||
return namespaces
|
||||
|
||||
def _matches_regex(self, test_string: str, namespace_key: str) -> Optional[Match]:
|
||||
for regex_obj in self.namespaces[namespace_key]:
|
||||
if regex_obj["regex"].match(test_string):
|
||||
return regex_obj
|
||||
# Pre-compile regex.
|
||||
result[ns].append(Namespace(exclusive, group_id, re.compile(regex)))
|
||||
|
||||
return result
|
||||
|
||||
def _matches_regex(
|
||||
self, namespace_key: str, test_string: str
|
||||
) -> Optional[Namespace]:
|
||||
for namespace in self.namespaces[namespace_key]:
|
||||
if namespace.regex.match(test_string):
|
||||
return namespace
|
||||
return None
|
||||
|
||||
def _is_exclusive(self, ns_key: str, test_string: str) -> bool:
|
||||
regex_obj = self._matches_regex(test_string, ns_key)
|
||||
if regex_obj:
|
||||
return regex_obj["exclusive"]
|
||||
def _is_exclusive(self, namespace_key: str, test_string: str) -> bool:
|
||||
namespace = self._matches_regex(namespace_key, test_string)
|
||||
if namespace:
|
||||
return namespace.exclusive
|
||||
return False
|
||||
|
||||
async def _matches_user(
|
||||
@@ -260,15 +281,15 @@ class ApplicationService:
|
||||
|
||||
def is_interested_in_user(self, user_id: str) -> bool:
|
||||
return (
|
||||
bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
|
||||
bool(self._matches_regex(ApplicationService.NS_USERS, user_id))
|
||||
or user_id == self.sender
|
||||
)
|
||||
|
||||
def is_interested_in_alias(self, alias: str) -> bool:
|
||||
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
|
||||
return bool(self._matches_regex(ApplicationService.NS_ALIASES, alias))
|
||||
|
||||
def is_interested_in_room(self, room_id: str) -> bool:
|
||||
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
|
||||
return bool(self._matches_regex(ApplicationService.NS_ROOMS, room_id))
|
||||
|
||||
def is_exclusive_user(self, user_id: str) -> bool:
|
||||
return (
|
||||
@@ -285,14 +306,14 @@ class ApplicationService:
|
||||
def is_exclusive_room(self, room_id: str) -> bool:
|
||||
return self._is_exclusive(ApplicationService.NS_ROOMS, room_id)
|
||||
|
||||
def get_exclusive_user_regexes(self):
|
||||
def get_exclusive_user_regexes(self) -> List[Pattern[str]]:
|
||||
"""Get the list of regexes used to determine if a user is exclusively
|
||||
registered by the AS
|
||||
"""
|
||||
return [
|
||||
regex_obj["regex"]
|
||||
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
|
||||
if regex_obj["exclusive"]
|
||||
namespace.regex
|
||||
for namespace in self.namespaces[ApplicationService.NS_USERS]
|
||||
if namespace.exclusive
|
||||
]
|
||||
|
||||
def get_groups_for_user(self, user_id: str) -> Iterable[str]:
|
||||
@@ -305,15 +326,15 @@ class ApplicationService:
|
||||
An iterable that yields group_id strings.
|
||||
"""
|
||||
return (
|
||||
regex_obj["group_id"]
|
||||
for regex_obj in self.namespaces[ApplicationService.NS_USERS]
|
||||
if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
|
||||
namespace.group_id
|
||||
for namespace in self.namespaces[ApplicationService.NS_USERS]
|
||||
if namespace.group_id and namespace.regex.match(user_id)
|
||||
)
|
||||
|
||||
def is_rate_limited(self) -> bool:
|
||||
return self.rate_limited
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
# copy dictionary and redact token fields so they don't get logged
|
||||
dict_copy = self.__dict__.copy()
|
||||
dict_copy["token"] = "<redacted>"
|
||||
|
||||
@@ -12,8 +12,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import urllib
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -53,7 +53,7 @@ HOUR_IN_MS = 60 * 60 * 1000
|
||||
APP_SERVICE_PREFIX = "/_matrix/app/unstable"
|
||||
|
||||
|
||||
def _is_valid_3pe_metadata(info):
|
||||
def _is_valid_3pe_metadata(info: JsonDict) -> bool:
|
||||
if "instances" not in info:
|
||||
return False
|
||||
if not isinstance(info["instances"], list):
|
||||
@@ -61,7 +61,7 @@ def _is_valid_3pe_metadata(info):
|
||||
return True
|
||||
|
||||
|
||||
def _is_valid_3pe_result(r, field):
|
||||
def _is_valid_3pe_result(r: JsonDict, field: str) -> bool:
|
||||
if not isinstance(r, dict):
|
||||
return False
|
||||
|
||||
@@ -93,9 +93,13 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
|
||||
)
|
||||
|
||||
async def query_user(self, service, user_id):
|
||||
async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
|
||||
if service.url is None:
|
||||
return False
|
||||
|
||||
# This is required by the configuration.
|
||||
assert service.hs_token is not None
|
||||
|
||||
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
|
||||
try:
|
||||
response = await self.get_json(uri, {"access_token": service.hs_token})
|
||||
@@ -109,9 +113,13 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
logger.warning("query_user to %s threw exception %s", uri, ex)
|
||||
return False
|
||||
|
||||
async def query_alias(self, service, alias):
|
||||
async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
|
||||
if service.url is None:
|
||||
return False
|
||||
|
||||
# This is required by the configuration.
|
||||
assert service.hs_token is not None
|
||||
|
||||
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
|
||||
try:
|
||||
response = await self.get_json(uri, {"access_token": service.hs_token})
|
||||
@@ -125,7 +133,13 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
logger.warning("query_alias to %s threw exception %s", uri, ex)
|
||||
return False
|
||||
|
||||
async def query_3pe(self, service, kind, protocol, fields):
|
||||
async def query_3pe(
|
||||
self,
|
||||
service: "ApplicationService",
|
||||
kind: str,
|
||||
protocol: str,
|
||||
fields: Dict[bytes, List[bytes]],
|
||||
) -> List[JsonDict]:
|
||||
if kind == ThirdPartyEntityKind.USER:
|
||||
required_field = "userid"
|
||||
elif kind == ThirdPartyEntityKind.LOCATION:
|
||||
@@ -205,11 +219,14 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
events: List[EventBase],
|
||||
ephemeral: List[JsonDict],
|
||||
txn_id: Optional[int] = None,
|
||||
):
|
||||
) -> bool:
|
||||
if service.url is None:
|
||||
return True
|
||||
|
||||
events = self._serialize(service, events)
|
||||
# This is required by the configuration.
|
||||
assert service.hs_token is not None
|
||||
|
||||
serialized_events = self._serialize(service, events)
|
||||
|
||||
if txn_id is None:
|
||||
logger.warning(
|
||||
@@ -221,9 +238,12 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
|
||||
# Never send ephemeral events to appservices that do not support it
|
||||
if service.supports_ephemeral:
|
||||
body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
|
||||
body = {
|
||||
"events": serialized_events,
|
||||
"de.sorunome.msc2409.ephemeral": ephemeral,
|
||||
}
|
||||
else:
|
||||
body = {"events": events}
|
||||
body = {"events": serialized_events}
|
||||
|
||||
try:
|
||||
await self.put_json(
|
||||
@@ -238,7 +258,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
[event.get("event_id") for event in events],
|
||||
)
|
||||
sent_transactions_counter.labels(service.id).inc()
|
||||
sent_events_counter.labels(service.id).inc(len(events))
|
||||
sent_events_counter.labels(service.id).inc(len(serialized_events))
|
||||
return True
|
||||
except CodeMessageException as e:
|
||||
logger.warning(
|
||||
@@ -260,7 +280,9 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
failed_transactions_counter.labels(service.id).inc()
|
||||
return False
|
||||
|
||||
def _serialize(self, service, events):
|
||||
def _serialize(
|
||||
self, service: "ApplicationService", events: Iterable[EventBase]
|
||||
) -> List[JsonDict]:
|
||||
time_now = self.clock.time_msec()
|
||||
return [
|
||||
serialize_event(
|
||||
|
||||
@@ -48,13 +48,19 @@ This is all tied together by the AppServiceScheduler which DIs the required
|
||||
components.
|
||||
"""
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
|
||||
|
||||
from synapse.appservice import ApplicationService, ApplicationServiceState
|
||||
from synapse.appservice.api import ApplicationServiceApi
|
||||
from synapse.events import EventBase
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage.databases.main import DataStore
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -72,7 +78,7 @@ class ApplicationServiceScheduler:
|
||||
case is a simple array.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.clock = hs.get_clock()
|
||||
self.store = hs.get_datastore()
|
||||
self.as_api = hs.get_application_service_api()
|
||||
@@ -80,7 +86,7 @@ class ApplicationServiceScheduler:
|
||||
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
|
||||
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
|
||||
|
||||
async def start(self):
|
||||
async def start(self) -> None:
|
||||
logger.info("Starting appservice scheduler")
|
||||
|
||||
# check for any DOWN ASes and start recoverers for them.
|
||||
@@ -91,12 +97,14 @@ class ApplicationServiceScheduler:
|
||||
for service in services:
|
||||
self.txn_ctrl.start_recoverer(service)
|
||||
|
||||
def submit_event_for_as(self, service: ApplicationService, event: EventBase):
|
||||
def submit_event_for_as(
|
||||
self, service: ApplicationService, event: EventBase
|
||||
) -> None:
|
||||
self.queuer.enqueue_event(service, event)
|
||||
|
||||
def submit_ephemeral_events_for_as(
|
||||
self, service: ApplicationService, events: List[JsonDict]
|
||||
):
|
||||
) -> None:
|
||||
self.queuer.enqueue_ephemeral(service, events)
|
||||
|
||||
|
||||
@@ -108,16 +116,18 @@ class _ServiceQueuer:
|
||||
appservice at a given time.
|
||||
"""
|
||||
|
||||
def __init__(self, txn_ctrl, clock):
|
||||
self.queued_events = {} # dict of {service_id: [events]}
|
||||
self.queued_ephemeral = {} # dict of {service_id: [events]}
|
||||
def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
|
||||
# dict of {service_id: [events]}
|
||||
self.queued_events: Dict[str, List[EventBase]] = {}
|
||||
# dict of {service_id: [events]}
|
||||
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
self.requests_in_flight: Set[str] = set()
|
||||
self.txn_ctrl = txn_ctrl
|
||||
self.clock = clock
|
||||
|
||||
def _start_background_request(self, service):
|
||||
def _start_background_request(self, service: ApplicationService) -> None:
|
||||
# start a sender for this appservice if we don't already have one
|
||||
if service.id in self.requests_in_flight:
|
||||
return
|
||||
@@ -126,15 +136,17 @@ class _ServiceQueuer:
|
||||
"as-sender-%s" % (service.id,), self._send_request, service
|
||||
)
|
||||
|
||||
def enqueue_event(self, service: ApplicationService, event: EventBase):
|
||||
def enqueue_event(self, service: ApplicationService, event: EventBase) -> None:
|
||||
self.queued_events.setdefault(service.id, []).append(event)
|
||||
self._start_background_request(service)
|
||||
|
||||
def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
|
||||
def enqueue_ephemeral(
|
||||
self, service: ApplicationService, events: List[JsonDict]
|
||||
) -> None:
|
||||
self.queued_ephemeral.setdefault(service.id, []).extend(events)
|
||||
self._start_background_request(service)
|
||||
|
||||
async def _send_request(self, service: ApplicationService):
|
||||
async def _send_request(self, service: ApplicationService) -> None:
|
||||
# sanity-check: we shouldn't get here if this service already has a sender
|
||||
# running.
|
||||
assert service.id not in self.requests_in_flight
|
||||
@@ -168,20 +180,15 @@ class _TransactionController:
|
||||
if a transaction fails.
|
||||
|
||||
(Note we have only have one of these in the homeserver.)
|
||||
|
||||
Args:
|
||||
clock (synapse.util.Clock):
|
||||
store (synapse.storage.DataStore):
|
||||
as_api (synapse.appservice.api.ApplicationServiceApi):
|
||||
"""
|
||||
|
||||
def __init__(self, clock, store, as_api):
|
||||
def __init__(self, clock: Clock, store: DataStore, as_api: ApplicationServiceApi):
|
||||
self.clock = clock
|
||||
self.store = store
|
||||
self.as_api = as_api
|
||||
|
||||
# map from service id to recoverer instance
|
||||
self.recoverers = {}
|
||||
self.recoverers: Dict[str, "_Recoverer"] = {}
|
||||
|
||||
# for UTs
|
||||
self.RECOVERER_CLASS = _Recoverer
|
||||
@@ -191,7 +198,7 @@ class _TransactionController:
|
||||
service: ApplicationService,
|
||||
events: List[EventBase],
|
||||
ephemeral: Optional[List[JsonDict]] = None,
|
||||
):
|
||||
) -> None:
|
||||
try:
|
||||
txn = await self.store.create_appservice_txn(
|
||||
service=service, events=events, ephemeral=ephemeral or []
|
||||
@@ -207,7 +214,7 @@ class _TransactionController:
|
||||
logger.exception("Error creating appservice transaction")
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
|
||||
async def on_recovered(self, recoverer):
|
||||
async def on_recovered(self, recoverer: "_Recoverer") -> None:
|
||||
logger.info(
|
||||
"Successfully recovered application service AS ID %s", recoverer.service.id
|
||||
)
|
||||
@@ -217,18 +224,18 @@ class _TransactionController:
|
||||
recoverer.service, ApplicationServiceState.UP
|
||||
)
|
||||
|
||||
async def _on_txn_fail(self, service):
|
||||
async def _on_txn_fail(self, service: ApplicationService) -> None:
|
||||
try:
|
||||
await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
|
||||
self.start_recoverer(service)
|
||||
except Exception:
|
||||
logger.exception("Error starting AS recoverer")
|
||||
|
||||
def start_recoverer(self, service):
|
||||
def start_recoverer(self, service: ApplicationService) -> None:
|
||||
"""Start a Recoverer for the given service
|
||||
|
||||
Args:
|
||||
service (synapse.appservice.ApplicationService):
|
||||
service:
|
||||
"""
|
||||
logger.info("Starting recoverer for AS ID %s", service.id)
|
||||
assert service.id not in self.recoverers
|
||||
@@ -257,7 +264,14 @@ class _Recoverer:
|
||||
callback (callable[_Recoverer]): called once the service recovers.
|
||||
"""
|
||||
|
||||
def __init__(self, clock, store, as_api, service, callback):
|
||||
def __init__(
|
||||
self,
|
||||
clock: Clock,
|
||||
store: DataStore,
|
||||
as_api: ApplicationServiceApi,
|
||||
service: ApplicationService,
|
||||
callback: Callable[["_Recoverer"], Awaitable[None]],
|
||||
):
|
||||
self.clock = clock
|
||||
self.store = store
|
||||
self.as_api = as_api
|
||||
@@ -265,8 +279,8 @@ class _Recoverer:
|
||||
self.callback = callback
|
||||
self.backoff_counter = 1
|
||||
|
||||
def recover(self):
|
||||
def _retry():
|
||||
def recover(self) -> None:
|
||||
def _retry() -> None:
|
||||
run_as_background_process(
|
||||
"as-recoverer-%s" % (self.service.id,), self.retry
|
||||
)
|
||||
@@ -275,13 +289,13 @@ class _Recoverer:
|
||||
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
|
||||
self.clock.call_later(delay, _retry)
|
||||
|
||||
def _backoff(self):
|
||||
def _backoff(self) -> None:
|
||||
# cap the backoff to be around 8.5min => (2^9) = 512 secs
|
||||
if self.backoff_counter < 9:
|
||||
self.backoff_counter += 1
|
||||
self.recover()
|
||||
|
||||
async def retry(self):
|
||||
async def retry(self) -> None:
|
||||
logger.info("Starting retries on %s", self.service.id)
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -147,8 +147,7 @@ def _load_appservice(
|
||||
# protocols check
|
||||
protocols = as_info.get("protocols")
|
||||
if protocols:
|
||||
# Because strings are lists in python
|
||||
if isinstance(protocols, str) or not isinstance(protocols, list):
|
||||
if not isinstance(protocols, list):
|
||||
raise KeyError("Optional 'protocols' must be a list if present.")
|
||||
for p in protocols:
|
||||
if not isinstance(p, str):
|
||||
|
||||
@@ -32,7 +32,7 @@ class ExperimentalConfig(Config):
|
||||
# MSC3026 (busy presence state)
|
||||
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
|
||||
|
||||
# MSC2716 (backfill existing history)
|
||||
# MSC2716 (importing historical messages)
|
||||
self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
|
||||
|
||||
# MSC2285 (hidden read receipts)
|
||||
@@ -49,3 +49,8 @@ class ExperimentalConfig(Config):
|
||||
|
||||
# MSC3030 (Jump to date API endpoint)
|
||||
self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
|
||||
|
||||
# The portion of MSC3202 which is related to device masquerading.
|
||||
self.msc3202_device_masquerading_enabled: bool = experimental.get(
|
||||
"msc3202_device_masquerading", False
|
||||
)
|
||||
|
||||
@@ -16,12 +16,14 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Iterator, List, Optional
|
||||
|
||||
import attr
|
||||
import jsonschema
|
||||
from signedjson.key import (
|
||||
NACL_ED25519,
|
||||
SigningKey,
|
||||
VerifyKey,
|
||||
decode_signing_key_base64,
|
||||
decode_verify_key_bytes,
|
||||
generate_signing_key,
|
||||
@@ -31,6 +33,7 @@ from signedjson.key import (
|
||||
)
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.stringutils import random_string, random_string_with_symbols
|
||||
|
||||
from ._base import Config, ConfigError
|
||||
@@ -81,14 +84,13 @@ To suppress this warning and continue using 'matrix.org', admins should set
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class TrustedKeyServer:
|
||||
# string: name of the server.
|
||||
server_name = attr.ib()
|
||||
# name of the server.
|
||||
server_name: str
|
||||
|
||||
# dict[str,VerifyKey]|None: map from key id to key object, or None to disable
|
||||
# signature verification.
|
||||
verify_keys = attr.ib(default=None)
|
||||
# map from key id to key object, or None to disable signature verification.
|
||||
verify_keys: Optional[Dict[str, VerifyKey]] = None
|
||||
|
||||
|
||||
class KeyConfig(Config):
|
||||
@@ -279,15 +281,15 @@ class KeyConfig(Config):
|
||||
% locals()
|
||||
)
|
||||
|
||||
def read_signing_keys(self, signing_key_path, name):
|
||||
def read_signing_keys(self, signing_key_path: str, name: str) -> List[SigningKey]:
|
||||
"""Read the signing keys in the given path.
|
||||
|
||||
Args:
|
||||
signing_key_path (str)
|
||||
name (str): Associated config key name
|
||||
signing_key_path
|
||||
name: Associated config key name
|
||||
|
||||
Returns:
|
||||
list[SigningKey]
|
||||
The signing keys read from the given path.
|
||||
"""
|
||||
|
||||
signing_keys = self.read_file(signing_key_path, name)
|
||||
@@ -296,7 +298,9 @@ class KeyConfig(Config):
|
||||
except Exception as e:
|
||||
raise ConfigError("Error reading %s: %s" % (name, str(e)))
|
||||
|
||||
def read_old_signing_keys(self, old_signing_keys):
|
||||
def read_old_signing_keys(
|
||||
self, old_signing_keys: Optional[JsonDict]
|
||||
) -> Dict[str, VerifyKey]:
|
||||
if old_signing_keys is None:
|
||||
return {}
|
||||
keys = {}
|
||||
@@ -340,7 +344,7 @@ class KeyConfig(Config):
|
||||
write_signing_keys(signing_key_file, (key,))
|
||||
|
||||
|
||||
def _perspectives_to_key_servers(config):
|
||||
def _perspectives_to_key_servers(config: JsonDict) -> Iterator[JsonDict]:
|
||||
"""Convert old-style 'perspectives' configs into new-style 'trusted_key_servers'
|
||||
|
||||
Returns an iterable of entries to add to trusted_key_servers.
|
||||
@@ -402,7 +406,9 @@ TRUSTED_KEY_SERVERS_SCHEMA = {
|
||||
}
|
||||
|
||||
|
||||
def _parse_key_servers(key_servers, federation_verify_certificates):
|
||||
def _parse_key_servers(
|
||||
key_servers: List[Any], federation_verify_certificates: bool
|
||||
) -> Iterator[TrustedKeyServer]:
|
||||
try:
|
||||
jsonschema.validate(key_servers, TRUSTED_KEY_SERVERS_SCHEMA)
|
||||
except jsonschema.ValidationError as e:
|
||||
@@ -444,7 +450,7 @@ def _parse_key_servers(key_servers, federation_verify_certificates):
|
||||
yield result
|
||||
|
||||
|
||||
def _assert_keyserver_has_verify_keys(trusted_key_server):
|
||||
def _assert_keyserver_has_verify_keys(trusted_key_server: TrustedKeyServer) -> None:
|
||||
if not trusted_key_server.verify_keys:
|
||||
raise ConfigError(INSECURE_NOTARY_ERROR)
|
||||
|
||||
|
||||
@@ -22,10 +22,12 @@ from ._base import Config, ConfigError
|
||||
|
||||
@attr.s
|
||||
class MetricsFlags:
|
||||
known_servers = attr.ib(default=False, validator=attr.validators.instance_of(bool))
|
||||
known_servers: bool = attr.ib(
|
||||
default=False, validator=attr.validators.instance_of(bool)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def all_off(cls):
|
||||
def all_off(cls) -> "MetricsFlags":
|
||||
"""
|
||||
Instantiate the flags with all options set to off.
|
||||
"""
|
||||
|
||||
@@ -1257,7 +1257,7 @@ class ServerConfig(Config):
|
||||
help="Turn on the twisted telnet manhole service on the given port.",
|
||||
)
|
||||
|
||||
def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]:
|
||||
def read_gc_intervals(self, durations: Any) -> Optional[Tuple[float, float, float]]:
|
||||
"""Reads the three durations for the GC min interval option, returning seconds."""
|
||||
if durations is None:
|
||||
return None
|
||||
|
||||
@@ -132,7 +132,7 @@ class TlsConfig(Config):
|
||||
self.tls_certificate: Optional[crypto.X509] = None
|
||||
self.tls_private_key: Optional[crypto.PKey] = None
|
||||
|
||||
def read_certificate_from_disk(self):
|
||||
def read_certificate_from_disk(self) -> None:
|
||||
"""
|
||||
Read the certificates and private key from disk.
|
||||
"""
|
||||
|
||||
@@ -454,23 +454,26 @@ class EventClientSerializer:
|
||||
return
|
||||
|
||||
event_id = event.event_id
|
||||
room_id = event.room_id
|
||||
|
||||
# The bundled aggregations to include.
|
||||
aggregations = {}
|
||||
|
||||
annotations = await self.store.get_aggregation_groups_for_event(event_id)
|
||||
annotations = await self.store.get_aggregation_groups_for_event(
|
||||
event_id, room_id
|
||||
)
|
||||
if annotations.chunk:
|
||||
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
|
||||
|
||||
references = await self.store.get_relations_for_event(
|
||||
event_id, RelationTypes.REFERENCE, direction="f"
|
||||
event_id, room_id, RelationTypes.REFERENCE, direction="f"
|
||||
)
|
||||
if references.chunk:
|
||||
aggregations[RelationTypes.REFERENCE] = references.to_dict()
|
||||
|
||||
edit = None
|
||||
if event.type == EventTypes.Message:
|
||||
edit = await self.store.get_applicable_edit(event_id)
|
||||
edit = await self.store.get_applicable_edit(event_id, room_id)
|
||||
|
||||
if edit:
|
||||
# If there is an edit replace the content, preserving existing
|
||||
@@ -503,7 +506,7 @@ class EventClientSerializer:
|
||||
(
|
||||
thread_count,
|
||||
latest_thread_event,
|
||||
) = await self.store.get_thread_summary(event_id)
|
||||
) = await self.store.get_thread_summary(event_id, room_id)
|
||||
if latest_thread_event:
|
||||
aggregations[RelationTypes.THREAD] = {
|
||||
# Don't bundle aggregations as this could recurse forever.
|
||||
|
||||
@@ -30,7 +30,6 @@ from typing import (
|
||||
|
||||
from prometheus_client import Counter, Gauge, Histogram
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
@@ -67,7 +66,7 @@ from synapse.replication.http.federation import (
|
||||
from synapse.storage.databases.main.lock import Lock
|
||||
from synapse.types import JsonDict, get_domain_from_id
|
||||
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.stringutils import parse_server_name
|
||||
|
||||
@@ -360,13 +359,13 @@ class FederationServer(FederationBase):
|
||||
# want to block things like to device messages from reaching clients
|
||||
# behind the potentially expensive handling of PDUs.
|
||||
pdu_results, _ = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
gather_results(
|
||||
(
|
||||
run_in_background(
|
||||
self._handle_pdus_in_txn, origin, transaction, request_time
|
||||
),
|
||||
run_in_background(self._handle_edus_in_txn, origin, transaction),
|
||||
],
|
||||
),
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
@@ -997,9 +997,7 @@ class AuthHandler:
|
||||
# really don't want is active access_tokens without a record of the
|
||||
# device, so we double-check it here.
|
||||
if device_id is not None:
|
||||
try:
|
||||
await self.store.get_device(user_id, device_id)
|
||||
except StoreError:
|
||||
if await self.store.get_device(user_id, device_id) is None:
|
||||
await self.store.delete_access_token(access_token)
|
||||
raise StoreError(400, "Login raced against device deletion")
|
||||
|
||||
|
||||
@@ -106,10 +106,10 @@ class DeviceWorkerHandler:
|
||||
Raises:
|
||||
errors.NotFoundError: if the device was not found
|
||||
"""
|
||||
try:
|
||||
device = await self.store.get_device(user_id, device_id)
|
||||
except errors.StoreError:
|
||||
raise errors.NotFoundError
|
||||
device = await self.store.get_device(user_id, device_id)
|
||||
if device is None:
|
||||
raise errors.NotFoundError()
|
||||
|
||||
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
@@ -602,6 +602,8 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
access_token, device_id
|
||||
)
|
||||
old_device = await self.store.get_device(user_id, old_device_id)
|
||||
if old_device is None:
|
||||
raise errors.NotFoundError()
|
||||
await self.store.update_device(user_id, device_id, old_device["display_name"])
|
||||
# can't call self.delete_device because that will clobber the
|
||||
# access token so call the storage layer directly
|
||||
|
||||
@@ -580,7 +580,9 @@ class E2eKeysHandler:
|
||||
log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
|
||||
fallback_keys = keys.get("fallback_keys") or keys.get(
|
||||
"org.matrix.msc2732.fallback_keys"
|
||||
)
|
||||
if fallback_keys and isinstance(fallback_keys, dict):
|
||||
log_kv(
|
||||
{
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
from typing import TYPE_CHECKING, Dict, Optional
|
||||
|
||||
from typing_extensions import Literal
|
||||
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
@@ -24,6 +26,7 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.storage.databases.main.e2e_room_keys import RoomKey
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
@@ -58,7 +61,9 @@ class E2eRoomKeysHandler:
|
||||
version: str,
|
||||
room_id: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
) -> List[JsonDict]:
|
||||
) -> Dict[
|
||||
Literal["rooms"], Dict[str, Dict[Literal["sessions"], Dict[str, RoomKey]]]
|
||||
]:
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
room, or a given session.
|
||||
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
|
||||
@@ -72,8 +77,8 @@ class E2eRoomKeysHandler:
|
||||
Raises:
|
||||
NotFoundError: if the backup version does not exist
|
||||
Returns:
|
||||
A list of dicts giving the session_data and message metadata for
|
||||
these room keys.
|
||||
A dict giving the session_data and message metadata for these room keys.
|
||||
`{"rooms": {room_id: {"sessions": {session_id: room_key}}}}`
|
||||
"""
|
||||
|
||||
# we deliberately take the lock to get keys so that changing the version
|
||||
@@ -273,7 +278,7 @@ class E2eRoomKeysHandler:
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(
|
||||
current_room_key: Optional[JsonDict], room_key: JsonDict
|
||||
current_room_key: Optional[RoomKey], room_key: RoomKey
|
||||
) -> bool:
|
||||
"""
|
||||
Determine whether to replace a given current_room_key (if any)
|
||||
|
||||
@@ -79,13 +79,14 @@ class EventStreamHandler:
|
||||
# thundering herds on restart.
|
||||
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
|
||||
|
||||
events, tokens = await self.notifier.get_events_for(
|
||||
stream_result = await self.notifier.get_events_for(
|
||||
auth_user,
|
||||
pagin_config,
|
||||
timeout,
|
||||
is_guest=is_guest,
|
||||
explicit_room_id=room_id,
|
||||
)
|
||||
events = stream_result.events
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
@@ -128,8 +129,8 @@ class EventStreamHandler:
|
||||
|
||||
chunk = {
|
||||
"chunk": chunks,
|
||||
"start": await tokens[0].to_string(self.store),
|
||||
"end": await tokens[1].to_string(self.store),
|
||||
"start": await stream_result.start_token.to_string(self.store),
|
||||
"end": await stream_result.end_token.to_string(self.store),
|
||||
}
|
||||
|
||||
return chunk
|
||||
|
||||
@@ -360,31 +360,34 @@ class FederationHandler:
|
||||
|
||||
logger.debug("calling resolve_state_groups in _maybe_backfill")
|
||||
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
|
||||
states = await make_deferred_yieldable(
|
||||
states_list = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
|
||||
)
|
||||
)
|
||||
|
||||
# dict[str, dict[tuple, str]], a map from event_id to state map of
|
||||
# event_ids.
|
||||
states = dict(zip(event_ids, [s.state for s in states]))
|
||||
# A map from event_id to state map of event_ids.
|
||||
state_ids: Dict[str, StateMap[str]] = dict(
|
||||
zip(event_ids, [s.state for s in states_list])
|
||||
)
|
||||
|
||||
state_map = await self.store.get_events(
|
||||
[e_id for ids in states.values() for e_id in ids.values()],
|
||||
[e_id for ids in state_ids.values() for e_id in ids.values()],
|
||||
get_prev_content=False,
|
||||
)
|
||||
states = {
|
||||
|
||||
# A map from event_id to state map of events.
|
||||
state_events: Dict[str, StateMap[EventBase]] = {
|
||||
key: {
|
||||
k: state_map[e_id]
|
||||
for k, e_id in state_dict.items()
|
||||
if e_id in state_map
|
||||
}
|
||||
for key, state_dict in states.items()
|
||||
for key, state_dict in state_ids.items()
|
||||
}
|
||||
|
||||
for e_id in event_ids:
|
||||
likely_extremeties_domains = get_domains_from_state(states[e_id])
|
||||
likely_extremeties_domains = get_domains_from_state(state_events[e_id])
|
||||
|
||||
success = await try_backfill(
|
||||
[
|
||||
|
||||
@@ -13,21 +13,27 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
|
||||
from twisted.internet import defer
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
|
||||
|
||||
from synapse.api.constants import EduTypes, EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.handlers.receipts import ReceiptEventSource
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
Requester,
|
||||
RoomStreamToken,
|
||||
StateMap,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute, gather_results
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
@@ -190,14 +196,13 @@ class InitialSyncHandler:
|
||||
)
|
||||
deferred_room_state = run_in_background(
|
||||
self.state_store.get_state_for_events, [event.event_id]
|
||||
)
|
||||
deferred_room_state.addCallback(
|
||||
lambda states: states[event.event_id]
|
||||
).addCallback(
|
||||
lambda states: cast(StateMap[EventBase], states[event.event_id])
|
||||
)
|
||||
|
||||
(messages, token), current_state = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
gather_results(
|
||||
(
|
||||
run_in_background(
|
||||
self.store.get_recent_events_for_room,
|
||||
event.room_id,
|
||||
@@ -205,7 +210,7 @@ class InitialSyncHandler:
|
||||
end_token=room_end_token,
|
||||
),
|
||||
deferred_room_state,
|
||||
]
|
||||
)
|
||||
)
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
@@ -454,8 +459,8 @@ class InitialSyncHandler:
|
||||
return receipts
|
||||
|
||||
presence, receipts, (messages, token) = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
gather_results(
|
||||
(
|
||||
run_in_background(get_presence),
|
||||
run_in_background(get_receipts),
|
||||
run_in_background(
|
||||
@@ -464,7 +469,7 @@ class InitialSyncHandler:
|
||||
limit=limit,
|
||||
end_token=now_token.room_key,
|
||||
),
|
||||
],
|
||||
),
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
)
|
||||
|
||||
@@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IDelayedCall
|
||||
|
||||
from synapse import event_auth
|
||||
@@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
|
||||
from synapse.util import json_decoder, json_encoder, log_failure
|
||||
from synapse.util.async_helpers import Linearizer, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -496,6 +495,7 @@ class EventCreationHandler:
|
||||
require_consent: bool = True,
|
||||
outlier: bool = False,
|
||||
historical: bool = False,
|
||||
allow_no_prev_events: bool = False,
|
||||
depth: Optional[int] = None,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""
|
||||
@@ -607,6 +607,7 @@ class EventCreationHandler:
|
||||
prev_event_ids=prev_event_ids,
|
||||
auth_event_ids=auth_event_ids,
|
||||
depth=depth,
|
||||
allow_no_prev_events=allow_no_prev_events,
|
||||
)
|
||||
|
||||
# In an ideal world we wouldn't need the second part of this condition. However,
|
||||
@@ -882,6 +883,7 @@ class EventCreationHandler:
|
||||
prev_event_ids: Optional[List[str]] = None,
|
||||
auth_event_ids: Optional[List[str]] = None,
|
||||
depth: Optional[int] = None,
|
||||
allow_no_prev_events: bool = False,
|
||||
) -> Tuple[EventBase, EventContext]:
|
||||
"""Create a new event for a local client
|
||||
|
||||
@@ -912,6 +914,7 @@ class EventCreationHandler:
|
||||
full_state_ids_at_event = None
|
||||
if auth_event_ids is not None:
|
||||
# If auth events are provided, prev events must be also.
|
||||
# prev_event_ids could be an empty array though.
|
||||
assert prev_event_ids is not None
|
||||
|
||||
# Copy the full auth state before it stripped down
|
||||
@@ -943,14 +946,22 @@ class EventCreationHandler:
|
||||
else:
|
||||
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
|
||||
|
||||
# we now ought to have some prev_events (unless it's a create event).
|
||||
#
|
||||
# do a quick sanity check here, rather than waiting until we've created the
|
||||
# Do a quick sanity check here, rather than waiting until we've created the
|
||||
# event and then try to auth it (which fails with a somewhat confusing "No
|
||||
# create event in auth events")
|
||||
assert (
|
||||
builder.type == EventTypes.Create or len(prev_event_ids) > 0
|
||||
), "Attempting to create an event with no prev_events"
|
||||
if allow_no_prev_events:
|
||||
# We allow events with no `prev_events` but it better have some `auth_events`
|
||||
assert (
|
||||
builder.type == EventTypes.Create
|
||||
# Allow an event to have empty list of prev_event_ids
|
||||
# only if it has auth_event_ids.
|
||||
or auth_event_ids
|
||||
), "Attempting to create a non-m.room.create event with no prev_events or auth_event_ids"
|
||||
else:
|
||||
# we now ought to have some prev_events (unless it's a create event).
|
||||
assert (
|
||||
builder.type == EventTypes.Create or prev_event_ids
|
||||
), "Attempting to create a non-m.room.create event with no prev_events"
|
||||
|
||||
event = await builder.build(
|
||||
prev_event_ids=prev_event_ids,
|
||||
@@ -1156,9 +1167,9 @@ class EventCreationHandler:
|
||||
|
||||
# We now persist the event (and update the cache in parallel, since we
|
||||
# don't want to block on it).
|
||||
result = await make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
result, _ = await make_deferred_yieldable(
|
||||
gather_results(
|
||||
(
|
||||
run_in_background(
|
||||
self._persist_event,
|
||||
requester=requester,
|
||||
@@ -1170,12 +1181,12 @@ class EventCreationHandler:
|
||||
run_in_background(
|
||||
self.cache_joined_hosts_for_event, event, context
|
||||
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
|
||||
],
|
||||
),
|
||||
consumeErrors=True,
|
||||
)
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
return result[0]
|
||||
return result
|
||||
|
||||
async def _persist_event(
|
||||
self,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import ReadReceiptEventFields
|
||||
from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.streams import EventSource
|
||||
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
|
||||
@@ -178,7 +178,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
for event_id in content.keys():
|
||||
event_content = content.get(event_id, {})
|
||||
m_read = event_content.get("m.read", {})
|
||||
m_read = event_content.get(ReceiptTypes.READ, {})
|
||||
|
||||
# If m_read is missing copy over the original event_content as there is nothing to process here
|
||||
if not m_read:
|
||||
@@ -206,7 +206,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||
|
||||
# Set new users unless empty
|
||||
if len(new_users.keys()) > 0:
|
||||
new_event["content"][event_id] = {"m.read": new_users}
|
||||
new_event["content"][event_id] = {ReceiptTypes.READ: new_users}
|
||||
|
||||
# Append new_event to visible_events unless empty
|
||||
if len(new_event["content"].keys()) > 0:
|
||||
|
||||
@@ -658,7 +658,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
if block_invite:
|
||||
raise SynapseError(403, "Invites have been disabled on this server")
|
||||
|
||||
if prev_event_ids:
|
||||
# An empty prev_events list is allowed as long as the auth_event_ids are present
|
||||
if prev_event_ids is not None:
|
||||
return await self._local_membership_update(
|
||||
requester=requester,
|
||||
target=target,
|
||||
@@ -1019,7 +1020,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# Add new room to the room directory if the old room was there
|
||||
# Remove old room from the room directory
|
||||
old_room = await self.store.get_room(old_room_id)
|
||||
if old_room and old_room["is_public"]:
|
||||
if old_room is not None and old_room["is_public"]:
|
||||
await self.store.set_room_is_public(old_room_id, False)
|
||||
await self.store.set_room_is_public(room_id, True)
|
||||
|
||||
@@ -1030,7 +1031,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
|
||||
for group_id in local_group_ids:
|
||||
# Add new the new room to those groups
|
||||
await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
|
||||
await self.store.add_room_to_group(
|
||||
group_id, room_id, old_room is not None and old_room["is_public"]
|
||||
)
|
||||
|
||||
# Remove the old room from those groups
|
||||
await self.store.remove_room_from_group(group_id, old_room_id)
|
||||
|
||||
@@ -1045,7 +1045,7 @@ class RoomSummaryHandler:
|
||||
|
||||
# filter out any events without a "via" (which implies it has been redacted),
|
||||
# and order to ensure we return stable results.
|
||||
return sorted(filter(has_valid_via, events), key=child_events_comparison_key)
|
||||
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
|
||||
|
||||
async def get_room_summary(
|
||||
self,
|
||||
@@ -1139,7 +1139,7 @@ class _RoomEntry:
|
||||
return result
|
||||
|
||||
|
||||
def has_valid_via(e: EventBase) -> bool:
|
||||
def _has_valid_via(e: EventBase) -> bool:
|
||||
via = e.content.get("via")
|
||||
if not via or not isinstance(via, Sequence):
|
||||
return False
|
||||
@@ -1162,7 +1162,7 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool:
|
||||
_INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7E]")
|
||||
|
||||
|
||||
def child_events_comparison_key(
|
||||
def _child_events_comparison_key(
|
||||
child: EventBase,
|
||||
) -> Tuple[bool, Optional[str], int, str]:
|
||||
"""
|
||||
|
||||
@@ -1,294 +0,0 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
from synapse.api.constants import EventContentFields, EventTypes, RoomTypes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.handlers.room_summary import child_events_comparison_key, has_valid_via
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SpaceHierarchyHandler:
|
||||
"""Provides methods for walking over space hierarchies.
|
||||
|
||||
Also see `RoomSummaryHandler`, which has similar functionality.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._store = hs.get_datastore()
|
||||
self._federation_client = hs.get_federation_client()
|
||||
|
||||
self._server_name = hs.hostname
|
||||
|
||||
async def get_space_descendants(
|
||||
self,
|
||||
space_id: str,
|
||||
via: Optional[Iterable[str]] = None,
|
||||
enable_federation: Optional[bool] = True,
|
||||
) -> Tuple[Sequence[Tuple[str, Iterable[str]]], Sequence[str]]:
|
||||
"""Gets the children of a space, recursively.
|
||||
|
||||
Args:
|
||||
space_id: The room ID of the space.
|
||||
via: A list of servers which may know about the space.
|
||||
enable_federation: A boolean controlling whether children of unknown rooms
|
||||
should be fetched over federation. Defaults to `True`.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
* A list of (room ID, via) tuples, representing the descendants of the
|
||||
space. `space_id` is included in the list.
|
||||
* A list of room IDs whose children could not be fully listed.
|
||||
Rooms in this list are either spaces not known locally, and thus require
|
||||
listing over federation, or are unknown rooms or subspaces completely
|
||||
inaccessible to the local homeserver which may contain further rooms.
|
||||
Subspaces requiring listing over federation are always included here,
|
||||
regardless of the value of the `enable_federation` flag.
|
||||
|
||||
This list is a subset of the previous list, except it may include
|
||||
`space_id`.
|
||||
"""
|
||||
via = via or []
|
||||
|
||||
# (room ID, via, federation room chunks)
|
||||
todo: List[Tuple[str, Iterable[str], Mapping[str, Optional[JsonDict]]]] = [
|
||||
(space_id, via, {})
|
||||
]
|
||||
# [(room ID, via)]
|
||||
descendants: List[Tuple[str, Iterable[str]]] = []
|
||||
|
||||
seen = {space_id}
|
||||
|
||||
inaccessible_room_ids: List[str] = []
|
||||
|
||||
while todo:
|
||||
space_id, via, federation_room_chunks = todo.pop()
|
||||
descendants.append((space_id, via))
|
||||
try:
|
||||
(
|
||||
is_in_room,
|
||||
children,
|
||||
federation_room_chunks,
|
||||
) = await self._get_space_children(
|
||||
space_id,
|
||||
via,
|
||||
federation_room_chunks,
|
||||
enable_federation=enable_federation,
|
||||
)
|
||||
except SynapseError:
|
||||
# Could not list children over federation
|
||||
inaccessible_room_ids.append(space_id)
|
||||
continue
|
||||
|
||||
# Children were retrieved over federation, which is not guaranteed to be
|
||||
# the full list.
|
||||
if not is_in_room:
|
||||
inaccessible_room_ids.append(space_id)
|
||||
|
||||
for child_room_id, child_via in reversed(children):
|
||||
if child_room_id in seen:
|
||||
continue
|
||||
|
||||
seen.add(child_room_id)
|
||||
|
||||
# Queue up the child for processing.
|
||||
# The child may not actually be a space, but that's checked by
|
||||
# `_get_space_children`.
|
||||
todo.append((child_room_id, child_via, federation_room_chunks))
|
||||
|
||||
return descendants, inaccessible_room_ids
|
||||
|
||||
async def _get_space_children(
|
||||
self,
|
||||
space_id: str,
|
||||
via: Optional[Iterable[str]] = None,
|
||||
federation_room_chunks: Optional[Mapping[str, Optional[JsonDict]]] = None,
|
||||
enable_federation: Optional[bool] = True,
|
||||
) -> Tuple[
|
||||
bool, Sequence[Tuple[str, Iterable[str]]], Mapping[str, Optional[JsonDict]]
|
||||
]:
|
||||
"""Gets the direct children of a space.
|
||||
|
||||
Args:
|
||||
space_id: The room ID of the space.
|
||||
via: A list of servers which may know about the space.
|
||||
federation_room_chunks: A cache of room chunks previously returned by
|
||||
`_get_space_children` that may be used to skip federation requests for
|
||||
inaccessible or non-space rooms.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
* A boolean indicating whether `space_id` is known to the local homeserver.
|
||||
* A list of (room ID, via) tuples, representing the children of the space,
|
||||
if `space_id` refers to a space; an empty list otherwise.
|
||||
* A dictionary of child room ID: `PublicRoomsChunk`s returned over
|
||||
federation:
|
||||
https://spec.matrix.org/latest/client-server-api/#get_matrixclientv3publicrooms
|
||||
These are supposed to include extra `room_type` and `allowed_room_ids`
|
||||
fields, as described in MSC2946.
|
||||
|
||||
Contains `None` for rooms to which the remote homeserver thinks we do not
|
||||
have access.
|
||||
|
||||
Local information about rooms should be trusted over data in this
|
||||
dictionary.
|
||||
|
||||
Raises:
|
||||
SynapseError: if `space_id` is not known locally and its children could not
|
||||
be retrieved over federation or `enable_federation` is `False`.
|
||||
"""
|
||||
via = via or []
|
||||
federation_room_chunks = federation_room_chunks or {}
|
||||
|
||||
is_in_room = await self._store.is_host_joined(space_id, self._server_name)
|
||||
if is_in_room:
|
||||
children = await self._get_space_children_local(space_id)
|
||||
return True, children, {}
|
||||
else:
|
||||
# Check the room chunks previously returned over federation to see if we
|
||||
# should really make a request.
|
||||
# `federation_room_chunks` is intentionally not used earlier since we want
|
||||
# to trust local data over data from federation.
|
||||
if space_id in federation_room_chunks:
|
||||
room_chunk = federation_room_chunks[space_id]
|
||||
if room_chunk is None:
|
||||
# `space_id` is inaccessible to the local homeserver according to
|
||||
# federation.
|
||||
raise SynapseError(
|
||||
502, f"{space_id} is not accessible to the local homeserver"
|
||||
)
|
||||
elif room_chunk.get("room_type") != RoomTypes.SPACE:
|
||||
# `space_id` is not a space according to federation.
|
||||
return False, [], {}
|
||||
|
||||
if not enable_federation:
|
||||
raise SynapseError(
|
||||
502, f"{space_id} is not accessible to the local homeserver"
|
||||
)
|
||||
|
||||
children, room_chunks = await self._get_space_children_remote(space_id, via)
|
||||
return False, children, room_chunks
|
||||
|
||||
async def _get_space_children_local(
|
||||
self, space_id: str
|
||||
) -> Sequence[Tuple[str, Iterable[str]]]:
|
||||
"""Gets the direct children of a space that the local homeserver is in.
|
||||
|
||||
Args:
|
||||
space_id: The room ID of the space.
|
||||
|
||||
Returns:
|
||||
A list of (room ID, via) tuples, representing the children of the space,
|
||||
if `space_id` refers to a space; an empty list otherwise.
|
||||
|
||||
Raises:
|
||||
ValueError: if `space_id` is not known locally.
|
||||
"""
|
||||
# Fetch the `m.room.create` and `m.space.child` events for `space_id`
|
||||
state_filter = StateFilter.from_types(
|
||||
[(EventTypes.Create, ""), (EventTypes.SpaceChild, None)]
|
||||
)
|
||||
current_state_ids = await self._store.get_filtered_current_state_ids(
|
||||
space_id, state_filter
|
||||
)
|
||||
state_events = await self._store.get_events_as_list(current_state_ids.values())
|
||||
assert len(state_events) == len(current_state_ids)
|
||||
|
||||
create_event_id = current_state_ids.get((EventTypes.Create, ""))
|
||||
if create_event_id is None:
|
||||
# The local homeserver is not in this room
|
||||
raise ValueError(f"{space_id} is not a room known locally.")
|
||||
|
||||
create_event = next(
|
||||
event for event in state_events if event.event_id == create_event_id
|
||||
)
|
||||
if create_event.content.get(EventContentFields.ROOM_TYPE) != RoomTypes.SPACE:
|
||||
# `space_id` is a regular room and not a space.
|
||||
# Ignore any `m.space.child` events.
|
||||
return []
|
||||
|
||||
child_events = [
|
||||
event
|
||||
for event in state_events
|
||||
# Ignore events with a missing or non-array `via`, as per MSC1772
|
||||
if event.event_id != create_event_id and has_valid_via(event)
|
||||
]
|
||||
child_events.sort(key=child_events_comparison_key)
|
||||
return [(event.state_key, event.content["via"]) for event in child_events]
|
||||
|
||||
async def _get_space_children_remote(
|
||||
self, space_id: str, via: Iterable[str]
|
||||
) -> Tuple[Sequence[Tuple[str, Iterable[str]]], Mapping[str, Optional[JsonDict]]]:
|
||||
"""Gets the direct children of a space over federation.
|
||||
|
||||
Args:
|
||||
space_id: The room ID of the space.
|
||||
via: A list of servers which may know about the space.
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
* A list of (room ID, via) tuples, representing the children of the space,
|
||||
if `space_id` refers to a space; an empty list otherwise.
|
||||
* A dictionary of child room ID: `PublicRoomsChunk`s returned over
|
||||
federation:
|
||||
https://spec.matrix.org/latest/client-server-api/#get_matrixclientv3publicrooms
|
||||
These are supposed to include extra `room_type` and `allowed_room_ids`
|
||||
fields, as described in MSC2946.
|
||||
|
||||
Contains `None` for rooms to which the remote homeserver thinks we do not
|
||||
have access.
|
||||
|
||||
Raises:
|
||||
SynapseError: if none of the remote servers provided us with the space's
|
||||
children.
|
||||
"""
|
||||
(
|
||||
room,
|
||||
children_chunks,
|
||||
inaccessible_children,
|
||||
) = await self._federation_client.get_room_hierarchy(
|
||||
via, space_id, suggested_only=False
|
||||
)
|
||||
|
||||
child_events: List[JsonDict] = room["children_state"]
|
||||
children = [
|
||||
(child_event["room_id"], child_event["content"]["via"])
|
||||
for child_event in child_events
|
||||
]
|
||||
|
||||
room_chunks: Dict[str, Optional[JsonDict]] = {}
|
||||
room_chunks.update((room_id, None) for room_id in inaccessible_children)
|
||||
room_chunks.update(
|
||||
(room_chunk["room_id"], room_chunk) for room_chunk in children_chunks
|
||||
)
|
||||
|
||||
return children, room_chunks
|
||||
@@ -28,7 +28,7 @@ from typing import (
|
||||
import attr
|
||||
from prometheus_client import Counter
|
||||
|
||||
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
|
||||
from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
|
||||
from synapse.api.filtering import FilterCollection
|
||||
from synapse.api.presence import UserPresenceState
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
@@ -1046,7 +1046,7 @@ class SyncHandler:
|
||||
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
|
||||
user_id=sync_config.user.to_string(),
|
||||
room_id=room_id,
|
||||
receipt_type="m.read",
|
||||
receipt_type=ReceiptTypes.READ,
|
||||
)
|
||||
|
||||
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
@@ -1662,20 +1662,20 @@ class SyncHandler:
|
||||
) -> _RoomChanges:
|
||||
"""Determine the changes in rooms to report to the user.
|
||||
|
||||
Ideally, we want to report all events whose stream ordering `s` lies in the
|
||||
range `since_token < s <= now_token`, where the two tokens are read from the
|
||||
sync_result_builder.
|
||||
This function is a first pass at generating the rooms part of the sync response.
|
||||
It determines which rooms have changed during the sync period, and categorises
|
||||
them into four buckets: "knock", "invite", "join" and "leave".
|
||||
|
||||
If there are too many events in that range to report, things get complicated.
|
||||
In this situation we return a truncated list of the most recent events, and
|
||||
indicate in the response that there is a "gap" of omitted events. Additionally:
|
||||
1. Finds all membership changes for the user in the sync period (from
|
||||
`since_token` up to `now_token`).
|
||||
2. Uses those to place the room in one of the four categories above.
|
||||
3. Builds a `_RoomChanges` struct to record this, and return that struct.
|
||||
|
||||
- we include a "state_delta", to describe the changes in state over the gap,
|
||||
- we include all membership events applying to the user making the request,
|
||||
even those in the gap.
|
||||
|
||||
See the spec for the rationale:
|
||||
https://spec.matrix.org/v1.1/client-server-api/#syncing
|
||||
For rooms classified as "knock", "invite" or "leave", we just need to report
|
||||
a single membership event in the eventual /sync response. For "join" we need
|
||||
to fetch additional non-membership events, e.g. messages in the room. That is
|
||||
more complicated, so instead we report an intermediary `RoomSyncResultBuilder`
|
||||
struct, and leave the additional work to `_generate_room_entry`.
|
||||
|
||||
The sync_result_builder is not modified by this function.
|
||||
"""
|
||||
@@ -1686,16 +1686,6 @@ class SyncHandler:
|
||||
|
||||
assert since_token
|
||||
|
||||
# The spec
|
||||
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
|
||||
# notes that membership events need special consideration:
|
||||
#
|
||||
# > When a sync is limited, the server MUST return membership events for events
|
||||
# > in the gap (between since and the start of the returned timeline), regardless
|
||||
# > as to whether or not they are redundant.
|
||||
#
|
||||
# We fetch such events here, but we only seem to use them for categorising rooms
|
||||
# as newly joined, newly left, invited or knocked.
|
||||
# TODO: we've already called this function and ran this query in
|
||||
# _have_rooms_changed. We could keep the results in memory to avoid a
|
||||
# second query, at the cost of more complicated source code.
|
||||
@@ -2009,6 +1999,23 @@ class SyncHandler:
|
||||
"""Populates the `joined` and `archived` section of `sync_result_builder`
|
||||
based on the `room_builder`.
|
||||
|
||||
Ideally, we want to report all events whose stream ordering `s` lies in the
|
||||
range `since_token < s <= now_token`, where the two tokens are read from the
|
||||
sync_result_builder.
|
||||
|
||||
If there are too many events in that range to report, things get complicated.
|
||||
In this situation we return a truncated list of the most recent events, and
|
||||
indicate in the response that there is a "gap" of omitted events. Lots of this
|
||||
is handled in `_load_filtered_recents`, but some of is handled in this method.
|
||||
|
||||
Additionally:
|
||||
- we include a "state_delta", to describe the changes in state over the gap,
|
||||
- we include all membership events applying to the user making the request,
|
||||
even those in the gap.
|
||||
|
||||
See the spec for the rationale:
|
||||
https://spec.matrix.org/v1.1/client-server-api/#syncing
|
||||
|
||||
Args:
|
||||
sync_result_builder
|
||||
ignored_users: Set of users ignored by user.
|
||||
|
||||
@@ -25,7 +25,7 @@ from synapse.api.errors import SynapseError
|
||||
class RequestTimedOutError(SynapseError):
|
||||
"""Exception representing timeout of an outbound request"""
|
||||
|
||||
def __init__(self, msg):
|
||||
def __init__(self, msg: str):
|
||||
super().__init__(504, msg)
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
|
||||
CLIENT_SECRET_RE = re.compile(r"(\?.*client(_|%5[Ff])secret=)[^&]*(.*)$")
|
||||
|
||||
|
||||
def redact_uri(uri):
|
||||
def redact_uri(uri: str) -> str:
|
||||
"""Strips sensitive information from the uri replaces with <redacted>"""
|
||||
uri = ACCESS_TOKEN_RE.sub(r"\1<redacted>\3", uri)
|
||||
return CLIENT_SECRET_RE.sub(r"\1<redacted>\3", uri)
|
||||
@@ -46,7 +46,7 @@ class QuieterFileBodyProducer(FileBodyProducer):
|
||||
https://twistedmatrix.com/trac/ticket/6528
|
||||
"""
|
||||
|
||||
def stopProducing(self):
|
||||
def stopProducing(self) -> None:
|
||||
try:
|
||||
FileBodyProducer.stopProducing(self)
|
||||
except task.TaskStopped:
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple
|
||||
|
||||
from twisted.web.server import Request
|
||||
|
||||
@@ -32,7 +32,11 @@ class AdditionalResource(DirectServeJsonResource):
|
||||
and exception handling.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer", handler):
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
handler: Callable[[Request], Awaitable[Optional[Tuple[int, Any]]]],
|
||||
):
|
||||
"""Initialise AdditionalResource
|
||||
|
||||
The ``handler`` should return a deferred which completes when it has
|
||||
@@ -47,7 +51,7 @@ class AdditionalResource(DirectServeJsonResource):
|
||||
super().__init__()
|
||||
self._handler = handler
|
||||
|
||||
def _async_render(self, request: Request):
|
||||
async def _async_render(self, request: Request) -> Optional[Tuple[int, Any]]:
|
||||
# Cheekily pass the result straight through, so we don't need to worry
|
||||
# if its an awaitable or not.
|
||||
return self._handler(request)
|
||||
return await self._handler(request)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import urllib.parse
|
||||
from http import HTTPStatus
|
||||
from io import BytesIO
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -280,7 +281,9 @@ class BlacklistingAgentWrapper(Agent):
|
||||
ip_address, self._ip_whitelist, self._ip_blacklist
|
||||
):
|
||||
logger.info("Blocking access to %s due to blacklist" % (ip_address,))
|
||||
e = SynapseError(403, "IP address blocked by IP blacklist entry")
|
||||
e = SynapseError(
|
||||
HTTPStatus.FORBIDDEN, "IP address blocked by IP blacklist entry"
|
||||
)
|
||||
return defer.fail(Failure(e))
|
||||
|
||||
return self._agent.request(
|
||||
@@ -719,7 +722,9 @@ class SimpleHttpClient:
|
||||
|
||||
if response.code > 299:
|
||||
logger.warning("Got %d when downloading %s" % (response.code, url))
|
||||
raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY, "Got error %d" % (response.code,), Codes.UNKNOWN
|
||||
)
|
||||
|
||||
# TODO: if our Content-Type is HTML or something, just read the first
|
||||
# N bytes into RAM rather than saving it all to disk only to read it
|
||||
@@ -731,12 +736,14 @@ class SimpleHttpClient:
|
||||
)
|
||||
except BodyExceededMaxSize:
|
||||
raise SynapseError(
|
||||
502,
|
||||
HTTPStatus.BAD_GATEWAY,
|
||||
"Requested file is too large > %r bytes" % (max_size,),
|
||||
Codes.TOO_LARGE,
|
||||
)
|
||||
except Exception as e:
|
||||
raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_GATEWAY, ("Failed to download remote body: %s" % e)
|
||||
) from e
|
||||
|
||||
return (
|
||||
length,
|
||||
|
||||
@@ -25,6 +25,7 @@ from zope.interface import implementer
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet.interfaces import (
|
||||
IProtocol,
|
||||
IProtocolFactory,
|
||||
IReactorCore,
|
||||
IStreamClientEndpoint,
|
||||
@@ -309,12 +310,14 @@ class MatrixHostnameEndpoint:
|
||||
|
||||
self._srv_resolver = srv_resolver
|
||||
|
||||
def connect(self, protocol_factory: IProtocolFactory) -> defer.Deferred:
|
||||
def connect(
|
||||
self, protocol_factory: IProtocolFactory
|
||||
) -> "defer.Deferred[IProtocol]":
|
||||
"""Implements IStreamClientEndpoint interface"""
|
||||
|
||||
return run_in_background(self._do_connect, protocol_factory)
|
||||
|
||||
async def _do_connect(self, protocol_factory: IProtocolFactory) -> None:
|
||||
async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
|
||||
first_exception = None
|
||||
|
||||
server_list = await self._resolve_server()
|
||||
|
||||
@@ -19,6 +19,7 @@ import random
|
||||
import sys
|
||||
import typing
|
||||
import urllib.parse
|
||||
from http import HTTPStatus
|
||||
from io import BytesIO, StringIO
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -1154,7 +1155,7 @@ class MatrixFederationHttpClient:
|
||||
request.destination,
|
||||
msg,
|
||||
)
|
||||
raise SynapseError(502, msg, Codes.TOO_LARGE)
|
||||
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
||||
except defer.TimeoutError as e:
|
||||
logger.warning(
|
||||
"{%s} [%s] Timed out reading response - %s %s",
|
||||
|
||||
@@ -30,6 +30,7 @@ from typing import (
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
NoReturn,
|
||||
Optional,
|
||||
Pattern,
|
||||
Tuple,
|
||||
@@ -170,7 +171,9 @@ def return_html_error(
|
||||
respond_with_html(request, code, body)
|
||||
|
||||
|
||||
def wrap_async_request_handler(h):
|
||||
def wrap_async_request_handler(
|
||||
h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]]
|
||||
) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]:
|
||||
"""Wraps an async request handler so that it calls request.processing.
|
||||
|
||||
This helps ensure that work done by the request handler after the request is completed
|
||||
@@ -183,7 +186,9 @@ def wrap_async_request_handler(h):
|
||||
logged until the deferred completes.
|
||||
"""
|
||||
|
||||
async def wrapped_async_request_handler(self, request):
|
||||
async def wrapped_async_request_handler(
|
||||
self: "_AsyncResource", request: SynapseRequest
|
||||
) -> None:
|
||||
with request.processing():
|
||||
await h(self, request)
|
||||
|
||||
@@ -240,18 +245,18 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||
context from the request the servlet is handling.
|
||||
"""
|
||||
|
||||
def __init__(self, extract_context=False):
|
||||
def __init__(self, extract_context: bool = False):
|
||||
super().__init__()
|
||||
|
||||
self._extract_context = extract_context
|
||||
|
||||
def render(self, request):
|
||||
def render(self, request: SynapseRequest) -> int:
|
||||
"""This gets called by twisted every time someone sends us a request."""
|
||||
defer.ensureDeferred(self._async_render_wrapper(request))
|
||||
return NOT_DONE_YET
|
||||
|
||||
@wrap_async_request_handler
|
||||
async def _async_render_wrapper(self, request: SynapseRequest):
|
||||
async def _async_render_wrapper(self, request: SynapseRequest) -> None:
|
||||
"""This is a wrapper that delegates to `_async_render` and handles
|
||||
exceptions, return values, metrics, etc.
|
||||
"""
|
||||
@@ -271,7 +276,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
|
||||
f = failure.Failure()
|
||||
self._send_error_response(f, request)
|
||||
|
||||
async def _async_render(self, request: Request):
|
||||
async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]:
|
||||
"""Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
|
||||
no appropriate method exists. Can be overridden in sub classes for
|
||||
different routing.
|
||||
@@ -318,7 +323,7 @@ class DirectServeJsonResource(_AsyncResource):
|
||||
formatting responses and errors as JSON.
|
||||
"""
|
||||
|
||||
def __init__(self, canonical_json=False, extract_context=False):
|
||||
def __init__(self, canonical_json: bool = False, extract_context: bool = False):
|
||||
super().__init__(extract_context)
|
||||
self.canonical_json = canonical_json
|
||||
|
||||
@@ -327,7 +332,7 @@ class DirectServeJsonResource(_AsyncResource):
|
||||
request: SynapseRequest,
|
||||
code: int,
|
||||
response_object: Any,
|
||||
):
|
||||
) -> None:
|
||||
"""Implements _AsyncResource._send_response"""
|
||||
# TODO: Only enable CORS for the requests that need it.
|
||||
respond_with_json(
|
||||
@@ -368,34 +373,45 @@ class JsonResource(DirectServeJsonResource):
|
||||
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, hs: "HomeServer", canonical_json=True, extract_context=False):
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
canonical_json: bool = True,
|
||||
extract_context: bool = False,
|
||||
):
|
||||
super().__init__(canonical_json, extract_context)
|
||||
self.clock = hs.get_clock()
|
||||
self.path_regexs: Dict[bytes, List[_PathEntry]] = {}
|
||||
self.hs = hs
|
||||
|
||||
def register_paths(self, method, path_patterns, callback, servlet_classname):
|
||||
def register_paths(
|
||||
self,
|
||||
method: str,
|
||||
path_patterns: Iterable[Pattern],
|
||||
callback: ServletCallback,
|
||||
servlet_classname: str,
|
||||
) -> None:
|
||||
"""
|
||||
Registers a request handler against a regular expression. Later request URLs are
|
||||
checked against these regular expressions in order to identify an appropriate
|
||||
handler for that request.
|
||||
|
||||
Args:
|
||||
method (str): GET, POST etc
|
||||
method: GET, POST etc
|
||||
|
||||
path_patterns (Iterable[str]): A list of regular expressions to which
|
||||
the request URLs are compared.
|
||||
path_patterns: A list of regular expressions to which the request
|
||||
URLs are compared.
|
||||
|
||||
callback (function): The handler for the request. Usually a Servlet
|
||||
callback: The handler for the request. Usually a Servlet
|
||||
|
||||
servlet_classname (str): The name of the handler to be used in prometheus
|
||||
servlet_classname: The name of the handler to be used in prometheus
|
||||
and opentracing logs.
|
||||
"""
|
||||
method = method.encode("utf-8") # method is bytes on py3
|
||||
method_bytes = method.encode("utf-8")
|
||||
|
||||
for path_pattern in path_patterns:
|
||||
logger.debug("Registering for %s %s", method, path_pattern.pattern)
|
||||
self.path_regexs.setdefault(method, []).append(
|
||||
self.path_regexs.setdefault(method_bytes, []).append(
|
||||
_PathEntry(path_pattern, callback, servlet_classname)
|
||||
)
|
||||
|
||||
@@ -427,7 +443,7 @@ class JsonResource(DirectServeJsonResource):
|
||||
# Huh. No one wanted to handle that? Fiiiiiine. Send 400.
|
||||
return _unrecognised_request_handler, "unrecognised_request_handler", {}
|
||||
|
||||
async def _async_render(self, request):
|
||||
async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
|
||||
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
|
||||
|
||||
# Make sure we have an appropriate name for this handler in prometheus
|
||||
@@ -468,7 +484,7 @@ class DirectServeHtmlResource(_AsyncResource):
|
||||
request: SynapseRequest,
|
||||
code: int,
|
||||
response_object: Any,
|
||||
):
|
||||
) -> None:
|
||||
"""Implements _AsyncResource._send_response"""
|
||||
# We expect to get bytes for us to write
|
||||
assert isinstance(response_object, bytes)
|
||||
@@ -492,12 +508,12 @@ class StaticResource(File):
|
||||
Differs from the File resource by adding clickjacking protection.
|
||||
"""
|
||||
|
||||
def render_GET(self, request: Request):
|
||||
def render_GET(self, request: Request) -> bytes:
|
||||
set_clickjacking_protection_headers(request)
|
||||
return super().render_GET(request)
|
||||
|
||||
|
||||
def _unrecognised_request_handler(request):
|
||||
def _unrecognised_request_handler(request: Request) -> NoReturn:
|
||||
"""Request handler for unrecognised requests
|
||||
|
||||
This is a request handler suitable for return from
|
||||
@@ -505,7 +521,7 @@ def _unrecognised_request_handler(request):
|
||||
UnrecognizedRequestError.
|
||||
|
||||
Args:
|
||||
request (twisted.web.http.Request):
|
||||
request: Unused, but passed in to match the signature of ServletCallback.
|
||||
"""
|
||||
raise UnrecognizedRequestError()
|
||||
|
||||
@@ -513,14 +529,14 @@ def _unrecognised_request_handler(request):
|
||||
class RootRedirect(resource.Resource):
|
||||
"""Redirects the root '/' path to another path."""
|
||||
|
||||
def __init__(self, path):
|
||||
def __init__(self, path: str):
|
||||
resource.Resource.__init__(self)
|
||||
self.url = path
|
||||
|
||||
def render_GET(self, request):
|
||||
def render_GET(self, request: Request) -> bytes:
|
||||
return redirectTo(self.url.encode("ascii"), request)
|
||||
|
||||
def getChild(self, name, request):
|
||||
def getChild(self, name: str, request: Request) -> resource.Resource:
|
||||
if len(name) == 0:
|
||||
return self # select ourselves as the child to render
|
||||
return resource.Resource.getChild(self, name, request)
|
||||
@@ -529,7 +545,7 @@ class RootRedirect(resource.Resource):
|
||||
class OptionsResource(resource.Resource):
|
||||
"""Responds to OPTION requests for itself and all children."""
|
||||
|
||||
def render_OPTIONS(self, request):
|
||||
def render_OPTIONS(self, request: Request) -> bytes:
|
||||
request.setResponseCode(204)
|
||||
request.setHeader(b"Content-Length", b"0")
|
||||
|
||||
@@ -537,7 +553,7 @@ class OptionsResource(resource.Resource):
|
||||
|
||||
return b""
|
||||
|
||||
def getChildWithDefault(self, path, request):
|
||||
def getChildWithDefault(self, path: str, request: Request) -> resource.Resource:
|
||||
if request.method == b"OPTIONS":
|
||||
return self # select ourselves as the child to render
|
||||
return resource.Resource.getChildWithDefault(self, path, request)
|
||||
@@ -649,7 +665,7 @@ def respond_with_json(
|
||||
json_object: Any,
|
||||
send_cors: bool = False,
|
||||
canonical_json: bool = True,
|
||||
):
|
||||
) -> Optional[int]:
|
||||
"""Sends encoded JSON in response to the given request.
|
||||
|
||||
Args:
|
||||
@@ -696,7 +712,7 @@ def respond_with_json_bytes(
|
||||
code: int,
|
||||
json_bytes: bytes,
|
||||
send_cors: bool = False,
|
||||
):
|
||||
) -> Optional[int]:
|
||||
"""Sends encoded JSON in response to the given request.
|
||||
|
||||
Args:
|
||||
@@ -713,7 +729,7 @@ def respond_with_json_bytes(
|
||||
logger.warning(
|
||||
"Not sending response to request %s, already disconnected.", request
|
||||
)
|
||||
return
|
||||
return None
|
||||
|
||||
request.setResponseCode(code)
|
||||
request.setHeader(b"Content-Type", b"application/json")
|
||||
@@ -731,7 +747,7 @@ async def _async_write_json_to_request_in_thread(
|
||||
request: SynapseRequest,
|
||||
json_encoder: Callable[[Any], bytes],
|
||||
json_object: Any,
|
||||
):
|
||||
) -> None:
|
||||
"""Encodes the given JSON object on a thread and then writes it to the
|
||||
request.
|
||||
|
||||
@@ -773,7 +789,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
|
||||
_ByteProducer(request, bytes_generator)
|
||||
|
||||
|
||||
def set_cors_headers(request: Request):
|
||||
def set_cors_headers(request: Request) -> None:
|
||||
"""Set the CORS headers so that javascript running in a web browsers can
|
||||
use this API
|
||||
|
||||
@@ -790,14 +806,14 @@ def set_cors_headers(request: Request):
|
||||
)
|
||||
|
||||
|
||||
def respond_with_html(request: Request, code: int, html: str):
|
||||
def respond_with_html(request: Request, code: int, html: str) -> None:
|
||||
"""
|
||||
Wraps `respond_with_html_bytes` by first encoding HTML from a str to UTF-8 bytes.
|
||||
"""
|
||||
respond_with_html_bytes(request, code, html.encode("utf-8"))
|
||||
|
||||
|
||||
def respond_with_html_bytes(request: Request, code: int, html_bytes: bytes):
|
||||
def respond_with_html_bytes(request: Request, code: int, html_bytes: bytes) -> None:
|
||||
"""
|
||||
Sends HTML (encoded as UTF-8 bytes) as the response to the given request.
|
||||
|
||||
@@ -815,7 +831,7 @@ def respond_with_html_bytes(request: Request, code: int, html_bytes: bytes):
|
||||
logger.warning(
|
||||
"Not sending response to request %s, already disconnected.", request
|
||||
)
|
||||
return
|
||||
return None
|
||||
|
||||
request.setResponseCode(code)
|
||||
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
|
||||
@@ -828,7 +844,7 @@ def respond_with_html_bytes(request: Request, code: int, html_bytes: bytes):
|
||||
finish_request(request)
|
||||
|
||||
|
||||
def set_clickjacking_protection_headers(request: Request):
|
||||
def set_clickjacking_protection_headers(request: Request) -> None:
|
||||
"""
|
||||
Set headers to guard against clickjacking of embedded content.
|
||||
|
||||
@@ -850,7 +866,7 @@ def respond_with_redirect(request: Request, url: bytes) -> None:
|
||||
finish_request(request)
|
||||
|
||||
|
||||
def finish_request(request: Request):
|
||||
def finish_request(request: Request) -> None:
|
||||
"""Finish writing the response to the request.
|
||||
|
||||
Twisted throws a RuntimeException if the connection closed before the
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
""" This module contains base REST classes for constructing REST servlets. """
|
||||
import logging
|
||||
from http import HTTPStatus
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Iterable,
|
||||
@@ -30,6 +31,7 @@ from typing_extensions import Literal
|
||||
from twisted.web.server import Request
|
||||
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.types import JsonDict, RoomAlias, RoomID
|
||||
from synapse.util import json_decoder
|
||||
|
||||
@@ -137,11 +139,15 @@ def parse_integer_from_args(
|
||||
return int(args[name_bytes][0])
|
||||
except Exception:
|
||||
message = "Query parameter %r must be an integer" % (name,)
|
||||
raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, message, errcode=Codes.INVALID_PARAM
|
||||
)
|
||||
else:
|
||||
if required:
|
||||
message = "Missing integer query parameter %r" % (name,)
|
||||
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM
|
||||
)
|
||||
else:
|
||||
return default
|
||||
|
||||
@@ -246,11 +252,15 @@ def parse_boolean_from_args(
|
||||
message = (
|
||||
"Boolean query parameter %r must be one of ['true', 'false']"
|
||||
) % (name,)
|
||||
raise SynapseError(400, message)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, message, errcode=Codes.INVALID_PARAM
|
||||
)
|
||||
else:
|
||||
if required:
|
||||
message = "Missing boolean query parameter %r" % (name,)
|
||||
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM
|
||||
)
|
||||
else:
|
||||
return default
|
||||
|
||||
@@ -313,7 +323,7 @@ def parse_bytes_from_args(
|
||||
return args[name_bytes][0]
|
||||
elif required:
|
||||
message = "Missing string query parameter %s" % (name,)
|
||||
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM)
|
||||
|
||||
return default
|
||||
|
||||
@@ -407,14 +417,16 @@ def _parse_string_value(
|
||||
try:
|
||||
value_str = value.decode(encoding)
|
||||
except ValueError:
|
||||
raise SynapseError(400, "Query parameter %r must be %s" % (name, encoding))
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, "Query parameter %r must be %s" % (name, encoding)
|
||||
)
|
||||
|
||||
if allowed_values is not None and value_str not in allowed_values:
|
||||
message = "Query parameter %r must be one of [%s]" % (
|
||||
name,
|
||||
", ".join(repr(v) for v in allowed_values),
|
||||
)
|
||||
raise SynapseError(400, message)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.INVALID_PARAM)
|
||||
else:
|
||||
return value_str
|
||||
|
||||
@@ -510,7 +522,9 @@ def parse_strings_from_args(
|
||||
else:
|
||||
if required:
|
||||
message = "Missing string query parameter %r" % (name,)
|
||||
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, message, errcode=Codes.MISSING_PARAM
|
||||
)
|
||||
|
||||
return default
|
||||
|
||||
@@ -638,7 +652,7 @@ def parse_json_value_from_request(
|
||||
try:
|
||||
content_bytes = request.content.read() # type: ignore
|
||||
except Exception:
|
||||
raise SynapseError(400, "Error reading JSON content.")
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Error reading JSON content.")
|
||||
|
||||
if not content_bytes and allow_empty_body:
|
||||
return None
|
||||
@@ -647,7 +661,9 @@ def parse_json_value_from_request(
|
||||
content = json_decoder.decode(content_bytes.decode("utf-8"))
|
||||
except Exception as e:
|
||||
logger.warning("Unable to parse JSON: %s (%s)", e, content_bytes)
|
||||
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, "Content not JSON.", errcode=Codes.NOT_JSON
|
||||
)
|
||||
|
||||
return content
|
||||
|
||||
@@ -673,7 +689,7 @@ def parse_json_object_from_request(
|
||||
|
||||
if not isinstance(content, dict):
|
||||
message = "Content must be a JSON object."
|
||||
raise SynapseError(400, message, errcode=Codes.BAD_JSON)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.BAD_JSON)
|
||||
|
||||
return content
|
||||
|
||||
@@ -685,7 +701,9 @@ def assert_params_in_dict(body: JsonDict, required: Iterable[str]) -> None:
|
||||
absent.append(k)
|
||||
|
||||
if len(absent) > 0:
|
||||
raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST, "Missing params: %r" % absent, Codes.MISSING_PARAM
|
||||
)
|
||||
|
||||
|
||||
class RestServlet:
|
||||
@@ -709,7 +727,7 @@ class RestServlet:
|
||||
into the appropriate HTTP response.
|
||||
"""
|
||||
|
||||
def register(self, http_server):
|
||||
def register(self, http_server: HttpServer) -> None:
|
||||
"""Register this servlet with the given HTTP server."""
|
||||
patterns = getattr(self, "PATTERNS", None)
|
||||
if patterns:
|
||||
@@ -758,10 +776,12 @@ class ResolveRoomIdMixin:
|
||||
resolved_room_id = room_id.to_string()
|
||||
else:
|
||||
raise SynapseError(
|
||||
400, "%s was not legal room ID or room alias" % (room_identifier,)
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"%s was not legal room ID or room alias" % (room_identifier,),
|
||||
)
|
||||
if not resolved_room_id:
|
||||
raise SynapseError(
|
||||
400, "Unknown room ID or room alias %s" % room_identifier
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Unknown room ID or room alias %s" % room_identifier,
|
||||
)
|
||||
return resolved_room_id, remote_room_hosts
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
from typing import Generator, Optional, Tuple, Union
|
||||
from typing import Any, Generator, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
@@ -66,9 +66,9 @@ class SynapseRequest(Request):
|
||||
self,
|
||||
channel: HTTPChannel,
|
||||
site: "SynapseSite",
|
||||
*args,
|
||||
*args: Any,
|
||||
max_request_body_size: int = 1024,
|
||||
**kw,
|
||||
**kw: Any,
|
||||
):
|
||||
super().__init__(channel, *args, **kw)
|
||||
self._max_request_body_size = max_request_body_size
|
||||
@@ -557,7 +557,7 @@ class SynapseSite(Site):
|
||||
proxied = config.http_options.x_forwarded
|
||||
request_class = XForwardedForRequest if proxied else SynapseRequest
|
||||
|
||||
def request_factory(channel, queued: bool) -> Request:
|
||||
def request_factory(channel: HTTPChannel, queued: bool) -> Request:
|
||||
return request_class(
|
||||
channel,
|
||||
self,
|
||||
|
||||
@@ -22,20 +22,33 @@ them.
|
||||
|
||||
See doc/log_contexts.rst for details on how this works.
|
||||
"""
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
overload,
|
||||
)
|
||||
|
||||
import attr
|
||||
from typing_extensions import Literal
|
||||
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.python.threadpool import ThreadPool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.logging.scopecontextmanager import _LogContextScope
|
||||
from synapse.types import ISynapseReactor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -66,7 +79,7 @@ except Exception:
|
||||
|
||||
|
||||
# a hook which can be set during testing to assert that we aren't abusing logcontexts.
|
||||
def logcontext_error(msg: str):
|
||||
def logcontext_error(msg: str) -> None:
|
||||
logger.warning(msg)
|
||||
|
||||
|
||||
@@ -223,22 +236,19 @@ class _Sentinel:
|
||||
def __str__(self) -> str:
|
||||
return "sentinel"
|
||||
|
||||
def copy_to(self, record):
|
||||
def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
|
||||
pass
|
||||
|
||||
def start(self, rusage: "Optional[resource.struct_rusage]"):
|
||||
def stop(self, rusage: "Optional[resource.struct_rusage]") -> None:
|
||||
pass
|
||||
|
||||
def stop(self, rusage: "Optional[resource.struct_rusage]"):
|
||||
def add_database_transaction(self, duration_sec: float) -> None:
|
||||
pass
|
||||
|
||||
def add_database_transaction(self, duration_sec):
|
||||
def add_database_scheduled(self, sched_sec: float) -> None:
|
||||
pass
|
||||
|
||||
def add_database_scheduled(self, sched_sec):
|
||||
pass
|
||||
|
||||
def record_event_fetch(self, event_count):
|
||||
def record_event_fetch(self, event_count: int) -> None:
|
||||
pass
|
||||
|
||||
def __bool__(self) -> Literal[False]:
|
||||
@@ -379,7 +389,12 @@ class LoggingContext:
|
||||
)
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback) -> None:
|
||||
def __exit__(
|
||||
self,
|
||||
type: Optional[Type[BaseException]],
|
||||
value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
"""Restore the logging context in thread local storage to the state it
|
||||
was before this context was entered.
|
||||
Returns:
|
||||
@@ -399,17 +414,6 @@ class LoggingContext:
|
||||
# recorded against the correct metrics.
|
||||
self.finished = True
|
||||
|
||||
def copy_to(self, record) -> None:
|
||||
"""Copy logging fields from this context to a log record or
|
||||
another LoggingContext
|
||||
"""
|
||||
|
||||
# we track the current request
|
||||
record.request = self.request
|
||||
|
||||
# we also track the current scope:
|
||||
record.scope = self.scope
|
||||
|
||||
def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
|
||||
"""
|
||||
Record that this logcontext is currently running.
|
||||
@@ -626,7 +630,12 @@ class PreserveLoggingContext:
|
||||
def __enter__(self) -> None:
|
||||
self._old_context = set_current_context(self._new_context)
|
||||
|
||||
def __exit__(self, type, value, traceback) -> None:
|
||||
def __exit__(
|
||||
self,
|
||||
type: Optional[Type[BaseException]],
|
||||
value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
context = set_current_context(self._old_context)
|
||||
|
||||
if context != self._new_context:
|
||||
@@ -711,16 +720,61 @@ def nested_logging_context(suffix: str) -> LoggingContext:
|
||||
)
|
||||
|
||||
|
||||
def preserve_fn(f):
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
@overload
|
||||
def preserve_fn( # type: ignore[misc]
|
||||
f: Callable[..., Awaitable[R]],
|
||||
) -> Callable[..., "defer.Deferred[R]"]:
|
||||
# The `type: ignore[misc]` above suppresses
|
||||
# "Overloaded function signatures 1 and 2 overlap with incompatible return types"
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def preserve_fn(f: Callable[..., R]) -> Callable[..., "defer.Deferred[R]"]:
|
||||
...
|
||||
|
||||
|
||||
def preserve_fn(
|
||||
f: Union[
|
||||
Callable[..., R],
|
||||
Callable[..., Awaitable[R]],
|
||||
]
|
||||
) -> Callable[..., "defer.Deferred[R]"]:
|
||||
"""Function decorator which wraps the function with run_in_background"""
|
||||
|
||||
def g(*args, **kwargs):
|
||||
def g(*args: Any, **kwargs: Any) -> "defer.Deferred[R]":
|
||||
return run_in_background(f, *args, **kwargs)
|
||||
|
||||
return g
|
||||
|
||||
|
||||
def run_in_background(f, *args, **kwargs) -> defer.Deferred:
|
||||
@overload
|
||||
def run_in_background( # type: ignore[misc]
|
||||
f: Callable[..., Awaitable[R]], *args: Any, **kwargs: Any
|
||||
) -> "defer.Deferred[R]":
|
||||
# The `type: ignore[misc]` above suppresses
|
||||
# "Overloaded function signatures 1 and 2 overlap with incompatible return types"
|
||||
...
|
||||
|
||||
|
||||
@overload
|
||||
def run_in_background(
|
||||
f: Callable[..., R], *args: Any, **kwargs: Any
|
||||
) -> "defer.Deferred[R]":
|
||||
...
|
||||
|
||||
|
||||
def run_in_background(
|
||||
f: Union[
|
||||
Callable[..., R],
|
||||
Callable[..., Awaitable[R]],
|
||||
],
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> "defer.Deferred[R]":
|
||||
"""Calls a function, ensuring that the current context is restored after
|
||||
return from the function, and that the sentinel context is set once the
|
||||
deferred returned by the function completes.
|
||||
@@ -751,6 +805,10 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred:
|
||||
# At this point we should have a Deferred, if not then f was a synchronous
|
||||
# function, wrap it in a Deferred for consistency.
|
||||
if not isinstance(res, defer.Deferred):
|
||||
# `res` is not a `Deferred` and not a `Coroutine`.
|
||||
# There are no other types of `Awaitable`s we expect to encounter in Synapse.
|
||||
assert not isinstance(res, Awaitable)
|
||||
|
||||
return defer.succeed(res)
|
||||
|
||||
if res.called and not res.paused:
|
||||
@@ -778,13 +836,14 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred:
|
||||
return res
|
||||
|
||||
|
||||
def make_deferred_yieldable(deferred):
|
||||
"""Given a deferred (or coroutine), make it follow the Synapse logcontext
|
||||
rules:
|
||||
T = TypeVar("T")
|
||||
|
||||
If the deferred has completed (or is not actually a Deferred), essentially
|
||||
does nothing (just returns another completed deferred with the
|
||||
result/failure).
|
||||
|
||||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed, essentially does nothing (just returns another
|
||||
completed deferred with the result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
@@ -792,16 +851,6 @@ def make_deferred_yieldable(deferred):
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if inspect.isawaitable(deferred):
|
||||
# If we're given a coroutine we convert it to a deferred so that we
|
||||
# run it and find out if it immediately finishes, it it does then we
|
||||
# don't need to fiddle with log contexts at all and can return
|
||||
# immediately.
|
||||
deferred = defer.ensureDeferred(deferred)
|
||||
|
||||
if not isinstance(deferred, defer.Deferred):
|
||||
return deferred
|
||||
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
@@ -823,7 +872,9 @@ def _set_context_cb(result: ResultT, context: LoggingContext) -> ResultT:
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(reactor, f, *args, **kwargs):
|
||||
def defer_to_thread(
|
||||
reactor: "ISynapseReactor", f: Callable[..., R], *args: Any, **kwargs: Any
|
||||
) -> "defer.Deferred[R]":
|
||||
"""
|
||||
Calls the function `f` using a thread from the reactor's default threadpool and
|
||||
returns the result as a Deferred.
|
||||
@@ -855,7 +906,13 @@ def defer_to_thread(reactor, f, *args, **kwargs):
|
||||
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
|
||||
|
||||
|
||||
def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
|
||||
def defer_to_threadpool(
|
||||
reactor: "ISynapseReactor",
|
||||
threadpool: ThreadPool,
|
||||
f: Callable[..., R],
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> "defer.Deferred[R]":
|
||||
"""
|
||||
A wrapper for twisted.internet.threads.deferToThreadpool, which handles
|
||||
logcontexts correctly.
|
||||
@@ -897,7 +954,7 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
|
||||
assert isinstance(curr_context, LoggingContext)
|
||||
parent_context = curr_context
|
||||
|
||||
def g():
|
||||
def g() -> R:
|
||||
with LoggingContext(str(curr_context), parent_context=parent_context):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import (
|
||||
Awaitable,
|
||||
Callable,
|
||||
@@ -44,7 +43,13 @@ from synapse.logging.opentracing import log_kv, start_active_span
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
PersistedEventPosition,
|
||||
RoomStreamToken,
|
||||
StreamToken,
|
||||
UserID,
|
||||
)
|
||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -178,7 +183,12 @@ class _NotifierUserStream:
|
||||
return _NotificationListener(self.notify_deferred.observe())
|
||||
|
||||
|
||||
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class EventStreamResult:
|
||||
events: List[Union[JsonDict, EventBase]]
|
||||
start_token: StreamToken
|
||||
end_token: StreamToken
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.events)
|
||||
|
||||
@@ -582,9 +592,12 @@ class Notifier:
|
||||
before_token: StreamToken, after_token: StreamToken
|
||||
) -> EventStreamResult:
|
||||
if after_token == before_token:
|
||||
return EventStreamResult([], (from_token, from_token))
|
||||
return EventStreamResult([], from_token, from_token)
|
||||
|
||||
events: List[EventBase] = []
|
||||
# The events fetched from each source are a JsonDict, EventBase, or
|
||||
# UserPresenceState, but see below for UserPresenceState being
|
||||
# converted to JsonDict.
|
||||
events: List[Union[JsonDict, EventBase]] = []
|
||||
end_token = from_token
|
||||
|
||||
for name, source in self.event_sources.sources.get_sources():
|
||||
@@ -623,7 +636,7 @@ class Notifier:
|
||||
events.extend(new_events)
|
||||
end_token = end_token.copy_and_replace(keyname, new_key)
|
||||
|
||||
return EventStreamResult(events, (from_token, end_token))
|
||||
return EventStreamResult(events, from_token, end_token)
|
||||
|
||||
user_id_for_stream = user.to_string()
|
||||
if is_peeking:
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
from typing import Dict
|
||||
|
||||
from synapse.api.constants import ReceiptTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
|
||||
from synapse.storage import Storage
|
||||
@@ -23,7 +24,7 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
|
||||
invites = await store.get_invited_rooms_for_local_user(user_id)
|
||||
joins = await store.get_rooms_for_user(user_id)
|
||||
|
||||
my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")
|
||||
my_receipts_by_room = await store.get_receipts_for_user(user_id, ReceiptTypes.READ)
|
||||
|
||||
badge = len(invites)
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ from synapse.push.pusher import PusherFactory
|
||||
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
|
||||
from synapse.types import JsonDict, RoomStreamToken
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.threepids import canonicalise_email
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
@@ -113,7 +114,9 @@ class PusherPool:
|
||||
"""
|
||||
|
||||
if kind == "email":
|
||||
email_owner = await self.store.get_user_id_by_threepid("email", pushkey)
|
||||
email_owner = await self.store.get_user_id_by_threepid(
|
||||
"email", canonicalise_email(pushkey)
|
||||
)
|
||||
if email_owner != user_id:
|
||||
raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator
|
||||
@@ -27,7 +27,12 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseSlavedStore(CacheInvalidationWorkerStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
self._cache_id_gen: Optional[
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
|
||||
@@ -25,7 +25,12 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class SlavedClientIpStore(BaseSlavedStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
|
||||
|
||||
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.devices import DeviceWorkerStore
|
||||
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
@@ -27,7 +27,12 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self.hs = hs
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
|
||||
from synapse.storage.databases.main.event_push_actions import (
|
||||
EventPushActionsWorkerStore,
|
||||
@@ -58,7 +58,12 @@ class SlavedEventStore(
|
||||
RelationsWorkerStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
events_max = self._stream_id_gen.get_current_token()
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.filtering import FilteringStore
|
||||
|
||||
from ._base import BaseSlavedStore
|
||||
@@ -24,7 +24,12 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class SlavedFilteringStore(BaseSlavedStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
# Filters are immutable so this cache doesn't need to be expired
|
||||
|
||||
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||
from synapse.replication.tcp.streams import GroupServerStream
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
|
||||
from synapse.storage.databases.main.group_server import GroupServerWorkerStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
@@ -26,7 +26,12 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
self.hs = hs
|
||||
|
||||
@@ -66,7 +66,6 @@ from synapse.rest.admin.rooms import (
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.space import RemoveSpaceMemberRestServlet
|
||||
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
|
||||
from synapse.rest.admin.username_available import UsernameAvailableRestServlet
|
||||
from synapse.rest.admin.users import (
|
||||
@@ -109,7 +108,7 @@ class VersionServlet(RestServlet):
|
||||
|
||||
class PurgeHistoryRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns(
|
||||
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
|
||||
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]*))?$"
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -196,7 +195,7 @@ class PurgeHistoryRestServlet(RestServlet):
|
||||
|
||||
|
||||
class PurgeHistoryStatusRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/purge_history_status/(?P<purge_id>[^/]+)")
|
||||
PATTERNS = admin_patterns("/purge_history_status/(?P<purge_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
@@ -268,7 +267,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||
RegistrationTokenRestServlet(hs).register(http_server)
|
||||
DestinationsRestServlet(hs).register(http_server)
|
||||
ListDestinationsRestServlet(hs).register(http_server)
|
||||
RemoveSpaceMemberRestServlet(hs).register(http_server)
|
||||
|
||||
# Some servlets only get registered for the main process.
|
||||
if hs.config.worker.worker_app is None:
|
||||
|
||||
@@ -22,7 +22,7 @@ from synapse.http.servlet import (
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
|
||||
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -41,8 +41,7 @@ class BackgroundUpdateEnabledRestServlet(RestServlet):
|
||||
self._data_stores = hs.get_datastores()
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
# We need to check that all configured databases have updates enabled.
|
||||
# (They *should* all be in sync.)
|
||||
@@ -51,8 +50,7 @@ class BackgroundUpdateEnabledRestServlet(RestServlet):
|
||||
return HTTPStatus.OK, {"enabled": enabled}
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
@@ -84,8 +82,7 @@ class BackgroundUpdateRestServlet(RestServlet):
|
||||
self._data_stores = hs.get_datastores()
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
# We need to check that all configured databases have updates enabled.
|
||||
# (They *should* all be in sync.)
|
||||
@@ -111,15 +108,14 @@ class BackgroundUpdateRestServlet(RestServlet):
|
||||
class BackgroundUpdateStartJobRestServlet(RestServlet):
|
||||
"""Allows to start specific background updates"""
|
||||
|
||||
PATTERNS = admin_patterns("/background_updates/start_job")
|
||||
PATTERNS = admin_patterns("/background_updates/start_job$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
self._store = hs.get_datastore()
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self._auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self._auth, requester.user)
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(body, ["job_name"])
|
||||
|
||||
@@ -42,10 +42,10 @@ class DeviceRestServlet(RestServlet):
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
self.store = hs.get_datastore()
|
||||
self.is_mine = hs.is_mine
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, user_id: str, device_id: str
|
||||
@@ -53,7 +53,7 @@ class DeviceRestServlet(RestServlet):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
@@ -63,6 +63,8 @@ class DeviceRestServlet(RestServlet):
|
||||
device = await self.device_handler.get_device(
|
||||
target_user.to_string(), device_id
|
||||
)
|
||||
if device is None:
|
||||
raise NotFoundError("No device found")
|
||||
return HTTPStatus.OK, device
|
||||
|
||||
async def on_DELETE(
|
||||
@@ -71,7 +73,7 @@ class DeviceRestServlet(RestServlet):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
@@ -87,7 +89,7 @@ class DeviceRestServlet(RestServlet):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
@@ -109,14 +111,10 @@ class DevicesRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
"""
|
||||
Args:
|
||||
hs: server
|
||||
"""
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
self.store = hs.get_datastore()
|
||||
self.is_mine = hs.is_mine
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
@@ -124,7 +122,7 @@ class DevicesRestServlet(RestServlet):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
@@ -144,10 +142,10 @@ class DeleteDevicesRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/delete_devices$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
self.store = hs.get_datastore()
|
||||
self.is_mine = hs.is_mine
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, user_id: str
|
||||
@@ -155,7 +153,7 @@ class DeleteDevicesRestServlet(RestServlet):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only lookup local users")
|
||||
|
||||
u = await self.store.get_user_by_id(target_user.to_string())
|
||||
|
||||
@@ -52,7 +52,6 @@ class EventReportsRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/event_reports$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@@ -115,7 +114,6 @@ class EventReportDetailRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/event_reports/(?P<report_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@ class DestinationsRestServlet(RestServlet):
|
||||
200 OK with details of a destination if success otherwise an error.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]+)$")
|
||||
PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
|
||||
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
|
||||
class DeleteGroupAdminRestServlet(RestServlet):
|
||||
"""Allows deleting of local groups"""
|
||||
|
||||
PATTERNS = admin_patterns("/delete_group/(?P<group_id>[^/]*)")
|
||||
PATTERNS = admin_patterns("/delete_group/(?P<group_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.group_server = hs.get_groups_server_handler()
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Tuple
|
||||
|
||||
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
|
||||
from synapse.api.errors import Codes, NotFoundError, SynapseError
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||
from synapse.http.site import SynapseRequest
|
||||
@@ -41,9 +41,9 @@ class QuarantineMediaInRoom(RestServlet):
|
||||
"""
|
||||
|
||||
PATTERNS = [
|
||||
*admin_patterns("/room/(?P<room_id>[^/]+)/media/quarantine$"),
|
||||
*admin_patterns("/room/(?P<room_id>[^/]*)/media/quarantine$"),
|
||||
# This path kept around for legacy reasons
|
||||
*admin_patterns("/quarantine_media/(?P<room_id>[^/]+)"),
|
||||
*admin_patterns("/quarantine_media/(?P<room_id>[^/]*)$"),
|
||||
]
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -71,7 +71,7 @@ class QuarantineMediaByUser(RestServlet):
|
||||
this server.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/user/(?P<user_id>[^/]+)/media/quarantine$")
|
||||
PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/media/quarantine$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -99,7 +99,7 @@ class QuarantineMediaByID(RestServlet):
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns(
|
||||
"/media/quarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
|
||||
"/media/quarantine/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$"
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -128,7 +128,7 @@ class UnquarantineMediaByID(RestServlet):
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns(
|
||||
"/media/unquarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
|
||||
"/media/unquarantine/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$"
|
||||
)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
@@ -138,8 +138,7 @@ class UnquarantineMediaByID(RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, server_name: str, media_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info(
|
||||
"Remove from quarantine local media by ID: %s/%s", server_name, media_id
|
||||
@@ -154,7 +153,7 @@ class UnquarantineMediaByID(RestServlet):
|
||||
class ProtectMediaByID(RestServlet):
|
||||
"""Protect local media from being quarantined."""
|
||||
|
||||
PATTERNS = admin_patterns("/media/protect/(?P<media_id>[^/]+)")
|
||||
PATTERNS = admin_patterns("/media/protect/(?P<media_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -163,8 +162,7 @@ class ProtectMediaByID(RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, media_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info("Protecting local media by ID: %s", media_id)
|
||||
|
||||
@@ -177,7 +175,7 @@ class ProtectMediaByID(RestServlet):
|
||||
class UnprotectMediaByID(RestServlet):
|
||||
"""Unprotect local media from being quarantined."""
|
||||
|
||||
PATTERNS = admin_patterns("/media/unprotect/(?P<media_id>[^/]+)")
|
||||
PATTERNS = admin_patterns("/media/unprotect/(?P<media_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -186,8 +184,7 @@ class UnprotectMediaByID(RestServlet):
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, media_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
logging.info("Unprotecting local media by ID: %s", media_id)
|
||||
|
||||
@@ -200,7 +197,7 @@ class UnprotectMediaByID(RestServlet):
|
||||
class ListMediaInRoom(RestServlet):
|
||||
"""Lists all of the media in a given room."""
|
||||
|
||||
PATTERNS = admin_patterns("/room/(?P<room_id>[^/]+)/media$")
|
||||
PATTERNS = admin_patterns("/room/(?P<room_id>[^/]*)/media$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -209,10 +206,7 @@ class ListMediaInRoom(RestServlet):
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
is_admin = await self.auth.is_server_admin(requester.user)
|
||||
if not is_admin:
|
||||
raise AuthError(HTTPStatus.FORBIDDEN, "You are not a server admin")
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
local_mxcs, remote_mxcs = await self.store.get_media_mxcs_in_room(room_id)
|
||||
|
||||
@@ -254,7 +248,7 @@ class PurgeMediaCacheRestServlet(RestServlet):
|
||||
class DeleteMediaByID(RestServlet):
|
||||
"""Delete local media by a given ID. Removes it from this server."""
|
||||
|
||||
PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)")
|
||||
PATTERNS = admin_patterns("/media/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -286,7 +280,7 @@ class DeleteMediaByDateSize(RestServlet):
|
||||
timestamp and size.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/delete$")
|
||||
PATTERNS = admin_patterns("/media/(?P<server_name>[^/]*)/delete$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
@@ -353,7 +347,7 @@ class UserMediaRestServlet(RestServlet):
|
||||
media that exist given for this user
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]+)/media$")
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/media$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.is_mine = hs.is_mine
|
||||
@@ -403,16 +397,7 @@ class UserMediaRestServlet(RestServlet):
|
||||
request,
|
||||
"order_by",
|
||||
default=MediaSortOrder.CREATED_TS.value,
|
||||
allowed_values=(
|
||||
MediaSortOrder.MEDIA_ID.value,
|
||||
MediaSortOrder.UPLOAD_NAME.value,
|
||||
MediaSortOrder.CREATED_TS.value,
|
||||
MediaSortOrder.LAST_ACCESS_TS.value,
|
||||
MediaSortOrder.MEDIA_LENGTH.value,
|
||||
MediaSortOrder.MEDIA_TYPE.value,
|
||||
MediaSortOrder.QUARANTINED_BY.value,
|
||||
MediaSortOrder.SAFE_FROM_QUARANTINE.value,
|
||||
),
|
||||
allowed_values=[sort_order.value for sort_order in MediaSortOrder],
|
||||
)
|
||||
direction = parse_string(
|
||||
request, "dir", default="f", allowed_values=("f", "b")
|
||||
@@ -470,16 +455,7 @@ class UserMediaRestServlet(RestServlet):
|
||||
request,
|
||||
"order_by",
|
||||
default=MediaSortOrder.CREATED_TS.value,
|
||||
allowed_values=(
|
||||
MediaSortOrder.MEDIA_ID.value,
|
||||
MediaSortOrder.UPLOAD_NAME.value,
|
||||
MediaSortOrder.CREATED_TS.value,
|
||||
MediaSortOrder.LAST_ACCESS_TS.value,
|
||||
MediaSortOrder.MEDIA_LENGTH.value,
|
||||
MediaSortOrder.MEDIA_TYPE.value,
|
||||
MediaSortOrder.QUARANTINED_BY.value,
|
||||
MediaSortOrder.SAFE_FROM_QUARANTINE.value,
|
||||
),
|
||||
allowed_values=[sort_order.value for sort_order in MediaSortOrder],
|
||||
)
|
||||
direction = parse_string(
|
||||
request, "dir", default="f", allowed_values=("f", "b")
|
||||
|
||||
@@ -70,7 +70,6 @@ class ListRegistrationTokensRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/registration_tokens$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@@ -109,7 +108,6 @@ class NewRegistrationTokenRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/registration_tokens/new$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
@@ -260,7 +258,6 @@ class RegistrationTokenRestServlet(RestServlet):
|
||||
PATTERNS = admin_patterns("/registration_tokens/(?P<token>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@@ -61,7 +61,7 @@ class RoomRestV2Servlet(RestServlet):
|
||||
If 'purge' is true, it will remove all traces of a room from the database.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)$", "v2")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
@@ -123,7 +123,7 @@ class RoomRestV2Servlet(RestServlet):
|
||||
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
|
||||
"""Get the status of the delete room background task."""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)/delete_status$", "v2")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/delete_status$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
@@ -160,7 +160,7 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
|
||||
class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
|
||||
"""Get the status of the delete room background task."""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/delete_status/(?P<delete_id>[^/]+)$", "v2")
|
||||
PATTERNS = admin_patterns("/rooms/delete_status/(?P<delete_id>[^/]*)$", "v2")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
@@ -193,35 +193,17 @@ class ListRoomRestServlet(RestServlet):
|
||||
self.admin_handler = hs.get_admin_handler()
|
||||
|
||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
# Extract query parameters
|
||||
start = parse_integer(request, "from", default=0)
|
||||
limit = parse_integer(request, "limit", default=100)
|
||||
order_by = parse_string(request, "order_by", default=RoomSortOrder.NAME.value)
|
||||
if order_by not in (
|
||||
RoomSortOrder.ALPHABETICAL.value,
|
||||
RoomSortOrder.SIZE.value,
|
||||
RoomSortOrder.NAME.value,
|
||||
RoomSortOrder.CANONICAL_ALIAS.value,
|
||||
RoomSortOrder.JOINED_MEMBERS.value,
|
||||
RoomSortOrder.JOINED_LOCAL_MEMBERS.value,
|
||||
RoomSortOrder.VERSION.value,
|
||||
RoomSortOrder.CREATOR.value,
|
||||
RoomSortOrder.ENCRYPTION.value,
|
||||
RoomSortOrder.FEDERATABLE.value,
|
||||
RoomSortOrder.PUBLIC.value,
|
||||
RoomSortOrder.JOIN_RULES.value,
|
||||
RoomSortOrder.GUEST_ACCESS.value,
|
||||
RoomSortOrder.HISTORY_VISIBILITY.value,
|
||||
RoomSortOrder.STATE_EVENTS.value,
|
||||
):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"Unknown value for order_by: %s" % (order_by,),
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
order_by = parse_string(
|
||||
request,
|
||||
"order_by",
|
||||
default=RoomSortOrder.NAME.value,
|
||||
allowed_values=[sort_order.value for sort_order in RoomSortOrder],
|
||||
)
|
||||
|
||||
search_term = parse_string(request, "search_term", encoding="utf-8")
|
||||
if search_term == "":
|
||||
@@ -292,10 +274,9 @@ class RoomRestServlet(RestServlet):
|
||||
TODO: Add on_POST to allow room creation without joining the room
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)$")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.room_shutdown_handler = hs.get_room_shutdown_handler()
|
||||
@@ -397,10 +378,9 @@ class RoomMembersRestServlet(RestServlet):
|
||||
Get members list of a room.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)/members")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/members$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@@ -424,10 +404,9 @@ class RoomStateRestServlet(RestServlet):
|
||||
Get full state within a room.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)/state")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/state$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
@@ -436,8 +415,7 @@ class RoomStateRestServlet(RestServlet):
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
ret = await self.store.get_room(room_id)
|
||||
if not ret:
|
||||
@@ -454,14 +432,14 @@ class RoomStateRestServlet(RestServlet):
|
||||
|
||||
class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
|
||||
|
||||
PATTERNS = admin_patterns("/join/(?P<room_identifier>[^/]*)")
|
||||
PATTERNS = admin_patterns("/join/(?P<room_identifier>[^/]*)$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.admin_handler = hs.get_admin_handler()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.is_mine = hs.is_mine
|
||||
|
||||
async def on_POST(
|
||||
self, request: SynapseRequest, room_identifier: str
|
||||
@@ -477,7 +455,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, RestServlet):
|
||||
assert_params_in_dict(content, ["user_id"])
|
||||
target_user = UserID.from_string(content["user_id"])
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
if not self.is_mine(target_user):
|
||||
raise SynapseError(
|
||||
HTTPStatus.BAD_REQUEST,
|
||||
"This endpoint can only be used with local users",
|
||||
@@ -542,11 +520,10 @@ class MakeRoomAdminRestServlet(ResolveRoomIdMixin, RestServlet):
|
||||
}
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/make_room_admin")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/make_room_admin$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
@@ -688,19 +665,17 @@ class ForwardExtremitiesRestServlet(ResolveRoomIdMixin, RestServlet):
|
||||
GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/forward_extremities")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/forward_extremities$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def on_DELETE(
|
||||
self, request: SynapseRequest, room_identifier: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
room_id, _ = await self.resolve_room_id(room_identifier)
|
||||
|
||||
@@ -710,8 +685,7 @@ class ForwardExtremitiesRestServlet(ResolveRoomIdMixin, RestServlet):
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_identifier: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
room_id, _ = await self.resolve_room_id(room_identifier)
|
||||
|
||||
@@ -793,7 +767,7 @@ class BlockRoomRestServlet(RestServlet):
|
||||
On GET: Get blocking status of room and user who has blocked this room.
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)/block$")
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/block$")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._auth = hs.get_auth()
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user