1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Andrew Morgan
20c48d50ef Add debug logging 2020-01-13 11:51:20 +00:00
145 changed files with 1300 additions and 3163 deletions

View File

@@ -3,10 +3,6 @@
<!-- Please read CONTRIBUTING.md before submitting your pull request -->
* [ ] Pull request is based on the develop branch
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog). The entry should:
- Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
- Use markdown where necessary, mostly for `code blocks`.
- End with either a period (.) or an exclamation mark (!).
- Start with a capital letter.
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog)
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style))

View File

@@ -1,9 +1,6 @@
Synapse 1.8.0 (2020-01-09)
==========================
**WARNING**: As of this release Synapse will refuse to start if the `log_file` config option is specified. Support for the option was removed in v1.3.0.
Bugfixes
--------
@@ -19,7 +16,7 @@ Features
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
- Add custom SAML username mapping functionality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))

View File

@@ -101,8 +101,8 @@ in the format of `PRnumber.type`. The type can be one of the following:
The content of the file is your changelog entry, which should be a short
description of your change in the same style as the rest of our [changelog](
https://github.com/matrix-org/synapse/blob/master/CHANGES.md). The file can
contain Markdown formatting, and should end with a full stop (.) or an
exclamation mark (!) for consistency.
contain Markdown formatting, and should end with a full stop ('.') for
consistency.
Adding credits to the changelog is encouraged, we value your
contributions and would like to have you shouted out in the release notes!

View File

@@ -133,11 +133,6 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools"
```
Note that Synapse does not support versions of SQLite before 3.11, and CentOS 7
uses SQLite 3.7. You may be able to work around this by installing a more
recent SQLite version, but it is recommended that you instead use a Postgres
database: see [docs/postgres.md](docs/postgres.md).
#### macOS
Installing prerequisites on macOS:

View File

@@ -75,15 +75,6 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.8.0
===================
Specifying a ``log_file`` config option will now cause Synapse to refuse to
start, and should be replaced by with the ``log_config`` option. Support for
the ``log_file`` option was removed in v1.3.0 and has since had no effect.
Upgrading to v1.7.0
===================

View File

@@ -1 +0,0 @@
Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH.

View File

@@ -1 +0,0 @@
Fix a typo in the configuration example for purge jobs in the sample configuration file.

View File

@@ -1 +0,0 @@
Add complete documentation of the message retention policies support.

View File

@@ -1 +0,0 @@
Correctly proxy HTTP errors due to API calls to remote group servers.

View File

@@ -1 +0,0 @@
Add `local_current_membership` table for tracking local user membership state in rooms.

View File

@@ -1 +0,0 @@
No more overriding the entire /etc folder of the container in docker-compose.yaml. Contributed by Fabian Meyer.

View File

@@ -1 +0,0 @@
Add some helpful tips about changelog entries to the github pull request template.

View File

@@ -1 +0,0 @@
Fix media repo admin APIs when using a media worker.

View File

@@ -1 +0,0 @@
Add complete documentation of the message retention policies support.

View File

@@ -1 +0,0 @@
Port `synapse.replication.tcp` to async/await.

View File

@@ -1 +0,0 @@
Fixup `synapse.replication` to pass mypy checks.

View File

@@ -1 +0,0 @@
Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).

View File

@@ -1 +0,0 @@
Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media.

View File

@@ -1,2 +0,0 @@
Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters.

View File

@@ -1 +0,0 @@
Clarify the `account_validity` and `email` sections of the sample configuration.

View File

@@ -1 +0,0 @@
Allow additional_resources to implement IResource directly.

View File

@@ -1 +0,0 @@
Allow REST endpoint implementations to raise a RedirectException, which will redirect the user's browser to a given location.

View File

@@ -1 +0,0 @@
Updates and extensions to the module API.

View File

@@ -1 +0,0 @@
Updates to the SAML mapping provider API.

View File

@@ -1 +0,0 @@
Fix a bug where we would assign a numeric userid if somebody tried registering with an empty username.

View File

@@ -1 +0,0 @@
Remove redundant RegistrationError class.

View File

@@ -1 +0,0 @@
Don't block processing of incoming EDUs behind processing PDUs in the same transaction.

View File

@@ -1 +0,0 @@
Add more endpoints to the documentation for Synapse workers.

View File

@@ -1 +0,0 @@
Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint.

View File

@@ -1 +0,0 @@
Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data.

View File

@@ -1 +0,0 @@
Fix `purge_room` admin API.

View File

@@ -1 +0,0 @@
Add org.matrix.e2e_cross_signing to unstable_features in /versions as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756).

View File

@@ -1 +0,0 @@
Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs.

View File

@@ -1 +0,0 @@
Add StateMap type alias to simplify types.

View File

@@ -1 +0,0 @@
Updates to the SAML mapping provider API.

View File

@@ -1 +0,0 @@
When processing a SAML response, log the assertions for easier configuration.

View File

@@ -1 +0,0 @@
Fix a bug causing `ValueError: unsupported format character ''' (0x27) at index 312` error when running the schema 57 upgrade script.

View File

@@ -18,7 +18,7 @@ services:
- SYNAPSE_CONFIG_PATH=/etc/homeserver.yaml
volumes:
# You may either store all the files in a local folder
- ./matrix-config/homeserver.yaml:/etc/homeserver.yaml
- ./matrix-config:/etc
- ./files:/data
# .. or you may split this between different storage points
# - ./files:/data

View File

@@ -22,81 +22,19 @@ It returns a JSON body like the following:
}
```
# Quarantine media
# Quarantine media in a room
This API 'quarantines' all the media in a room.
The API is:
```
POST /_synapse/admin/v1/quarantine_media/<room_id>
{}
```
Quarantining media means that it is marked as inaccessible by users. It applies
to any local media, and any locally-cached copies of remote media.
The media file itself (and any thumbnails) is not deleted from the server.
## Quarantining media by ID
This API quarantines a single piece of local or remote media.
Request:
```
POST /_synapse/admin/v1/media/quarantine/<server_name>/<media_id>
{}
```
Where `server_name` is in the form of `example.org`, and `media_id` is in the
form of `abcdefg12345...`.
Response:
```
{}
```
## Quarantining media in a room
This API quarantines all local and remote media in a room.
Request:
```
POST /_synapse/admin/v1/room/<room_id>/media/quarantine
{}
```
Where `room_id` is in the form of `!roomid12345:example.org`.
Response:
```
{
"num_quarantined": 10 # The number of media items successfully quarantined
}
```
Note that there is a legacy endpoint, `POST
/_synapse/admin/v1/quarantine_media/<room_id >`, that operates the same.
However, it is deprecated and may be removed in a future release.
## Quarantining all media of a user
This API quarantines all *local* media that a *local* user has uploaded. That is to say, if
you would like to quarantine media uploaded by a user on a remote homeserver, you should
instead use one of the other APIs.
Request:
```
POST /_synapse/admin/v1/user/<user_id>/media/quarantine
{}
```
Where `user_id` is in the form of `@bob:example.org`.
Response:
```
{
"num_quarantined": 10 # The number of media items successfully quarantined
}
```

View File

@@ -1,33 +1,3 @@
Create or modify Account
========================
This API allows an administrator to create or modify a user account with a
specific ``user_id``.
This api is::
PUT /_synapse/admin/v2/users/<user_id>
with a body of:
.. code:: json
{
"password": "user_password",
"displayname": "User",
"avatar_url": "<avatar_url>",
"admin": false,
"deactivated": false
}
including an ``access_token`` of a server admin.
The parameter ``displayname`` is optional and defaults to ``user_id``.
The parameter ``avatar_url`` is optional.
The parameter ``admin`` is optional and defaults to 'false'.
The parameter ``deactivated`` is optional and defaults to 'false'.
If the user already exists then optional parameters default to the current value.
List Accounts
=============
@@ -80,8 +50,7 @@ This API returns information about a specific user account.
The api is::
GET /_synapse/admin/v1/whois/<user_id> (deprecated)
GET /_synapse/admin/v2/users/<user_id>
GET /_synapse/admin/v1/whois/<user_id>
including an ``access_token`` of a server admin.

View File

@@ -1,191 +0,0 @@
# Message retention policies
Synapse admins can enable support for message retention policies on
their homeserver. Message retention policies exist at a room level,
follow the semantics described in
[MSC1763](https://github.com/matrix-org/matrix-doc/blob/matthew/msc1763/proposals/1763-configurable-retention-periods.md),
and allow server and room admins to configure how long messages should
be kept in a homeserver's database before being purged from it.
**Please note that, as this feature isn't part of the Matrix
specification yet, this implementation is to be considered as
experimental.**
A message retention policy is mainly defined by its `max_lifetime`
parameter, which defines how long a message can be kept around after
it was sent to the room. If a room doesn't have a message retention
policy, and there's no default one for a given server, then no message
sent in that room is ever purged on that server.
MSC1763 also specifies semantics for a `min_lifetime` parameter which
defines the amount of time after which an event _can_ get purged (after
it was sent to the room), but Synapse doesn't currently support it
beyond registering it.
Both `max_lifetime` and `min_lifetime` are optional parameters.
Note that message retention policies don't apply to state events.
Once an event reaches its expiry date (defined as the time it was sent
plus the value for `max_lifetime` in the room), two things happen:
* Synapse stops serving the event to clients via any endpoint.
* The message gets picked up by the next purge job (see the "Purge jobs"
section) and is removed from Synapse's database.
Since purge jobs don't run continuously, this means that an event might
stay in a server's database for longer than the value for `max_lifetime`
in the room would allow, though hidden from clients.
Similarly, if a server (with support for message retention policies
enabled) receives from another server an event that should have been
purged according to its room's policy, then the receiving server will
process and store that event until it's picked up by the next purge job,
though it will always hide it from clients.
## Server configuration
Support for this feature can be enabled and configured in the
`retention` section of the Synapse configuration file (see the
[sample file](https://github.com/matrix-org/synapse/blob/v1.7.3/docs/sample_config.yaml#L332-L393)).
To enable support for message retention policies, set the setting
`enabled` in this section to `true`.
### Default policy
A default message retention policy is a policy defined in Synapse's
configuration that is used by Synapse for every room that doesn't have a
message retention policy configured in its state. This allows server
admins to ensure that messages are never kept indefinitely in a server's
database.
A default policy can be defined as such, in the `retention` section of
the configuration file:
```yaml
default_policy:
min_lifetime: 1d
max_lifetime: 1y
```
Here, `min_lifetime` and `max_lifetime` have the same meaning and level
of support as previously described. They can be expressed either as a
duration (using the units `s` (seconds), `m` (minutes), `h` (hours),
`d` (days), `w` (weeks) and `y` (years)) or as a number of milliseconds.
### Purge jobs
Purge jobs are the jobs that Synapse runs in the background to purge
expired events from the database. They are only run if support for
message retention policies is enabled in the server's configuration. If
no configuration for purge jobs is configured by the server admin,
Synapse will use a default configuration, which is described in the
[sample configuration file](https://github.com/matrix-org/synapse/blob/master/docs/sample_config.yaml#L332-L393).
Some server admins might want a finer control on when events are removed
depending on an event's room's policy. This can be done by setting the
`purge_jobs` sub-section in the `retention` section of the configuration
file. An example of such configuration could be:
```yaml
purge_jobs:
- longest_max_lifetime: 3d
interval: 12h
- shortest_max_lifetime: 3d
longest_max_lifetime: 1w
interval: 1d
- shortest_max_lifetime: 1w
interval: 2d
```
In this example, we define three jobs:
* one that runs twice a day (every 12 hours) and purges events in rooms
which policy's `max_lifetime` is lower or equal to 3 days.
* one that runs once a day and purges events in rooms which policy's
`max_lifetime` is between 3 days and a week.
* one that runs once every 2 days and purges events in rooms which
policy's `max_lifetime` is greater than a week.
Note that this example is tailored to show different configurations and
features slightly more jobs than it's probably necessary (in practice, a
server admin would probably consider it better to replace the two last
jobs with one that runs once a day and handles rooms which which
policy's `max_lifetime` is greater than 3 days).
Keep in mind, when configuring these jobs, that a purge job can become
quite heavy on the server if it targets many rooms, therefore prefer
having jobs with a low interval that target a limited set of rooms. Also
make sure to include a job with no minimum and one with no maximum to
make sure your configuration handles every policy.
As previously mentioned in this documentation, while a purge job that
runs e.g. every day means that an expired event might stay in the
database for up to a day after its expiry, Synapse hides expired events
from clients as soon as they expire, so the event is not visible to
local users between its expiry date and the moment it gets purged from
the server's database.
### Lifetime limits
**Note: this feature is mainly useful within a closed federation or on
servers that don't federate, because there currently is no way to
enforce these limits in an open federation.**
Server admins can restrict the values their local users are allowed to
use for both `min_lifetime` and `max_lifetime`. These limits can be
defined as such in the `retention` section of the configuration file:
```yaml
allowed_lifetime_min: 1d
allowed_lifetime_max: 1y
```
Here, `allowed_lifetime_min` is the lowest value a local user can set
for both `min_lifetime` and `max_lifetime`, and `allowed_lifetime_max`
is the highest value. Both parameters are optional (e.g. setting
`allowed_lifetime_min` but not `allowed_lifetime_max` only enforces a
minimum and no maximum).
Like other settings in this section, these parameters can be expressed
either as a duration or as a number of milliseconds.
## Room configuration
To configure a room's message retention policy, a room's admin or
moderator needs to send a state event in that room with the type
`m.room.retention` and the following content:
```json
{
"max_lifetime": ...
}
```
In this event's content, the `max_lifetime` parameter has the same
meaning as previously described, and needs to be expressed in
milliseconds. The event's content can also include a `min_lifetime`
parameter, which has the same meaning and limited support as previously
described.
Note that over every server in the room, only the ones with support for
message retention policies will actually remove expired events. This
support is currently not enabled by default in Synapse.
## Note on reclaiming disk space
While purge jobs actually delete data from the database, the disk space
used by the database might not decrease immediately on the database's
host. However, even though the database engine won't free up the disk
space, it will start writing new data into where the purged data was.
If you want to reclaim the freed disk space anyway and return it to the
operating system, the server admin needs to run `VACUUM FULL;` (or
`VACUUM;` for SQLite databases) on Synapse's database (see the related
[PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-vacuum.html)).

View File

@@ -387,17 +387,17 @@ retention:
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a more frequent basis than for the rest of the rooms
# (e.g. every 12h), but not want that purge to be performed by a job that's
# iterating over every room it knows, which could be heavy on the server.
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 12h
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 1d
# interval: 24h
## TLS ##
@@ -874,6 +874,23 @@ media_store_path: "DATADIR/media_store"
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# ``enabled`` defines whether the account validity feature is enabled. Defaults
# to False.
#
# ``period`` allows setting the period after which an account is valid
# after its registration. When renewing the account, its validity period
# will be extended by this amount of time. This parameter is required when using
# the account validity feature.
#
# ``renew_at`` is the amount of time before an account's expiry date at which
# Synapse will send an email to the account's email address with a renewal link.
# This needs the ``email`` and ``public_baseurl`` configuration sections to be
# filled.
#
# ``renew_email_subject`` is the subject of the email sent out with the renewal
# link. ``%(app)s`` can be used as a placeholder for the ``app_name`` parameter
# from the ``email`` section.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
@@ -884,55 +901,21 @@ media_store_path: "DATADIR/media_store"
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
#account_validity:
# enabled: true
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
@@ -1370,110 +1353,107 @@ password_config:
#pepper: "EVEN_MORE_SECRET"
# Configuration for sending emails from Synapse.
# Enable sending emails for password resets, notification events or
# account expiry notices
#
email:
# The hostname of the outgoing SMTP server to use. Defaults to 'localhost'.
#
#smtp_host: mail.server
# The port on the mail server for outgoing SMTP. Defaults to 25.
#
#smtp_port: 587
# Username/password for authentication to the SMTP server. By default, no
# authentication is attempted.
#
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# Uncomment the following to require TLS transport security for SMTP.
# By default, Synapse will connect over plain text, and will then switch to
# TLS via STARTTLS *if the SMTP server supports it*. If this option is set,
# Synapse will refuse to connect unless the server supports STARTTLS.
#
#require_transport_security: true
# Enable sending emails for messages that the user has missed
#
#enable_notifs: false
# notif_from defines the "From" address to use when sending emails.
# It must be set if email sending is enabled.
#
# The placeholder '%(app)s' will be replaced by the application name,
# which is normally 'app_name' (below), but may be overridden by the
# Matrix client application.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
#notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name defines the default value for '%(app)s' in notif_from. It
# defaults to 'Matrix'.
#
#app_name: my_branded_matrix_server
# Uncomment the following to disable automatic subscription to email
# notifications for new users. Enabled by default.
#
#notif_for_new_users: false
# Custom URL for client links within the email notifications. By default
# links will be based on "https://matrix.to".
#
# (This setting used to be called riot_base_url; the old name is still
# supported for backwards-compatibility but is now deprecated.)
#
#client_base_url: "http://localhost/riot"
# Configure the time that a validation email will expire after sending.
# Defaults to 1h.
#
#validation_token_lifetime: 15m
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
# If you *do* uncomment it, you will need to make sure that all the templates
# below are in the directory.
#
# Synapse will look for the following templates in this directory:
#
# * The contents of email notifications of missed events: 'notif_mail.html' and
# 'notif_mail.txt'.
#
# * The contents of account expiry notice emails: 'notice_expiry.html' and
# 'notice_expiry.txt'.
#
# * The contents of password reset emails sent by the homeserver:
# 'password_reset.html' and 'password_reset.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in the password reset email: 'password_reset_success.html' and
# 'password_reset_failure.html'
#
# * The contents of address verification emails sent during registration:
# 'registration.html' and 'registration.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent during registration:
# 'registration_success.html' and 'registration_failure.html'
#
# * The contents of address verification emails sent when an address is added
# to a Matrix account: 'add_threepid.html' and 'add_threepid.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent when an address is added
# to a Matrix account: 'add_threepid_success.html' and
# 'add_threepid_failure.html'
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#
#template_dir: "res/templates"
# If your SMTP server requires authentication, the optional smtp_user &
# smtp_pass variables should be used
#
#email:
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25 # SSL: 465, STARTTLS: 587
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
# notif_for_new_users: true
#
# # Defining a custom URL for Riot is only needed if email notifications
# # should contain links to a self-hosted installation of Riot; when set
# # the "app_name" setting is ignored
# #
# riot_base_url: "http://localhost/riot"
#
# # Configure the time that a validation email or text message code
# # will expire after sending
# #
# # This is currently used for password resets
# #
# #validation_token_lifetime: 1h
#
# # Template directory. All template files should be stored within this
# # directory. If not set, default templates from within the Synapse
# # package will be used
# #
# # For the list of default templates, please see
# # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
# #
# #template_dir: res/templates
#
# # Templates for email notifications
# #
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
#
# # Templates for account expiry notices
# #
# expiry_template_html: notice_expiry.html
# expiry_template_text: notice_expiry.txt
#
# # Templates for password reset emails sent by the homeserver
# #
# #password_reset_template_html: password_reset.html
# #password_reset_template_text: password_reset.txt
#
# # Templates for registration emails sent by the homeserver
# #
# #registration_template_html: registration.html
# #registration_template_text: registration.txt
#
# # Templates for validation emails sent by the homeserver when adding an email to
# # your user account
# #
# #add_threepid_template_html: add_threepid.html
# #add_threepid_template_text: add_threepid.txt
#
# # Templates for password reset success and failure pages that a user
# # will see after attempting to reset their password
# #
# #password_reset_template_success_html: password_reset_success.html
# #password_reset_template_failure_html: password_reset_failure.html
#
# # Templates for registration success and failure pages that a user
# # will see after attempting to register using an email or phone
# #
# #registration_template_success_html: registration_success.html
# #registration_template_failure_html: registration_failure.html
#
# # Templates for success and failure pages that a user will see after attempting
# # to add an email or phone to their account
# #
# #add_threepid_success_html: add_threepid_success.html
# #add_threepid_failure_html: add_threepid_failure.html
#password_providers:

View File

@@ -209,7 +209,7 @@ Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.
The `<stream_name>` is the name of a replication stream to subscribe
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
@@ -234,10 +234,6 @@ in which case the `<token>` must be set to `NOW`.
Used exclusively in tests
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
See `synapse/replication/tcp/commands.py` for a detailed description and
the format of each command.

View File

@@ -168,11 +168,8 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v2/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v2/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v2/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
@@ -202,9 +199,7 @@ Handles the media repository. It can handle all endpoints starting with:
... and the following regular expressions matching media-specific administration APIs:
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media.*$
^/_synapse/admin/v1/user/.*/media.*$
^/_synapse/admin/v1/media/.*$
^/_synapse/admin/v1/room/.*/media$
^/_synapse/admin/v1/quarantine_media/.*$
You should also set `enable_media_repo: False` in the shared configuration
@@ -293,7 +288,6 @@ file. For example:
Handles some event creation. It can handle REST endpoints matching:
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/

View File

@@ -447,15 +447,20 @@ class Porter(object):
else:
return
def build_db_store(
self, db_config: DatabaseConnectionConfig, allow_outdated_version: bool = False,
):
def setup_db(self, db_config: DatabaseConnectionConfig, engine):
db_conn = make_conn(db_config, engine)
prepare_database(db_conn, engine, config=None)
db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, db_config: DatabaseConnectionConfig):
"""Builds and returns a database store using the provided configuration.
Args:
db_config: The database configuration
allow_outdated_version: True to suppress errors about the database server
version being too old to run a complete synapse
config: The database configuration
Returns:
The built Store object.
@@ -463,16 +468,16 @@ class Porter(object):
self.progress.set_state("Preparing %s" % db_config.config["name"])
engine = create_engine(db_config.config)
conn = self.setup_db(db_config, engine)
hs = MockHomeserver(self.hs_config)
with make_conn(db_config, engine) as db_conn:
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
store = Store(Database(hs, db_config, engine), db_conn, hs)
db_conn.commit()
store = Store(Database(hs, db_config, engine), conn, hs)
yield store.db.runInteraction(
"%s_engine.check_database" % db_config.config["name"],
engine.check_database,
)
return store
@@ -497,10 +502,8 @@ class Porter(object):
@defer.inlineCallbacks
def run(self):
try:
# we allow people to port away from outdated versions of sqlite.
self.sqlite_store = self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config),
allow_outdated_version=True,
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config)
)
# Check if all background updates are done, abort if not.
@@ -515,7 +518,7 @@ class Porter(object):
)
defer.returnValue(None)
self.postgres_store = self.build_db_store(
self.postgres_store = yield self.build_db_store(
self.hs_config.get_single_database()
)

View File

@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.9.0.dev2"
__version__ = "1.8.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, Tuple
from six import itervalues
@@ -34,7 +35,7 @@ from synapse.api.errors import (
ResourceLimitError,
)
from synapse.config.server import is_threepid_reserved
from synapse.types import StateMap, UserID
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
@@ -508,7 +509,10 @@ class Auth(object):
return self.store.is_server_admin(user)
def compute_auth_events(
self, event, current_state_ids: StateMap[str], for_verification: bool = False,
self,
event,
current_state_ids: Dict[Tuple[str, str], str],
for_verification: bool = False,
):
"""Given an event and current state return the list of event IDs used
to auth an event.

View File

@@ -17,15 +17,13 @@
"""Contains exceptions and error codes."""
import logging
from typing import Dict, List
from typing import Dict
from six import iteritems
from six.moves import http_client
from canonicaljson import json
from twisted.web import http
logger = logging.getLogger(__name__)
@@ -82,29 +80,6 @@ class CodeMessageException(RuntimeError):
self.msg = msg
class RedirectException(CodeMessageException):
"""A pseudo-error indicating that we want to redirect the client to a different
location
Attributes:
cookies: a list of set-cookies values to add to the response. For example:
b"sessionId=a3fWa; Expires=Wed, 21 Oct 2015 07:28:00 GMT"
"""
def __init__(self, location: bytes, http_code: int = http.FOUND):
"""
Args:
location: the URI to redirect to
http_code: the HTTP response code
"""
msg = "Redirect to %s" % (location.decode("utf-8"),)
super().__init__(code=http_code, msg=msg)
self.location = location
self.cookies = [] # type: List[bytes]
class SynapseError(CodeMessageException):
"""A base exception type for matrix errors which have an errcode and error
message (as well as an HTTP status code).
@@ -183,6 +158,12 @@ class UserDeactivatedError(SynapseError):
)
class RegistrationError(SynapseError):
"""An error raised when a registration event fails."""
pass
class FederationDeniedError(SynapseError):
"""An error raised when the server tries to federate with a server which
is not on its federation whitelist.

View File

@@ -84,7 +84,8 @@ class AdminCmdServer(HomeServer):
class AdminCmdReplicationHandler(ReplicationClientHandler):
async def on_rdata(self, stream_name, token, rows):
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
pass
def get_streams_to_replicate(self):

View File

@@ -115,8 +115,9 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
async def on_rdata(self, stream_name, token, rows):
await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View File

@@ -145,8 +145,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
async def on_rdata(self, stream_name, token, rows):
await super(FederationSenderReplicationHandler, self).on_rdata(
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -158,13 +159,6 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
args.update(self.send_handler.stream_positions())
return args
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
# Let's wake up the transaction queue for the server in case we have
# pending stuff to send to it.
self.send_handler.wake_destination(server)
def start(config_options):
try:
@@ -212,7 +206,7 @@ class FederationSenderHandler(object):
to the federation sender.
"""
def __init__(self, hs: FederationSenderServer, replication_client):
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
@@ -233,9 +227,6 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering()
)
def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)
def stream_positions(self):
return {"federation": self.federation_position}

View File

@@ -31,7 +31,7 @@ from prometheus_client import Gauge
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.web.resource import EncodingResourceWrapper, IResource, NoResource
from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -109,16 +109,7 @@ class SynapseHomeServer(HomeServer):
for path, resmodule in additional_resources.items():
handler_cls, config = load_module(resmodule)
handler = handler_cls(config, module_api)
if IResource.providedBy(handler):
resource = handler
elif hasattr(handler, "handle_request"):
resource = AdditionalResource(self, handler.handle_request)
else:
raise ConfigError(
"additional_resource %s does not implement a known interface"
% (resmodule["module"],)
)
resources[path] = resource
resources[path] = AdditionalResource(self, handler.handle_request)
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:

View File

@@ -34,7 +34,6 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
@@ -48,7 +47,6 @@ logger = logging.getLogger("synapse.app.media_repository")
class MediaRepositorySlavedStore(
RoomStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,

View File

@@ -141,8 +141,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
async def on_rdata(self, stream_name, token, rows):
await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks

View File

@@ -358,8 +358,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
async def on_rdata(self, stream_name, token, rows):
await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View File

@@ -172,8 +172,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
async def on_rdata(self, stream_name, token, rows):
await super(UserDirectoryReplicationHandler, self).on_rdata(
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == EventsStream.NAME:

View File

@@ -37,12 +37,10 @@ class EmailConfig(Config):
self.email_enable_notifs = False
email_config = config.get("email")
if email_config is None:
email_config = {}
email_config = config.get("email", {})
self.email_smtp_host = email_config.get("smtp_host", "localhost")
self.email_smtp_port = email_config.get("smtp_port", 25)
self.email_smtp_host = email_config.get("smtp_host", None)
self.email_smtp_port = email_config.get("smtp_port", None)
self.email_smtp_user = email_config.get("smtp_user", None)
self.email_smtp_pass = email_config.get("smtp_pass", None)
self.require_transport_security = email_config.get(
@@ -76,9 +74,9 @@ class EmailConfig(Config):
self.email_template_dir = os.path.abspath(template_dir)
self.email_enable_notifs = email_config.get("enable_notifs", False)
account_validity_config = config.get("account_validity") or {}
account_validity_renewal_enabled = account_validity_config.get("renew_at")
account_validity_renewal_enabled = config.get("account_validity", {}).get(
"renew_at"
)
self.threepid_behaviour_email = (
# Have Synapse handle the email sending if account_threepid_delegates.email
@@ -280,9 +278,7 @@ class EmailConfig(Config):
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
)
self.email_riot_base_url = email_config.get(
"client_base_url", email_config.get("riot_base_url", None)
)
self.email_riot_base_url = email_config.get("riot_base_url", None)
if account_validity_renewal_enabled:
self.email_expiry_template_html = email_config.get(
@@ -298,111 +294,107 @@ class EmailConfig(Config):
raise ConfigError("Unable to find email template file %s" % (p,))
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
# Configuration for sending emails from Synapse.
return """
# Enable sending emails for password resets, notification events or
# account expiry notices
#
email:
# The hostname of the outgoing SMTP server to use. Defaults to 'localhost'.
#
#smtp_host: mail.server
# The port on the mail server for outgoing SMTP. Defaults to 25.
#
#smtp_port: 587
# Username/password for authentication to the SMTP server. By default, no
# authentication is attempted.
#
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# Uncomment the following to require TLS transport security for SMTP.
# By default, Synapse will connect over plain text, and will then switch to
# TLS via STARTTLS *if the SMTP server supports it*. If this option is set,
# Synapse will refuse to connect unless the server supports STARTTLS.
#
#require_transport_security: true
# Enable sending emails for messages that the user has missed
#
#enable_notifs: false
# notif_from defines the "From" address to use when sending emails.
# It must be set if email sending is enabled.
#
# The placeholder '%(app)s' will be replaced by the application name,
# which is normally 'app_name' (below), but may be overridden by the
# Matrix client application.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
#notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name defines the default value for '%(app)s' in notif_from. It
# defaults to 'Matrix'.
#
#app_name: my_branded_matrix_server
# Uncomment the following to disable automatic subscription to email
# notifications for new users. Enabled by default.
#
#notif_for_new_users: false
# Custom URL for client links within the email notifications. By default
# links will be based on "https://matrix.to".
#
# (This setting used to be called riot_base_url; the old name is still
# supported for backwards-compatibility but is now deprecated.)
#
#client_base_url: "http://localhost/riot"
# Configure the time that a validation email will expire after sending.
# Defaults to 1h.
#
#validation_token_lifetime: 15m
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
# If you *do* uncomment it, you will need to make sure that all the templates
# below are in the directory.
#
# Synapse will look for the following templates in this directory:
#
# * The contents of email notifications of missed events: 'notif_mail.html' and
# 'notif_mail.txt'.
#
# * The contents of account expiry notice emails: 'notice_expiry.html' and
# 'notice_expiry.txt'.
#
# * The contents of password reset emails sent by the homeserver:
# 'password_reset.html' and 'password_reset.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in the password reset email: 'password_reset_success.html' and
# 'password_reset_failure.html'
#
# * The contents of address verification emails sent during registration:
# 'registration.html' and 'registration.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent during registration:
# 'registration_success.html' and 'registration_failure.html'
#
# * The contents of address verification emails sent when an address is added
# to a Matrix account: 'add_threepid.html' and 'add_threepid.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent when an address is added
# to a Matrix account: 'add_threepid_success.html' and
# 'add_threepid_failure.html'
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#
#template_dir: "res/templates"
# If your SMTP server requires authentication, the optional smtp_user &
# smtp_pass variables should be used
#
#email:
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25 # SSL: 465, STARTTLS: 587
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
# notif_for_new_users: true
#
# # Defining a custom URL for Riot is only needed if email notifications
# # should contain links to a self-hosted installation of Riot; when set
# # the "app_name" setting is ignored
# #
# riot_base_url: "http://localhost/riot"
#
# # Configure the time that a validation email or text message code
# # will expire after sending
# #
# # This is currently used for password resets
# #
# #validation_token_lifetime: 1h
#
# # Template directory. All template files should be stored within this
# # directory. If not set, default templates from within the Synapse
# # package will be used
# #
# # For the list of default templates, please see
# # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
# #
# #template_dir: res/templates
#
# # Templates for email notifications
# #
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
#
# # Templates for account expiry notices
# #
# expiry_template_html: notice_expiry.html
# expiry_template_text: notice_expiry.txt
#
# # Templates for password reset emails sent by the homeserver
# #
# #password_reset_template_html: password_reset.html
# #password_reset_template_text: password_reset.txt
#
# # Templates for registration emails sent by the homeserver
# #
# #registration_template_html: registration.html
# #registration_template_text: registration.txt
#
# # Templates for validation emails sent by the homeserver when adding an email to
# # your user account
# #
# #add_threepid_template_html: add_threepid.html
# #add_threepid_template_text: add_threepid.txt
#
# # Templates for password reset success and failure pages that a user
# # will see after attempting to reset their password
# #
# #password_reset_template_success_html: password_reset_success.html
# #password_reset_template_failure_html: password_reset_failure.html
#
# # Templates for registration success and failure pages that a user
# # will see after attempting to register using an email or phone
# #
# #registration_template_success_html: registration_success.html
# #registration_template_failure_html: registration_failure.html
#
# # Templates for success and failure pages that a user will see after attempting
# # to add an email or phone to their account
# #
# #add_threepid_success_html: add_threepid_success.html
# #add_threepid_failure_html: add_threepid_failure.html
"""

View File

@@ -35,7 +35,7 @@ class PushConfig(Config):
# Now check for the one in the 'email' section and honour it,
# with a warning.
push_config = config.get("email") or {}
push_config = config.get("email", {})
redact_content = push_config.get("redact_content")
if redact_content is not None:
print(

View File

@@ -27,8 +27,6 @@ class AccountValidityConfig(Config):
section = "accountvalidity"
def __init__(self, config, synapse_config):
if config is None:
return
self.enabled = config.get("enabled", False)
self.renew_by_email_enabled = "renew_at" in config
@@ -161,6 +159,23 @@ class RegistrationConfig(Config):
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# ``enabled`` defines whether the account validity feature is enabled. Defaults
# to False.
#
# ``period`` allows setting the period after which an account is valid
# after its registration. When renewing the account, its validity period
# will be extended by this amount of time. This parameter is required when using
# the account validity feature.
#
# ``renew_at`` is the amount of time before an account's expiry date at which
# Synapse will send an email to the account's email address with a renewal link.
# This needs the ``email`` and ``public_baseurl`` configuration sections to be
# filled.
#
# ``renew_email_subject`` is the subject of the email sent out with the renewal
# link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter
# from the ``email`` section.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
@@ -171,55 +186,21 @@ class RegistrationConfig(Config):
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10%% of the validity period.
#
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %%(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
#account_validity:
# enabled: true
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %%(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#

View File

@@ -121,7 +121,6 @@ class SAML2Config(Config):
required_methods = [
"get_saml_attributes",
"saml_response_to_user_attributes",
"get_remote_user_id",
]
missing_methods = [
method

View File

@@ -948,17 +948,17 @@ class ServerConfig(Config):
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a more frequent basis than for the rest of the rooms
# (e.g. every 12h), but not want that purge to be performed by a job that's
# iterating over every room it knows, which could be heavy on the server.
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 12h
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 1d
# interval: 24h
"""
% locals()
)

View File

@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
from typing import Dict, Optional, Tuple, Union
from six import iteritems
@@ -23,7 +23,6 @@ from twisted.internet import defer
from synapse.appservice import ApplicationService
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import StateMap
@attr.s(slots=True)
@@ -107,11 +106,13 @@ class EventContext:
_state_group = attr.ib(default=None, type=Optional[int])
state_group_before_event = attr.ib(default=None, type=Optional[int])
prev_group = attr.ib(default=None, type=Optional[int])
delta_ids = attr.ib(default=None, type=Optional[StateMap[str]])
delta_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
app_service = attr.ib(default=None, type=Optional[ApplicationService])
_current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
_prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
_current_state_ids = attr.ib(
default=None, type=Optional[Dict[Tuple[str, str], str]]
)
_prev_state_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
@staticmethod
def with_state(

View File

@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict
import six
from six import iteritems
@@ -23,7 +22,6 @@ from six import iteritems
from canonicaljson import json
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
@@ -43,11 +41,7 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import (
make_deferred_yieldable,
nested_logging_context,
run_in_background,
)
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
@@ -55,7 +49,7 @@ from synapse.replication.http.federation import (
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util import glob_to_regex
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -166,43 +160,6 @@ class FederationServer(FederationBase):
)
return 400, response
# We process PDUs and EDUs in parallel. This is important as we don't
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
response = {"pdus": pdu_results}
logger.debug("Returning: %s", str(response))
await self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response
async def _handle_pdus_in_txn(
self, origin: str, transaction: Transaction, request_time: int
) -> Dict[str, dict]:
"""Process the PDUs in a received transaction.
Args:
origin: the server making the request
transaction: incoming transaction
request_time: timestamp that the HTTP request arrived at
Returns:
A map from event ID of a processed PDU to any errors we should
report back to the sending server.
"""
received_pdus_counter.inc(len(transaction.pdus))
origin_host, _ = parse_server_name(origin)
@@ -293,23 +250,20 @@ class FederationServer(FederationBase):
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
)
return pdu_results
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
await self.received_edu(origin, edu.edu_type, edu.content)
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
"""Process the EDUs in a received transaction.
"""
response = {"pdus": pdu_results}
async def _process_edu(edu_dict):
received_edus_counter.inc()
logger.debug("Returning: %s", str(response))
edu = Edu(**edu_dict)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
await self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response
await concurrently_execute(
_process_edu,
getattr(transaction, "edus", []),
TRANSACTION_CONCURRENCY_LIMIT,
)
async def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
await self.registry.on_edu(edu_type, origin, content)
async def on_context_state_request(self, origin, room_id, event_id):
origin_host, _ = parse_server_name(origin)

View File

@@ -259,9 +259,7 @@ class FederationRemoteSendQueue(object):
def federation_ack(self, token):
self._clear_queue_before_pos(token)
async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
"""Get rows to be sent over federation between the two tokens
Args:

View File

@@ -21,7 +21,6 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -55,7 +54,7 @@ sent_pdus_destination_dist_total = Counter(
class FederationSender(object):
def __init__(self, hs: "synapse.server.HomeServer"):
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
@@ -483,20 +482,7 @@ class FederationSender(object):
def send_device_messages(self, destination):
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
might have come back.
"""
if destination == self.server_name:
logger.warning("Not waking up ourselves")
logger.info("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()

View File

@@ -31,7 +31,6 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import StateMap
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
@@ -78,7 +77,7 @@ class PerDestinationQueue(object):
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: StateMap[Edu]
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination

View File

@@ -44,7 +44,6 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -102,17 +101,12 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator(object):
def __init__(self, hs: HomeServer):
def __init__(self, hs):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
self.notifer = hs.get_notifier()
self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content):
@@ -172,17 +166,6 @@ class Authenticator(object):
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
# Inform the relevant places that the remote server is back up.
self.notifer.notify_remote_server_up(origin)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(origin)
except Exception:
logger.exception("Error resetting retry timings on %s", origin)

View File

@@ -14,11 +14,9 @@
# limitations under the License.
import logging
from typing import List
from synapse.api.constants import Membership
from synapse.events import FrozenEvent
from synapse.types import RoomStreamToken, StateMap
from synapse.types import RoomStreamToken
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -53,15 +51,6 @@ class AdminHandler(BaseHandler):
return ret
async def get_user(self, user):
"""Function to get user details"""
ret = await self.store.get_user_by_id(user.to_string())
if ret:
profile = await self.store.get_profileinfo(user.localpart)
ret["displayname"] = profile.display_name
ret["avatar_url"] = profile.avatar_url
return ret
async def get_users(self):
"""Function to retrieve a list of users in users table.
@@ -136,7 +125,7 @@ class AdminHandler(BaseHandler):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
rooms = await self.store.get_rooms_for_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
@@ -261,26 +250,35 @@ class ExfiltrationWriter(object):
"""Interface used to specify how to write exported data.
"""
def write_events(self, room_id: str, events: List[FrozenEvent]):
def write_events(self, room_id, events):
"""Write a batch of events for a room.
Args:
room_id (str)
events (list[FrozenEvent])
"""
pass
def write_state(self, room_id: str, event_id: str, state: StateMap[FrozenEvent]):
def write_state(self, room_id, event_id, state):
"""Write the state at the given event in the room.
This only gets called for backward extremities rather than for each
event.
Args:
room_id (str)
event_id (str)
state (dict[tuple[str, str], FrozenEvent])
"""
pass
def write_invite(self, room_id: str, event: FrozenEvent, state: StateMap[dict]):
def write_invite(self, room_id, event, state):
"""Write an invite for the room, with associated invite state.
Args:
room_id
event
state: A subset of the state at the
room_id (str)
event (FrozenEvent)
state (dict[tuple[str, str], dict]): A subset of the state at the
invite, with a subset of the event keys (type, state_key
content and sender)
"""

View File

@@ -140,7 +140,7 @@ class DeactivateAccountHandler(BaseHandler):
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = await self.store.get_invited_rooms_for_local_user(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)
for room in pending_invites:
try:

View File

@@ -64,7 +64,7 @@ from synapse.replication.http.federation import (
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap, UserID, get_domain_from_id
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
@@ -89,7 +89,7 @@ class _NewEventInfo:
event = attr.ib(type=EventBase)
state = attr.ib(type=Optional[Sequence[EventBase]], default=None)
auth_events = attr.ib(type=Optional[StateMap[EventBase]], default=None)
auth_events = attr.ib(type=Optional[Dict[Tuple[str, str], EventBase]], default=None)
def shortstr(iterable, maxitems=5):
@@ -352,7 +352,9 @@ class FederationHandler(BaseHandler):
ours = await self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(ours.values()) # type: list[StateMap[str]]
state_maps = list(
ours.values()
) # type: list[dict[tuple[str, str], str]]
# we don't need this any more, let's delete it.
del ours
@@ -436,7 +438,9 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
return
logger.info("here1")
latest = await self.store.get_latest_event_ids_in_room(room_id)
logger.info("here2")
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -1910,7 +1914,7 @@ class FederationHandler(BaseHandler):
origin: str,
event: EventBase,
state: Optional[Iterable[EventBase]],
auth_events: Optional[StateMap[EventBase]],
auth_events: Optional[Dict[Tuple[str, str], EventBase]],
backfilled: bool,
):
"""

View File

@@ -130,8 +130,6 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -192,8 +190,6 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -235,8 +231,6 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -277,8 +271,6 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -323,8 +315,6 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -371,8 +361,6 @@ class GroupsLocalHandler(object):
requester_user_id,
content,
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -436,8 +424,6 @@ class GroupsLocalHandler(object):
user_id,
content,
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -474,8 +460,6 @@ class GroupsLocalHandler(object):
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")

View File

@@ -101,7 +101,7 @@ class InitialSyncHandler(BaseHandler):
if include_archived:
memberships.append(Membership.LEAVE)
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
room_list = await self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=memberships
)

View File

@@ -156,7 +156,7 @@ class PaginationHandler(object):
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
r = yield self.store.get_room_event_before_stream_ordering(
r = yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
if not r:

View File

@@ -20,7 +20,13 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import MAX_USERID_LENGTH, LoginType
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
RegistrationError,
SynapseError,
)
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
from synapse.replication.http.login import RegisterDeviceReplicationServlet
@@ -159,7 +165,7 @@ class RegistrationHandler(BaseHandler):
Returns:
Deferred[str]: user_id
Raises:
SynapseError if there was a problem registering.
RegistrationError if there was a problem registering.
"""
yield self.check_registration_ratelimit(address)
@@ -168,7 +174,7 @@ class RegistrationHandler(BaseHandler):
if password:
password_hash = yield self._auth_handler.hash(password)
if localpart is not None:
if localpart:
yield self.check_username(localpart, guest_access_token=guest_access_token)
was_guest = guest_access_token is not None
@@ -176,7 +182,7 @@ class RegistrationHandler(BaseHandler):
if not was_guest:
try:
int(localpart)
raise SynapseError(
raise RegistrationError(
400, "Numeric user IDs are reserved for guest users."
)
except ValueError:

View File

@@ -32,15 +32,7 @@ from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, Syna
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.storage.state import StateFilter
from synapse.types import (
Requester,
RoomAlias,
RoomID,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.response_cache import ResponseCache
@@ -215,19 +207,15 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def _update_upgraded_room_pls(
self,
requester: Requester,
old_room_id: str,
new_room_id: str,
old_room_state: StateMap[str],
self, requester, old_room_id, new_room_id, old_room_state,
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester: the user requesting the upgrade
old_room_id: the id of the room to be replaced
new_room_id: the id of the replacement room
old_room_state: the state map for the old room
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (str): the id of the room to be replaced
new_room_id (str): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
Deferred

View File

@@ -690,7 +690,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_local_user_in_room(
invite = yield self.store.get_invite_for_user_in_room(
user_id=user_id, room_id=room_id
)
if invite:

View File

@@ -24,7 +24,6 @@ from saml2.client import Saml2Client
from synapse.api.errors import SynapseError
from synapse.config import ConfigError
from synapse.http.servlet import parse_string
from synapse.module_api import ModuleApi
from synapse.rest.client.v1.login import SSOAuthHandler
from synapse.types import (
UserID,
@@ -32,7 +31,6 @@ from synapse.types import (
mxid_localpart_allowed_characters,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.iterutils import chunk_seq
logger = logging.getLogger(__name__)
@@ -61,8 +59,7 @@ class SamlHandler:
# plugin to do custom mapping from saml response to mxid
self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class(
hs.config.saml2_user_mapping_provider_config,
ModuleApi(hs, hs.get_auth_handler()),
hs.config.saml2_user_mapping_provider_config
)
# identifier for the external_ids table
@@ -115,10 +112,10 @@ class SamlHandler:
# the dict.
self.expire_sessions()
user_id = await self._map_saml_response_to_user(resp_bytes, relay_state)
user_id = await self._map_saml_response_to_user(resp_bytes)
self._sso_auth_handler.complete_sso_login(user_id, request, relay_state)
async def _map_saml_response_to_user(self, resp_bytes, client_redirect_url):
async def _map_saml_response_to_user(self, resp_bytes):
try:
saml2_auth = self._saml_client.parse_authn_request_response(
resp_bytes,
@@ -133,28 +130,17 @@ class SamlHandler:
logger.warning("SAML2 response was not signed")
raise SynapseError(400, "SAML2 response was not signed")
logger.debug("SAML2 response: %s", saml2_auth.origxml)
for assertion in saml2_auth.assertions:
# kibana limits the length of a log field, whereas this is all rather
# useful, so split it up.
count = 0
for part in chunk_seq(str(assertion), 10000):
logger.info(
"SAML2 assertion: %s%s", "(%i)..." % (count,) if count else "", part
)
count += 1
logger.info("SAML2 response: %s", saml2_auth.origxml)
logger.info("SAML2 mapped attributes: %s", saml2_auth.ava)
try:
remote_user_id = saml2_auth.ava["uid"][0]
except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "'uid' not in SAML2 response")
self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
remote_user_id = self._user_mapping_provider.get_remote_user_id(
saml2_auth, client_redirect_url
)
if not remote_user_id:
raise Exception("Failed to extract remote user id from SAML response")
with (await self._mapping_lock.queue(self._auth_provider_id)):
# first of all, check if we already have a mapping for this user
logger.info(
@@ -197,7 +183,7 @@ class SamlHandler:
# Map saml response to user attributes using the configured mapping provider
for i in range(1000):
attribute_dict = self._user_mapping_provider.saml_response_to_user_attributes(
saml2_auth, i, client_redirect_url=client_redirect_url,
saml2_auth, i
)
logger.debug(
@@ -230,8 +216,6 @@ class SamlHandler:
500, "Unable to generate a Matrix ID from the SAML response"
)
logger.info("Mapped SAML user to local part %s", localpart)
registered_user_id = await self._registration_handler.register_user(
localpart=localpart, default_display_name=displayname
)
@@ -281,35 +265,17 @@ class SamlConfig(object):
class DefaultSamlMappingProvider(object):
__version__ = "0.0.1"
def __init__(self, parsed_config: SamlConfig, module_api: ModuleApi):
def __init__(self, parsed_config: SamlConfig):
"""The default SAML user mapping provider
Args:
parsed_config: Module configuration
module_api: module api proxy
"""
self._mxid_source_attribute = parsed_config.mxid_source_attribute
self._mxid_mapper = parsed_config.mxid_mapper
self._grandfathered_mxid_source_attribute = (
module_api._hs.config.saml2_grandfathered_mxid_source_attribute
)
def get_remote_user_id(
self, saml_response: saml2.response.AuthnResponse, client_redirect_url: str
):
"""Extracts the remote user id from the SAML response"""
try:
return saml_response.ava["uid"][0]
except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "'uid' not in SAML2 response")
def saml_response_to_user_attributes(
self,
saml_response: saml2.response.AuthnResponse,
failures: int,
client_redirect_url: str,
self, saml_response: saml2.response.AuthnResponse, failures: int = 0,
) -> dict:
"""Maps some text from a SAML response to attributes of a new user
@@ -319,8 +285,6 @@ class DefaultSamlMappingProvider(object):
failures: How many times a call to this function with this
saml_response has resulted in a failure
client_redirect_url: where the client wants to redirect to
Returns:
dict: A dict containing new user attributes. Possible keys:
* mxid_localpart (str): Required. The localpart of the user's mxid

View File

@@ -179,7 +179,7 @@ class SearchHandler(BaseHandler):
search_filter = Filter(filter_dict)
# TODO: Search through left rooms too
rooms = yield self.store.get_rooms_for_local_user_where_membership_is(
rooms = yield self.store.get_rooms_for_user_where_membership_is(
user.to_string(),
membership_list=[Membership.JOIN],
# membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],

View File

@@ -1662,7 +1662,7 @@ class SyncHandler(object):
Membership.BAN,
)
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
room_list = await self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=membership_list
)

View File

@@ -257,7 +257,7 @@ class TypingHandler(object):
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
async def get_all_typing_updates(self, last_id, current_id):
def get_all_typing_updates(self, last_id, current_id):
if last_id == current_id:
return []

View File

@@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
import collections
import html
import http.client
import logging
import types
@@ -36,7 +36,6 @@ import synapse.metrics
from synapse.api.errors import (
CodeMessageException,
Codes,
RedirectException,
SynapseError,
UnrecognizedRequestError,
)
@@ -154,18 +153,14 @@ def _return_html_error(f, request):
Args:
f (twisted.python.failure.Failure):
request (twisted.web.server.Request):
request (twisted.web.iweb.IRequest):
"""
if f.check(CodeMessageException):
cme = f.value
code = cme.code
msg = cme.msg
if isinstance(cme, RedirectException):
logger.info("%s redirect to %s", request, cme.location)
request.setHeader(b"location", cme.location)
request.cookies.extend(cme.cookies)
elif isinstance(cme, SynapseError):
if isinstance(cme, SynapseError):
logger.info("%s SynapseError: %s - %s", request, code, msg)
else:
logger.error(
@@ -183,7 +178,7 @@ def _return_html_error(f, request):
exc_info=(f.type, f.value, f.getTracebackObject()),
)
body = HTML_ERROR_TEMPLATE.format(code=code, msg=html.escape(msg)).encode("utf-8")
body = HTML_ERROR_TEMPLATE.format(code=code, msg=cgi.escape(msg)).encode("utf-8")
request.setResponseCode(code)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%i" % (len(body),))

View File

@@ -88,7 +88,7 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
uri = self.uri
if isinstance(uri, bytes):
uri = self.uri.decode("ascii", errors="replace")
uri = self.uri.decode("ascii")
return redact_uri(uri)
def get_method(self):

View File

@@ -571,9 +571,6 @@ def run_in_background(f, *args, **kwargs):
yield or await on (for instance because you want to pass it to
deferred.gatherResults()).
If f returns a Coroutine object, it will be wrapped into a Deferred (which will have
the side effect of executing the coroutine).
Note that if you completely discard the result, you should make sure that
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,26 +16,18 @@ import logging
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import UserID
"""
This package defines the 'stable' API which can be used by extension modules which
are loaded into Synapse.
"""
__all__ = ["errors", "make_deferred_yieldable", "run_in_background", "ModuleApi"]
logger = logging.getLogger(__name__)
class ModuleApi(object):
"""A proxy object that gets passed to various plugin modules so they
"""A proxy object that gets passed to password auth providers so they
can register new users etc if necessary.
"""
def __init__(self, hs, auth_handler):
self._hs = hs
self.hs = hs
self._store = hs.get_datastore()
self._auth = hs.get_auth()
@@ -73,7 +64,7 @@ class ModuleApi(object):
"""
if username.startswith("@"):
return username
return UserID(username, self._hs.hostname).to_string()
return UserID(username, self.hs.hostname).to_string()
def check_user_exists(self, user_id):
"""Check if user exists.
@@ -120,14 +111,10 @@ class ModuleApi(object):
displayname (str|None): The displayname of the new user.
emails (List[str]): Emails to bind to the new user.
Raises:
SynapseError if there is an error performing the registration. Check the
'errcode' property for more information on the reason for failure
Returns:
Deferred[str]: user_id
"""
return self._hs.get_registration_handler().register_user(
return self.hs.get_registration_handler().register_user(
localpart=localpart, default_display_name=displayname, bind_emails=emails
)
@@ -144,34 +131,12 @@ class ModuleApi(object):
Returns:
defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
"""
return self._hs.get_registration_handler().register_device(
return self.hs.get_registration_handler().register_device(
user_id=user_id,
device_id=device_id,
initial_display_name=initial_display_name,
)
def record_user_external_id(
self, auth_provider_id: str, remote_user_id: str, registered_user_id: str
) -> defer.Deferred:
"""Record a mapping from an external user id to a mxid
Args:
auth_provider: identifier for the remote auth provider
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
return self._store.record_user_external_id(
auth_provider_id, remote_user_id, registered_user_id
)
def generate_short_term_login_token(
self, user_id: str, duration_in_ms: int = (2 * 60 * 1000)
) -> str:
"""Generate a login token suitable for m.login.token authentication"""
return self._hs.get_macaroon_generator().generate_short_term_login_token(
user_id, duration_in_ms
)
@defer.inlineCallbacks
def invalidate_access_token(self, access_token):
"""Invalidate an access token for a user
@@ -192,7 +157,7 @@ class ModuleApi(object):
user_id = user_info["user"].to_string()
if device_id:
# delete the device, which will also delete its access tokens
yield self._hs.get_device_handler().delete_device(user_id, device_id)
yield self.hs.get_device_handler().delete_device(user_id, device_id)
else:
# no associated device. Just delete the access token.
yield self._auth_handler.delete_access_token(access_token)

View File

@@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exception types which are exposed as part of the stable module API"""
from synapse.api.errors import RedirectException, SynapseError # noqa: F401

View File

@@ -15,13 +15,11 @@
import logging
from collections import namedtuple
from typing import Callable, List
from prometheus_client import Counter
from twisted.internet import defer
import synapse.server
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
@@ -156,7 +154,7 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs: "synapse.server.HomeServer"):
def __init__(self, hs):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
@@ -166,12 +164,7 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = []
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]
self.replication_callbacks = []
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
@@ -212,7 +205,7 @@ class Notifier(object):
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
)
def add_replication_callback(self, cb: Callable[[], None]):
def add_replication_callback(self, cb):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
@@ -220,12 +213,6 @@ class Notifier(object):
"""
self.replication_callbacks.append(cb)
def add_remote_server_up_callback(self, cb: Callable[[str], None]):
"""Add a callback that will be called when synapse detects a server
has been
"""
self.remote_server_up_callbacks.append(cb)
def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
):
@@ -535,15 +522,3 @@ class Notifier(object):
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()
def notify_remote_server_up(self, server: str):
"""Notify any replication that a remote server has come back up
"""
# We call federation_sender directly rather than registering as a
# callback as a) we already have a reference to it and b) it introduces
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)
for cb in self.remote_server_up_callbacks:
cb(server)

View File

@@ -21,7 +21,7 @@ from synapse.storage import Storage
@defer.inlineCallbacks
def get_badge_count(store, user_id):
invites = yield store.get_invited_rooms_for_local_user(user_id)
invites = yield store.get_invited_rooms_for_user(user_id)
joins = yield store.get_rooms_for_user(user_id)
my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")

View File

@@ -16,7 +16,6 @@
import abc
import logging
import re
from typing import Dict, List, Tuple
from six import raise_from
from six.moves import urllib
@@ -79,8 +78,9 @@ class ReplicationEndpoint(object):
__metaclass__ = abc.ABCMeta
NAME = abc.abstractproperty() # type: str # type: ignore
PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore
NAME = abc.abstractproperty()
PATH_ARGS = abc.abstractproperty()
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
@@ -171,7 +171,7 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers = {} # type: Dict[bytes, List[bytes]]
headers = {}
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
@@ -207,7 +207,7 @@ class ReplicationEndpoint(object):
method = self.METHOD
if self.CACHE:
handler = self._cached_handler # type: ignore
handler = self._cached_handler
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import Dict, Optional
from typing import Dict
import six
@@ -41,7 +41,7 @@ class BaseSlavedStore(SQLBaseStore):
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
db_conn, "cache_invalidation_stream", "stream_id"
) # type: Optional[SlavedIdTracker]
)
else:
self._cache_id_gen = None
@@ -62,8 +62,7 @@ class BaseSlavedStore(SQLBaseStore):
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
if self._cache_id_gen:
self._cache_id_gen.advance(token)
self._cache_id_gen.advance(token)
for row in rows:
if row.cache_func == CURRENT_STATE_CACHE_NAME:
room_id = row.keys[0]

View File

@@ -152,7 +152,7 @@ class SlavedEventStore(
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
self.get_invited_rooms_for_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))

View File

@@ -29,7 +29,7 @@ class SlavedPresenceStore(BaseSlavedStore):
self._presence_on_startup = self._get_active_presence(db_conn)
self.presence_stream_cache = StreamChangeCache(
self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)

View File

@@ -16,7 +16,7 @@
"""
import logging
from typing import Dict, List, Optional
from typing import Dict
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
@@ -28,7 +28,6 @@ from synapse.replication.tcp.protocol import (
)
from .commands import (
Command,
FederationAckCommand,
InvalidateCacheCommand,
RemovePusherCommand,
@@ -90,15 +89,15 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
# Any pending commands to be sent once a new connection has been
# established
self.pending_commands = [] # type: List[Command]
self.pending_commands = []
# Map from string -> deferred, to wake up when receiveing a SYNC with
# the given string.
# Used for tests.
self.awaiting_syncs = {} # type: Dict[str, defer.Deferred]
self.awaiting_syncs = {}
# The factory used to create connections.
self.factory = None # type: Optional[ReplicationClientFactory]
self.factory = None
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
@@ -110,7 +109,7 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self.factory)
async def on_rdata(self, stream_name, token, rows):
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
By default this just pokes the slave store. Can be overridden in subclasses to
@@ -121,17 +120,20 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
return self.store.process_replication_rows(stream_name, token, rows)
async def on_position(self, stream_name, token):
def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
the slave store.
Can be overriden in subclasses to handle more.
"""
self.store.process_replication_rows(stream_name, token, [])
return self.store.process_replication_rows(stream_name, token, [])
def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting
@@ -143,9 +145,6 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
if d:
d.callback(data)
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to
subscribe to streams.
@@ -236,5 +235,4 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
if self.factory:
self.factory.resetDelay()
self.factory.resetDelay()

View File

@@ -20,16 +20,15 @@ allowed to be sent by which side.
import logging
import platform
from typing import Tuple, Type
if platform.python_implementation() == "PyPy":
import json
_json_encoder = json.JSONEncoder()
else:
import simplejson as json # type: ignore[no-redef] # noqa: F821
import simplejson as json
_json_encoder = json.JSONEncoder(namedtuple_as_object=False) # type: ignore[call-arg] # noqa: F821
_json_encoder = json.JSONEncoder(namedtuple_as_object=False)
logger = logging.getLogger(__name__)
@@ -45,7 +44,7 @@ class Command(object):
The default implementation creates a command of form `<NAME> <data>`
"""
NAME = None # type: str
NAME = None
def __init__(self, data):
self.data = data
@@ -387,39 +386,25 @@ class UserIpCommand(Command):
)
class RemoteServerUpCommand(Command):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.
If sent from a client the server will relay to all other workers.
Format::
REMOTE_SERVER_UP <server>
"""
NAME = "REMOTE_SERVER_UP"
_COMMANDS = (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
RemoteServerUpCommand,
) # type: Tuple[Type[Command], ...]
# Map of command name to command type.
COMMAND_MAP = {cmd.NAME: cmd for cmd in _COMMANDS}
COMMAND_MAP = {
cmd.NAME: cmd
for cmd in (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
)
}
# The commands the server is allowed to send
VALID_SERVER_COMMANDS = (
@@ -429,7 +414,6 @@ VALID_SERVER_COMMANDS = (
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
RemoteServerUpCommand.NAME,
)
# The commands the client is allowed to send
@@ -443,5 +427,4 @@ VALID_CLIENT_COMMANDS = (
InvalidateCacheCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)

View File

@@ -53,7 +53,6 @@ import fcntl
import logging
import struct
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Set, Tuple
from six import iteritems, iterkeys
@@ -66,26 +65,24 @@ from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import (
from synapse.util import Clock
from synapse.util.stringutils import random_string
from .commands import (
COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
ErrorCommand,
NameCommand,
PingCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
ReplicateCommand,
ServerCommand,
SyncCommand,
UserSyncCommand,
)
from synapse.replication.tcp.streams import STREAMS_MAP
from synapse.types import Collection
from synapse.util import Clock
from synapse.util.stringutils import random_string
from .streams import STREAMS_MAP
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
@@ -127,11 +124,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
delimiter = b"\n"
# Valid commands we expect to receive
VALID_INBOUND_COMMANDS = [] # type: Collection[str]
# Valid commands we can send
VALID_OUTBOUND_COMMANDS = [] # type: Collection[str]
VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
max_line_buffer = 10000
@@ -150,13 +144,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.conn_id = random_string(5) # To dedupe in case of name clashes.
# List of pending commands to send once we've established the connection
self.pending_commands = [] # type: List[Command]
self.pending_commands = []
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -241,16 +235,19 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
async def handle_command(self, cmd: Command):
def handle_command(self, cmd):
"""Handle a command we have received over the replication stream.
By default delegates to on_<COMMAND>, which should return an awaitable.
By default delegates to on_<COMMAND>
Args:
cmd: received command
cmd (synapse.replication.tcp.commands.Command): received command
Returns:
Deferred
"""
handler = getattr(self, "on_%s" % (cmd.NAME,))
await handler(cmd)
return handler(cmd)
def close(self):
logger.warning("[%s] Closing connection", self.id())
@@ -323,10 +320,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
for cmd in pending:
self.send_command(cmd)
async def on_PING(self, line):
def on_PING(self, line):
self.received_ping = True
async def on_ERROR(self, cmd):
def on_ERROR(self, cmd):
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
def pauseProducing(self):
@@ -412,30 +409,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.streamer = streamer
# The streams the client has subscribed to and is up to date with
self.replication_streams = set() # type: Set[str]
self.replication_streams = set()
# The streams the client is currently subscribing to.
self.connecting_streams = set() # type: Set[str]
self.connecting_streams = set()
# Map from stream name to list of updates to send once we've finished
# subscribing the client to the stream.
self.pending_rdata = {} # type: Dict[str, List[Tuple[int, Any]]]
self.pending_rdata = {}
def connectionMade(self):
self.send_command(ServerCommand(self.server_name))
BaseReplicationStreamProtocol.connectionMade(self)
self.streamer.new_connection(self)
async def on_NAME(self, cmd):
def on_NAME(self, cmd):
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
self.name = cmd.data
async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
def on_USER_SYNC(self, cmd):
return self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
async def on_REPLICATE(self, cmd):
def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name
token = cmd.token
@@ -446,26 +443,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
for stream in iterkeys(self.streamer.streams_by_name)
]
await make_deferred_yieldable(
return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
await self.subscribe_to_stream(stream_name, token)
return self.subscribe_to_stream(stream_name, token)
async def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
def on_FEDERATION_ACK(self, cmd):
return self.streamer.federation_ack(cmd.token)
async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
def on_REMOVE_PUSHER(self, cmd):
return self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
def on_INVALIDATE_CACHE(self, cmd):
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
async def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
def on_USER_IP(self, cmd):
return self.streamer.on_user_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
@@ -474,7 +468,8 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
cmd.last_seen,
)
async def subscribe_to_stream(self, stream_name, token):
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those
@@ -486,7 +481,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
try:
# Get missing updates
updates, current_token = await self.streamer.get_stream_updates(
updates, current_token = yield self.streamer.get_stream_updates(
stream_name, token
)
@@ -559,9 +554,6 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def send_sync(self, data):
self.send_command(SyncCommand(data))
def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.streamer.lost_connection(self)
@@ -574,7 +566,7 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
async def on_rdata(self, stream_name, token, rows):
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
Args:
@@ -582,11 +574,14 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
raise NotImplementedError()
@abc.abstractmethod
async def on_position(self, stream_name, token):
def on_position(self, stream_name, token):
"""Called when we get new position data."""
raise NotImplementedError()
@@ -595,11 +590,6 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""Called when get a new SYNC command."""
raise NotImplementedError()
@abc.abstractmethod
async def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
raise NotImplementedError()
@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
@@ -652,11 +642,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Set of stream names that have been subscribe to, but haven't yet
# caught up with. This is used to track when the client has been fully
# connected to the remote.
self.streams_connecting = set() # type: Set[str]
self.streams_connecting = set()
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {} # type: Dict[str, Any]
self.pending_batches = {}
def connectionMade(self):
self.send_command(NameCommand(self.client_name))
@@ -680,12 +670,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
if not self.streams_connecting:
self.handler.finished_connecting()
async def on_SERVER(self, cmd):
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
self.send_error("Wrong remote")
async def on_RDATA(self, cmd):
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()
@@ -705,22 +695,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
await self.handler.on_rdata(stream_name, cmd.token, rows)
return self.handler.on_rdata(stream_name, cmd.token, rows)
async def on_POSITION(self, cmd):
def on_POSITION(self, cmd):
# When we get a `POSITION` command it means we've finished getting
# missing updates for the given stream, and are now up to date.
self.streams_connecting.discard(cmd.stream_name)
if not self.streams_connecting:
self.handler.finished_connecting()
await self.handler.on_position(cmd.stream_name, cmd.token)
return self.handler.on_position(cmd.stream_name, cmd.token)
async def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.handler.on_remote_server_up(cmd.data)
def on_SYNC(self, cmd):
return self.handler.on_sync(cmd.data)
def replicate(self, stream_name, token):
"""Send the subscription request to the server
@@ -779,7 +766,7 @@ def transport_kernel_read_buffer_size(protocol, read=True):
op = SIOCINQ
else:
op = SIOCOUTQ
size = struct.unpack("I", fcntl.ioctl(fileno, op, b"\0\0\0\0"))[0]
size = struct.unpack("I", fcntl.ioctl(fileno, op, "\0\0\0\0"))[0]
return size
return 0

View File

@@ -17,12 +17,12 @@
import logging
import random
from typing import List
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge
@@ -79,7 +79,7 @@ class ReplicationStreamer(object):
self._replication_torture_level = hs.config.replication_torture_level
# Current connections.
self.connections = [] # type: List[ServerReplicationStreamProtocol]
self.connections = []
LaterGauge(
"synapse_replication_tcp_resource_total_connections",
@@ -120,7 +120,6 @@ class ReplicationStreamer(object):
self.federation_sender = hs.get_federation_sender()
self.notifier.add_replication_callback(self.on_notifier_poke)
self.notifier.add_remote_server_up_callback(self.send_remote_server_up)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
@@ -155,7 +154,8 @@ class ReplicationStreamer(object):
run_as_background_process("replication_notifier", self._run_notifier_loop)
async def _run_notifier_loop(self):
@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True
try:
@@ -184,7 +184,7 @@ class ReplicationStreamer(object):
continue
if self._replication_torture_level:
await self.clock.sleep(
yield self.clock.sleep(
self._replication_torture_level / 1000.0
)
@@ -195,7 +195,7 @@ class ReplicationStreamer(object):
stream.upto_token,
)
try:
updates, current_token = await stream.get_updates()
updates, current_token = yield stream.get_updates()
except Exception:
logger.info("Failed to handle stream %s", stream.NAME)
raise
@@ -232,7 +232,7 @@ class ReplicationStreamer(object):
self.is_looping = False
@measure_func("repl.get_stream_updates")
async def get_stream_updates(self, stream_name, token):
def get_stream_updates(self, stream_name, token):
"""For a given stream get all updates since token. This is called when
a client first subscribes to a stream.
"""
@@ -240,7 +240,7 @@ class ReplicationStreamer(object):
if not stream:
raise Exception("unknown stream %s", stream_name)
return await stream.get_updates_since(token)
return stream.get_updates_since(token)
@measure_func("repl.federation_ack")
def federation_ack(self, token):
@@ -251,20 +251,22 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
@defer.inlineCallbacks
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
await self.presence_handler.update_external_syncs_row(
yield self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms
)
@measure_func("repl.on_remove_pusher")
async def on_remove_pusher(self, app_id, push_key, user_id):
@defer.inlineCallbacks
def on_remove_pusher(self, app_id, push_key, user_id):
"""A client has asked us to remove a pusher
"""
remove_pusher_counter.inc()
await self.store.delete_pusher_by_app_id_pushkey_user_id(
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id=app_id, pushkey=push_key, user_id=user_id
)
@@ -278,24 +280,15 @@ class ReplicationStreamer(object):
getattr(self.store, cache_func).invalidate(tuple(keys))
@measure_func("repl.on_user_ip")
async def on_user_ip(
self, user_id, access_token, ip, user_agent, device_id, last_seen
):
@defer.inlineCallbacks
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
"""The client saw a user request
"""
user_ip_cache_counter.inc()
await self.store.insert_client_ip(
yield self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen
)
await self._server_notices_sender.on_user_ip(user_id)
@measure_func("repl.on_remote_server_up")
def on_remote_server_up(self, server: str):
self.notifier.notify_remote_server_up(server)
def send_remote_server_up(self, server: str):
for conn in self.connections:
conn.send_remote_server_up(server)
yield self._server_notices_sender.on_user_ip(user_id)
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.

View File

@@ -14,10 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
from typing import Any
from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -102,9 +104,8 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # type: str # The name of the stream
# The type of the row. Used by the default impl of parse_row.
ROW_TYPE = None # type: Any
NAME = None # The name of the stream
ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit
@classmethod
@@ -142,7 +143,8 @@ class Stream(object):
self.upto_token = self.current_token()
self.last_token = self.upto_token
async def get_updates(self):
@defer.inlineCallbacks
def get_updates(self):
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before),
until the `upto_token`
@@ -153,12 +155,13 @@ class Stream(object):
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
"""
updates, current_token = await self.get_updates_since(self.last_token)
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
return updates, current_token
async def get_updates_since(self, from_token):
@defer.inlineCallbacks
def get_updates_since(self, from_token):
"""Like get_updates except allows specifying from when we should
stream updates
@@ -178,16 +181,15 @@ class Stream(object):
if from_token == current_token:
return [], current_token
logger.info("get_updates_since: %s", self.__class__)
if self._LIMITED:
rows = await self.update_function(
rows = yield self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
rows = await self.update_function(from_token, current_token)
rows = yield self.update_function(from_token, current_token)
updates = [(row[0], row[1:]) for row in rows]
@@ -229,8 +231,8 @@ class BackfillStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_backfill_token # type: ignore
self.update_function = store.get_all_new_backfill_event_rows # type: ignore
self.current_token = store.get_current_backfill_token
self.update_function = store.get_all_new_backfill_event_rows
super(BackfillStream, self).__init__(hs)
@@ -244,8 +246,8 @@ class PresenceStream(Stream):
store = hs.get_datastore()
presence_handler = hs.get_presence_handler()
self.current_token = store.get_current_presence_token # type: ignore
self.update_function = presence_handler.get_all_presence_updates # type: ignore
self.current_token = store.get_current_presence_token
self.update_function = presence_handler.get_all_presence_updates
super(PresenceStream, self).__init__(hs)
@@ -258,8 +260,8 @@ class TypingStream(Stream):
def __init__(self, hs):
typing_handler = hs.get_typing_handler()
self.current_token = typing_handler.get_current_token # type: ignore
self.update_function = typing_handler.get_all_typing_updates # type: ignore
self.current_token = typing_handler.get_current_token
self.update_function = typing_handler.get_all_typing_updates
super(TypingStream, self).__init__(hs)
@@ -271,8 +273,8 @@ class ReceiptsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_receipt_stream_id # type: ignore
self.update_function = store.get_all_updated_receipts # type: ignore
self.current_token = store.get_max_receipt_stream_id
self.update_function = store.get_all_updated_receipts
super(ReceiptsStream, self).__init__(hs)
@@ -292,8 +294,9 @@ class PushRulesStream(Stream):
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token
async def update_function(self, from_token, to_token, limit):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
@defer.inlineCallbacks
def update_function(self, from_token, to_token, limit):
rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
return [(row[0], row[2]) for row in rows]
@@ -307,8 +310,8 @@ class PushersStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_pushers_stream_token # type: ignore
self.update_function = store.get_all_updated_pushers_rows # type: ignore
self.current_token = store.get_pushers_stream_token
self.update_function = store.get_all_updated_pushers_rows
super(PushersStream, self).__init__(hs)
@@ -324,8 +327,8 @@ class CachesStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_cache_stream_token # type: ignore
self.update_function = store.get_all_updated_caches # type: ignore
self.current_token = store.get_cache_stream_token
self.update_function = store.get_all_updated_caches
super(CachesStream, self).__init__(hs)
@@ -340,8 +343,8 @@ class PublicRoomsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_public_room_stream_id # type: ignore
self.update_function = store.get_all_new_public_rooms # type: ignore
self.current_token = store.get_current_public_room_stream_id
self.update_function = store.get_all_new_public_rooms
super(PublicRoomsStream, self).__init__(hs)
@@ -357,8 +360,8 @@ class DeviceListsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_device_stream_token # type: ignore
self.update_function = store.get_all_device_list_changes_for_remotes # type: ignore
self.current_token = store.get_device_stream_token
self.update_function = store.get_all_device_list_changes_for_remotes
super(DeviceListsStream, self).__init__(hs)
@@ -373,8 +376,8 @@ class ToDeviceStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_to_device_stream_token # type: ignore
self.update_function = store.get_all_new_device_messages # type: ignore
self.current_token = store.get_to_device_stream_token
self.update_function = store.get_all_new_device_messages
super(ToDeviceStream, self).__init__(hs)
@@ -389,8 +392,8 @@ class TagAccountDataStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_account_data_stream_id # type: ignore
self.update_function = store.get_all_updated_tags # type: ignore
self.current_token = store.get_max_account_data_stream_id
self.update_function = store.get_all_updated_tags
super(TagAccountDataStream, self).__init__(hs)
@@ -405,12 +408,13 @@ class AccountDataStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
self.current_token = self.store.get_max_account_data_stream_id # type: ignore
self.current_token = self.store.get_max_account_data_stream_id
super(AccountDataStream, self).__init__(hs)
async def update_function(self, from_token, to_token, limit):
global_results, room_results = await self.store.get_all_updated_account_data(
@defer.inlineCallbacks
def update_function(self, from_token, to_token, limit):
global_results, room_results = yield self.store.get_all_updated_account_data(
from_token, from_token, to_token, limit
)
@@ -430,8 +434,8 @@ class GroupServerStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_group_stream_token # type: ignore
self.update_function = store.get_all_groups_changes # type: ignore
self.current_token = store.get_group_stream_token
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
@@ -447,7 +451,7 @@ class UserSignatureStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_device_stream_token # type: ignore
self.update_function = store.get_all_user_signature_changes_for_remotes # type: ignore
self.current_token = store.get_device_stream_token
self.update_function = store.get_all_user_signature_changes_for_remotes
super(UserSignatureStream, self).__init__(hs)

View File

@@ -13,12 +13,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
from typing import Tuple, Type
import attr
from twisted.internet import defer
from ._base import Stream
@@ -63,8 +63,7 @@ class BaseEventsStreamRow(object):
Specifies how to identify, serialize and deserialize the different types.
"""
# Unique string that ids the type. Must be overriden in sub classes.
TypeId = None # type: str
TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
@classmethod
def from_data(cls, data):
@@ -100,12 +99,9 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
event_id = attr.ib() # str, optional
_EventRows = (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
) # type: Tuple[Type[BaseEventsStreamRow], ...]
TypeToRow = {Row.TypeId: Row for Row in _EventRows}
TypeToRow = {
Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
}
class EventsStream(Stream):
@@ -116,19 +112,20 @@ class EventsStream(Stream):
def __init__(self, hs):
self._store = hs.get_datastore()
self.current_token = self._store.get_current_events_token # type: ignore
self.current_token = self._store.get_current_events_token
super(EventsStream, self).__init__(hs)
async def update_function(self, from_token, current_token, limit=None):
event_rows = await self._store.get_all_new_forward_event_rows(
@defer.inlineCallbacks
def update_function(self, from_token, current_token, limit=None):
event_rows = yield self._store.get_all_new_forward_event_rows(
from_token, current_token, limit
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
)
state_rows = await self._store.get_all_updated_current_state_deltas(
state_rows = yield self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (

View File

@@ -37,7 +37,7 @@ class FederationStream(Stream):
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token # type: ignore
self.update_function = federation_sender.get_replication_rows # type: ignore
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
super(FederationStream, self).__init__(hs)

View File

@@ -38,7 +38,6 @@ from synapse.rest.admin.users import (
SearchUsersRestServlet,
UserAdminServlet,
UserRegisterServlet,
UserRestServletV2,
UsersRestServlet,
UsersRestServletV2,
WhoisRestServlet,
@@ -107,7 +106,7 @@ class PurgeHistoryRestServlet(RestServlet):
stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
r = await self.store.get_room_event_before_stream_ordering(
r = await self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering
)
if not r:
@@ -192,7 +191,6 @@ def register_servlets(hs, http_server):
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
UserAdminServlet(hs).register(http_server)
UserRestServletV2(hs).register(http_server)
UsersRestServletV2(hs).register(http_server)

View File

@@ -32,24 +32,16 @@ class QuarantineMediaInRoom(RestServlet):
this server.
"""
PATTERNS = (
historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media/quarantine")
+
# This path kept around for legacy reasons
historical_admin_path_patterns("/quarantine_media/(?P<room_id>![^/]+)")
)
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, room_id: str):
async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining room: %s", room_id)
# Quarantine all media in this room
num_quarantined = await self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
@@ -57,60 +49,6 @@ class QuarantineMediaInRoom(RestServlet):
return 200, {"num_quarantined": num_quarantined}
class QuarantineMediaByUser(RestServlet):
"""Quarantines all local media by a given user so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns(
"/user/(?P<user_id>[^/]+)/media/quarantine"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, user_id: str):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining local media by user: %s", user_id)
# Quarantine all media this user has uploaded
num_quarantined = await self.store.quarantine_media_ids_by_user(
user_id, requester.user.to_string()
)
return 200, {"num_quarantined": num_quarantined}
class QuarantineMediaByID(RestServlet):
"""Quarantines local or remote media by a given ID so that no one can download
it via this server.
"""
PATTERNS = historical_admin_path_patterns(
"/media/quarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, server_name: str, media_id: str):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining local media by ID: %s/%s", server_name, media_id)
# Quarantine this media id
await self.store.quarantine_media_by_id(
server_name, media_id, requester.user.to_string()
)
return 200, {}
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
@@ -156,6 +94,4 @@ def register_servlets_for_media_repo(hs, http_server):
"""
PurgeMediaCacheRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
QuarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)

Some files were not shown because too many files have changed in this diff Show More