Compare commits
58 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f0c33a540 | |||
| ccdfa36131 | |||
| a1b6dea0b7 | |||
| 4b73488e81 | |||
| 54a6afeee3 | |||
| a78016dadf | |||
| 34efb4c604 | |||
| 00e97a7774 | |||
| ccb9616f26 | |||
| 2e537a0280 | |||
| 300d0d756a | |||
| fbd9de6d1f | |||
| 7fa1346f93 | |||
| 17b713850f | |||
| b685c5e7f1 | |||
| e54746bdf7 | |||
| 71c46652a2 | |||
| 73ed289bd2 | |||
| 93b61589b0 | |||
| cfcc4bfcaf | |||
| a737cc2713 | |||
| a64c29926e | |||
| 1baab20352 | |||
| 26837d5dbe | |||
| dd8da8c5f6 | |||
| 4937fe3d6b | |||
| e74bb96733 | |||
| e5b659e9e1 | |||
| a1ff1e967f | |||
| 4936fc59fc | |||
| cee4010f94 | |||
| e20f18a766 | |||
| fdf8346944 | |||
| 5b857b77f7 | |||
| 4a55d267ee | |||
| 2547d9d4d7 | |||
| 65fb3b2e25 | |||
| a71be9d62d | |||
| fe18882bb5 | |||
| e448dbbf5b | |||
| 69961c7e9f | |||
| a01605c136 | |||
| 6f7417c3db | |||
| 8965b6cfec | |||
| 930ba00971 | |||
| 056327457f | |||
| 28f255d5f3 | |||
| c177faf5a9 | |||
| 49c619a9a2 | |||
| da16d06301 | |||
| 0b77329fe2 | |||
| b52fb703f7 | |||
| e2c16edc78 | |||
| 2eb421b606 | |||
| 90ad4d443a | |||
| 85c0999bfb | |||
| c91045f56c | |||
| b849e46139 |
@@ -9,3 +9,8 @@ apt-get update
|
||||
apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev xmlsec1 zlib1g-dev tox
|
||||
|
||||
export LANG="C.UTF-8"
|
||||
|
||||
# Prevent virtualenv from auto-updating pip to an incompatible version
|
||||
export VIRTUALENV_NO_DOWNLOAD=1
|
||||
|
||||
exec tox -e py35-old,combine
|
||||
+35
-2
@@ -1,9 +1,42 @@
|
||||
Unreleased
|
||||
==========
|
||||
|
||||
Note that this release includes a change in Synapse to use Redis as a cache ─ as well as a pub/sub mechanism ─ if Redis support is enabled. No action is needed by server administrators, and we do not expect resource usage of the Redis instance to change dramatically.
|
||||
|
||||
|
||||
Synapse 1.26.0 (2021-01-27)
|
||||
===========================
|
||||
|
||||
This release brings a new schema version for Synapse and rolling back to a previous
|
||||
version is not trivial. Please review [UPGRADE.rst](UPGRADE.rst) for more details
|
||||
on these changes and for general upgrade guidance.
|
||||
|
||||
No significant changes since 1.26.0rc2.
|
||||
|
||||
|
||||
Synapse 1.26.0rc2 (2021-01-25)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix receipts and account data not being sent down sync. Introduced in v1.26.0rc1. ([\#9193](https://github.com/matrix-org/synapse/issues/9193), [\#9195](https://github.com/matrix-org/synapse/issues/9195))
|
||||
- Fix chain cover update to handle events with duplicate auth events. Introduced in v1.26.0rc1. ([\#9210](https://github.com/matrix-org/synapse/issues/9210))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Add an `oidc-` prefix to any `idp_id`s which are given in the `oidc_providers` configuration. ([\#9189](https://github.com/matrix-org/synapse/issues/9189))
|
||||
- Bump minimum `psycopg2` version to v2.8. ([\#9204](https://github.com/matrix-org/synapse/issues/9204))
|
||||
|
||||
|
||||
Synapse 1.26.0rc1 (2021-01-20)
|
||||
==============================
|
||||
|
||||
This release brings a new schema version for Synapse and rolling back to a previous
|
||||
version is not trivial. Please review [UPGRADE.rst](UPGRADE.rst) for more details
|
||||
on these changes and for general upgrade guidance.
|
||||
version is not trivial. Please review [UPGRADE.rst](UPGRADE.rst) for more details
|
||||
on these changes and for general upgrade guidance.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
+37
@@ -85,6 +85,43 @@ for example:
|
||||
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
|
||||
|
||||
Upgrading to v1.27.0
|
||||
====================
|
||||
|
||||
Changes to HTML templates
|
||||
-------------------------
|
||||
|
||||
The HTML templates for SSO and email notifications now have `Jinja2's autoescape <https://jinja.palletsprojects.com/en/2.11.x/api/#autoescaping>`_
|
||||
enabled for files ending in ``.html``, ``.htm``, and ``.xml``. If you hae customised
|
||||
these templates and see issues when viewing them you might need to update them.
|
||||
It is expected that most configurations will need no changes.
|
||||
|
||||
If you have customised the templates *names* for these templates it is recommended
|
||||
to verify they end in ``.html`` to ensure autoescape is enabled.
|
||||
|
||||
The above applies to the following templates:
|
||||
|
||||
* ``add_threepid.html``
|
||||
* ``add_threepid_failure.html``
|
||||
* ``add_threepid_success.html``
|
||||
* ``notice_expiry.html``
|
||||
* ``notice_expiry.html``
|
||||
* ``notif_mail.html`` (which, by default, includes ``room.html`` and ``notif.html``)
|
||||
* ``password_reset.html``
|
||||
* ``password_reset_confirmation.html``
|
||||
* ``password_reset_failure.html``
|
||||
* ``password_reset_success.html``
|
||||
* ``registration.html``
|
||||
* ``registration_failure.html``
|
||||
* ``registration_success.html``
|
||||
* ``sso_account_deactivated.html``
|
||||
* ``sso_auth_bad_user.html``
|
||||
* ``sso_auth_confirm.html``
|
||||
* ``sso_auth_success.html``
|
||||
* ``sso_error.html``
|
||||
* ``sso_login_idp_picker.html``
|
||||
* ``sso_redirect_confirm.html``
|
||||
|
||||
Upgrading to v1.26.0
|
||||
====================
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Add admin API for getting and deleting forward extremities for a room.
|
||||
@@ -0,0 +1 @@
|
||||
Fix spurious errors in logs when deleting a non-existant pusher.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where an internal server error was raised when attempting to preview an HTML document in an unknown character encoding.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a long-standing bug where invalid data could cause errors when calculating the presentable room name for push.
|
||||
@@ -0,0 +1 @@
|
||||
Add experimental support for allowing clients to pick an SSO Identity Provider ([MSC2858](https://github.com/matrix-org/matrix-doc/pull/2858).
|
||||
@@ -1 +0,0 @@
|
||||
Add an `oidc-` prefix to any `idp_id`s which are given in the `oidc_providers` configuration.
|
||||
@@ -1 +0,0 @@
|
||||
Fix receipts or account data not being sent down sync. Introduced in v1.26.0rc1.
|
||||
@@ -1 +0,0 @@
|
||||
Fix receipts or account data not being sent down sync. Introduced in v1.26.0rc1.
|
||||
@@ -0,0 +1 @@
|
||||
Precompute joined hosts and store in Redis.
|
||||
@@ -0,0 +1 @@
|
||||
The `service_url` parameter in `cas_config` is deprecated in favor of `public_baseurl`.
|
||||
@@ -0,0 +1 @@
|
||||
Clean-up template loading code.
|
||||
@@ -0,0 +1 @@
|
||||
Add an admin API endpoint for shadow-banning users.
|
||||
@@ -0,0 +1 @@
|
||||
Fix the Python 3.5 old dependencies build.
|
||||
@@ -0,0 +1 @@
|
||||
Fix bug where we sometimes didn't detect that Redis connections had died, causing workers to not see new data.
|
||||
@@ -0,0 +1 @@
|
||||
Update `isort` to v5.7.0 to bypass a bug where it would disagree with `black` about formatting.
|
||||
@@ -0,0 +1 @@
|
||||
Add type hints to handlers code.
|
||||
@@ -0,0 +1 @@
|
||||
Precompute joined hosts and store in Redis.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a bug where `None` was passed to Synapse modules instead of an empty dictionary if an empty module `config` block was provided in the homeserver config.
|
||||
@@ -0,0 +1 @@
|
||||
Add type hints to handlers code.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a bug in the `make_room_admin` admin API where it failed if the admin with the greatest power level was not in the room. Contributed by Pankaj Yadav.
|
||||
@@ -0,0 +1 @@
|
||||
Add ratelimited to 3PID `/requestToken` API.
|
||||
@@ -0,0 +1 @@
|
||||
Add notes on integrating with Facebook for SSO login.
|
||||
@@ -0,0 +1 @@
|
||||
Minor performance improvement during TLS handshake.
|
||||
Vendored
+6
-2
@@ -1,8 +1,12 @@
|
||||
matrix-synapse-py3 (1.25.0ubuntu1) UNRELEASED; urgency=medium
|
||||
matrix-synapse-py3 (1.26.0) stable; urgency=medium
|
||||
|
||||
[ Richard van der Hoff ]
|
||||
* Remove dependency on `python3-distutils`.
|
||||
|
||||
-- Richard van der Hoff <richard@matrix.org> Fri, 15 Jan 2021 12:44:19 +0000
|
||||
[ Synapse Packaging team ]
|
||||
* New synapse release 1.26.0.
|
||||
|
||||
-- Synapse Packaging team <packages@matrix.org> Wed, 27 Jan 2021 12:43:35 -0500
|
||||
|
||||
matrix-synapse-py3 (1.25.0) stable; urgency=medium
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
* [Response](#response)
|
||||
* [Undoing room shutdowns](#undoing-room-shutdowns)
|
||||
- [Make Room Admin API](#make-room-admin-api)
|
||||
- [Forward Extremities Admin API](#forward-extremities-admin-api)
|
||||
|
||||
# List Room API
|
||||
|
||||
@@ -511,3 +512,55 @@ optionally be specified, e.g.:
|
||||
"user_id": "@foo:example.com"
|
||||
}
|
||||
```
|
||||
|
||||
# Forward Extremities Admin API
|
||||
|
||||
Enables querying and deleting forward extremities from rooms. When a lot of forward
|
||||
extremities accumulate in a room, performance can become degraded. For details, see
|
||||
[#1760](https://github.com/matrix-org/synapse/issues/1760).
|
||||
|
||||
## Check for forward extremities
|
||||
|
||||
To check the status of forward extremities for a room:
|
||||
|
||||
```
|
||||
GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
|
||||
```
|
||||
|
||||
A response as follows will be returned:
|
||||
|
||||
```json
|
||||
{
|
||||
"count": 1,
|
||||
"results": [
|
||||
{
|
||||
"event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh",
|
||||
"state_group": 439,
|
||||
"depth": 123,
|
||||
"received_ts": 1611263016761
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Deleting forward extremities
|
||||
|
||||
**WARNING**: Please ensure you know what you're doing and have read
|
||||
the related issue [#1760](https://github.com/matrix-org/synapse/issues/1760).
|
||||
Under no situations should this API be executed as an automated maintenance task!
|
||||
|
||||
If a room has lots of forward extremities, the extra can be
|
||||
deleted as follows:
|
||||
|
||||
```
|
||||
DELETE /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
|
||||
```
|
||||
|
||||
A response as follows will be returned, indicating the amount of forward extremities
|
||||
that were deleted.
|
||||
|
||||
```json
|
||||
{
|
||||
"deleted": 1
|
||||
}
|
||||
```
|
||||
|
||||
@@ -760,3 +760,33 @@ The following fields are returned in the JSON response body:
|
||||
- ``total`` - integer - Number of pushers.
|
||||
|
||||
See also `Client-Server API Spec <https://matrix.org/docs/spec/client_server/latest#get-matrix-client-r0-pushers>`_
|
||||
|
||||
Shadow-banning users
|
||||
====================
|
||||
|
||||
Shadow-banning is a useful tool for moderating malicious or egregiously abusive users.
|
||||
A shadow-banned users receives successful responses to their client-server API requests,
|
||||
but the events are not propagated into rooms. This can be an effective tool as it
|
||||
(hopefully) takes longer for the user to realise they are being moderated before
|
||||
pivoting to another account.
|
||||
|
||||
Shadow-banning a user should be used as a tool of last resort and may lead to confusing
|
||||
or broken behaviour for the client. A shadow-banned user will not receive any
|
||||
notification and it is generally more appropriate to ban or kick abusive users.
|
||||
A shadow-banned user will be unable to contact anyone on the server.
|
||||
|
||||
The API is::
|
||||
|
||||
POST /_synapse/admin/v1/users/<user_id>/shadow_ban
|
||||
|
||||
To use it, you will need to authenticate by providing an ``access_token`` for a
|
||||
server admin: see `README.rst <README.rst>`_.
|
||||
|
||||
An empty JSON dict is returned.
|
||||
|
||||
**Parameters**
|
||||
|
||||
The following parameters should be set in the URL:
|
||||
|
||||
- ``user_id`` - The fully qualified MXID: for example, ``@user:server.com``. The user must
|
||||
be local.
|
||||
|
||||
+48
-5
@@ -44,7 +44,7 @@ as follows:
|
||||
|
||||
To enable the OpenID integration, you should then add a section to the `oidc_providers`
|
||||
setting in your configuration file (or uncomment one of the existing examples).
|
||||
See [sample_config.yaml](./sample_config.yaml) for some sample settings, as well as
|
||||
See [sample_config.yaml](./sample_config.yaml) for some sample settings, as well as
|
||||
the text below for example configurations for specific providers.
|
||||
|
||||
## Sample configs
|
||||
@@ -52,11 +52,11 @@ the text below for example configurations for specific providers.
|
||||
Here are a few configs for providers that should work with Synapse.
|
||||
|
||||
### Microsoft Azure Active Directory
|
||||
Azure AD can act as an OpenID Connect Provider. Register a new application under
|
||||
Azure AD can act as an OpenID Connect Provider. Register a new application under
|
||||
*App registrations* in the Azure AD management console. The RedirectURI for your
|
||||
application should point to your matrix server: `[synapse public baseurl]/_synapse/oidc/callback`
|
||||
|
||||
Go to *Certificates & secrets* and register a new client secret. Make note of your
|
||||
Go to *Certificates & secrets* and register a new client secret. Make note of your
|
||||
Directory (tenant) ID as it will be used in the Azure links.
|
||||
Edit your Synapse config file and change the `oidc_config` section:
|
||||
|
||||
@@ -118,7 +118,7 @@ oidc_providers:
|
||||
```
|
||||
### [Keycloak][keycloak-idp]
|
||||
|
||||
[Keycloak][keycloak-idp] is an opensource IdP maintained by Red Hat.
|
||||
[Keycloak][keycloak-idp] is an opensource IdP maintained by Red Hat.
|
||||
|
||||
Follow the [Getting Started Guide](https://www.keycloak.org/getting-started) to install Keycloak and set up a realm.
|
||||
|
||||
@@ -194,7 +194,7 @@ Synapse config:
|
||||
|
||||
```yaml
|
||||
oidc_providers:
|
||||
- idp_id: auth0
|
||||
- idp_id: auth0
|
||||
idp_name: Auth0
|
||||
issuer: "https://your-tier.eu.auth0.com/" # TO BE FILLED
|
||||
client_id: "your-client-id" # TO BE FILLED
|
||||
@@ -307,3 +307,46 @@ oidc_providers:
|
||||
localpart_template: '{{ user.nickname }}'
|
||||
display_name_template: '{{ user.name }}'
|
||||
```
|
||||
|
||||
### Facebook
|
||||
|
||||
Like Github, Facebook provide a custom OAuth2 API rather than an OIDC-compliant
|
||||
one so requires a little more configuration.
|
||||
|
||||
0. You will need a Facebook developer account. You can register for one
|
||||
[here](https://developers.facebook.com/async/registration/).
|
||||
1. On the [apps](https://developers.facebook.com/apps/) page of the developer
|
||||
console, "Create App", and choose "Build Connected Experiences".
|
||||
2. Once the app is created, add "Facebook Login" and choose "Web". You don't
|
||||
need to go through the whole form here.
|
||||
3. In the left-hand menu, open "Products"/"Facebook Login"/"Settings".
|
||||
* Add `[synapse public baseurl]/_synapse/oidc/callback` as an OAuth Redirect
|
||||
URL.
|
||||
4. In the left-hand menu, open "Settings/Basic". Here you can copy the "App ID"
|
||||
and "App Secret" for use below.
|
||||
|
||||
Synapse config:
|
||||
|
||||
```yaml
|
||||
- idp_id: facebook
|
||||
idp_name: Facebook
|
||||
idp_brand: "org.matrix.facebook" # optional: styling hint for clients
|
||||
discover: false
|
||||
issuer: "https://facebook.com"
|
||||
client_id: "your-client-id" # TO BE FILLED
|
||||
client_secret: "your-client-secret" # TO BE FILLED
|
||||
scopes: ["openid", "email"]
|
||||
authorization_endpoint: https://facebook.com/dialog/oauth
|
||||
token_endpoint: https://graph.facebook.com/v9.0/oauth/access_token
|
||||
user_profile_method: "userinfo_endpoint"
|
||||
userinfo_endpoint: "https://graph.facebook.com/v9.0/me?fields=id,name,email,picture"
|
||||
user_mapping_provider:
|
||||
config:
|
||||
subject_claim: "id"
|
||||
display_name_template: "{{ user.name }}"
|
||||
```
|
||||
|
||||
Relevant documents:
|
||||
* https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow
|
||||
* Using Facebook's Graph API: https://developers.facebook.com/docs/graph-api/using-graph-api/
|
||||
* Reference to the User endpoint: https://developers.facebook.com/docs/graph-api/reference/user
|
||||
|
||||
@@ -824,6 +824,7 @@ log_config: "CONFDIR/SERVERNAME.log.config"
|
||||
# users are joining rooms the server is already in (this is cheap) vs
|
||||
# "remote" for when users are trying to join rooms not on the server (which
|
||||
# can be more expensive)
|
||||
# - one for ratelimiting how often a user or IP can attempt to validate a 3PID.
|
||||
#
|
||||
# The defaults are as shown below.
|
||||
#
|
||||
@@ -857,7 +858,10 @@ log_config: "CONFDIR/SERVERNAME.log.config"
|
||||
# remote:
|
||||
# per_second: 0.01
|
||||
# burst_count: 3
|
||||
|
||||
#
|
||||
#rc_3pid_validation:
|
||||
# per_second: 0.003
|
||||
# burst_count: 5
|
||||
|
||||
# Ratelimiting settings for incoming federation
|
||||
#
|
||||
@@ -1878,10 +1882,6 @@ cas_config:
|
||||
#
|
||||
#server_url: "https://cas-server.com"
|
||||
|
||||
# The public URL of the homeserver.
|
||||
#
|
||||
#service_url: "https://homeserver.domain.com:8448"
|
||||
|
||||
# The attribute of the CAS response to use as the display name.
|
||||
#
|
||||
# If unset, no displayname will be set.
|
||||
|
||||
+4
-1
@@ -40,6 +40,9 @@ which relays replication commands between processes. This can give a significant
|
||||
cpu saving on the main process and will be a prerequisite for upcoming
|
||||
performance improvements.
|
||||
|
||||
If Redis support is enabled Synapse will use it as a shared cache, as well as a
|
||||
pub/sub mechanism.
|
||||
|
||||
See the [Architectural diagram](#architectural-diagram) section at the end for
|
||||
a visualisation of what this looks like.
|
||||
|
||||
@@ -271,7 +274,7 @@ using):
|
||||
Note that a HTTP listener with `client` and `federation` resources must be
|
||||
configured in the `worker_listeners` option in the worker config.
|
||||
|
||||
Ensure that all SSO logins go to a single process (usually the main process).
|
||||
Ensure that all SSO logins go to a single process (usually the main process).
|
||||
For multiple workers not handling the SSO endpoints properly, see
|
||||
[#7530](https://github.com/matrix-org/synapse/issues/7530).
|
||||
|
||||
|
||||
@@ -23,39 +23,7 @@ files =
|
||||
synapse/events/validator.py,
|
||||
synapse/events/spamcheck.py,
|
||||
synapse/federation,
|
||||
synapse/handlers/_base.py,
|
||||
synapse/handlers/account_data.py,
|
||||
synapse/handlers/account_validity.py,
|
||||
synapse/handlers/admin.py,
|
||||
synapse/handlers/appservice.py,
|
||||
synapse/handlers/auth.py,
|
||||
synapse/handlers/cas_handler.py,
|
||||
synapse/handlers/deactivate_account.py,
|
||||
synapse/handlers/device.py,
|
||||
synapse/handlers/devicemessage.py,
|
||||
synapse/handlers/directory.py,
|
||||
synapse/handlers/events.py,
|
||||
synapse/handlers/federation.py,
|
||||
synapse/handlers/identity.py,
|
||||
synapse/handlers/initial_sync.py,
|
||||
synapse/handlers/message.py,
|
||||
synapse/handlers/oidc_handler.py,
|
||||
synapse/handlers/pagination.py,
|
||||
synapse/handlers/password_policy.py,
|
||||
synapse/handlers/presence.py,
|
||||
synapse/handlers/profile.py,
|
||||
synapse/handlers/read_marker.py,
|
||||
synapse/handlers/receipts.py,
|
||||
synapse/handlers/register.py,
|
||||
synapse/handlers/room.py,
|
||||
synapse/handlers/room_list.py,
|
||||
synapse/handlers/room_member.py,
|
||||
synapse/handlers/room_member_worker.py,
|
||||
synapse/handlers/saml_handler.py,
|
||||
synapse/handlers/sso.py,
|
||||
synapse/handlers/sync.py,
|
||||
synapse/handlers/user_directory.py,
|
||||
synapse/handlers/ui_auth,
|
||||
synapse/handlers,
|
||||
synapse/http/client.py,
|
||||
synapse/http/federation/matrix_federation_agent.py,
|
||||
synapse/http/federation/well_known_resolver.py,
|
||||
@@ -194,3 +162,9 @@ ignore_missing_imports = True
|
||||
|
||||
[mypy-hiredis]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-josepy.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-txacme.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
@@ -96,7 +96,7 @@ CONDITIONAL_REQUIREMENTS["all"] = list(ALL_OPTIONAL_REQUIREMENTS)
|
||||
#
|
||||
# We pin black so that our tests don't start failing on new releases.
|
||||
CONDITIONAL_REQUIREMENTS["lint"] = [
|
||||
"isort==5.0.3",
|
||||
"isort==5.7.0",
|
||||
"black==19.10b0",
|
||||
"flake8-comprehensions",
|
||||
"flake8",
|
||||
|
||||
+19
-6
@@ -15,13 +15,23 @@
|
||||
|
||||
"""Contains *incomplete* type hints for txredisapi.
|
||||
"""
|
||||
|
||||
from typing import List, Optional, Type, Union
|
||||
from typing import Any, List, Optional, Type, Union
|
||||
|
||||
class RedisProtocol:
|
||||
def publish(self, channel: str, message: bytes): ...
|
||||
async def ping(self) -> None: ...
|
||||
async def set(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
expire: Optional[int] = None,
|
||||
pexpire: Optional[int] = None,
|
||||
only_if_not_exists: bool = False,
|
||||
only_if_exists: bool = False,
|
||||
) -> None: ...
|
||||
async def get(self, key: str) -> Any: ...
|
||||
|
||||
class SubscriberProtocol:
|
||||
class SubscriberProtocol(RedisProtocol):
|
||||
def __init__(self, *args, **kwargs): ...
|
||||
password: Optional[str]
|
||||
def subscribe(self, channels: Union[str, List[str]]): ...
|
||||
@@ -40,14 +50,13 @@ def lazyConnection(
|
||||
convertNumbers: bool = ...,
|
||||
) -> RedisProtocol: ...
|
||||
|
||||
class SubscriberFactory:
|
||||
def buildProtocol(self, addr): ...
|
||||
|
||||
class ConnectionHandler: ...
|
||||
|
||||
class RedisFactory:
|
||||
continueTrying: bool
|
||||
handler: RedisProtocol
|
||||
pool: List[RedisProtocol]
|
||||
replyTimeout: Optional[int]
|
||||
def __init__(
|
||||
self,
|
||||
uuid: str,
|
||||
@@ -60,3 +69,7 @@ class RedisFactory:
|
||||
replyTimeout: Optional[int] = None,
|
||||
convertNumbers: Optional[int] = True,
|
||||
): ...
|
||||
def buildProtocol(self, addr) -> RedisProtocol: ...
|
||||
|
||||
class SubscriberFactory(RedisFactory):
|
||||
def __init__(self): ...
|
||||
|
||||
+1
-1
@@ -48,7 +48,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.26.0rc1"
|
||||
__version__ = "1.26.0"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
+26
-16
@@ -203,11 +203,28 @@ class Config:
|
||||
with open(file_path) as file_stream:
|
||||
return file_stream.read()
|
||||
|
||||
def read_template(self, filename: str) -> jinja2.Template:
|
||||
"""Load a template file from disk.
|
||||
|
||||
This function will attempt to load the given template from the default Synapse
|
||||
template directory.
|
||||
|
||||
Files read are treated as Jinja templates. The templates is not rendered yet
|
||||
and has autoescape enabled.
|
||||
|
||||
Args:
|
||||
filename: A template filename to read.
|
||||
|
||||
Raises:
|
||||
ConfigError: if the file's path is incorrect or otherwise cannot be read.
|
||||
|
||||
Returns:
|
||||
A jinja2 template.
|
||||
"""
|
||||
return self.read_templates([filename])[0]
|
||||
|
||||
def read_templates(
|
||||
self,
|
||||
filenames: List[str],
|
||||
custom_template_directory: Optional[str] = None,
|
||||
autoescape: bool = False,
|
||||
self, filenames: List[str], custom_template_directory: Optional[str] = None,
|
||||
) -> List[jinja2.Template]:
|
||||
"""Load a list of template files from disk using the given variables.
|
||||
|
||||
@@ -215,7 +232,8 @@ class Config:
|
||||
template directory. If `custom_template_directory` is supplied, that directory
|
||||
is tried first.
|
||||
|
||||
Files read are treated as Jinja templates. These templates are not rendered yet.
|
||||
Files read are treated as Jinja templates. The templates are not rendered yet
|
||||
and have autoescape enabled.
|
||||
|
||||
Args:
|
||||
filenames: A list of template filenames to read.
|
||||
@@ -223,16 +241,12 @@ class Config:
|
||||
custom_template_directory: A directory to try to look for the templates
|
||||
before using the default Synapse template directory instead.
|
||||
|
||||
autoescape: Whether to autoescape variables before inserting them into the
|
||||
template.
|
||||
|
||||
Raises:
|
||||
ConfigError: if the file's path is incorrect or otherwise cannot be read.
|
||||
|
||||
Returns:
|
||||
A list of jinja2 templates.
|
||||
"""
|
||||
templates = []
|
||||
search_directories = [self.default_template_dir]
|
||||
|
||||
# The loader will first look in the custom template directory (if specified) for the
|
||||
@@ -249,7 +263,7 @@ class Config:
|
||||
search_directories.insert(0, custom_template_directory)
|
||||
|
||||
loader = jinja2.FileSystemLoader(search_directories)
|
||||
env = jinja2.Environment(loader=loader, autoescape=autoescape)
|
||||
env = jinja2.Environment(loader=loader, autoescape=jinja2.select_autoescape(),)
|
||||
|
||||
# Update the environment with our custom filters
|
||||
env.filters.update(
|
||||
@@ -259,12 +273,8 @@ class Config:
|
||||
}
|
||||
)
|
||||
|
||||
for filename in filenames:
|
||||
# Load the template
|
||||
template = env.get_template(filename)
|
||||
templates.append(template)
|
||||
|
||||
return templates
|
||||
# Load the templates
|
||||
return [env.get_template(filename) for filename in filenames]
|
||||
|
||||
|
||||
def _format_ts_filter(value: int, format: str):
|
||||
|
||||
@@ -9,6 +9,7 @@ from synapse.config import (
|
||||
consent_config,
|
||||
database,
|
||||
emailconfig,
|
||||
experimental,
|
||||
groups,
|
||||
jwt_config,
|
||||
key,
|
||||
@@ -18,6 +19,7 @@ from synapse.config import (
|
||||
password_auth_providers,
|
||||
push,
|
||||
ratelimiting,
|
||||
redis,
|
||||
registration,
|
||||
repository,
|
||||
room_directory,
|
||||
@@ -48,10 +50,11 @@ def path_exists(file_path: str): ...
|
||||
|
||||
class RootConfig:
|
||||
server: server.ServerConfig
|
||||
experimental: experimental.ExperimentalConfig
|
||||
tls: tls.TlsConfig
|
||||
database: database.DatabaseConfig
|
||||
logging: logger.LoggingConfig
|
||||
ratelimit: ratelimiting.RatelimitConfig
|
||||
ratelimiting: ratelimiting.RatelimitConfig
|
||||
media: repository.ContentRepositoryConfig
|
||||
captcha: captcha.CaptchaConfig
|
||||
voip: voip.VoipConfig
|
||||
@@ -79,6 +82,7 @@ class RootConfig:
|
||||
roomdirectory: room_directory.RoomDirectoryConfig
|
||||
thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig
|
||||
tracer: tracer.TracerConfig
|
||||
redis: redis.RedisConfig
|
||||
|
||||
config_classes: List = ...
|
||||
def __init__(self) -> None: ...
|
||||
|
||||
@@ -28,9 +28,7 @@ class CaptchaConfig(Config):
|
||||
"recaptcha_siteverify_api",
|
||||
"https://www.recaptcha.net/recaptcha/api/siteverify",
|
||||
)
|
||||
self.recaptcha_template = self.read_templates(
|
||||
["recaptcha.html"], autoescape=True
|
||||
)[0]
|
||||
self.recaptcha_template = self.read_template("recaptcha.html")
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
|
||||
@@ -30,7 +30,13 @@ class CasConfig(Config):
|
||||
|
||||
if self.cas_enabled:
|
||||
self.cas_server_url = cas_config["server_url"]
|
||||
self.cas_service_url = cas_config["service_url"]
|
||||
public_base_url = cas_config.get("service_url") or self.public_baseurl
|
||||
if public_base_url[-1] != "/":
|
||||
public_base_url += "/"
|
||||
# TODO Update this to a _synapse URL.
|
||||
self.cas_service_url = (
|
||||
public_base_url + "_matrix/client/r0/login/cas/ticket"
|
||||
)
|
||||
self.cas_displayname_attribute = cas_config.get("displayname_attribute")
|
||||
self.cas_required_attributes = cas_config.get("required_attributes") or {}
|
||||
else:
|
||||
@@ -53,10 +59,6 @@ class CasConfig(Config):
|
||||
#
|
||||
#server_url: "https://cas-server.com"
|
||||
|
||||
# The public URL of the homeserver.
|
||||
#
|
||||
#service_url: "https://homeserver.domain.com:8448"
|
||||
|
||||
# The attribute of the CAS response to use as the display name.
|
||||
#
|
||||
# If unset, no displayname will be set.
|
||||
|
||||
@@ -89,7 +89,7 @@ class ConsentConfig(Config):
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
consent_config = config.get("user_consent")
|
||||
self.terms_template = self.read_templates(["terms.html"], autoescape=True)[0]
|
||||
self.terms_template = self.read_template("terms.html")
|
||||
|
||||
if consent_config is None:
|
||||
return
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.config._base import Config
|
||||
from synapse.types import JsonDict
|
||||
|
||||
|
||||
class ExperimentalConfig(Config):
|
||||
"""Config section for enabling experimental features"""
|
||||
|
||||
section = "experimental"
|
||||
|
||||
def read_config(self, config: JsonDict, **kwargs):
|
||||
experimental = config.get("experimental_features") or {}
|
||||
|
||||
# MSC2858 (multiple SSO identity providers)
|
||||
self.msc2858_enabled = experimental.get("msc2858_enabled", False) # type: bool
|
||||
@@ -24,6 +24,7 @@ from .cas import CasConfig
|
||||
from .consent_config import ConsentConfig
|
||||
from .database import DatabaseConfig
|
||||
from .emailconfig import EmailConfig
|
||||
from .experimental import ExperimentalConfig
|
||||
from .federation import FederationConfig
|
||||
from .groups import GroupsConfig
|
||||
from .jwt_config import JWTConfig
|
||||
@@ -57,6 +58,7 @@ class HomeServerConfig(RootConfig):
|
||||
|
||||
config_classes = [
|
||||
ServerConfig,
|
||||
ExperimentalConfig,
|
||||
TlsConfig,
|
||||
FederationConfig,
|
||||
CacheConfig,
|
||||
|
||||
@@ -54,8 +54,7 @@ class OIDCConfig(Config):
|
||||
"Multiple OIDC providers have the idp_id %r." % idp_id
|
||||
)
|
||||
|
||||
public_baseurl = self.public_baseurl
|
||||
self.oidc_callback_url = public_baseurl + "_synapse/oidc/callback"
|
||||
self.oidc_callback_url = self.public_baseurl + "_synapse/oidc/callback"
|
||||
|
||||
@property
|
||||
def oidc_enabled(self) -> bool:
|
||||
|
||||
@@ -24,7 +24,7 @@ class RateLimitConfig:
|
||||
defaults={"per_second": 0.17, "burst_count": 3.0},
|
||||
):
|
||||
self.per_second = config.get("per_second", defaults["per_second"])
|
||||
self.burst_count = config.get("burst_count", defaults["burst_count"])
|
||||
self.burst_count = int(config.get("burst_count", defaults["burst_count"]))
|
||||
|
||||
|
||||
class FederationRateLimitConfig:
|
||||
@@ -102,6 +102,11 @@ class RatelimitConfig(Config):
|
||||
defaults={"per_second": 0.01, "burst_count": 3},
|
||||
)
|
||||
|
||||
self.rc_3pid_validation = RateLimitConfig(
|
||||
config.get("rc_3pid_validation") or {},
|
||||
defaults={"per_second": 0.003, "burst_count": 5},
|
||||
)
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
## Ratelimiting ##
|
||||
@@ -131,6 +136,7 @@ class RatelimitConfig(Config):
|
||||
# users are joining rooms the server is already in (this is cheap) vs
|
||||
# "remote" for when users are trying to join rooms not on the server (which
|
||||
# can be more expensive)
|
||||
# - one for ratelimiting how often a user or IP can attempt to validate a 3PID.
|
||||
#
|
||||
# The defaults are as shown below.
|
||||
#
|
||||
@@ -164,7 +170,10 @@ class RatelimitConfig(Config):
|
||||
# remote:
|
||||
# per_second: 0.01
|
||||
# burst_count: 3
|
||||
|
||||
#
|
||||
#rc_3pid_validation:
|
||||
# per_second: 0.003
|
||||
# burst_count: 5
|
||||
|
||||
# Ratelimiting settings for incoming federation
|
||||
#
|
||||
|
||||
@@ -176,9 +176,7 @@ class RegistrationConfig(Config):
|
||||
self.session_lifetime = session_lifetime
|
||||
|
||||
# The success template used during fallback auth.
|
||||
self.fallback_success_template = self.read_templates(
|
||||
["auth_success.html"], autoescape=True
|
||||
)[0]
|
||||
self.fallback_success_template = self.read_template("auth_success.html")
|
||||
|
||||
def generate_config_section(self, generate_secrets=False, **kwargs):
|
||||
if generate_secrets:
|
||||
|
||||
@@ -125,19 +125,24 @@ class FederationPolicyForHTTPS:
|
||||
self._no_verify_ssl_context = _no_verify_ssl.getContext()
|
||||
self._no_verify_ssl_context.set_info_callback(_context_info_cb)
|
||||
|
||||
def get_options(self, host: bytes):
|
||||
self._should_verify = self._config.federation_verify_certificates
|
||||
|
||||
self._federation_certificate_verification_whitelist = (
|
||||
self._config.federation_certificate_verification_whitelist
|
||||
)
|
||||
|
||||
def get_options(self, host: bytes):
|
||||
# IPolicyForHTTPS.get_options takes bytes, but we want to compare
|
||||
# against the str whitelist. The hostnames in the whitelist are already
|
||||
# IDNA-encoded like the hosts will be here.
|
||||
ascii_host = host.decode("ascii")
|
||||
|
||||
# Check if certificate verification has been enabled
|
||||
should_verify = self._config.federation_verify_certificates
|
||||
should_verify = self._should_verify
|
||||
|
||||
# Check if we've disabled certificate verification for this host
|
||||
if should_verify:
|
||||
for regex in self._config.federation_certificate_verification_whitelist:
|
||||
if self._should_verify:
|
||||
for regex in self._federation_certificate_verification_whitelist:
|
||||
if regex.match(ascii_host):
|
||||
should_verify = False
|
||||
break
|
||||
|
||||
@@ -142,6 +142,8 @@ class FederationSender:
|
||||
self._wake_destinations_needing_catchup,
|
||||
)
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
|
||||
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
|
||||
"""Get or create a PerDestinationQueue for the given destination
|
||||
|
||||
@@ -197,22 +199,40 @@ class FederationSender:
|
||||
if not event.internal_metadata.should_proactively_send():
|
||||
return
|
||||
|
||||
try:
|
||||
# Get the state from before the event.
|
||||
# We need to make sure that this is the state from before
|
||||
# the event and not from after it.
|
||||
# Otherwise if the last member on a server in a room is
|
||||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = await self.state.get_hosts_in_room_at_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
destinations = None # type: Optional[Set[str]]
|
||||
if not event.prev_event_ids():
|
||||
# If there are no prev event IDs then the state is empty
|
||||
# and so no remote servers in the room
|
||||
destinations = set()
|
||||
else:
|
||||
# We check the external cache for the destinations, which is
|
||||
# stored per state group.
|
||||
|
||||
sg = await self._external_cache.get(
|
||||
"event_to_prev_state_group", event.event_id
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to calculate hosts in room for event: %s",
|
||||
event.event_id,
|
||||
)
|
||||
return
|
||||
if sg:
|
||||
destinations = await self._external_cache.get(
|
||||
"get_joined_hosts", str(sg)
|
||||
)
|
||||
|
||||
if destinations is None:
|
||||
try:
|
||||
# Get the state from before the event.
|
||||
# We need to make sure that this is the state from before
|
||||
# the event and not from after it.
|
||||
# Otherwise if the last member on a server in a room is
|
||||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = await self.state.get_hosts_in_room_at_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to calculate hosts in room for event: %s",
|
||||
event.event_id,
|
||||
)
|
||||
return
|
||||
|
||||
destinations = {
|
||||
d
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import twisted
|
||||
import twisted.internet.error
|
||||
@@ -22,6 +23,9 @@ from twisted.web.resource import Resource
|
||||
|
||||
from synapse.app import check_bind_error
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ACME_REGISTER_FAIL_ERROR = """
|
||||
@@ -35,12 +39,12 @@ solutions, please read https://github.com/matrix-org/synapse/blob/master/docs/AC
|
||||
|
||||
|
||||
class AcmeHandler:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.reactor = hs.get_reactor()
|
||||
self._acme_domain = hs.config.acme_domain
|
||||
|
||||
async def start_listening(self):
|
||||
async def start_listening(self) -> None:
|
||||
from synapse.handlers import acme_issuing_service
|
||||
|
||||
# Configure logging for txacme, if you need to debug
|
||||
@@ -85,7 +89,7 @@ class AcmeHandler:
|
||||
logger.error(ACME_REGISTER_FAIL_ERROR)
|
||||
raise
|
||||
|
||||
async def provision_certificate(self):
|
||||
async def provision_certificate(self) -> None:
|
||||
|
||||
logger.warning("Reprovisioning %s", self._acme_domain)
|
||||
|
||||
@@ -110,5 +114,3 @@ class AcmeHandler:
|
||||
except Exception:
|
||||
logger.exception("Failed saving!")
|
||||
raise
|
||||
|
||||
return True
|
||||
|
||||
@@ -22,8 +22,10 @@ only need (and may only have available) if we are doing ACME, so is designed to
|
||||
imported conditionally.
|
||||
"""
|
||||
import logging
|
||||
from typing import Dict, Iterable, List
|
||||
|
||||
import attr
|
||||
import pem
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from josepy import JWKRSA
|
||||
@@ -36,20 +38,27 @@ from txacme.util import generate_private_key
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IReactorTCP
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.url import URL
|
||||
from twisted.web.resource import IResource
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_issuing_service(reactor, acme_url, account_key_file, well_known_resource):
|
||||
def create_issuing_service(
|
||||
reactor: IReactorTCP,
|
||||
acme_url: str,
|
||||
account_key_file: str,
|
||||
well_known_resource: IResource,
|
||||
) -> AcmeIssuingService:
|
||||
"""Create an ACME issuing service, and attach it to a web Resource
|
||||
|
||||
Args:
|
||||
reactor: twisted reactor
|
||||
acme_url (str): URL to use to request certificates
|
||||
account_key_file (str): where to store the account key
|
||||
well_known_resource (twisted.web.IResource): web resource for .well-known.
|
||||
acme_url: URL to use to request certificates
|
||||
account_key_file: where to store the account key
|
||||
well_known_resource: web resource for .well-known.
|
||||
we will attach a child resource for "acme-challenge".
|
||||
|
||||
Returns:
|
||||
@@ -83,18 +92,20 @@ class ErsatzStore:
|
||||
A store that only stores in memory.
|
||||
"""
|
||||
|
||||
certs = attr.ib(default=attr.Factory(dict))
|
||||
certs = attr.ib(type=Dict[bytes, List[bytes]], default=attr.Factory(dict))
|
||||
|
||||
def store(self, server_name, pem_objects):
|
||||
def store(
|
||||
self, server_name: bytes, pem_objects: Iterable[pem.AbstractPEMObject]
|
||||
) -> defer.Deferred:
|
||||
self.certs[server_name] = [o.as_bytes() for o in pem_objects]
|
||||
return defer.succeed(None)
|
||||
|
||||
|
||||
def load_or_create_client_key(key_file):
|
||||
def load_or_create_client_key(key_file: str) -> JWKRSA:
|
||||
"""Load the ACME account key from a file, creating it if it does not exist.
|
||||
|
||||
Args:
|
||||
key_file (str): name of the file to use as the account key
|
||||
key_file: name of the file to use as the account key
|
||||
"""
|
||||
# this is based on txacme.endpoint.load_or_create_client_key, but doesn't
|
||||
# hardcode the 'client.key' filename
|
||||
|
||||
@@ -99,11 +99,7 @@ class CasHandler:
|
||||
Returns:
|
||||
The URL to use as a "service" parameter.
|
||||
"""
|
||||
return "%s%s?%s" % (
|
||||
self._cas_service_url,
|
||||
"/_matrix/client/r0/login/cas/ticket",
|
||||
urllib.parse.urlencode(args),
|
||||
)
|
||||
return "%s?%s" % (self._cas_service_url, urllib.parse.urlencode(args),)
|
||||
|
||||
async def _validate_ticket(
|
||||
self, ticket: str, service_args: Dict[str, str]
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from synapse.api import errors
|
||||
from synapse.api.constants import EventTypes
|
||||
@@ -62,7 +62,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
|
||||
@trace
|
||||
async def get_devices_by_user(self, user_id: str) -> List[Dict[str, Any]]:
|
||||
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
|
||||
"""
|
||||
Retrieve the given user's devices
|
||||
|
||||
@@ -85,7 +85,7 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
return devices
|
||||
|
||||
@trace
|
||||
async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:
|
||||
async def get_device(self, user_id: str, device_id: str) -> JsonDict:
|
||||
""" Retrieve the given device
|
||||
|
||||
Args:
|
||||
@@ -598,7 +598,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
|
||||
|
||||
def _update_device_from_client_ips(
|
||||
device: Dict[str, Any], client_ips: Dict[Tuple[str, str], Dict[str, Any]]
|
||||
device: JsonDict, client_ips: Dict[Tuple[str, str], JsonDict]
|
||||
) -> None:
|
||||
ip = client_ips.get((device["user_id"], device["device_id"]), {})
|
||||
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
|
||||
@@ -946,8 +946,8 @@ class DeviceListUpdater:
|
||||
async def process_cross_signing_key_update(
|
||||
self,
|
||||
user_id: str,
|
||||
master_key: Optional[Dict[str, Any]],
|
||||
self_signing_key: Optional[Dict[str, Any]],
|
||||
master_key: Optional[JsonDict],
|
||||
self_signing_key: Optional[JsonDict],
|
||||
) -> List[str]:
|
||||
"""Process the given new master and self-signing key for the given remote user.
|
||||
|
||||
|
||||
+129
-94
@@ -16,7 +16,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
from canonicaljson import encode_canonical_json
|
||||
@@ -31,6 +31,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
|
||||
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
UserID,
|
||||
get_domain_from_id,
|
||||
get_verify_key_from_cross_signing_key,
|
||||
@@ -40,11 +41,14 @@ from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class E2eKeysHandler:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
self.federation = hs.get_federation_client()
|
||||
self.device_handler = hs.get_device_handler()
|
||||
@@ -78,7 +82,9 @@ class E2eKeysHandler:
|
||||
)
|
||||
|
||||
@trace
|
||||
async def query_devices(self, query_body, timeout, from_user_id):
|
||||
async def query_devices(
|
||||
self, query_body: JsonDict, timeout: int, from_user_id: str
|
||||
) -> JsonDict:
|
||||
""" Handle a device key query from a client
|
||||
|
||||
{
|
||||
@@ -98,12 +104,14 @@ class E2eKeysHandler:
|
||||
}
|
||||
|
||||
Args:
|
||||
from_user_id (str): the user making the query. This is used when
|
||||
from_user_id: the user making the query. This is used when
|
||||
adding cross-signing signatures to limit what signatures users
|
||||
can see.
|
||||
"""
|
||||
|
||||
device_keys_query = query_body.get("device_keys", {})
|
||||
device_keys_query = query_body.get(
|
||||
"device_keys", {}
|
||||
) # type: Dict[str, Iterable[str]]
|
||||
|
||||
# separate users by domain.
|
||||
# make a map from domain to user_id to device_ids
|
||||
@@ -121,7 +129,8 @@ class E2eKeysHandler:
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
# First get local devices.
|
||||
failures = {}
|
||||
# A map of destination -> failure response.
|
||||
failures = {} # type: Dict[str, JsonDict]
|
||||
results = {}
|
||||
if local_query:
|
||||
local_result = await self.query_local_devices(local_query)
|
||||
@@ -135,9 +144,10 @@ class E2eKeysHandler:
|
||||
)
|
||||
|
||||
# Now attempt to get any remote devices from our local cache.
|
||||
remote_queries_not_in_cache = {}
|
||||
# A map of destination -> user ID -> device IDs.
|
||||
remote_queries_not_in_cache = {} # type: Dict[str, Dict[str, Iterable[str]]]
|
||||
if remote_queries:
|
||||
query_list = []
|
||||
query_list = [] # type: List[Tuple[str, Optional[str]]]
|
||||
for user_id, device_ids in remote_queries.items():
|
||||
if device_ids:
|
||||
query_list.extend((user_id, device_id) for device_id in device_ids)
|
||||
@@ -284,15 +294,15 @@ class E2eKeysHandler:
|
||||
return ret
|
||||
|
||||
async def get_cross_signing_keys_from_cache(
|
||||
self, query, from_user_id
|
||||
self, query: Iterable[str], from_user_id: Optional[str]
|
||||
) -> Dict[str, Dict[str, dict]]:
|
||||
"""Get cross-signing keys for users from the database
|
||||
|
||||
Args:
|
||||
query (Iterable[string]) an iterable of user IDs. A dict whose keys
|
||||
query: an iterable of user IDs. A dict whose keys
|
||||
are user IDs satisfies this, so the query format used for
|
||||
query_devices can be used here.
|
||||
from_user_id (str): the user making the query. This is used when
|
||||
from_user_id: the user making the query. This is used when
|
||||
adding cross-signing signatures to limit what signatures users
|
||||
can see.
|
||||
|
||||
@@ -315,14 +325,12 @@ class E2eKeysHandler:
|
||||
if "self_signing" in user_info:
|
||||
self_signing_keys[user_id] = user_info["self_signing"]
|
||||
|
||||
if (
|
||||
from_user_id in keys
|
||||
and keys[from_user_id] is not None
|
||||
and "user_signing" in keys[from_user_id]
|
||||
):
|
||||
# users can see other users' master and self-signing keys, but can
|
||||
# only see their own user-signing keys
|
||||
user_signing_keys[from_user_id] = keys[from_user_id]["user_signing"]
|
||||
# users can see other users' master and self-signing keys, but can
|
||||
# only see their own user-signing keys
|
||||
if from_user_id:
|
||||
from_user_key = keys.get(from_user_id)
|
||||
if from_user_key and "user_signing" in from_user_key:
|
||||
user_signing_keys[from_user_id] = from_user_key["user_signing"]
|
||||
|
||||
return {
|
||||
"master_keys": master_keys,
|
||||
@@ -344,9 +352,9 @@ class E2eKeysHandler:
|
||||
A map from user_id -> device_id -> device details
|
||||
"""
|
||||
set_tag("local_query", query)
|
||||
local_query = []
|
||||
local_query = [] # type: List[Tuple[str, Optional[str]]]
|
||||
|
||||
result_dict = {}
|
||||
result_dict = {} # type: Dict[str, Dict[str, dict]]
|
||||
for user_id, device_ids in query.items():
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
@@ -380,10 +388,14 @@ class E2eKeysHandler:
|
||||
log_kv(results)
|
||||
return result_dict
|
||||
|
||||
async def on_federation_query_client_keys(self, query_body):
|
||||
async def on_federation_query_client_keys(
|
||||
self, query_body: Dict[str, Dict[str, Optional[List[str]]]]
|
||||
) -> JsonDict:
|
||||
""" Handle a device key query from a federated server
|
||||
"""
|
||||
device_keys_query = query_body.get("device_keys", {})
|
||||
device_keys_query = query_body.get(
|
||||
"device_keys", {}
|
||||
) # type: Dict[str, Optional[List[str]]]
|
||||
res = await self.query_local_devices(device_keys_query)
|
||||
ret = {"device_keys": res}
|
||||
|
||||
@@ -397,31 +409,34 @@ class E2eKeysHandler:
|
||||
return ret
|
||||
|
||||
@trace
|
||||
async def claim_one_time_keys(self, query, timeout):
|
||||
local_query = []
|
||||
remote_queries = {}
|
||||
async def claim_one_time_keys(
|
||||
self, query: Dict[str, Dict[str, Dict[str, str]]], timeout: int
|
||||
) -> JsonDict:
|
||||
local_query = [] # type: List[Tuple[str, str, str]]
|
||||
remote_queries = {} # type: Dict[str, Dict[str, Dict[str, str]]]
|
||||
|
||||
for user_id, device_keys in query.get("one_time_keys", {}).items():
|
||||
for user_id, one_time_keys in query.get("one_time_keys", {}).items():
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if self.is_mine(UserID.from_string(user_id)):
|
||||
for device_id, algorithm in device_keys.items():
|
||||
for device_id, algorithm in one_time_keys.items():
|
||||
local_query.append((user_id, device_id, algorithm))
|
||||
else:
|
||||
domain = get_domain_from_id(user_id)
|
||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
|
||||
|
||||
set_tag("local_key_query", local_query)
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
results = await self.store.claim_e2e_one_time_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
failures = {}
|
||||
# A map of user ID -> device ID -> key ID -> key.
|
||||
json_result = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
|
||||
failures = {} # type: Dict[str, JsonDict]
|
||||
for user_id, device_keys in results.items():
|
||||
for device_id, keys in device_keys.items():
|
||||
for key_id, json_bytes in keys.items():
|
||||
for key_id, json_str in keys.items():
|
||||
json_result.setdefault(user_id, {})[device_id] = {
|
||||
key_id: json_decoder.decode(json_bytes)
|
||||
key_id: json_decoder.decode(json_str)
|
||||
}
|
||||
|
||||
@trace
|
||||
@@ -468,7 +483,9 @@ class E2eKeysHandler:
|
||||
return {"one_time_keys": json_result, "failures": failures}
|
||||
|
||||
@tag_args
|
||||
async def upload_keys_for_user(self, user_id, device_id, keys):
|
||||
async def upload_keys_for_user(
|
||||
self, user_id: str, device_id: str, keys: JsonDict
|
||||
) -> JsonDict:
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
@@ -543,8 +560,8 @@ class E2eKeysHandler:
|
||||
return {"one_time_key_counts": result}
|
||||
|
||||
async def _upload_one_time_keys_for_user(
|
||||
self, user_id, device_id, time_now, one_time_keys
|
||||
):
|
||||
self, user_id: str, device_id: str, time_now: int, one_time_keys: JsonDict
|
||||
) -> None:
|
||||
logger.info(
|
||||
"Adding one_time_keys %r for device %r for user %r at %d",
|
||||
one_time_keys.keys(),
|
||||
@@ -585,12 +602,14 @@ class E2eKeysHandler:
|
||||
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
|
||||
await self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||
|
||||
async def upload_signing_keys_for_user(self, user_id, keys):
|
||||
async def upload_signing_keys_for_user(
|
||||
self, user_id: str, keys: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Upload signing keys for cross-signing
|
||||
|
||||
Args:
|
||||
user_id (string): the user uploading the keys
|
||||
keys (dict[string, dict]): the signing keys
|
||||
user_id: the user uploading the keys
|
||||
keys: the signing keys
|
||||
"""
|
||||
|
||||
# if a master key is uploaded, then check it. Otherwise, load the
|
||||
@@ -667,16 +686,17 @@ class E2eKeysHandler:
|
||||
|
||||
return {}
|
||||
|
||||
async def upload_signatures_for_device_keys(self, user_id, signatures):
|
||||
async def upload_signatures_for_device_keys(
|
||||
self, user_id: str, signatures: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Upload device signatures for cross-signing
|
||||
|
||||
Args:
|
||||
user_id (string): the user uploading the signatures
|
||||
signatures (dict[string, dict[string, dict]]): map of users to
|
||||
devices to signed keys. This is the submission from the user; an
|
||||
exception will be raised if it is malformed.
|
||||
user_id: the user uploading the signatures
|
||||
signatures: map of users to devices to signed keys. This is the submission
|
||||
from the user; an exception will be raised if it is malformed.
|
||||
Returns:
|
||||
dict: response to be sent back to the client. The response will have
|
||||
The response to be sent back to the client. The response will have
|
||||
a "failures" key, which will be a dict mapping users to devices
|
||||
to errors for the signatures that failed.
|
||||
Raises:
|
||||
@@ -719,7 +739,9 @@ class E2eKeysHandler:
|
||||
|
||||
return {"failures": failures}
|
||||
|
||||
async def _process_self_signatures(self, user_id, signatures):
|
||||
async def _process_self_signatures(
|
||||
self, user_id: str, signatures: JsonDict
|
||||
) -> Tuple[List["SignatureListItem"], Dict[str, Dict[str, dict]]]:
|
||||
"""Process uploaded signatures of the user's own keys.
|
||||
|
||||
Signatures of the user's own keys from this API come in two forms:
|
||||
@@ -731,15 +753,14 @@ class E2eKeysHandler:
|
||||
signatures (dict[string, dict]): map of devices to signed keys
|
||||
|
||||
Returns:
|
||||
(list[SignatureListItem], dict[string, dict[string, dict]]):
|
||||
a list of signatures to store, and a map of users to devices to failure
|
||||
reasons
|
||||
A tuple of a list of signatures to store, and a map of users to
|
||||
devices to failure reasons
|
||||
|
||||
Raises:
|
||||
SynapseError: if the input is malformed
|
||||
"""
|
||||
signature_list = []
|
||||
failures = {}
|
||||
signature_list = [] # type: List[SignatureListItem]
|
||||
failures = {} # type: Dict[str, Dict[str, JsonDict]]
|
||||
if not signatures:
|
||||
return signature_list, failures
|
||||
|
||||
@@ -834,19 +855,24 @@ class E2eKeysHandler:
|
||||
return signature_list, failures
|
||||
|
||||
def _check_master_key_signature(
|
||||
self, user_id, master_key_id, signed_master_key, stored_master_key, devices
|
||||
):
|
||||
self,
|
||||
user_id: str,
|
||||
master_key_id: str,
|
||||
signed_master_key: JsonDict,
|
||||
stored_master_key: JsonDict,
|
||||
devices: Dict[str, Dict[str, JsonDict]],
|
||||
) -> List["SignatureListItem"]:
|
||||
"""Check signatures of a user's master key made by their devices.
|
||||
|
||||
Args:
|
||||
user_id (string): the user whose master key is being checked
|
||||
master_key_id (string): the ID of the user's master key
|
||||
signed_master_key (dict): the user's signed master key that was uploaded
|
||||
stored_master_key (dict): our previously-stored copy of the user's master key
|
||||
devices (iterable(dict)): the user's devices
|
||||
user_id: the user whose master key is being checked
|
||||
master_key_id: the ID of the user's master key
|
||||
signed_master_key: the user's signed master key that was uploaded
|
||||
stored_master_key: our previously-stored copy of the user's master key
|
||||
devices: the user's devices
|
||||
|
||||
Returns:
|
||||
list[SignatureListItem]: a list of signatures to store
|
||||
A list of signatures to store
|
||||
|
||||
Raises:
|
||||
SynapseError: if a signature is invalid
|
||||
@@ -877,25 +903,26 @@ class E2eKeysHandler:
|
||||
|
||||
return master_key_signature_list
|
||||
|
||||
async def _process_other_signatures(self, user_id, signatures):
|
||||
async def _process_other_signatures(
|
||||
self, user_id: str, signatures: Dict[str, dict]
|
||||
) -> Tuple[List["SignatureListItem"], Dict[str, Dict[str, dict]]]:
|
||||
"""Process uploaded signatures of other users' keys. These will be the
|
||||
target user's master keys, signed by the uploading user's user-signing
|
||||
key.
|
||||
|
||||
Args:
|
||||
user_id (string): the user uploading the keys
|
||||
signatures (dict[string, dict]): map of users to devices to signed keys
|
||||
user_id: the user uploading the keys
|
||||
signatures: map of users to devices to signed keys
|
||||
|
||||
Returns:
|
||||
(list[SignatureListItem], dict[string, dict[string, dict]]):
|
||||
a list of signatures to store, and a map of users to devices to failure
|
||||
A list of signatures to store, and a map of users to devices to failure
|
||||
reasons
|
||||
|
||||
Raises:
|
||||
SynapseError: if the input is malformed
|
||||
"""
|
||||
signature_list = []
|
||||
failures = {}
|
||||
signature_list = [] # type: List[SignatureListItem]
|
||||
failures = {} # type: Dict[str, Dict[str, JsonDict]]
|
||||
if not signatures:
|
||||
return signature_list, failures
|
||||
|
||||
@@ -983,7 +1010,7 @@ class E2eKeysHandler:
|
||||
|
||||
async def _get_e2e_cross_signing_verify_key(
|
||||
self, user_id: str, key_type: str, from_user_id: str = None
|
||||
):
|
||||
) -> Tuple[JsonDict, str, VerifyKey]:
|
||||
"""Fetch locally or remotely query for a cross-signing public key.
|
||||
|
||||
First, attempt to fetch the cross-signing public key from storage.
|
||||
@@ -997,8 +1024,7 @@ class E2eKeysHandler:
|
||||
This affects what signatures are fetched.
|
||||
|
||||
Returns:
|
||||
dict, str, VerifyKey: the raw key data, the key ID, and the
|
||||
signedjson verify key
|
||||
The raw key data, the key ID, and the signedjson verify key
|
||||
|
||||
Raises:
|
||||
NotFoundError: if the key is not found
|
||||
@@ -1135,16 +1161,18 @@ class E2eKeysHandler:
|
||||
return desired_key, desired_key_id, desired_verify_key
|
||||
|
||||
|
||||
def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
|
||||
def _check_cross_signing_key(
|
||||
key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None
|
||||
) -> None:
|
||||
"""Check a cross-signing key uploaded by a user. Performs some basic sanity
|
||||
checking, and ensures that it is signed, if a signature is required.
|
||||
|
||||
Args:
|
||||
key (dict): the key data to verify
|
||||
user_id (str): the user whose key is being checked
|
||||
key_type (str): the type of key that the key should be
|
||||
signing_key (VerifyKey): (optional) the signing key that the key should
|
||||
be signed with. If omitted, signatures will not be checked.
|
||||
key: the key data to verify
|
||||
user_id: the user whose key is being checked
|
||||
key_type: the type of key that the key should be
|
||||
signing_key: the signing key that the key should be signed with. If
|
||||
omitted, signatures will not be checked.
|
||||
"""
|
||||
if (
|
||||
key.get("user_id") != user_id
|
||||
@@ -1162,16 +1190,21 @@ def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
|
||||
)
|
||||
|
||||
|
||||
def _check_device_signature(user_id, verify_key, signed_device, stored_device):
|
||||
def _check_device_signature(
|
||||
user_id: str,
|
||||
verify_key: VerifyKey,
|
||||
signed_device: JsonDict,
|
||||
stored_device: JsonDict,
|
||||
) -> None:
|
||||
"""Check that a signature on a device or cross-signing key is correct and
|
||||
matches the copy of the device/key that we have stored. Throws an
|
||||
exception if an error is detected.
|
||||
|
||||
Args:
|
||||
user_id (str): the user ID whose signature is being checked
|
||||
verify_key (VerifyKey): the key to verify the device with
|
||||
signed_device (dict): the uploaded signed device data
|
||||
stored_device (dict): our previously stored copy of the device
|
||||
user_id: the user ID whose signature is being checked
|
||||
verify_key: the key to verify the device with
|
||||
signed_device: the uploaded signed device data
|
||||
stored_device: our previously stored copy of the device
|
||||
|
||||
Raises:
|
||||
SynapseError: if the signature was invalid or the sent device is not the
|
||||
@@ -1201,7 +1234,7 @@ def _check_device_signature(user_id, verify_key, signed_device, stored_device):
|
||||
raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
|
||||
|
||||
|
||||
def _exception_to_failure(e):
|
||||
def _exception_to_failure(e: Exception) -> JsonDict:
|
||||
if isinstance(e, SynapseError):
|
||||
return {"status": e.code, "errcode": e.errcode, "message": str(e)}
|
||||
|
||||
@@ -1218,7 +1251,7 @@ def _exception_to_failure(e):
|
||||
return {"status": 503, "message": str(e)}
|
||||
|
||||
|
||||
def _one_time_keys_match(old_key_json, new_key):
|
||||
def _one_time_keys_match(old_key_json: str, new_key: JsonDict) -> bool:
|
||||
old_key = json_decoder.decode(old_key_json)
|
||||
|
||||
# if either is a string rather than an object, they must match exactly
|
||||
@@ -1239,16 +1272,16 @@ class SignatureListItem:
|
||||
"""An item in the signature list as used by upload_signatures_for_device_keys.
|
||||
"""
|
||||
|
||||
signing_key_id = attr.ib()
|
||||
target_user_id = attr.ib()
|
||||
target_device_id = attr.ib()
|
||||
signature = attr.ib()
|
||||
signing_key_id = attr.ib(type=str)
|
||||
target_user_id = attr.ib(type=str)
|
||||
target_device_id = attr.ib(type=str)
|
||||
signature = attr.ib(type=JsonDict)
|
||||
|
||||
|
||||
class SigningKeyEduUpdater:
|
||||
"""Handles incoming signing key updates from federation and updates the DB"""
|
||||
|
||||
def __init__(self, hs, e2e_keys_handler):
|
||||
def __init__(self, hs: "HomeServer", e2e_keys_handler: E2eKeysHandler):
|
||||
self.store = hs.get_datastore()
|
||||
self.federation = hs.get_federation_client()
|
||||
self.clock = hs.get_clock()
|
||||
@@ -1257,7 +1290,7 @@ class SigningKeyEduUpdater:
|
||||
self._remote_edu_linearizer = Linearizer(name="remote_signing_key")
|
||||
|
||||
# user_id -> list of updates waiting to be handled.
|
||||
self._pending_updates = {}
|
||||
self._pending_updates = {} # type: Dict[str, List[Tuple[JsonDict, JsonDict]]]
|
||||
|
||||
# Recently seen stream ids. We don't bother keeping these in the DB,
|
||||
# but they're useful to have them about to reduce the number of spurious
|
||||
@@ -1270,13 +1303,15 @@ class SigningKeyEduUpdater:
|
||||
iterable=True,
|
||||
)
|
||||
|
||||
async def incoming_signing_key_update(self, origin, edu_content):
|
||||
async def incoming_signing_key_update(
|
||||
self, origin: str, edu_content: JsonDict
|
||||
) -> None:
|
||||
"""Called on incoming signing key update from federation. Responsible for
|
||||
parsing the EDU and adding to pending updates list.
|
||||
|
||||
Args:
|
||||
origin (string): the server that sent the EDU
|
||||
edu_content (dict): the contents of the EDU
|
||||
origin: the server that sent the EDU
|
||||
edu_content: the contents of the EDU
|
||||
"""
|
||||
|
||||
user_id = edu_content.pop("user_id")
|
||||
@@ -1299,11 +1334,11 @@ class SigningKeyEduUpdater:
|
||||
|
||||
await self._handle_signing_key_updates(user_id)
|
||||
|
||||
async def _handle_signing_key_updates(self, user_id):
|
||||
async def _handle_signing_key_updates(self, user_id: str) -> None:
|
||||
"""Actually handle pending updates.
|
||||
|
||||
Args:
|
||||
user_id (string): the user whose updates we are processing
|
||||
user_id: the user whose updates we are processing
|
||||
"""
|
||||
|
||||
device_handler = self.e2e_keys_handler.device_handler
|
||||
@@ -1315,7 +1350,7 @@ class SigningKeyEduUpdater:
|
||||
# This can happen since we batch updates
|
||||
return
|
||||
|
||||
device_ids = []
|
||||
device_ids = [] # type: List[str]
|
||||
|
||||
logger.info("pending updates: %r", pending_updates)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
from synapse.api.errors import (
|
||||
Codes,
|
||||
@@ -24,8 +25,12 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -37,7 +42,7 @@ class E2eRoomKeysHandler:
|
||||
The actual payload of the encrypted keys is completely opaque to the handler.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
# Used to lock whenever a client is uploading key data. This prevents collisions
|
||||
@@ -48,21 +53,27 @@ class E2eRoomKeysHandler:
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
|
||||
@trace
|
||||
async def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
async def get_room_keys(
|
||||
self,
|
||||
user_id: str,
|
||||
version: str,
|
||||
room_id: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
) -> List[JsonDict]:
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
room, or a given session.
|
||||
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose keys we're getting
|
||||
version(str): the version ID of the backup we're getting keys from
|
||||
room_id(string): room ID to get keys for, for None to get keys for all rooms
|
||||
session_id(string): session ID to get keys for, for None to get keys for all
|
||||
user_id: the user whose keys we're getting
|
||||
version: the version ID of the backup we're getting keys from
|
||||
room_id: room ID to get keys for, for None to get keys for all rooms
|
||||
session_id: session ID to get keys for, for None to get keys for all
|
||||
sessions
|
||||
Raises:
|
||||
NotFoundError: if the backup version does not exist
|
||||
Returns:
|
||||
A deferred list of dicts giving the session_data and message metadata for
|
||||
A list of dicts giving the session_data and message metadata for
|
||||
these room keys.
|
||||
"""
|
||||
|
||||
@@ -86,17 +97,23 @@ class E2eRoomKeysHandler:
|
||||
return results
|
||||
|
||||
@trace
|
||||
async def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
async def delete_room_keys(
|
||||
self,
|
||||
user_id: str,
|
||||
version: str,
|
||||
room_id: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
) -> JsonDict:
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
room or a given session.
|
||||
See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup we're deleting
|
||||
version(str): the version ID of the backup we're deleting
|
||||
room_id(string): room ID to delete keys for, for None to delete keys for all
|
||||
user_id: the user whose backup we're deleting
|
||||
version: the version ID of the backup we're deleting
|
||||
room_id: room ID to delete keys for, for None to delete keys for all
|
||||
rooms
|
||||
session_id(string): session ID to delete keys for, for None to delete keys
|
||||
session_id: session ID to delete keys for, for None to delete keys
|
||||
for all sessions
|
||||
Raises:
|
||||
NotFoundError: if the backup version does not exist
|
||||
@@ -128,15 +145,17 @@ class E2eRoomKeysHandler:
|
||||
return {"etag": str(version_etag), "count": count}
|
||||
|
||||
@trace
|
||||
async def upload_room_keys(self, user_id, version, room_keys):
|
||||
async def upload_room_keys(
|
||||
self, user_id: str, version: str, room_keys: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||
that the given version is the current backup version. room_keys are merged
|
||||
into the current backup as described in RoomKeysServlet.on_PUT().
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup we're setting
|
||||
version(str): the version ID of the backup we're updating
|
||||
room_keys(dict): a nested dict describing the room_keys we're setting:
|
||||
user_id: the user whose backup we're setting
|
||||
version: the version ID of the backup we're updating
|
||||
room_keys: a nested dict describing the room_keys we're setting:
|
||||
|
||||
{
|
||||
"rooms": {
|
||||
@@ -254,14 +273,16 @@ class E2eRoomKeysHandler:
|
||||
return {"etag": str(version_etag), "count": count}
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(current_room_key, room_key):
|
||||
def _should_replace_room_key(
|
||||
current_room_key: Optional[JsonDict], room_key: JsonDict
|
||||
) -> bool:
|
||||
"""
|
||||
Determine whether to replace a given current_room_key (if any)
|
||||
with a newly uploaded room_key backup
|
||||
|
||||
Args:
|
||||
current_room_key (dict): Optional, the current room_key dict if any
|
||||
room_key (dict): The new room_key dict which may or may not be fit to
|
||||
current_room_key: Optional, the current room_key dict if any
|
||||
room_key : The new room_key dict which may or may not be fit to
|
||||
replace the current_room_key
|
||||
|
||||
Returns:
|
||||
@@ -286,14 +307,14 @@ class E2eRoomKeysHandler:
|
||||
return True
|
||||
|
||||
@trace
|
||||
async def create_version(self, user_id, version_info):
|
||||
async def create_version(self, user_id: str, version_info: JsonDict) -> str:
|
||||
"""Create a new backup version. This automatically becomes the new
|
||||
backup version for the user's keys; previous backups will no longer be
|
||||
writeable to.
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup version we're creating
|
||||
version_info(dict): metadata about the new version being created
|
||||
user_id: the user whose backup version we're creating
|
||||
version_info: metadata about the new version being created
|
||||
|
||||
{
|
||||
"algorithm": "m.megolm_backup.v1",
|
||||
@@ -301,7 +322,7 @@ class E2eRoomKeysHandler:
|
||||
}
|
||||
|
||||
Returns:
|
||||
A deferred of a string that gives the new version number.
|
||||
The new version number.
|
||||
"""
|
||||
|
||||
# TODO: Validate the JSON to make sure it has the right keys.
|
||||
@@ -313,17 +334,19 @@ class E2eRoomKeysHandler:
|
||||
)
|
||||
return new_version
|
||||
|
||||
async def get_version_info(self, user_id, version=None):
|
||||
async def get_version_info(
|
||||
self, user_id: str, version: Optional[str] = None
|
||||
) -> JsonDict:
|
||||
"""Get the info about a given version of the user's backup
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose current backup version we're querying
|
||||
version(str): Optional; if None gives the most recent version
|
||||
user_id: the user whose current backup version we're querying
|
||||
version: Optional; if None gives the most recent version
|
||||
otherwise a historical one.
|
||||
Raises:
|
||||
NotFoundError: if the requested backup version doesn't exist
|
||||
Returns:
|
||||
A deferred of a info dict that gives the info about the new version.
|
||||
A info dict that gives the info about the new version.
|
||||
|
||||
{
|
||||
"version": "1234",
|
||||
@@ -346,7 +369,7 @@ class E2eRoomKeysHandler:
|
||||
return res
|
||||
|
||||
@trace
|
||||
async def delete_version(self, user_id, version=None):
|
||||
async def delete_version(self, user_id: str, version: Optional[str] = None) -> None:
|
||||
"""Deletes a given version of the user's e2e_room_keys backup
|
||||
|
||||
Args:
|
||||
@@ -366,17 +389,19 @@ class E2eRoomKeysHandler:
|
||||
raise
|
||||
|
||||
@trace
|
||||
async def update_version(self, user_id, version, version_info):
|
||||
async def update_version(
|
||||
self, user_id: str, version: str, version_info: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Update the info about a given version of the user's backup
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose current backup version we're updating
|
||||
version(str): the backup version we're updating
|
||||
version_info(dict): the new information about the backup
|
||||
user_id: the user whose current backup version we're updating
|
||||
version: the backup version we're updating
|
||||
version_info: the new information about the backup
|
||||
Raises:
|
||||
NotFoundError: if the requested backup version doesn't exist
|
||||
Returns:
|
||||
A deferred of an empty dict.
|
||||
An empty dict.
|
||||
"""
|
||||
if "version" not in version_info:
|
||||
version_info["version"] = version
|
||||
|
||||
@@ -2093,6 +2093,11 @@ class FederationHandler(BaseHandler):
|
||||
if event.type == EventTypes.GuestAccess and not context.rejected:
|
||||
await self.maybe_kick_guest_users(event)
|
||||
|
||||
# If we are going to send this event over federation we precaclculate
|
||||
# the joined hosts.
|
||||
if event.internal_metadata.get_send_on_behalf_of():
|
||||
await self.event_creation_handler.cache_joined_hosts_for_event(event)
|
||||
|
||||
return context
|
||||
|
||||
async def _check_for_soft_fail(
|
||||
|
||||
@@ -15,9 +15,13 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Set
|
||||
|
||||
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
|
||||
from synapse.types import GroupID, get_domain_from_id
|
||||
from synapse.types import GroupID, JsonDict, get_domain_from_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -56,7 +60,7 @@ def _create_rerouter(func_name):
|
||||
|
||||
|
||||
class GroupsLocalWorkerHandler:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.room_list_handler = hs.get_room_list_handler()
|
||||
@@ -84,7 +88,9 @@ class GroupsLocalWorkerHandler:
|
||||
get_group_role = _create_rerouter("get_group_role")
|
||||
get_group_roles = _create_rerouter("get_group_roles")
|
||||
|
||||
async def get_group_summary(self, group_id, requester_user_id):
|
||||
async def get_group_summary(
|
||||
self, group_id: str, requester_user_id: str
|
||||
) -> JsonDict:
|
||||
"""Get the group summary for a group.
|
||||
|
||||
If the group is remote we check that the users have valid attestations.
|
||||
@@ -137,14 +143,15 @@ class GroupsLocalWorkerHandler:
|
||||
|
||||
return res
|
||||
|
||||
async def get_users_in_group(self, group_id, requester_user_id):
|
||||
async def get_users_in_group(
|
||||
self, group_id: str, requester_user_id: str
|
||||
) -> JsonDict:
|
||||
"""Get users in a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
res = await self.groups_server_handler.get_users_in_group(
|
||||
return await self.groups_server_handler.get_users_in_group(
|
||||
group_id, requester_user_id
|
||||
)
|
||||
return res
|
||||
|
||||
group_server_name = get_domain_from_id(group_id)
|
||||
|
||||
@@ -178,11 +185,11 @@ class GroupsLocalWorkerHandler:
|
||||
|
||||
return res
|
||||
|
||||
async def get_joined_groups(self, user_id):
|
||||
async def get_joined_groups(self, user_id: str) -> JsonDict:
|
||||
group_ids = await self.store.get_joined_groups(user_id)
|
||||
return {"groups": group_ids}
|
||||
|
||||
async def get_publicised_groups_for_user(self, user_id):
|
||||
async def get_publicised_groups_for_user(self, user_id: str) -> JsonDict:
|
||||
if self.hs.is_mine_id(user_id):
|
||||
result = await self.store.get_publicised_groups_for_user(user_id)
|
||||
|
||||
@@ -206,8 +213,10 @@ class GroupsLocalWorkerHandler:
|
||||
# TODO: Verify attestations
|
||||
return {"groups": result}
|
||||
|
||||
async def bulk_get_publicised_groups(self, user_ids, proxy=True):
|
||||
destinations = {}
|
||||
async def bulk_get_publicised_groups(
|
||||
self, user_ids: Iterable[str], proxy: bool = True
|
||||
) -> JsonDict:
|
||||
destinations = {} # type: Dict[str, Set[str]]
|
||||
local_users = set()
|
||||
|
||||
for user_id in user_ids:
|
||||
@@ -220,7 +229,7 @@ class GroupsLocalWorkerHandler:
|
||||
raise SynapseError(400, "Some user_ids are not local")
|
||||
|
||||
results = {}
|
||||
failed_results = []
|
||||
failed_results = [] # type: List[str]
|
||||
for destination, dest_user_ids in destinations.items():
|
||||
try:
|
||||
r = await self.transport_client.bulk_get_publicised_groups(
|
||||
@@ -242,7 +251,7 @@ class GroupsLocalWorkerHandler:
|
||||
|
||||
|
||||
class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
# Ensure attestations get renewed
|
||||
@@ -271,7 +280,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
set_group_join_policy = _create_rerouter("set_group_join_policy")
|
||||
|
||||
async def create_group(self, group_id, user_id, content):
|
||||
async def create_group(
|
||||
self, group_id: str, user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Create a group
|
||||
"""
|
||||
|
||||
@@ -284,27 +295,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
local_attestation = None
|
||||
remote_attestation = None
|
||||
else:
|
||||
local_attestation = self.attestations.create_attestation(group_id, user_id)
|
||||
content["attestation"] = local_attestation
|
||||
|
||||
content["user_profile"] = await self.profile_handler.get_profile(user_id)
|
||||
|
||||
try:
|
||||
res = await self.transport_client.create_group(
|
||||
get_domain_from_id(group_id), group_id, user_id, content
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
remote_attestation = res["attestation"]
|
||||
await self.attestations.verify_attestation(
|
||||
remote_attestation,
|
||||
group_id=group_id,
|
||||
user_id=user_id,
|
||||
server_name=get_domain_from_id(group_id),
|
||||
)
|
||||
raise SynapseError(400, "Unable to create remote groups")
|
||||
|
||||
is_publicised = content.get("publicise", False)
|
||||
token = await self.store.register_user_group_membership(
|
||||
@@ -320,7 +311,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
return res
|
||||
|
||||
async def join_group(self, group_id, user_id, content):
|
||||
async def join_group(
|
||||
self, group_id: str, user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Request to join a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
@@ -365,7 +358,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
return {}
|
||||
|
||||
async def accept_invite(self, group_id, user_id, content):
|
||||
async def accept_invite(
|
||||
self, group_id: str, user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Accept an invite to a group
|
||||
"""
|
||||
if self.is_mine_id(group_id):
|
||||
@@ -410,7 +405,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
return {}
|
||||
|
||||
async def invite(self, group_id, user_id, requester_user_id, config):
|
||||
async def invite(
|
||||
self, group_id: str, user_id: str, requester_user_id: str, config: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Invite a user to a group
|
||||
"""
|
||||
content = {"requester_user_id": requester_user_id, "config": config}
|
||||
@@ -434,7 +431,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
return res
|
||||
|
||||
async def on_invite(self, group_id, user_id, content):
|
||||
async def on_invite(
|
||||
self, group_id: str, user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
"""One of our users were invited to a group
|
||||
"""
|
||||
# TODO: Support auto join and rejection
|
||||
@@ -465,8 +464,8 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
return {"state": "invite", "user_profile": user_profile}
|
||||
|
||||
async def remove_user_from_group(
|
||||
self, group_id, user_id, requester_user_id, content
|
||||
):
|
||||
self, group_id: str, user_id: str, requester_user_id: str, content: JsonDict
|
||||
) -> JsonDict:
|
||||
"""Remove a user from a group
|
||||
"""
|
||||
if user_id == requester_user_id:
|
||||
@@ -499,7 +498,9 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
|
||||
|
||||
return res
|
||||
|
||||
async def user_removed_from_group(self, group_id, user_id, content):
|
||||
async def user_removed_from_group(
|
||||
self, group_id: str, user_id: str, content: JsonDict
|
||||
) -> None:
|
||||
"""One of our users was removed/kicked from a group
|
||||
"""
|
||||
# TODO: Check if user in group
|
||||
|
||||
@@ -27,9 +27,11 @@ from synapse.api.errors import (
|
||||
HttpResponseException,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.config.emailconfig import ThreepidBehaviour
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.types import JsonDict, Requester
|
||||
from synapse.util import json_decoder
|
||||
from synapse.util.hash import sha256_and_url_safe_base64
|
||||
@@ -57,6 +59,32 @@ class IdentityHandler(BaseHandler):
|
||||
|
||||
self._web_client_location = hs.config.invite_client_location
|
||||
|
||||
# Ratelimiters for `/requestToken` endpoints.
|
||||
self._3pid_validation_ratelimiter_ip = Ratelimiter(
|
||||
clock=hs.get_clock(),
|
||||
rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
|
||||
)
|
||||
self._3pid_validation_ratelimiter_address = Ratelimiter(
|
||||
clock=hs.get_clock(),
|
||||
rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
|
||||
burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
|
||||
)
|
||||
|
||||
def ratelimit_request_token_requests(
|
||||
self, request: SynapseRequest, medium: str, address: str,
|
||||
):
|
||||
"""Used to ratelimit requests to `/requestToken` by IP and address.
|
||||
|
||||
Args:
|
||||
request: The associated request
|
||||
medium: The type of threepid, e.g. "msisdn" or "email"
|
||||
address: The actual threepid ID, e.g. the phone number or email address
|
||||
"""
|
||||
|
||||
self._3pid_validation_ratelimiter_ip.ratelimit((medium, request.getClientIP()))
|
||||
self._3pid_validation_ratelimiter_address.ratelimit((medium, address))
|
||||
|
||||
async def threepid_from_creds(
|
||||
self, id_server: str, creds: Dict[str, str]
|
||||
) -> Optional[JsonDict]:
|
||||
|
||||
@@ -432,6 +432,8 @@ class EventCreationHandler:
|
||||
|
||||
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
|
||||
async def create_event(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -939,6 +941,8 @@ class EventCreationHandler:
|
||||
|
||||
await self.action_generator.handle_push_actions_for_event(event, context)
|
||||
|
||||
await self.cache_joined_hosts_for_event(event)
|
||||
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
||||
@@ -978,6 +982,44 @@ class EventCreationHandler:
|
||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
|
||||
"""Precalculate the joined hosts at the event, when using Redis, so that
|
||||
external federation senders don't have to recalculate it themselves.
|
||||
"""
|
||||
|
||||
if not self._external_cache.is_enabled():
|
||||
return
|
||||
|
||||
# We actually store two mappings, event ID -> prev state group,
|
||||
# state group -> joined hosts, which is much more space efficient
|
||||
# than event ID -> joined hosts.
|
||||
#
|
||||
# Note: We have to cache event ID -> prev state group, as we don't
|
||||
# store that in the DB.
|
||||
#
|
||||
# Note: We always set the state group -> joined hosts cache, even if
|
||||
# we already set it, so that the expiry time is reset.
|
||||
|
||||
state_entry = await self.state.resolve_state_groups_for_events(
|
||||
event.room_id, event_ids=event.prev_event_ids()
|
||||
)
|
||||
|
||||
if state_entry.state_group:
|
||||
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
|
||||
|
||||
await self._external_cache.set(
|
||||
"event_to_prev_state_group",
|
||||
event.event_id,
|
||||
state_entry.state_group,
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
await self._external_cache.set(
|
||||
"get_joined_hosts",
|
||||
str(state_entry.state_group),
|
||||
list(joined_hosts),
|
||||
expiry_ms=60 * 60 * 1000,
|
||||
)
|
||||
|
||||
async def _validate_canonical_alias(
|
||||
self, directory_handler, room_alias_str: str, expected_room_id: str
|
||||
) -> None:
|
||||
|
||||
+22
-16
@@ -15,23 +15,28 @@
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Iterable
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional
|
||||
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import NotFoundError, SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events import EventBase
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.types import JsonDict, UserID
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SearchHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
self.storage = hs.get_storage()
|
||||
@@ -87,13 +92,15 @@ class SearchHandler(BaseHandler):
|
||||
|
||||
return historical_room_ids
|
||||
|
||||
async def search(self, user, content, batch=None):
|
||||
async def search(
|
||||
self, user: UserID, content: JsonDict, batch: Optional[str] = None
|
||||
) -> JsonDict:
|
||||
"""Performs a full text search for a user.
|
||||
|
||||
Args:
|
||||
user (UserID)
|
||||
content (dict): Search parameters
|
||||
batch (str): The next_batch parameter. Used for pagination.
|
||||
user
|
||||
content: Search parameters
|
||||
batch: The next_batch parameter. Used for pagination.
|
||||
|
||||
Returns:
|
||||
dict to be returned to the client with results of search
|
||||
@@ -186,7 +193,7 @@ class SearchHandler(BaseHandler):
|
||||
# If doing a subset of all rooms seearch, check if any of the rooms
|
||||
# are from an upgraded room, and search their contents as well
|
||||
if search_filter.rooms:
|
||||
historical_room_ids = []
|
||||
historical_room_ids = [] # type: List[str]
|
||||
for room_id in search_filter.rooms:
|
||||
# Add any previous rooms to the search if they exist
|
||||
ids = await self.get_old_rooms_from_upgraded_room(room_id)
|
||||
@@ -209,8 +216,10 @@ class SearchHandler(BaseHandler):
|
||||
|
||||
rank_map = {} # event_id -> rank of event
|
||||
allowed_events = []
|
||||
room_groups = {} # Holds result of grouping by room, if applicable
|
||||
sender_group = {} # Holds result of grouping by sender, if applicable
|
||||
# Holds result of grouping by room, if applicable
|
||||
room_groups = {} # type: Dict[str, JsonDict]
|
||||
# Holds result of grouping by sender, if applicable
|
||||
sender_group = {} # type: Dict[str, JsonDict]
|
||||
|
||||
# Holds the next_batch for the entire result set if one of those exists
|
||||
global_next_batch = None
|
||||
@@ -254,7 +263,7 @@ class SearchHandler(BaseHandler):
|
||||
s["results"].append(e.event_id)
|
||||
|
||||
elif order_by == "recent":
|
||||
room_events = []
|
||||
room_events = [] # type: List[EventBase]
|
||||
i = 0
|
||||
|
||||
pagination_token = batch_token
|
||||
@@ -418,13 +427,10 @@ class SearchHandler(BaseHandler):
|
||||
|
||||
state_results = {}
|
||||
if include_state:
|
||||
rooms = {e.room_id for e in allowed_events}
|
||||
for room_id in rooms:
|
||||
for room_id in {e.room_id for e in allowed_events}:
|
||||
state = await self.state_handler.get_current_state(room_id)
|
||||
state_results[room_id] = list(state.values())
|
||||
|
||||
state_results.values()
|
||||
|
||||
# We're now about to serialize the events. We should not make any
|
||||
# blocking calls after this. Otherwise the 'age' will be wrong
|
||||
|
||||
@@ -448,9 +454,9 @@ class SearchHandler(BaseHandler):
|
||||
|
||||
if state_results:
|
||||
s = {}
|
||||
for room_id, state in state_results.items():
|
||||
for room_id, state_events in state_results.items():
|
||||
s[room_id] = await self._event_serializer.serialize_events(
|
||||
state, time_now
|
||||
state_events, time_now
|
||||
)
|
||||
|
||||
rooms_cat_res["state"] = s
|
||||
|
||||
@@ -13,24 +13,26 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from synapse.api.errors import Codes, StoreError, SynapseError
|
||||
from synapse.types import Requester
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SetPasswordHandler(BaseHandler):
|
||||
"""Handler which deals with changing user account passwords"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
self._password_policy_handler = hs.get_password_policy_handler()
|
||||
|
||||
async def set_password(
|
||||
self,
|
||||
@@ -38,7 +40,7 @@ class SetPasswordHandler(BaseHandler):
|
||||
password_hash: str,
|
||||
logout_devices: bool,
|
||||
requester: Optional[Requester] = None,
|
||||
):
|
||||
) -> None:
|
||||
if not self.hs.config.password_localdb_enabled:
|
||||
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
|
||||
|
||||
|
||||
+18
-5
@@ -23,7 +23,7 @@ from typing_extensions import NoReturn, Protocol
|
||||
from twisted.web.http import Request
|
||||
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.api.errors import Codes, RedirectException, SynapseError
|
||||
from synapse.api.errors import Codes, NotFoundError, RedirectException, SynapseError
|
||||
from synapse.handlers.ui_auth import UIAuthSessionDataConstants
|
||||
from synapse.http import get_request_user_agent
|
||||
from synapse.http.server import respond_with_html
|
||||
@@ -235,7 +235,10 @@ class SsoHandler:
|
||||
respond_with_html(request, code, html)
|
||||
|
||||
async def handle_redirect_request(
|
||||
self, request: SynapseRequest, client_redirect_url: bytes,
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
client_redirect_url: bytes,
|
||||
idp_id: Optional[str],
|
||||
) -> str:
|
||||
"""Handle a request to /login/sso/redirect
|
||||
|
||||
@@ -243,6 +246,7 @@ class SsoHandler:
|
||||
request: incoming HTTP request
|
||||
client_redirect_url: the URL that we should redirect the
|
||||
client to after login.
|
||||
idp_id: optional identity provider chosen by the client
|
||||
|
||||
Returns:
|
||||
the URI to redirect to
|
||||
@@ -252,10 +256,19 @@ class SsoHandler:
|
||||
400, "Homeserver not configured for SSO.", errcode=Codes.UNRECOGNIZED
|
||||
)
|
||||
|
||||
# if the client chose an IdP, use that
|
||||
idp = None # type: Optional[SsoIdentityProvider]
|
||||
if idp_id:
|
||||
idp = self._identity_providers.get(idp_id)
|
||||
if not idp:
|
||||
raise NotFoundError("Unknown identity provider")
|
||||
|
||||
# if we only have one auth provider, redirect to it directly
|
||||
if len(self._identity_providers) == 1:
|
||||
ap = next(iter(self._identity_providers.values()))
|
||||
return await ap.handle_redirect_request(request, client_redirect_url)
|
||||
elif len(self._identity_providers) == 1:
|
||||
idp = next(iter(self._identity_providers.values()))
|
||||
|
||||
if idp:
|
||||
return await idp.handle_redirect_request(request, client_redirect_url)
|
||||
|
||||
# otherwise, redirect to the IDP picker
|
||||
return "/_synapse/client/pick_idp?" + urlencode(
|
||||
|
||||
@@ -14,15 +14,25 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StateDeltasHandler:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
|
||||
async def _get_key_change(
|
||||
self,
|
||||
prev_event_id: Optional[str],
|
||||
event_id: Optional[str],
|
||||
key_name: str,
|
||||
public_value: str,
|
||||
) -> Optional[bool]:
|
||||
"""Given two events check if the `key_name` field in content changed
|
||||
from not matching `public_value` to doing so.
|
||||
|
||||
|
||||
+23
-16
@@ -12,13 +12,19 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import Counter
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple
|
||||
|
||||
from typing_extensions import Counter as CounterType
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.metrics import event_processing_positions
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import JsonDict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.app.homeserver import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -31,7 +37,7 @@ class StatsHandler:
|
||||
Heavily derived from UserDirectoryHandler
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.state = hs.get_state_handler()
|
||||
@@ -44,7 +50,7 @@ class StatsHandler:
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
self.pos = None # type: Optional[int]
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
@@ -56,7 +62,7 @@ class StatsHandler:
|
||||
# we start populating stats
|
||||
self.clock.call_later(0, self.notify_new_event)
|
||||
|
||||
def notify_new_event(self):
|
||||
def notify_new_event(self) -> None:
|
||||
"""Called when there may be more deltas to process
|
||||
"""
|
||||
if not self.stats_enabled or self._is_processing:
|
||||
@@ -72,7 +78,7 @@ class StatsHandler:
|
||||
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
async def _unsafe_process(self):
|
||||
async def _unsafe_process(self) -> None:
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None:
|
||||
self.pos = await self.store.get_stats_positions()
|
||||
@@ -110,10 +116,10 @@ class StatsHandler:
|
||||
)
|
||||
|
||||
for room_id, fields in room_count.items():
|
||||
room_deltas.setdefault(room_id, {}).update(fields)
|
||||
room_deltas.setdefault(room_id, Counter()).update(fields)
|
||||
|
||||
for user_id, fields in user_count.items():
|
||||
user_deltas.setdefault(user_id, {}).update(fields)
|
||||
user_deltas.setdefault(user_id, Counter()).update(fields)
|
||||
|
||||
logger.debug("room_deltas: %s", room_deltas)
|
||||
logger.debug("user_deltas: %s", user_deltas)
|
||||
@@ -131,19 +137,20 @@ class StatsHandler:
|
||||
|
||||
self.pos = max_pos
|
||||
|
||||
async def _handle_deltas(self, deltas):
|
||||
async def _handle_deltas(
|
||||
self, deltas: Iterable[JsonDict]
|
||||
) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]:
|
||||
"""Called with the state deltas to process
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, Counter], dict[str, counter]]
|
||||
Two dicts: the room deltas and the user deltas,
|
||||
mapping from room/user ID to changes in the various fields.
|
||||
"""
|
||||
|
||||
room_to_stats_deltas = {}
|
||||
user_to_stats_deltas = {}
|
||||
room_to_stats_deltas = {} # type: Dict[str, CounterType[str]]
|
||||
user_to_stats_deltas = {} # type: Dict[str, CounterType[str]]
|
||||
|
||||
room_to_state_updates = {}
|
||||
room_to_state_updates = {} # type: Dict[str, Dict[str, Any]]
|
||||
|
||||
for delta in deltas:
|
||||
typ = delta["type"]
|
||||
@@ -173,7 +180,7 @@ class StatsHandler:
|
||||
)
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
event_content = {} # type: JsonDict
|
||||
|
||||
sender = None
|
||||
if event_id is not None:
|
||||
@@ -257,13 +264,13 @@ class StatsHandler:
|
||||
)
|
||||
|
||||
if has_changed_joinedness:
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
membership_delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
user_to_stats_deltas.setdefault(user_id, Counter())[
|
||||
"joined_rooms"
|
||||
] += delta
|
||||
] += membership_delta
|
||||
|
||||
room_stats_delta["local_users_in_room"] += delta
|
||||
room_stats_delta["local_users_in_room"] += membership_delta
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
room_state["is_federatable"] = (
|
||||
|
||||
+38
-31
@@ -15,13 +15,13 @@
|
||||
import logging
|
||||
import random
|
||||
from collections import namedtuple
|
||||
from typing import TYPE_CHECKING, List, Set, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.streams import TypingStream
|
||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
@@ -65,17 +65,17 @@ class FollowerTypingHandler:
|
||||
)
|
||||
|
||||
# map room IDs to serial numbers
|
||||
self._room_serials = {}
|
||||
self._room_serials = {} # type: Dict[str, int]
|
||||
# map room IDs to sets of users currently typing
|
||||
self._room_typing = {}
|
||||
self._room_typing = {} # type: Dict[str, Set[str]]
|
||||
|
||||
self._member_last_federation_poke = {}
|
||||
self._member_last_federation_poke = {} # type: Dict[RoomMember, int]
|
||||
self.wheel_timer = WheelTimer(bucket_size=5000)
|
||||
self._latest_room_serial = 0
|
||||
|
||||
self.clock.looping_call(self._handle_timeouts, 5000)
|
||||
|
||||
def _reset(self):
|
||||
def _reset(self) -> None:
|
||||
"""Reset the typing handler's data caches.
|
||||
"""
|
||||
# map room IDs to serial numbers
|
||||
@@ -86,7 +86,7 @@ class FollowerTypingHandler:
|
||||
self._member_last_federation_poke = {}
|
||||
self.wheel_timer = WheelTimer(bucket_size=5000)
|
||||
|
||||
def _handle_timeouts(self):
|
||||
def _handle_timeouts(self) -> None:
|
||||
logger.debug("Checking for typing timeouts")
|
||||
|
||||
now = self.clock.time_msec()
|
||||
@@ -96,7 +96,7 @@ class FollowerTypingHandler:
|
||||
for member in members:
|
||||
self._handle_timeout_for_member(now, member)
|
||||
|
||||
def _handle_timeout_for_member(self, now: int, member: RoomMember):
|
||||
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
|
||||
if not self.is_typing(member):
|
||||
# Nothing to do if they're no longer typing
|
||||
return
|
||||
@@ -114,10 +114,10 @@ class FollowerTypingHandler:
|
||||
# each person typing.
|
||||
self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
|
||||
|
||||
def is_typing(self, member):
|
||||
def is_typing(self, member: RoomMember) -> bool:
|
||||
return member.user_id in self._room_typing.get(member.room_id, [])
|
||||
|
||||
async def _push_remote(self, member, typing):
|
||||
async def _push_remote(self, member: RoomMember, typing: bool) -> None:
|
||||
if not self.federation:
|
||||
return
|
||||
|
||||
@@ -148,7 +148,7 @@ class FollowerTypingHandler:
|
||||
|
||||
def process_replication_rows(
|
||||
self, token: int, rows: List[TypingStream.TypingStreamRow]
|
||||
):
|
||||
) -> None:
|
||||
"""Should be called whenever we receive updates for typing stream.
|
||||
"""
|
||||
|
||||
@@ -178,7 +178,7 @@ class FollowerTypingHandler:
|
||||
|
||||
async def _send_changes_in_typing_to_remotes(
|
||||
self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
|
||||
):
|
||||
) -> None:
|
||||
"""Process a change in typing of a room from replication, sending EDUs
|
||||
for any local users.
|
||||
"""
|
||||
@@ -194,12 +194,12 @@ class FollowerTypingHandler:
|
||||
if self.is_mine_id(user_id):
|
||||
await self._push_remote(RoomMember(room_id, user_id), False)
|
||||
|
||||
def get_current_token(self):
|
||||
def get_current_token(self) -> int:
|
||||
return self._latest_room_serial
|
||||
|
||||
|
||||
class TypingWriterHandler(FollowerTypingHandler):
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__(hs)
|
||||
|
||||
assert hs.config.worker.writers.typing == hs.get_instance_name()
|
||||
@@ -213,14 +213,15 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
hs.get_distributor().observe("user_left_room", self.user_left_room)
|
||||
|
||||
self._member_typing_until = {} # clock time we expect to stop
|
||||
# clock time we expect to stop
|
||||
self._member_typing_until = {} # type: Dict[RoomMember, int]
|
||||
|
||||
# caches which room_ids changed at which serials
|
||||
self._typing_stream_change_cache = StreamChangeCache(
|
||||
"TypingStreamChangeCache", self._latest_room_serial
|
||||
)
|
||||
|
||||
def _handle_timeout_for_member(self, now: int, member: RoomMember):
|
||||
def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
|
||||
super()._handle_timeout_for_member(now, member)
|
||||
|
||||
if not self.is_typing(member):
|
||||
@@ -233,7 +234,9 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
self._stopped_typing(member)
|
||||
return
|
||||
|
||||
async def started_typing(self, target_user, requester, room_id, timeout):
|
||||
async def started_typing(
|
||||
self, target_user: UserID, requester: Requester, room_id: str, timeout: int
|
||||
) -> None:
|
||||
target_user_id = target_user.to_string()
|
||||
auth_user_id = requester.user.to_string()
|
||||
|
||||
@@ -263,11 +266,13 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
if was_present:
|
||||
# No point sending another notification
|
||||
return None
|
||||
return
|
||||
|
||||
self._push_update(member=member, typing=True)
|
||||
|
||||
async def stopped_typing(self, target_user, requester, room_id):
|
||||
async def stopped_typing(
|
||||
self, target_user: UserID, requester: Requester, room_id: str
|
||||
) -> None:
|
||||
target_user_id = target_user.to_string()
|
||||
auth_user_id = requester.user.to_string()
|
||||
|
||||
@@ -290,23 +295,23 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
self._stopped_typing(member)
|
||||
|
||||
def user_left_room(self, user, room_id):
|
||||
def user_left_room(self, user: UserID, room_id: str) -> None:
|
||||
user_id = user.to_string()
|
||||
if self.is_mine_id(user_id):
|
||||
member = RoomMember(room_id=room_id, user_id=user_id)
|
||||
self._stopped_typing(member)
|
||||
|
||||
def _stopped_typing(self, member):
|
||||
def _stopped_typing(self, member: RoomMember) -> None:
|
||||
if member.user_id not in self._room_typing.get(member.room_id, set()):
|
||||
# No point
|
||||
return None
|
||||
return
|
||||
|
||||
self._member_typing_until.pop(member, None)
|
||||
self._member_last_federation_poke.pop(member, None)
|
||||
|
||||
self._push_update(member=member, typing=False)
|
||||
|
||||
def _push_update(self, member, typing):
|
||||
def _push_update(self, member: RoomMember, typing: bool) -> None:
|
||||
if self.hs.is_mine_id(member.user_id):
|
||||
# Only send updates for changes to our own users.
|
||||
run_as_background_process(
|
||||
@@ -315,7 +320,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
self._push_update_local(member=member, typing=typing)
|
||||
|
||||
async def _recv_edu(self, origin, content):
|
||||
async def _recv_edu(self, origin: str, content: JsonDict) -> None:
|
||||
room_id = content["room_id"]
|
||||
user_id = content["user_id"]
|
||||
|
||||
@@ -340,7 +345,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT)
|
||||
self._push_update_local(member=member, typing=content["typing"])
|
||||
|
||||
def _push_update_local(self, member, typing):
|
||||
def _push_update_local(self, member: RoomMember, typing: bool) -> None:
|
||||
room_set = self._room_typing.setdefault(member.room_id, set())
|
||||
if typing:
|
||||
room_set.add(member.user_id)
|
||||
@@ -386,7 +391,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
|
||||
last_id
|
||||
)
|
||||
) # type: Optional[Iterable[str]]
|
||||
|
||||
if changed_rooms is None:
|
||||
changed_rooms = self._room_serials
|
||||
@@ -412,13 +417,13 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||
|
||||
def process_replication_rows(
|
||||
self, token: int, rows: List[TypingStream.TypingStreamRow]
|
||||
):
|
||||
) -> None:
|
||||
# The writing process should never get updates from replication.
|
||||
raise Exception("Typing writer instance got typing info over replication")
|
||||
|
||||
|
||||
class TypingNotificationEventSource:
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
# We can't call get_typing_handler here because there's a cycle:
|
||||
@@ -427,7 +432,7 @@ class TypingNotificationEventSource:
|
||||
#
|
||||
self.get_typing_handler = hs.get_typing_handler
|
||||
|
||||
def _make_event_for(self, room_id):
|
||||
def _make_event_for(self, room_id: str) -> JsonDict:
|
||||
typing = self.get_typing_handler()._room_typing[room_id]
|
||||
return {
|
||||
"type": "m.typing",
|
||||
@@ -462,7 +467,9 @@ class TypingNotificationEventSource:
|
||||
|
||||
return (events, handler._latest_room_serial)
|
||||
|
||||
async def get_new_events(self, from_key, room_ids, **kwargs):
|
||||
async def get_new_events(
|
||||
self, from_key: int, room_ids: Iterable[str], **kwargs
|
||||
) -> Tuple[List[JsonDict], int]:
|
||||
with Measure(self.clock, "typing.get_new_events"):
|
||||
from_key = int(from_key)
|
||||
handler = self.get_typing_handler()
|
||||
@@ -478,5 +485,5 @@ class TypingNotificationEventSource:
|
||||
|
||||
return (events, handler._latest_room_serial)
|
||||
|
||||
def get_current_key(self):
|
||||
def get_current_key(self) -> int:
|
||||
return self.get_typing_handler()._latest_room_serial
|
||||
|
||||
@@ -145,10 +145,6 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
if self.pos is None:
|
||||
self.pos = await self.store.get_user_directory_stream_pos()
|
||||
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "user_dir_delta"):
|
||||
@@ -233,6 +229,11 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||
|
||||
if change: # The user joined
|
||||
event = await self.store.get_event(event_id, allow_none=True)
|
||||
# It isn't expected for this event to not exist, but we
|
||||
# don't want the entire background process to break.
|
||||
if event is None:
|
||||
continue
|
||||
|
||||
profile = ProfileInfo(
|
||||
avatar_url=event.content.get("avatar_url"),
|
||||
display_name=event.content.get("displayname"),
|
||||
|
||||
+36
-8
@@ -22,10 +22,22 @@ import types
|
||||
import urllib
|
||||
from http import HTTPStatus
|
||||
from io import BytesIO
|
||||
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
Iterator,
|
||||
List,
|
||||
Pattern,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
||||
import jinja2
|
||||
from canonicaljson import iterencode_canonical_json
|
||||
from typing_extensions import Protocol
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer, interfaces
|
||||
@@ -168,11 +180,25 @@ def wrap_async_request_handler(h):
|
||||
return preserve_fn(wrapped_async_request_handler)
|
||||
|
||||
|
||||
class HttpServer:
|
||||
# Type of a callback method for processing requests
|
||||
# it is actually called with a SynapseRequest and a kwargs dict for the params,
|
||||
# but I can't figure out how to represent that.
|
||||
ServletCallback = Callable[
|
||||
..., Union[None, Awaitable[None], Tuple[int, Any], Awaitable[Tuple[int, Any]]]
|
||||
]
|
||||
|
||||
|
||||
class HttpServer(Protocol):
|
||||
""" Interface for registering callbacks on a HTTP server
|
||||
"""
|
||||
|
||||
def register_paths(self, method, path_patterns, callback):
|
||||
def register_paths(
|
||||
self,
|
||||
method: str,
|
||||
path_patterns: Iterable[Pattern],
|
||||
callback: ServletCallback,
|
||||
servlet_classname: str,
|
||||
) -> None:
|
||||
""" Register a callback that gets fired if we receive a http request
|
||||
with the given method for a path that matches the given regex.
|
||||
|
||||
@@ -180,12 +206,14 @@ class HttpServer:
|
||||
an unpacked tuple.
|
||||
|
||||
Args:
|
||||
method (str): The method to listen to.
|
||||
path_patterns (list<SRE_Pattern>): The regex used to match requests.
|
||||
callback (function): The function to fire if we receive a matched
|
||||
method: The HTTP method to listen to.
|
||||
path_patterns: The regex used to match requests.
|
||||
callback: The function to fire if we receive a matched
|
||||
request. The first argument will be the request object and
|
||||
subsequent arguments will be any matched groups from the regex.
|
||||
This should return a tuple of (code, response).
|
||||
This should return either tuple of (code, response), or None.
|
||||
servlet_classname (str): The name of the handler to be used in prometheus
|
||||
and opentracing logs.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -354,7 +382,7 @@ class JsonResource(DirectServeJsonResource):
|
||||
|
||||
def _get_handler_for_request(
|
||||
self, request: SynapseRequest
|
||||
) -> Tuple[Callable, str, Dict[str, str]]:
|
||||
) -> Tuple[ServletCallback, str, Dict[str, str]]:
|
||||
"""Finds a callback method to handle the given request.
|
||||
|
||||
Returns:
|
||||
|
||||
@@ -791,7 +791,7 @@ def tag_args(func):
|
||||
|
||||
@wraps(func)
|
||||
def _tag_args_inner(*args, **kwargs):
|
||||
argspec = inspect.getargspec(func)
|
||||
argspec = inspect.getfullargspec(func)
|
||||
for i, arg in enumerate(argspec.args[1:]):
|
||||
set_tag("ARG_" + arg, args[i])
|
||||
set_tag("args", args[len(argspec.args) :])
|
||||
|
||||
+16
-2
@@ -668,6 +668,15 @@ class Mailer:
|
||||
|
||||
|
||||
def safe_markup(raw_html: str) -> jinja2.Markup:
|
||||
"""
|
||||
Sanitise a raw HTML string to a set of allowed tags and attributes, and linkify any bare URLs.
|
||||
|
||||
Args
|
||||
raw_html: Unsafe HTML.
|
||||
|
||||
Returns:
|
||||
A Markup object ready to safely use in a Jinja template.
|
||||
"""
|
||||
return jinja2.Markup(
|
||||
bleach.linkify(
|
||||
bleach.clean(
|
||||
@@ -684,8 +693,13 @@ def safe_markup(raw_html: str) -> jinja2.Markup:
|
||||
|
||||
def safe_text(raw_text: str) -> jinja2.Markup:
|
||||
"""
|
||||
Process text: treat it as HTML but escape any tags (ie. just escape the
|
||||
HTML) then linkify it.
|
||||
Sanitise text (escape any HTML tags), and then linkify any bare URLs.
|
||||
|
||||
Args
|
||||
raw_text: Unsafe text which might include HTML markup.
|
||||
|
||||
Returns:
|
||||
A Markup object ready to safely use in a Jinja template.
|
||||
"""
|
||||
return jinja2.Markup(
|
||||
bleach.linkify(bleach.clean(raw_text, tags=[], attributes={}, strip=False))
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, Optional
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import StateMap
|
||||
|
||||
@@ -63,7 +63,7 @@ async def calculate_room_name(
|
||||
m_room_name = await store.get_event(
|
||||
room_state_ids[(EventTypes.Name, "")], allow_none=True
|
||||
)
|
||||
if m_room_name and m_room_name.content and m_room_name.content["name"]:
|
||||
if m_room_name and m_room_name.content and m_room_name.content.get("name"):
|
||||
return m_room_name.content["name"]
|
||||
|
||||
# does it have a canonical alias?
|
||||
@@ -74,15 +74,11 @@ async def calculate_room_name(
|
||||
if (
|
||||
canon_alias
|
||||
and canon_alias.content
|
||||
and canon_alias.content["alias"]
|
||||
and canon_alias.content.get("alias")
|
||||
and _looks_like_an_alias(canon_alias.content["alias"])
|
||||
):
|
||||
return canon_alias.content["alias"]
|
||||
|
||||
# at this point we're going to need to search the state by all state keys
|
||||
# for an event type, so rearrange the data structure
|
||||
room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
|
||||
|
||||
if not fallback_to_members:
|
||||
return None
|
||||
|
||||
@@ -94,7 +90,7 @@ async def calculate_room_name(
|
||||
|
||||
if (
|
||||
my_member_event is not None
|
||||
and my_member_event.content["membership"] == "invite"
|
||||
and my_member_event.content.get("membership") == Membership.INVITE
|
||||
):
|
||||
if (EventTypes.Member, my_member_event.sender) in room_state_ids:
|
||||
inviter_member_event = await store.get_event(
|
||||
@@ -111,6 +107,10 @@ async def calculate_room_name(
|
||||
else:
|
||||
return "Room Invite"
|
||||
|
||||
# at this point we're going to need to search the state by all state keys
|
||||
# for an event type, so rearrange the data structure
|
||||
room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
|
||||
|
||||
# we're going to have to generate a name based on who's in the room,
|
||||
# so find out who is in the room that isn't the user.
|
||||
if EventTypes.Member in room_state_bytype_ids:
|
||||
@@ -120,8 +120,8 @@ async def calculate_room_name(
|
||||
all_members = [
|
||||
ev
|
||||
for ev in member_events.values()
|
||||
if ev.content["membership"] == "join"
|
||||
or ev.content["membership"] == "invite"
|
||||
if ev.content.get("membership") == Membership.JOIN
|
||||
or ev.content.get("membership") == Membership.INVITE
|
||||
]
|
||||
# Sort the member events oldest-first so the we name people in the
|
||||
# order the joined (it should at least be deterministic rather than
|
||||
@@ -194,11 +194,7 @@ def descriptor_from_member_events(member_events: Iterable[EventBase]) -> str:
|
||||
|
||||
|
||||
def name_from_member_event(member_event: EventBase) -> str:
|
||||
if (
|
||||
member_event.content
|
||||
and "displayname" in member_event.content
|
||||
and member_event.content["displayname"]
|
||||
):
|
||||
if member_event.content and member_event.content.get("displayname"):
|
||||
return member_event.content["displayname"]
|
||||
return member_event.state_key
|
||||
|
||||
|
||||
@@ -86,8 +86,8 @@ REQUIREMENTS = [
|
||||
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
"matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"],
|
||||
# we use execute_batch, which arrived in psycopg 2.7.
|
||||
"postgres": ["psycopg2>=2.7"],
|
||||
# we use execute_values with the fetch param, which arrived in psycopg 2.8.
|
||||
"postgres": ["psycopg2>=2.8"],
|
||||
# ACME support is required to provision TLS certificates from authorities
|
||||
# that use the protocol, such as Let's Encrypt.
|
||||
"acme": [
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import json_decoder, json_encoder
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
set_counter = Counter(
|
||||
"synapse_external_cache_set",
|
||||
"Number of times we set a cache",
|
||||
labelnames=["cache_name"],
|
||||
)
|
||||
|
||||
get_counter = Counter(
|
||||
"synapse_external_cache_get",
|
||||
"Number of times we get a cache",
|
||||
labelnames=["cache_name", "hit"],
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExternalCache:
|
||||
"""A cache backed by an external Redis. Does nothing if no Redis is
|
||||
configured.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._redis_connection = hs.get_outbound_redis_connection()
|
||||
|
||||
def _get_redis_key(self, cache_name: str, key: str) -> str:
|
||||
return "cache_v1:%s:%s" % (cache_name, key)
|
||||
|
||||
def is_enabled(self) -> bool:
|
||||
"""Whether the external cache is used or not.
|
||||
|
||||
It's safe to use the cache when this returns false, the methods will
|
||||
just no-op, but the function is useful to avoid doing unnecessary work.
|
||||
"""
|
||||
return self._redis_connection is not None
|
||||
|
||||
async def set(self, cache_name: str, key: str, value: Any, expiry_ms: int) -> None:
|
||||
"""Add the key/value to the named cache, with the expiry time given.
|
||||
"""
|
||||
|
||||
if self._redis_connection is None:
|
||||
return
|
||||
|
||||
set_counter.labels(cache_name).inc()
|
||||
|
||||
# txredisapi requires the value to be string, bytes or numbers, so we
|
||||
# encode stuff in JSON.
|
||||
encoded_value = json_encoder.encode(value)
|
||||
|
||||
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
|
||||
|
||||
return await make_deferred_yieldable(
|
||||
self._redis_connection.set(
|
||||
self._get_redis_key(cache_name, key), encoded_value, pexpire=expiry_ms,
|
||||
)
|
||||
)
|
||||
|
||||
async def get(self, cache_name: str, key: str) -> Optional[Any]:
|
||||
"""Look up a key/value in the named cache.
|
||||
"""
|
||||
|
||||
if self._redis_connection is None:
|
||||
return None
|
||||
|
||||
result = await make_deferred_yieldable(
|
||||
self._redis_connection.get(self._get_redis_key(cache_name, key))
|
||||
)
|
||||
|
||||
logger.debug("Got cache result %s %s: %r", cache_name, key, result)
|
||||
|
||||
get_counter.labels(cache_name, result is not None).inc()
|
||||
|
||||
if not result:
|
||||
return None
|
||||
|
||||
# For some reason the integers get magically converted back to integers
|
||||
if isinstance(result, int):
|
||||
return result
|
||||
|
||||
return json_decoder.decode(result)
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Awaitable,
|
||||
Dict,
|
||||
@@ -63,6 +64,9 @@ from synapse.replication.tcp.streams import (
|
||||
TypingStream,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -88,7 +92,7 @@ class ReplicationCommandHandler:
|
||||
back out to connections.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._replication_data_handler = hs.get_replication_data_handler()
|
||||
self._presence_handler = hs.get_presence_handler()
|
||||
self._store = hs.get_datastore()
|
||||
@@ -282,13 +286,6 @@ class ReplicationCommandHandler:
|
||||
if hs.config.redis.redis_enabled:
|
||||
from synapse.replication.tcp.redis import (
|
||||
RedisDirectTcpReplicationClientFactory,
|
||||
lazyConnection,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Connecting to redis (host=%r port=%r)",
|
||||
hs.config.redis_host,
|
||||
hs.config.redis_port,
|
||||
)
|
||||
|
||||
# First let's ensure that we have a ReplicationStreamer started.
|
||||
@@ -299,13 +296,7 @@ class ReplicationCommandHandler:
|
||||
# connection after SUBSCRIBE is called).
|
||||
|
||||
# First create the connection for sending commands.
|
||||
outbound_redis_connection = lazyConnection(
|
||||
reactor=hs.get_reactor(),
|
||||
host=hs.config.redis_host,
|
||||
port=hs.config.redis_port,
|
||||
password=hs.config.redis.redis_password,
|
||||
reconnect=True,
|
||||
)
|
||||
outbound_redis_connection = hs.get_outbound_redis_connection()
|
||||
|
||||
# Now create the factory/connection for the subscription stream.
|
||||
self._factory = RedisDirectTcpReplicationClientFactory(
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
import logging
|
||||
from inspect import isawaitable
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
from typing import TYPE_CHECKING, Optional, Type, cast
|
||||
|
||||
import txredisapi
|
||||
|
||||
@@ -23,6 +23,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
BackgroundProcessLoggingContext,
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.replication.tcp.commands import (
|
||||
Command,
|
||||
@@ -59,16 +60,16 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
|
||||
immediately after initialisation.
|
||||
|
||||
Attributes:
|
||||
handler: The command handler to handle incoming commands.
|
||||
stream_name: The *redis* stream name to subscribe to and publish from
|
||||
(not anything to do with Synapse replication streams).
|
||||
outbound_redis_connection: The connection to redis to use to send
|
||||
synapse_handler: The command handler to handle incoming commands.
|
||||
synapse_stream_name: The *redis* stream name to subscribe to and publish
|
||||
from (not anything to do with Synapse replication streams).
|
||||
synapse_outbound_redis_connection: The connection to redis to use to send
|
||||
commands.
|
||||
"""
|
||||
|
||||
handler = None # type: ReplicationCommandHandler
|
||||
stream_name = None # type: str
|
||||
outbound_redis_connection = None # type: txredisapi.RedisProtocol
|
||||
synapse_handler = None # type: ReplicationCommandHandler
|
||||
synapse_stream_name = None # type: str
|
||||
synapse_outbound_redis_connection = None # type: txredisapi.RedisProtocol
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@@ -88,19 +89,19 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
|
||||
# it's important to make sure that we only send the REPLICATE command once we
|
||||
# have successfully subscribed to the stream - otherwise we might miss the
|
||||
# POSITION response sent back by the other end.
|
||||
logger.info("Sending redis SUBSCRIBE for %s", self.stream_name)
|
||||
await make_deferred_yieldable(self.subscribe(self.stream_name))
|
||||
logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
|
||||
await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
|
||||
logger.info(
|
||||
"Successfully subscribed to redis stream, sending REPLICATE command"
|
||||
)
|
||||
self.handler.new_connection(self)
|
||||
self.synapse_handler.new_connection(self)
|
||||
await self._async_send_command(ReplicateCommand())
|
||||
logger.info("REPLICATE successfully sent")
|
||||
|
||||
# We send out our positions when there is a new connection in case the
|
||||
# other side missed updates. We do this for Redis connections as the
|
||||
# otherside won't know we've connected and so won't issue a REPLICATE.
|
||||
self.handler.send_positions_to_connection(self)
|
||||
self.synapse_handler.send_positions_to_connection(self)
|
||||
|
||||
def messageReceived(self, pattern: str, channel: str, message: str):
|
||||
"""Received a message from redis.
|
||||
@@ -137,7 +138,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
|
||||
cmd: received command
|
||||
"""
|
||||
|
||||
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
|
||||
cmd_func = getattr(self.synapse_handler, "on_%s" % (cmd.NAME,), None)
|
||||
if not cmd_func:
|
||||
logger.warning("Unhandled command: %r", cmd)
|
||||
return
|
||||
@@ -155,7 +156,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
|
||||
def connectionLost(self, reason):
|
||||
logger.info("Lost connection to redis")
|
||||
super().connectionLost(reason)
|
||||
self.handler.lost_connection(self)
|
||||
self.synapse_handler.lost_connection(self)
|
||||
|
||||
# mark the logging context as finished
|
||||
self._logging_context.__exit__(None, None, None)
|
||||
@@ -183,11 +184,54 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
|
||||
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
|
||||
|
||||
await make_deferred_yieldable(
|
||||
self.outbound_redis_connection.publish(self.stream_name, encoded_string)
|
||||
self.synapse_outbound_redis_connection.publish(
|
||||
self.synapse_stream_name, encoded_string
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
|
||||
class SynapseRedisFactory(txredisapi.RedisFactory):
|
||||
"""A subclass of RedisFactory that periodically sends pings to ensure that
|
||||
we detect dead connections.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hs: "HomeServer",
|
||||
uuid: str,
|
||||
dbid: Optional[int],
|
||||
poolsize: int,
|
||||
isLazy: bool = False,
|
||||
handler: Type = txredisapi.ConnectionHandler,
|
||||
charset: str = "utf-8",
|
||||
password: Optional[str] = None,
|
||||
replyTimeout: int = 30,
|
||||
convertNumbers: Optional[int] = True,
|
||||
):
|
||||
super().__init__(
|
||||
uuid=uuid,
|
||||
dbid=dbid,
|
||||
poolsize=poolsize,
|
||||
isLazy=isLazy,
|
||||
handler=handler,
|
||||
charset=charset,
|
||||
password=password,
|
||||
replyTimeout=replyTimeout,
|
||||
convertNumbers=convertNumbers,
|
||||
)
|
||||
|
||||
hs.get_clock().looping_call(self._send_ping, 30 * 1000)
|
||||
|
||||
@wrap_as_background_process("redis_ping")
|
||||
async def _send_ping(self):
|
||||
for connection in self.pool:
|
||||
try:
|
||||
await make_deferred_yieldable(connection.ping())
|
||||
except Exception:
|
||||
logger.warning("Failed to send ping to a redis connection")
|
||||
|
||||
|
||||
class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
|
||||
"""This is a reconnecting factory that connects to redis and immediately
|
||||
subscribes to a stream.
|
||||
|
||||
@@ -206,65 +250,62 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
|
||||
self, hs: "HomeServer", outbound_redis_connection: txredisapi.RedisProtocol
|
||||
):
|
||||
|
||||
super().__init__()
|
||||
super().__init__(
|
||||
hs,
|
||||
uuid="subscriber",
|
||||
dbid=None,
|
||||
poolsize=1,
|
||||
replyTimeout=30,
|
||||
password=hs.config.redis.redis_password,
|
||||
)
|
||||
|
||||
# This sets the password on the RedisFactory base class (as
|
||||
# SubscriberFactory constructor doesn't pass it through).
|
||||
self.password = hs.config.redis.redis_password
|
||||
self.synapse_handler = hs.get_tcp_replication()
|
||||
self.synapse_stream_name = hs.hostname
|
||||
|
||||
self.handler = hs.get_tcp_replication()
|
||||
self.stream_name = hs.hostname
|
||||
|
||||
self.outbound_redis_connection = outbound_redis_connection
|
||||
self.synapse_outbound_redis_connection = outbound_redis_connection
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
p = super().buildProtocol(addr) # type: RedisSubscriber
|
||||
p = super().buildProtocol(addr)
|
||||
p = cast(RedisSubscriber, p)
|
||||
|
||||
# We do this here rather than add to the constructor of `RedisSubcriber`
|
||||
# as to do so would involve overriding `buildProtocol` entirely, however
|
||||
# the base method does some other things than just instantiating the
|
||||
# protocol.
|
||||
p.handler = self.handler
|
||||
p.outbound_redis_connection = self.outbound_redis_connection
|
||||
p.stream_name = self.stream_name
|
||||
p.password = self.password
|
||||
p.synapse_handler = self.synapse_handler
|
||||
p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
|
||||
p.synapse_stream_name = self.synapse_stream_name
|
||||
|
||||
return p
|
||||
|
||||
|
||||
def lazyConnection(
|
||||
reactor,
|
||||
hs: "HomeServer",
|
||||
host: str = "localhost",
|
||||
port: int = 6379,
|
||||
dbid: Optional[int] = None,
|
||||
reconnect: bool = True,
|
||||
charset: str = "utf-8",
|
||||
password: Optional[str] = None,
|
||||
connectTimeout: Optional[int] = None,
|
||||
replyTimeout: Optional[int] = None,
|
||||
convertNumbers: bool = True,
|
||||
replyTimeout: int = 30,
|
||||
) -> txredisapi.RedisProtocol:
|
||||
"""Equivalent to `txredisapi.lazyConnection`, except allows specifying a
|
||||
reactor.
|
||||
"""Creates a connection to Redis that is lazily set up and reconnects if the
|
||||
connections is lost.
|
||||
"""
|
||||
|
||||
isLazy = True
|
||||
poolsize = 1
|
||||
|
||||
uuid = "%s:%d" % (host, port)
|
||||
factory = txredisapi.RedisFactory(
|
||||
uuid,
|
||||
dbid,
|
||||
poolsize,
|
||||
isLazy,
|
||||
txredisapi.ConnectionHandler,
|
||||
charset,
|
||||
password,
|
||||
replyTimeout,
|
||||
convertNumbers,
|
||||
factory = SynapseRedisFactory(
|
||||
hs,
|
||||
uuid=uuid,
|
||||
dbid=dbid,
|
||||
poolsize=1,
|
||||
isLazy=True,
|
||||
handler=txredisapi.ConnectionHandler,
|
||||
password=password,
|
||||
replyTimeout=replyTimeout,
|
||||
)
|
||||
factory.continueTrying = reconnect
|
||||
for x in range(poolsize):
|
||||
reactor.connectTCP(host, port, factory, connectTimeout)
|
||||
|
||||
reactor = hs.get_reactor()
|
||||
reactor.connectTCP(host, port, factory, 30)
|
||||
|
||||
return factory.handler
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<body>
|
||||
<div>
|
||||
<p>
|
||||
We were unable to validate your <tt>{{server_name | e}}</tt> account via
|
||||
We were unable to validate your <tt>{{ server_name }}</tt> account via
|
||||
single-sign-on (SSO), because the SSO Identity Provider returned
|
||||
different details than when you logged in.
|
||||
</p>
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
<body>
|
||||
<div>
|
||||
<p>
|
||||
A client is trying to {{ description | e }}. To confirm this action,
|
||||
<a href="{{ redirect_url | e }}">re-authenticate with single sign-on</a>.
|
||||
A client is trying to {{ description }}. To confirm this action,
|
||||
<a href="{{ redirect_url }}">re-authenticate with single sign-on</a>.
|
||||
If you did not expect this, your account may be compromised!
|
||||
</p>
|
||||
</div>
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
<p>
|
||||
There was an error during authentication:
|
||||
</p>
|
||||
<div id="errormsg" style="margin:20px 80px">{{ error_description | e }}</div>
|
||||
<div id="errormsg" style="margin:20px 80px">{{ error_description }}</div>
|
||||
<p>
|
||||
If you are seeing this page after clicking a link sent to you via email, make
|
||||
sure you only click the confirmation link once, and that you open the
|
||||
|
||||
@@ -3,22 +3,22 @@
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<link rel="stylesheet" href="/_matrix/static/client/login/style.css">
|
||||
<title>{{server_name | e}} Login</title>
|
||||
<title>{{ server_name }} Login</title>
|
||||
</head>
|
||||
<body>
|
||||
<div id="container">
|
||||
<h1 id="title">{{server_name | e}} Login</h1>
|
||||
<h1 id="title">{{ server_name }} Login</h1>
|
||||
<div class="login_flow">
|
||||
<p>Choose one of the following identity providers:</p>
|
||||
<form>
|
||||
<input type="hidden" name="redirectUrl" value="{{redirect_url | e}}">
|
||||
<input type="hidden" name="redirectUrl" value="{{ redirect_url }}">
|
||||
<ul class="radiobuttons">
|
||||
{% for p in providers %}
|
||||
<li>
|
||||
<input type="radio" name="idp" id="prov{{loop.index}}" value="{{p.idp_id}}">
|
||||
<label for="prov{{loop.index}}">{{p.idp_name | e}}</label>
|
||||
<input type="radio" name="idp" id="prov{{ loop.index }}" value="{{ p.idp_id }}">
|
||||
<label for="prov{{ loop.index }}">{{ p.idp_name }}</label>
|
||||
{% if p.idp_icon %}
|
||||
<img src="{{p.idp_icon | mxc_to_http(32, 32)}}"/>
|
||||
<img src="{{ p.idp_icon | mxc_to_http(32, 32) }}"/>
|
||||
{% endif %}
|
||||
</li>
|
||||
{% endfor %}
|
||||
|
||||
@@ -5,10 +5,10 @@
|
||||
<title>SSO redirect confirmation</title>
|
||||
</head>
|
||||
<body>
|
||||
<p>The application at <span style="font-weight:bold">{{ display_url | e }}</span> is requesting full access to your <span style="font-weight:bold">{{ server_name }}</span> Matrix account.</p>
|
||||
<p>The application at <span style="font-weight:bold">{{ display_url }}</span> is requesting full access to your <span style="font-weight:bold">{{ server_name }}</span> Matrix account.</p>
|
||||
<p>If you don't recognise this address, you should ignore this and close this tab.</p>
|
||||
<p>
|
||||
<a href="{{ redirect_url | e }}">I trust this address</a>
|
||||
<a href="{{ redirect_url }}">I trust this address</a>
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
||||
</html>
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018-2019 New Vector Ltd
|
||||
# Copyright 2020, 2021 The Matrix.org Foundation C.I.C.
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -36,6 +38,7 @@ from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_medi
|
||||
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
|
||||
from synapse.rest.admin.rooms import (
|
||||
DeleteRoomRestServlet,
|
||||
ForwardExtremitiesRestServlet,
|
||||
JoinRoomAliasServlet,
|
||||
ListRoomRestServlet,
|
||||
MakeRoomAdminRestServlet,
|
||||
@@ -51,6 +54,7 @@ from synapse.rest.admin.users import (
|
||||
PushersRestServlet,
|
||||
ResetPasswordRestServlet,
|
||||
SearchUsersRestServlet,
|
||||
ShadowBanRestServlet,
|
||||
UserAdminServlet,
|
||||
UserMediaRestServlet,
|
||||
UserMembershipRestServlet,
|
||||
@@ -230,6 +234,8 @@ def register_servlets(hs, http_server):
|
||||
EventReportsRestServlet(hs).register(http_server)
|
||||
PushersRestServlet(hs).register(http_server)
|
||||
MakeRoomAdminRestServlet(hs).register(http_server)
|
||||
ShadowBanRestServlet(hs).register(http_server)
|
||||
ForwardExtremitiesRestServlet(hs).register(http_server)
|
||||
|
||||
|
||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -431,7 +431,17 @@ class MakeRoomAdminRestServlet(RestServlet):
|
||||
if not admin_users:
|
||||
raise SynapseError(400, "No local admin user in room")
|
||||
|
||||
admin_user_id = admin_users[-1]
|
||||
admin_user_id = None
|
||||
|
||||
for admin_user in reversed(admin_users):
|
||||
if room_state.get((EventTypes.Member, admin_user)):
|
||||
admin_user_id = admin_user
|
||||
break
|
||||
|
||||
if not admin_user_id:
|
||||
raise SynapseError(
|
||||
400, "No local admin user in room",
|
||||
)
|
||||
|
||||
pl_content = power_levels.content
|
||||
else:
|
||||
@@ -499,3 +509,60 @@ class MakeRoomAdminRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
return 200, {}
|
||||
|
||||
|
||||
class ForwardExtremitiesRestServlet(RestServlet):
|
||||
"""Allows a server admin to get or clear forward extremities.
|
||||
|
||||
Clearing does not require restarting the server.
|
||||
|
||||
Clear forward extremities:
|
||||
DELETE /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
|
||||
|
||||
Get forward_extremities:
|
||||
GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/forward_extremities")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.room_member_handler = hs.get_room_member_handler()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
async def resolve_room_id(self, room_identifier: str) -> str:
|
||||
"""Resolve to a room ID, if necessary."""
|
||||
if RoomID.is_valid(room_identifier):
|
||||
resolved_room_id = room_identifier
|
||||
elif RoomAlias.is_valid(room_identifier):
|
||||
room_alias = RoomAlias.from_string(room_identifier)
|
||||
room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias)
|
||||
resolved_room_id = room_id.to_string()
|
||||
else:
|
||||
raise SynapseError(
|
||||
400, "%s was not legal room ID or room alias" % (room_identifier,)
|
||||
)
|
||||
if not resolved_room_id:
|
||||
raise SynapseError(
|
||||
400, "Unknown room ID or room alias %s" % room_identifier
|
||||
)
|
||||
return resolved_room_id
|
||||
|
||||
async def on_DELETE(self, request, room_identifier):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
room_id = await self.resolve_room_id(room_identifier)
|
||||
|
||||
deleted_count = await self.store.delete_forward_extremities_for_room(room_id)
|
||||
return 200, {"deleted": deleted_count}
|
||||
|
||||
async def on_GET(self, request, room_identifier):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
await assert_user_is_admin(self.auth, requester.user)
|
||||
|
||||
room_id = await self.resolve_room_id(room_identifier)
|
||||
|
||||
extremities = await self.store.get_forward_extremities_for_room(room_id)
|
||||
return 200, {"count": len(extremities), "results": extremities}
|
||||
|
||||
@@ -890,3 +890,39 @@ class UserTokenRestServlet(RestServlet):
|
||||
)
|
||||
|
||||
return 200, {"access_token": token}
|
||||
|
||||
|
||||
class ShadowBanRestServlet(RestServlet):
|
||||
"""An admin API for shadow-banning a user.
|
||||
|
||||
A shadow-banned users receives successful responses to their client-server
|
||||
API requests, but the events are not propagated into rooms.
|
||||
|
||||
Shadow-banning a user should be used as a tool of last resort and may lead
|
||||
to confusing or broken behaviour for the client.
|
||||
|
||||
Example:
|
||||
|
||||
POST /_synapse/admin/v1/users/@test:example.com/shadow_ban
|
||||
{}
|
||||
|
||||
200 OK
|
||||
{}
|
||||
"""
|
||||
|
||||
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/shadow_ban")
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
async def on_POST(self, request, user_id):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
if not self.hs.is_mine_id(user_id):
|
||||
raise SynapseError(400, "Only local users can be shadow-banned")
|
||||
|
||||
await self.store.set_shadow_banned(UserID.from_string(user_id), True)
|
||||
|
||||
return 200, {}
|
||||
|
||||
@@ -19,7 +19,8 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Optional
|
||||
from synapse.api.errors import Codes, LoginError, SynapseError
|
||||
from synapse.api.ratelimiting import Ratelimiter
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.http.server import finish_request
|
||||
from synapse.handlers.sso import SsoIdentityProvider
|
||||
from synapse.http.server import HttpServer, finish_request
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
parse_json_object_from_request,
|
||||
@@ -60,11 +61,14 @@ class LoginRestServlet(RestServlet):
|
||||
self.saml2_enabled = hs.config.saml2_enabled
|
||||
self.cas_enabled = hs.config.cas_enabled
|
||||
self.oidc_enabled = hs.config.oidc_enabled
|
||||
self._msc2858_enabled = hs.config.experimental.msc2858_enabled
|
||||
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
self.auth_handler = self.hs.get_auth_handler()
|
||||
self.registration_handler = hs.get_registration_handler()
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
|
||||
self._well_known_builder = WellKnownBuilder(hs)
|
||||
self._address_ratelimiter = Ratelimiter(
|
||||
clock=hs.get_clock(),
|
||||
@@ -89,8 +93,17 @@ class LoginRestServlet(RestServlet):
|
||||
flows.append({"type": LoginRestServlet.CAS_TYPE})
|
||||
|
||||
if self.cas_enabled or self.saml2_enabled or self.oidc_enabled:
|
||||
flows.append({"type": LoginRestServlet.SSO_TYPE})
|
||||
# While its valid for us to advertise this login type generally,
|
||||
sso_flow = {"type": LoginRestServlet.SSO_TYPE} # type: JsonDict
|
||||
|
||||
if self._msc2858_enabled:
|
||||
sso_flow["org.matrix.msc2858.identity_providers"] = [
|
||||
_get_auth_flow_dict_for_idp(idp)
|
||||
for idp in self._sso_handler.get_identity_providers().values()
|
||||
]
|
||||
|
||||
flows.append(sso_flow)
|
||||
|
||||
# While it's valid for us to advertise this login type generally,
|
||||
# synapse currently only gives out these tokens as part of the
|
||||
# SSO login flow.
|
||||
# Generally we don't want to advertise login flows that clients
|
||||
@@ -311,8 +324,20 @@ class LoginRestServlet(RestServlet):
|
||||
return result
|
||||
|
||||
|
||||
def _get_auth_flow_dict_for_idp(idp: SsoIdentityProvider) -> JsonDict:
|
||||
"""Return an entry for the login flow dict
|
||||
|
||||
Returns an entry suitable for inclusion in "identity_providers" in the
|
||||
response to GET /_matrix/client/r0/login
|
||||
"""
|
||||
e = {"id": idp.idp_id, "name": idp.idp_name} # type: JsonDict
|
||||
if idp.idp_icon:
|
||||
e["icon"] = idp.idp_icon
|
||||
return e
|
||||
|
||||
|
||||
class SsoRedirectServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/login/(cas|sso)/redirect", v1=True)
|
||||
PATTERNS = client_patterns("/login/(cas|sso)/redirect$", v1=True)
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
# make sure that the relevant handlers are instantiated, so that they
|
||||
@@ -324,13 +349,31 @@ class SsoRedirectServlet(RestServlet):
|
||||
if hs.config.oidc_enabled:
|
||||
hs.get_oidc_handler()
|
||||
self._sso_handler = hs.get_sso_handler()
|
||||
self._msc2858_enabled = hs.config.experimental.msc2858_enabled
|
||||
|
||||
async def on_GET(self, request: SynapseRequest):
|
||||
def register(self, http_server: HttpServer) -> None:
|
||||
super().register(http_server)
|
||||
if self._msc2858_enabled:
|
||||
# expose additional endpoint for MSC2858 support
|
||||
http_server.register_paths(
|
||||
"GET",
|
||||
client_patterns(
|
||||
"/org.matrix.msc2858/login/sso/redirect/(?P<idp_id>[A-Za-z0-9_.~-]+)$",
|
||||
releases=(),
|
||||
unstable=True,
|
||||
),
|
||||
self.on_GET,
|
||||
self.__class__.__name__,
|
||||
)
|
||||
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, idp_id: Optional[str] = None
|
||||
) -> None:
|
||||
client_redirect_url = parse_string(
|
||||
request, "redirectUrl", required=True, encoding=None
|
||||
)
|
||||
sso_url = await self._sso_handler.handle_redirect_request(
|
||||
request, client_redirect_url
|
||||
request, client_redirect_url, idp_id,
|
||||
)
|
||||
logger.info("Redirecting to %s", sso_url)
|
||||
request.redirect(sso_url)
|
||||
|
||||
@@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
|
||||
class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/account/password/email/requestToken$")
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self.hs = hs
|
||||
self.datastore = hs.get_datastore()
|
||||
@@ -103,6 +103,8 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
|
||||
self.identity_handler.ratelimit_request_token_requests(request, "email", email)
|
||||
|
||||
# The email will be sent to the stored address.
|
||||
# This avoids a potential account hijack by requesting a password reset to
|
||||
# an email address which is controlled by the attacker but which, after
|
||||
@@ -379,6 +381,8 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
self.identity_handler.ratelimit_request_token_requests(request, "email", email)
|
||||
|
||||
if next_link:
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
@@ -430,7 +434,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
class MsisdnThreepidRequestTokenRestServlet(RestServlet):
|
||||
PATTERNS = client_patterns("/account/3pid/msisdn/requestToken$")
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
super().__init__()
|
||||
self.store = self.hs.get_datastore()
|
||||
@@ -458,6 +462,10 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
|
||||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
self.identity_handler.ratelimit_request_token_requests(
|
||||
request, "msisdn", msisdn
|
||||
)
|
||||
|
||||
if next_link:
|
||||
# Raise if the provided next_link value isn't valid
|
||||
assert_valid_next_link(self.hs, next_link)
|
||||
|
||||
@@ -126,6 +126,8 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
|
||||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
self.identity_handler.ratelimit_request_token_requests(request, "email", email)
|
||||
|
||||
existing_user_id = await self.hs.get_datastore().get_user_id_by_threepid(
|
||||
"email", email
|
||||
)
|
||||
@@ -205,6 +207,10 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
|
||||
Codes.THREEPID_DENIED,
|
||||
)
|
||||
|
||||
self.identity_handler.ratelimit_request_token_requests(
|
||||
request, "msisdn", msisdn
|
||||
)
|
||||
|
||||
existing_user_id = await self.hs.get_datastore().get_user_id_by_threepid(
|
||||
"msisdn", msisdn
|
||||
)
|
||||
|
||||
@@ -386,7 +386,7 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
"""
|
||||
Check whether the URL should be downloaded as oEmbed content instead.
|
||||
|
||||
Params:
|
||||
Args:
|
||||
url: The URL to check.
|
||||
|
||||
Returns:
|
||||
@@ -403,7 +403,7 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
"""
|
||||
Request content from an oEmbed endpoint.
|
||||
|
||||
Params:
|
||||
Args:
|
||||
endpoint: The oEmbed API endpoint.
|
||||
url: The URL to pass to the API.
|
||||
|
||||
@@ -692,27 +692,51 @@ class PreviewUrlResource(DirectServeJsonResource):
|
||||
def decode_and_calc_og(
|
||||
body: bytes, media_uri: str, request_encoding: Optional[str] = None
|
||||
) -> Dict[str, Optional[str]]:
|
||||
"""
|
||||
Calculate metadata for an HTML document.
|
||||
|
||||
This uses lxml to parse the HTML document into the OG response. If errors
|
||||
occur during processing of the document, an empty response is returned.
|
||||
|
||||
Args:
|
||||
body: The HTML document, as bytes.
|
||||
media_url: The URI used to download the body.
|
||||
request_encoding: The character encoding of the body, as a string.
|
||||
|
||||
Returns:
|
||||
The OG response as a dictionary.
|
||||
"""
|
||||
# If there's no body, nothing useful is going to be found.
|
||||
if not body:
|
||||
return {}
|
||||
|
||||
from lxml import etree
|
||||
|
||||
# Create an HTML parser. If this fails, log and return no metadata.
|
||||
try:
|
||||
parser = etree.HTMLParser(recover=True, encoding=request_encoding)
|
||||
tree = etree.fromstring(body, parser)
|
||||
og = _calc_og(tree, media_uri)
|
||||
except LookupError:
|
||||
# blindly consider the encoding as utf-8.
|
||||
parser = etree.HTMLParser(recover=True, encoding="utf-8")
|
||||
except Exception as e:
|
||||
logger.warning("Unable to create HTML parser: %s" % (e,))
|
||||
return {}
|
||||
|
||||
def _attempt_calc_og(body_attempt: Union[bytes, str]) -> Dict[str, Optional[str]]:
|
||||
# Attempt to parse the body. If this fails, log and return no metadata.
|
||||
tree = etree.fromstring(body_attempt, parser)
|
||||
return _calc_og(tree, media_uri)
|
||||
|
||||
# Attempt to parse the body. If this fails, log and return no metadata.
|
||||
try:
|
||||
return _attempt_calc_og(body)
|
||||
except UnicodeDecodeError:
|
||||
# blindly try decoding the body as utf-8, which seems to fix
|
||||
# the charset mismatches on https://google.com
|
||||
parser = etree.HTMLParser(recover=True, encoding=request_encoding)
|
||||
tree = etree.fromstring(body.decode("utf-8", "ignore"), parser)
|
||||
og = _calc_og(tree, media_uri)
|
||||
|
||||
return og
|
||||
return _attempt_calc_og(body.decode("utf-8", "ignore"))
|
||||
|
||||
|
||||
def _calc_og(tree, media_uri: str) -> Dict[str, Optional[str]]:
|
||||
def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]:
|
||||
# suck our tree into lxml and define our OG response.
|
||||
|
||||
# if we see any image URLs in the OG response, then spider them
|
||||
|
||||
@@ -103,6 +103,7 @@ from synapse.notifier import Notifier
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.push.pusherpool import PusherPool
|
||||
from synapse.replication.tcp.client import ReplicationDataHandler
|
||||
from synapse.replication.tcp.external_cache import ExternalCache
|
||||
from synapse.replication.tcp.handler import ReplicationCommandHandler
|
||||
from synapse.replication.tcp.resource import ReplicationStreamer
|
||||
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
|
||||
@@ -128,6 +129,8 @@ from synapse.util.stringutils import random_string
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from txredisapi import RedisProtocol
|
||||
|
||||
from synapse.handlers.oidc_handler import OidcHandler
|
||||
from synapse.handlers.saml_handler import SamlHandler
|
||||
|
||||
@@ -716,6 +719,33 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
def get_account_data_handler(self) -> AccountDataHandler:
|
||||
return AccountDataHandler(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_external_cache(self) -> ExternalCache:
|
||||
return ExternalCache(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]:
|
||||
if not self.config.redis.redis_enabled:
|
||||
return None
|
||||
|
||||
# We only want to import redis module if we're using it, as we have
|
||||
# `txredisapi` as an optional dependency.
|
||||
from synapse.replication.tcp.redis import lazyConnection
|
||||
|
||||
logger.info(
|
||||
"Connecting to redis (host=%r port=%r) for external cache",
|
||||
self.config.redis_host,
|
||||
self.config.redis_port,
|
||||
)
|
||||
|
||||
return lazyConnection(
|
||||
hs=self,
|
||||
host=self.config.redis_host,
|
||||
port=self.config.redis_port,
|
||||
password=self.config.redis.redis_password,
|
||||
reconnect=True,
|
||||
)
|
||||
|
||||
async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
|
||||
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
|
||||
@@ -310,6 +310,7 @@ class StateHandler:
|
||||
state_group_before_event = None
|
||||
state_group_before_event_prev_group = None
|
||||
deltas_to_state_group_before_event = None
|
||||
entry = None
|
||||
|
||||
else:
|
||||
# otherwise, we'll need to resolve the state across the prev_events.
|
||||
@@ -340,9 +341,13 @@ class StateHandler:
|
||||
current_state_ids=state_ids_before_event,
|
||||
)
|
||||
|
||||
# XXX: can we update the state cache entry for the new state group? or
|
||||
# could we set a flag on resolve_state_groups_for_events to tell it to
|
||||
# always make a state group?
|
||||
# Assign the new state group to the cached state entry.
|
||||
#
|
||||
# Note that this can race in that we could generate multiple state
|
||||
# groups for the same state entry, but that is just inefficient
|
||||
# rather than dangerous.
|
||||
if entry and entry.state_group is None:
|
||||
entry.state_group = state_group_before_event
|
||||
|
||||
#
|
||||
# now if it's not a state event, we're done
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore
|
||||
from .event_federation import EventFederationStore
|
||||
from .event_push_actions import EventPushActionsStore
|
||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||
from .events_forward_extremities import EventForwardExtremitiesStore
|
||||
from .filtering import FilteringStore
|
||||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
@@ -118,6 +119,7 @@ class DataStore(
|
||||
UIAuthStore,
|
||||
CacheInvalidationWorkerStore,
|
||||
ServerMetricsStore,
|
||||
EventForwardExtremitiesStore,
|
||||
):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
self.hs = hs
|
||||
|
||||
@@ -634,7 +634,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
|
||||
|
||||
async def get_e2e_cross_signing_keys_bulk(
|
||||
self, user_ids: List[str], from_user_id: Optional[str] = None
|
||||
) -> Dict[str, Dict[str, dict]]:
|
||||
) -> Dict[str, Optional[Dict[str, dict]]]:
|
||||
"""Returns the cross-signing keys for a set of users.
|
||||
|
||||
Args:
|
||||
@@ -724,7 +724,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
|
||||
|
||||
async def claim_e2e_one_time_keys(
|
||||
self, query_list: Iterable[Tuple[str, str, str]]
|
||||
) -> Dict[str, Dict[str, Dict[str, bytes]]]:
|
||||
) -> Dict[str, Dict[str, Dict[str, str]]]:
|
||||
"""Take a list of one time keys out of the database.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Dict, List
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventForwardExtremitiesStore(SQLBaseStore):
|
||||
async def delete_forward_extremities_for_room(self, room_id: str) -> int:
|
||||
"""Delete any extra forward extremities for a room.
|
||||
|
||||
Invalidates the "get_latest_event_ids_in_room" cache if any forward
|
||||
extremities were deleted.
|
||||
|
||||
Returns count deleted.
|
||||
"""
|
||||
|
||||
def delete_forward_extremities_for_room_txn(txn):
|
||||
# First we need to get the event_id to not delete
|
||||
sql = """
|
||||
SELECT event_id FROM event_forward_extremities
|
||||
INNER JOIN events USING (room_id, event_id)
|
||||
WHERE room_id = ?
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
rows = txn.fetchall()
|
||||
try:
|
||||
event_id = rows[0][0]
|
||||
logger.debug(
|
||||
"Found event_id %s as the forward extremity to keep for room %s",
|
||||
event_id,
|
||||
room_id,
|
||||
)
|
||||
except KeyError:
|
||||
msg = "No forward extremity event found for room %s" % room_id
|
||||
logger.warning(msg)
|
||||
raise SynapseError(400, msg)
|
||||
|
||||
# Now delete the extra forward extremities
|
||||
sql = """
|
||||
DELETE FROM event_forward_extremities
|
||||
WHERE event_id != ? AND room_id = ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (event_id, room_id))
|
||||
logger.info(
|
||||
"Deleted %s extra forward extremities for room %s",
|
||||
txn.rowcount,
|
||||
room_id,
|
||||
)
|
||||
|
||||
if txn.rowcount > 0:
|
||||
# Invalidate the cache
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_latest_event_ids_in_room, (room_id,),
|
||||
)
|
||||
|
||||
return txn.rowcount
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"delete_forward_extremities_for_room",
|
||||
delete_forward_extremities_for_room_txn,
|
||||
)
|
||||
|
||||
async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]:
|
||||
"""Get list of forward extremities for a room."""
|
||||
|
||||
def get_forward_extremities_for_room_txn(txn):
|
||||
sql = """
|
||||
SELECT event_id, state_group, depth, received_ts
|
||||
FROM event_forward_extremities
|
||||
INNER JOIN event_to_state_groups USING (event_id)
|
||||
INNER JOIN events USING (room_id, event_id)
|
||||
WHERE room_id = ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
return self.db_pool.cursor_to_dict(txn)
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_forward_extremities_for_room", get_forward_extremities_for_room_txn,
|
||||
)
|
||||
@@ -344,7 +344,9 @@ class PusherStore(PusherWorkerStore):
|
||||
txn, self.get_if_user_has_pusher, (user_id,)
|
||||
)
|
||||
|
||||
self.db_pool.simple_delete_one_txn(
|
||||
# It is expected that there is exactly one pusher to delete, but
|
||||
# if it isn't there (or there are multiple) delete them all.
|
||||
self.db_pool.simple_delete_txn(
|
||||
txn,
|
||||
"pushers",
|
||||
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
|
||||
|
||||
@@ -360,6 +360,35 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
|
||||
|
||||
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
|
||||
|
||||
async def set_shadow_banned(self, user: UserID, shadow_banned: bool) -> None:
|
||||
"""Sets whether a user shadow-banned.
|
||||
|
||||
Args:
|
||||
user: user ID of the user to test
|
||||
shadow_banned: true iff the user is to be shadow-banned, false otherwise.
|
||||
"""
|
||||
|
||||
def set_shadow_banned_txn(txn):
|
||||
self.db_pool.simple_update_one_txn(
|
||||
txn,
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
updatevalues={"shadow_banned": shadow_banned},
|
||||
)
|
||||
# In order for this to apply immediately, clear the cache for this user.
|
||||
tokens = self.db_pool.simple_select_onecol_txn(
|
||||
txn,
|
||||
table="access_tokens",
|
||||
keyvalues={"user_id": user.to_string()},
|
||||
retcol="token",
|
||||
)
|
||||
for token in tokens:
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_by_access_token, (token,)
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
|
||||
|
||||
def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
|
||||
sql = """
|
||||
SELECT users.name as user_id,
|
||||
|
||||
@@ -24,6 +24,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
|
||||
from synapse.storage.database import DatabasePool
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||
from synapse.types import Collection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -460,7 +461,7 @@ class SearchStore(SearchBackgroundUpdateStore):
|
||||
|
||||
async def search_rooms(
|
||||
self,
|
||||
room_ids: List[str],
|
||||
room_ids: Collection[str],
|
||||
search_term: str,
|
||||
keys: List[str],
|
||||
limit,
|
||||
|
||||
@@ -15,11 +15,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from collections import Counter
|
||||
from enum import Enum
|
||||
from itertools import chain
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from typing_extensions import Counter
|
||||
|
||||
from twisted.internet.defer import DeferredLock
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
@@ -319,7 +320,9 @@ class StatsStore(StateDeltasStore):
|
||||
return slice_list
|
||||
|
||||
@cached()
|
||||
async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
|
||||
async def get_earliest_token_for_stats(
|
||||
self, stats_type: str, id: str
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
@@ -339,7 +342,7 @@ class StatsStore(StateDeltasStore):
|
||||
)
|
||||
|
||||
async def bulk_update_stats_delta(
|
||||
self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
|
||||
self, ts: int, updates: Dict[str, Dict[str, Counter[str]]], stream_id: int
|
||||
) -> None:
|
||||
"""Bulk update stats tables for a given stream_id and updates the stats
|
||||
incremental position.
|
||||
@@ -665,7 +668,7 @@ class StatsStore(StateDeltasStore):
|
||||
|
||||
async def get_changes_room_total_events_and_bytes(
|
||||
self, min_pos: int, max_pos: int
|
||||
) -> Dict[str, Dict[str, int]]:
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Fetches the counts of events in the given range of stream IDs.
|
||||
|
||||
Args:
|
||||
@@ -683,18 +686,19 @@ class StatsStore(StateDeltasStore):
|
||||
max_pos,
|
||||
)
|
||||
|
||||
def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
|
||||
def get_changes_room_total_events_and_bytes_txn(
|
||||
self, txn, low_pos: int, high_pos: int
|
||||
) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
|
||||
"""Gets the total_events and total_event_bytes counts for rooms and
|
||||
senders, in a range of stream_orderings (including backfilled events).
|
||||
|
||||
Args:
|
||||
txn
|
||||
low_pos (int): Low stream ordering
|
||||
high_pos (int): High stream ordering
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
|
||||
room and user deltas for total_events/total_event_bytes in the
|
||||
The room and user deltas for total_events/total_event_bytes in the
|
||||
format of `stats_id` -> fields
|
||||
"""
|
||||
|
||||
|
||||
@@ -540,7 +540,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||
desc="get_user_in_directory",
|
||||
)
|
||||
|
||||
async def update_user_directory_stream_pos(self, stream_id: str) -> None:
|
||||
async def update_user_directory_stream_pos(self, stream_id: int) -> None:
|
||||
await self.db_pool.simple_update_one(
|
||||
table="user_directory_stream_pos",
|
||||
keyvalues={},
|
||||
|
||||
@@ -18,6 +18,7 @@ import collections
|
||||
import inspect
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from collections import deque
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
@@ -30,6 +31,7 @@ from typing import (
|
||||
Set,
|
||||
TypeVar,
|
||||
Union,
|
||||
Deque,
|
||||
)
|
||||
|
||||
import attr
|
||||
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.internet.interfaces import IReactorTime
|
||||
from twisted.internet.interfaces import IReactorTime, IDelayedCall
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.logging.context import (
|
||||
@@ -552,3 +554,84 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
|
||||
return value
|
||||
|
||||
return DoneAwaitable(value)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class _SmootherEntry:
|
||||
scheduled_at_ms = attr.ib(type=int)
|
||||
scheduled_for_ms = attr.ib(type=int)
|
||||
defer = attr.ib(type=defer.Deferred)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class Smoother:
|
||||
_reactor = attr.ib(type=IReactorTime)
|
||||
_target_ms = attr.ib(type=int)
|
||||
|
||||
_queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
|
||||
_last_run = attr.ib(type=int, default=0)
|
||||
_next_call = attr.ib(type=Optional[IDelayedCall], default=None)
|
||||
|
||||
def _fire_next(self):
|
||||
if not self._queue:
|
||||
return
|
||||
|
||||
self._next_call = None
|
||||
|
||||
entry = self._queue.popleft()
|
||||
entry.defer.callback(None)
|
||||
|
||||
async def smooth(self) -> None:
|
||||
now = self._reactor.seconds() * 1000.0
|
||||
|
||||
if not self._queue:
|
||||
scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
|
||||
if scheduled_for_ms <= now:
|
||||
self._last_run = now
|
||||
return
|
||||
|
||||
entry = _SmootherEntry(
|
||||
scheduled_at_ms=now,
|
||||
scheduled_for_ms=scheduled_for_ms,
|
||||
defer=defer.Deferred(),
|
||||
)
|
||||
self._queue.append(entry)
|
||||
|
||||
else:
|
||||
last_entry = self._queue[-1]
|
||||
|
||||
scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
|
||||
|
||||
entry = _SmootherEntry(
|
||||
scheduled_at_ms=now,
|
||||
scheduled_for_ms=scheduled_for_ms,
|
||||
defer=defer.Deferred(),
|
||||
)
|
||||
self._queue.append(entry)
|
||||
|
||||
step = self._target_ms / (len(self._queue) + 1)
|
||||
for idx, entry in enumerate(self._queue):
|
||||
new_time = now + (idx + 1) * step
|
||||
if new_time < entry.scheduled_for_ms:
|
||||
entry.scheduled_for_ms = new_time
|
||||
|
||||
if self._next_call and not self._next_call.active:
|
||||
self._next_call.reset(
|
||||
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
|
||||
)
|
||||
else:
|
||||
self._next_call = self._reactor.callLater(
|
||||
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
|
||||
)
|
||||
|
||||
await make_deferred_yieldable(entry.defer)
|
||||
now = self._reactor.seconds() * 1000.0
|
||||
|
||||
self._last_run = now
|
||||
|
||||
if self._queue:
|
||||
self._next_call = self._reactor.callLater(
|
||||
max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next,
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
@@ -78,7 +78,7 @@ def sorted_topologically(
|
||||
if node not in degree_map:
|
||||
continue
|
||||
|
||||
for edge in edges:
|
||||
for edge in set(edges):
|
||||
if edge in degree_map:
|
||||
degree_map[node] += 1
|
||||
|
||||
|
||||
@@ -49,7 +49,8 @@ def load_module(provider: dict, config_path: Iterable[str]) -> Tuple[Type, Any]:
|
||||
module = importlib.import_module(module)
|
||||
provider_class = getattr(module, clz)
|
||||
|
||||
module_config = provider.get("config")
|
||||
# Load the module config. If None, pass an empty dictionary instead
|
||||
module_config = provider.get("config") or {}
|
||||
try:
|
||||
provider_config = provider_class.parse_config(module_config)
|
||||
except jsonschema.ValidationError as e:
|
||||
|
||||
@@ -0,0 +1,229 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Iterable, Optional, Tuple
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.push.presentable_names import calculate_room_name
|
||||
from synapse.types import StateKey, StateMap
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class MockDataStore:
|
||||
"""
|
||||
A fake data store which stores a mapping of state key to event content.
|
||||
(I.e. the state key is used as the event ID.)
|
||||
"""
|
||||
|
||||
def __init__(self, events: Iterable[Tuple[StateKey, dict]]):
|
||||
"""
|
||||
Args:
|
||||
events: A state map to event contents.
|
||||
"""
|
||||
self._events = {}
|
||||
|
||||
for i, (event_id, content) in enumerate(events):
|
||||
self._events[event_id] = FrozenEvent(
|
||||
{
|
||||
"event_id": "$event_id",
|
||||
"type": event_id[0],
|
||||
"sender": "@user:test",
|
||||
"state_key": event_id[1],
|
||||
"room_id": "#room:test",
|
||||
"content": content,
|
||||
"origin_server_ts": i,
|
||||
},
|
||||
RoomVersions.V1,
|
||||
)
|
||||
|
||||
async def get_event(
|
||||
self, event_id: StateKey, allow_none: bool = False
|
||||
) -> Optional[FrozenEvent]:
|
||||
assert allow_none, "Mock not configured for allow_none = False"
|
||||
|
||||
return self._events.get(event_id)
|
||||
|
||||
async def get_events(self, event_ids: Iterable[StateKey]):
|
||||
# This is cheating since it just returns all events.
|
||||
return self._events
|
||||
|
||||
|
||||
class PresentableNamesTestCase(unittest.HomeserverTestCase):
|
||||
USER_ID = "@test:test"
|
||||
OTHER_USER_ID = "@user:test"
|
||||
|
||||
def _calculate_room_name(
|
||||
self,
|
||||
events: StateMap[dict],
|
||||
user_id: str = "",
|
||||
fallback_to_members: bool = True,
|
||||
fallback_to_single_member: bool = True,
|
||||
):
|
||||
# This isn't 100% accurate, but works with MockDataStore.
|
||||
room_state_ids = {k[0]: k[0] for k in events}
|
||||
|
||||
return self.get_success(
|
||||
calculate_room_name(
|
||||
MockDataStore(events),
|
||||
room_state_ids,
|
||||
user_id or self.USER_ID,
|
||||
fallback_to_members,
|
||||
fallback_to_single_member,
|
||||
)
|
||||
)
|
||||
|
||||
def test_name(self):
|
||||
"""A room name event should be used."""
|
||||
events = [
|
||||
((EventTypes.Name, ""), {"name": "test-name"}),
|
||||
]
|
||||
self.assertEqual("test-name", self._calculate_room_name(events))
|
||||
|
||||
# Check if the event content has garbage.
|
||||
events = [((EventTypes.Name, ""), {"foo": 1})]
|
||||
self.assertEqual("Empty Room", self._calculate_room_name(events))
|
||||
|
||||
events = [((EventTypes.Name, ""), {"name": 1})]
|
||||
self.assertEqual(1, self._calculate_room_name(events))
|
||||
|
||||
def test_canonical_alias(self):
|
||||
"""An canonical alias should be used."""
|
||||
events = [
|
||||
((EventTypes.CanonicalAlias, ""), {"alias": "#test-name:test"}),
|
||||
]
|
||||
self.assertEqual("#test-name:test", self._calculate_room_name(events))
|
||||
|
||||
# Check if the event content has garbage.
|
||||
events = [((EventTypes.CanonicalAlias, ""), {"foo": 1})]
|
||||
self.assertEqual("Empty Room", self._calculate_room_name(events))
|
||||
|
||||
events = [((EventTypes.CanonicalAlias, ""), {"alias": "test-name"})]
|
||||
self.assertEqual("Empty Room", self._calculate_room_name(events))
|
||||
|
||||
def test_invite(self):
|
||||
"""An invite has special behaviour."""
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
|
||||
((EventTypes.Member, self.OTHER_USER_ID), {"displayname": "Other User"}),
|
||||
]
|
||||
self.assertEqual("Invite from Other User", self._calculate_room_name(events))
|
||||
self.assertIsNone(
|
||||
self._calculate_room_name(events, fallback_to_single_member=False)
|
||||
)
|
||||
# Ensure this logic is skipped if we don't fallback to members.
|
||||
self.assertIsNone(self._calculate_room_name(events, fallback_to_members=False))
|
||||
|
||||
# Check if the event content has garbage.
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
|
||||
((EventTypes.Member, self.OTHER_USER_ID), {"foo": 1}),
|
||||
]
|
||||
self.assertEqual("Invite from @user:test", self._calculate_room_name(events))
|
||||
|
||||
# No member event for sender.
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
|
||||
]
|
||||
self.assertEqual("Room Invite", self._calculate_room_name(events))
|
||||
|
||||
def test_no_members(self):
|
||||
"""Behaviour of an empty room."""
|
||||
events = []
|
||||
self.assertEqual("Empty Room", self._calculate_room_name(events))
|
||||
|
||||
# Note that events with invalid (or missing) membership are ignored.
|
||||
events = [
|
||||
((EventTypes.Member, self.OTHER_USER_ID), {"foo": 1}),
|
||||
((EventTypes.Member, "@foo:test"), {"membership": "foo"}),
|
||||
]
|
||||
self.assertEqual("Empty Room", self._calculate_room_name(events))
|
||||
|
||||
def test_no_other_members(self):
|
||||
"""Behaviour of a room with no other members in it."""
|
||||
events = [
|
||||
(
|
||||
(EventTypes.Member, self.USER_ID),
|
||||
{"membership": Membership.JOIN, "displayname": "Me"},
|
||||
),
|
||||
]
|
||||
self.assertEqual("Me", self._calculate_room_name(events))
|
||||
|
||||
# Check if the event content has no displayname.
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
|
||||
]
|
||||
self.assertEqual("@test:test", self._calculate_room_name(events))
|
||||
|
||||
# 3pid invite, use the other user (who is set as the sender).
|
||||
events = [
|
||||
((EventTypes.Member, self.OTHER_USER_ID), {"membership": Membership.JOIN}),
|
||||
]
|
||||
self.assertEqual(
|
||||
"nobody", self._calculate_room_name(events, user_id=self.OTHER_USER_ID)
|
||||
)
|
||||
|
||||
events = [
|
||||
((EventTypes.Member, self.OTHER_USER_ID), {"membership": Membership.JOIN}),
|
||||
((EventTypes.ThirdPartyInvite, self.OTHER_USER_ID), {}),
|
||||
]
|
||||
self.assertEqual(
|
||||
"Inviting email address",
|
||||
self._calculate_room_name(events, user_id=self.OTHER_USER_ID),
|
||||
)
|
||||
|
||||
def test_one_other_member(self):
|
||||
"""Behaviour of a room with a single other member."""
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
|
||||
(
|
||||
(EventTypes.Member, self.OTHER_USER_ID),
|
||||
{"membership": Membership.JOIN, "displayname": "Other User"},
|
||||
),
|
||||
]
|
||||
self.assertEqual("Other User", self._calculate_room_name(events))
|
||||
self.assertIsNone(
|
||||
self._calculate_room_name(events, fallback_to_single_member=False)
|
||||
)
|
||||
|
||||
# Check if the event content has no displayname and is an invite.
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
|
||||
(
|
||||
(EventTypes.Member, self.OTHER_USER_ID),
|
||||
{"membership": Membership.INVITE},
|
||||
),
|
||||
]
|
||||
self.assertEqual("@user:test", self._calculate_room_name(events))
|
||||
|
||||
def test_other_members(self):
|
||||
"""Behaviour of a room with multiple other members."""
|
||||
# Two other members.
|
||||
events = [
|
||||
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
|
||||
(
|
||||
(EventTypes.Member, self.OTHER_USER_ID),
|
||||
{"membership": Membership.JOIN, "displayname": "Other User"},
|
||||
),
|
||||
((EventTypes.Member, "@foo:test"), {"membership": Membership.JOIN}),
|
||||
]
|
||||
self.assertEqual("Other User and @foo:test", self._calculate_room_name(events))
|
||||
|
||||
# Three or more other members.
|
||||
events.append(
|
||||
((EventTypes.Member, "@fourth:test"), {"membership": Membership.INVITE})
|
||||
)
|
||||
self.assertEqual("Other User and 2 others", self._calculate_room_name(events))
|
||||
@@ -29,7 +29,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
|
||||
"type": "m.room.history_visibility",
|
||||
"sender": "@user:test",
|
||||
"state_key": "",
|
||||
"room_id": "@room:test",
|
||||
"room_id": "#room:test",
|
||||
"content": content,
|
||||
},
|
||||
RoomVersions.V1,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user