1
0

Compare commits

...

120 Commits

Author SHA1 Message Date
Erik Johnston
57a60365da Merge branch 'develop' of github.com:matrix-org/synapse into erikj/debug_direct_message_checks 2020-01-22 16:53:28 +00:00
Andrew Morgan
ce84dd9e20 Remove unnecessary abstractions in admin handler (#6751) 2020-01-22 15:09:57 +00:00
Brendan Abolivier
33f7e5ce2a Fixup warning about workers changes 2020-01-22 14:49:21 +00:00
Brendan Abolivier
91085ef49e Add deprecation headers 2020-01-22 14:30:22 +00:00
Brendan Abolivier
ffa637050d Fixup changelog 2020-01-22 14:19:23 +00:00
Brendan Abolivier
0d0f32bc53 1.9.0rc1 2020-01-22 14:03:46 +00:00
Andrew Morgan
90a28fb475 Admin API to list, filter and sort rooms (#6720) 2020-01-22 13:36:43 +00:00
Brendan Abolivier
ae6cf586b0 Merge pull request #6764 from matrix-org/babolivier/fix-thumbnail
Fix typo in _select_thumbnail
2020-01-22 13:21:00 +00:00
Brendan Abolivier
6ae0c8db33 Lint + changelog 2020-01-22 12:38:18 +00:00
Brendan Abolivier
d9a8728b11 Remove unused import 2020-01-22 12:30:49 +00:00
Brendan Abolivier
67aa18e8dc Add tests for thumbnailing 2020-01-22 12:28:07 +00:00
Brendan Abolivier
ed83c3a018 Fix typo in _select_thumbnail 2020-01-22 12:27:42 +00:00
Andrew Morgan
aa9b00fb2f Fix and add test to deprecated quarantine media admin api (#6756) 2020-01-22 11:05:50 +00:00
Neil Johnson
5e52d8563b Allow monthly active user limiting support for worker mode, fixes #4639. (#6742) 2020-01-22 11:05:14 +00:00
Erik Johnston
5d7a6ad223 Allow streaming cache invalidate all to workers. (#6749) 2020-01-22 10:37:00 +00:00
Erik Johnston
2093f83ea0 Remove unused CI docker compose files (#6754)
These now exist in the pipelines repo.
2020-01-22 10:36:48 +00:00
Ivan Vilata-i-Balaguer
837f62266b Avoid attribute error when password_config present but empty (#6753)
The old statement returned `None` for such a `password_config` (like the one
created on first run), thus retrieval of the `pepper` key failed with
`AttributeError`.

Fixes #5315

Signed-off-by: Ivan Vilata i Balaguer <ivan@selidor.net>
2020-01-22 07:32:52 +00:00
Brendan Abolivier
07124d028d Port synapse_port_db to async/await (#6718)
* Raise an exception if there are pending background updates

So we return with a non-0 code

* Changelog

* Port synapse_port_db to async/await

* Port update_database to async/await

* Add version string to mocked homeservers

* Remove unused imports

* Convert overseen bits to async/await

* Fixup logging contexts

* Fix imports

* Add a way to print an error without raising an exception

* Incorporate review
2020-01-21 19:04:58 +00:00
Erik Johnston
0e68760078 Add a DeltaState to track changes to be made to current state (#6716) 2020-01-20 18:07:20 +00:00
Erik Johnston
b0a66ab83c Fixup synapse.rest to pass mypy (#6732) 2020-01-20 17:38:21 +00:00
Erik Johnston
74b74462f1 Fix /events/:event_id deprecated API. (#6731) 2020-01-20 17:38:09 +00:00
Erik Johnston
0f6e525be3 Fixup synapse.api to pass mypy (#6733) 2020-01-20 17:34:13 +00:00
Erik Johnston
ceecedc68b Fix changing password via user admin API. (#6730) 2020-01-20 17:23:59 +00:00
Andrew Morgan
e9e066055f Fix empty account_validity config block (#6747) 2020-01-20 16:21:59 +00:00
Andrew Morgan
351fdfede6 Update changelog.d/6747.bugfix
Co-Authored-By: Erik Johnston <erik@matrix.org>
2020-01-20 15:58:44 +00:00
Erik Johnston
2f23eb27b3 Revert "Newsfile"
This reverts commit 11c23af465.
2020-01-20 15:12:58 +00:00
Erik Johnston
11c23af465 Newsfile 2020-01-20 15:11:38 +00:00
Andrew Morgan
026f4bdf3c Add changelog 2020-01-20 14:12:21 +00:00
Andrew Morgan
198d52da3a Fix empty account_validity config block 2020-01-20 14:05:29 +00:00
Brendan Abolivier
a17f64361c Add more logging around message retention policies support (#6717)
So we can debug issues like #6683 more easily
2020-01-17 20:51:44 +00:00
Erik Johnston
5909751936 Fix up changelog 2020-01-17 15:13:27 +00:00
Richard van der Hoff
0b885d62ef bump version to v1.9.0.dev2 2020-01-17 14:58:58 +00:00
Satsuki Yanagi
722b4f302d Fix syntax error in run_upgrade for schema 57 (#6728)
Fix #6727
Related #6655

Co-authored-by: Erik Johnston <erikj@jki.re>
2020-01-17 14:30:35 +00:00
Brendan Abolivier
3b72bb780a Merge pull request #6714 from matrix-org/babolivier/retention_select_event
Fix instantiation of message retention purge jobs
2020-01-17 14:23:51 +00:00
Richard van der Hoff
1dee1e900b bump version to v1.9.0.dev1 2020-01-17 10:44:12 +00:00
Richard van der Hoff
59dc87c618 Merge pull request #6724 from matrix-org/rav/log_saml_attributes
Log saml assertions rather than the whole response
2020-01-17 10:33:24 +00:00
Richard van der Hoff
2b6a77fcde Delegate remote_user_id mapping to the saml mapping provider (#6723)
Turns out that figuring out a remote user id for the SAML user isn't quite as obvious as it seems. Factor it out to the SamlMappingProvider so that it's easy to control.
2020-01-17 10:32:47 +00:00
Erik Johnston
a8a50f5b57 Wake up transaction queue when remote server comes back online (#6706)
This will be used to retry outbound transactions to a remote server if
we think it might have come back up.
2020-01-17 10:27:19 +00:00
Richard van der Hoff
5ce0b17e38 Clarify the account_validity and email sections of the sample configuration. (#6685)
Generally try to make this more comprehensible, and make it match the
conventions.

I've removed the documentation for all the settings which allow you to change
the names of the template files, because I can't really see why they are
useful.
2020-01-17 10:04:15 +00:00
Richard van der Hoff
95c5b9bfb3 changelog 2020-01-16 22:29:06 +00:00
Richard van der Hoff
acc7820574 Log saml assertions rather than the whole response
... since the whole response is huge.

We even need to break up the assertions, since kibana otherwise truncates them.
2020-01-16 22:26:34 +00:00
Richard van der Hoff
14d8f342d5 move batch_iter to a separate module 2020-01-16 22:25:32 +00:00
Brendan Abolivier
4fb3cb208a Precise changelog 2020-01-16 20:27:07 +00:00
Brendan Abolivier
dac148341b Fixup diff 2020-01-16 20:25:09 +00:00
Brendan Abolivier
842c2cfbf1 Remove get_room_event_after_stream_ordering entirely 2020-01-16 20:24:17 +00:00
Erik Johnston
d386f2f339 Add StateMap type alias (#6715) 2020-01-16 13:31:22 +00:00
Brendan Abolivier
e601f35d3b Lint 2020-01-16 09:55:11 +00:00
Andrew Morgan
7b14c4a018 Add tips for the changelog to the pull request template (#6663) 2020-01-16 09:46:36 +00:00
Neil Johnson
38e0e59f42 Add org.matrix.e2e_cross_signing to unstable_features in /versions as per MSC1756 (#6712) 2020-01-16 09:46:14 +00:00
Erik Johnston
48c3a96886 Port synapse.replication.tcp to async/await (#6666)
* Port synapse.replication.tcp to async/await

* Newsfile

* Correctly document type of on_<FOO> functions as async

* Don't be overenthusiastic with the asyncing....
2020-01-16 09:16:12 +00:00
Brendan Abolivier
48e57a6452 Rename changelog 2020-01-15 19:40:52 +00:00
Brendan Abolivier
914e73cdd9 Changelog 2020-01-15 19:36:19 +00:00
Brendan Abolivier
066b9f52b8 Correctly order when selecting before stream ordering 2020-01-15 19:32:47 +00:00
Brendan Abolivier
8363588237 Fix typo 2020-01-15 19:13:22 +00:00
Brendan Abolivier
855af069a4 Fix instantiation of message retention purge jobs
When figuring out which topological token to start a purge job at, we
need to do the following:

1. Figure out a timestamp before which events will be purged
2. Select the first stream ordering after that timestamp
3. Select info about the first event after that stream ordering
4. Build a topological token from that info

In some situations (e.g. quiet rooms with a short max_lifetime), there
might not be an event after the stream ordering at step 3, therefore we
abort the purge with the error `No event found`. To mitigate that, this
patch fetches the first event _before_ the stream ordering, instead of
after.
2020-01-15 18:56:18 +00:00
Erik Johnston
19a1aac48c Fix purge_room admin API (#6711) 2020-01-15 18:13:47 +00:00
Andrew Morgan
edc244eec4 Remove duplicate session check in web fallback servlet (#6702) 2020-01-15 18:05:18 +00:00
Richard van der Hoff
608bf7d741 Merge pull request #6688 from matrix-org/rav/module_api_extensions
Cleanups and additions to the module API
2020-01-15 16:43:13 +00:00
Richard van der Hoff
107f256cd8 Merge branch 'develop' into rav/module_api_extensions 2020-01-15 16:00:24 +00:00
Richard van der Hoff
8f5d7302ac Implement RedirectException (#6687)
Allow REST endpoint implemnentations to raise a RedirectException, which will
redirect the user's browser to a given location.
2020-01-15 15:58:55 +00:00
Erik Johnston
28c98e51ff Add local_current_membership table (#6655)
Currently we rely on `current_state_events` to figure out what rooms a
user was in and their last membership event in there. However, if the
server leaves the room then the table may be cleaned up and that
information is lost. So lets add a table that separately holds that
information.
2020-01-15 14:59:33 +00:00
Erik Johnston
b5ce7f5874 Process EDUs in parallel with PDUs. (#6697)
This means that things like to device messages don't get blocked behind
processing PDUs, which can potentially take *ages*.
2020-01-14 14:08:35 +00:00
Erik Johnston
e8b68a4e4b Fixup synapse.replication to pass mypy checks (#6667) 2020-01-14 14:08:06 +00:00
Andrew Morgan
1177d3f3a3 Quarantine media by ID or user ID (#6681) 2020-01-13 18:10:43 +00:00
Erik Johnston
f0ff854911 Pull out more info about room key requests 2020-01-13 15:36:55 +00:00
Richard van der Hoff
47f4f493f0 Document more supported endpoints for workers (#6698) 2020-01-13 15:32:02 +00:00
Erik Johnston
1bb87fec0c Log received 'm.room_key_request' EDUs 2020-01-13 15:24:46 +00:00
Richard van der Hoff
326c893d24 Kill off RegistrationError (#6691)
This is pretty pointless. Let's just use SynapseError.
2020-01-13 12:48:22 +00:00
Richard van der Hoff
2d07c73777 Don't assign numeric IDs for empty usernames (#6690)
Fix a bug where we would assign a numeric userid if somebody tried registering
with an empty username
2020-01-13 12:47:30 +00:00
Richard van der Hoff
3cfac9593c Merge pull request #6689 from matrix-org/rav/saml_mapping_provider_updates
Updates to the SAML mapping provider API
2020-01-13 12:44:55 +00:00
Richard van der Hoff
8039685051 Allow additional_resources to implement Resource directly (#6686)
AdditionalResource really doesn't add any value, and it gets in the way for
resources which want to support child resources or the like. So, if the
resource object already implements the IResource interface, don't bother
wrapping it.
2020-01-13 12:42:44 +00:00
Richard van der Hoff
feee819973 Fix exceptions on requests for non-ascii urls (#6682)
Fixes #6402
2020-01-13 12:41:51 +00:00
Richard van der Hoff
da4e52544e comment for run_in_background 2020-01-12 21:53:47 +00:00
Richard van der Hoff
d56e95ea8b changelog 2020-01-12 21:42:15 +00:00
Richard van der Hoff
dc69a1cf43 Pass client redirect URL into SAML mapping providers 2020-01-12 21:40:49 +00:00
Richard van der Hoff
47e63cc67a Pass the module_api into the SamlMappingProvider
... for consistency with other modules, and because we'll need it sooner or
later and it will be a pain to introduce later.
2020-01-12 21:40:49 +00:00
Richard van der Hoff
96ed33739a changelog 2020-01-12 21:36:10 +00:00
Richard van der Hoff
01243b98e1 Handle config not being set for synapse plugin modules
Some modules don't need any config, so having to define a `config` property
just to keep the loader happy is a bit annoying.
2020-01-12 21:34:36 +00:00
Richard van der Hoff
473d3801b6 Cleanups and additions to the module API
Add some useful things, such as error types and logcontext handling, to the
API.

Make `hs` a private member to dissuade people from using it (hopefully
they aren't already).

Add a couple of new methods (`record_user_external_id` and
`generate_short_term_login_token`).
2020-01-12 21:31:44 +00:00
Richard van der Hoff
1d16f5ea0e Merge pull request #6675 from matrix-org/rav/die_sqlite37_die_die_die
Refuse to start if sqlite is older than 3.11.0
2020-01-10 12:17:22 +00:00
Erik Johnston
4171a8d19e Newsfile 2020-01-10 11:53:16 +00:00
Erik Johnston
6bcd38a50c Check inbound to device messages for correct devices.
To aid debugging we want to sanity check the device direct messages sent
between servers to try and make sure that remote servers have the
correct device lists.
2020-01-10 11:50:56 +00:00
Richard van der Hoff
937dea42e7 update install notes for CentOS 2020-01-09 18:11:04 +00:00
Richard van der Hoff
c3843fd075 changelog 2020-01-09 18:11:04 +00:00
Richard van der Hoff
bf46821180 Refuse to start if sqlite is older than 3.11.0 2020-01-09 18:11:04 +00:00
Richard van der Hoff
e48ba84e0b Check postgres version in check_database
this saves doing it on each connection, and will allow us to pass extra options
in.
2020-01-09 18:05:59 +00:00
Richard van der Hoff
e97d1cf001 Modify check_database to take a connection rather than a cursor
We might not need the cursor at all.
2020-01-09 18:05:50 +00:00
Erik Johnston
645b1f0ea1 Merge branch 'master' of github.com:matrix-org/synapse into develop 2020-01-09 17:14:02 +00:00
Erik Johnston
c2ba994dbb Add note about log_file no longer be accepted (#6674) 2020-01-09 17:13:36 +00:00
Manuel Stahl
d2906fe666 Allow admin users to create or modify users without a shared secret (#6495)
Signed-off-by: Manuel Stahl <manuel.stahl@awesome-technologies.de>
2020-01-09 13:31:00 +00:00
Erik Johnston
d773290cb1 Merge branch 'master' into develop 2020-01-09 13:25:48 +00:00
Erik Johnston
7c232bd98b Merge pull request #6664 from matrix-org/erikj/media_admin_apis
Fix media repo admin APIs when using a media worker.
2020-01-08 15:50:06 +00:00
Erik Johnston
d74054afda Shuffle the code 2020-01-08 14:57:45 +00:00
Erik Johnston
bca3455b38 Comments 2020-01-08 14:27:35 +00:00
Erik Johnston
187dc6ad02 Do not rely on streaming events, as media repo doesn't 2020-01-08 14:24:28 +00:00
Brendan Abolivier
e16521faab Merge pull request #6665 from matrix-org/babolivier/retention_doc_typo
Fix typo in message retention policies doc
2020-01-08 13:57:02 +00:00
Erik Johnston
4e2a072a05 Newsfile 2020-01-08 13:28:19 +00:00
Brendan Abolivier
32ad2a3349 Changelog 2020-01-08 13:28:12 +00:00
Brendan Abolivier
3889fcd9d7 Fix typo in message retention policies doc 2020-01-08 13:27:29 +00:00
Richard van der Hoff
b064a41291 Merge remote-tracking branch 'origin/release-v1.8.0' into develop 2020-01-08 13:27:17 +00:00
Erik Johnston
1adf27c82a Import RoomStore in media worker to fix admin APIs 2020-01-08 13:26:20 +00:00
Erik Johnston
3cf7d6d5b6 Move media admin store functions to worker store 2020-01-08 13:26:20 +00:00
Brendan Abolivier
cff1cb8685 Merge pull request #6624 from matrix-org/babolivier/retention_doc
Add complete documentation of the message retention policies support
2020-01-08 11:24:47 +00:00
Fabian Meyer
dd57715de2 contrib/docker-compose: fixing mount that overrides containers' /etc (#6656)
The mount in the form of ./matrix-config:/etc overwrites the contents of the container /etc folder. Since all valid ca certificates are stored in /etc, the synapse.push.httppusher, for example, cannot validate the certificate from matrix.org.
2020-01-08 07:25:05 +00:00
Matthew Hodgson
91718b3f23 typo 2020-01-07 15:46:04 +00:00
Erik Johnston
be29ed7ad8 Correctly proxy remote group HTTP errors. (#6654)
e.g. if remote returns a 404 then that shouldn't be treated as an error
but should be proxied through.
2020-01-07 15:36:41 +00:00
Brendan Abolivier
2b6b7f482a Merge pull request #6621 from matrix-org/babolivier/purge_job_config_typo
Fix a typo in the purge jobs configuration example
2020-01-07 16:17:40 +01:00
Brendan Abolivier
3675fb9bc6 Fix reference 2020-01-07 15:15:16 +00:00
Brendan Abolivier
7ba98a2874 Incorporate review 2020-01-07 15:14:33 +00:00
Brendan Abolivier
4be582d7c8 Merge branch 'develop' into babolivier/retention_doc 2020-01-07 15:07:19 +00:00
Brendan Abolivier
01fbd95736 Apply suggestions from code review
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2020-01-07 15:59:38 +01:00
Brendan Abolivier
03edfc5850 Update changelog.d/6624.doc
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2020-01-07 15:59:05 +01:00
Brendan Abolivier
391fb47791 Reword 2020-01-07 14:54:32 +00:00
Brendan Abolivier
3a86477162 Change the example from 5min to 12h
Have a purge job running every 5min is probably not something we want to advise admins to do as a sort-of default.
2020-01-07 14:53:07 +00:00
Brendan Abolivier
b7dec300b7 Fix vacuum instructions for sqlite 2020-01-03 13:51:59 +01:00
Brendan Abolivier
51b8a21f0c Rename changelog 2020-01-03 13:49:12 +01:00
Brendan Abolivier
9279a2c4e4 Add a complete documentation of the message retention policies support 2020-01-03 13:45:03 +01:00
Brendan Abolivier
9c59bc59c8 Changelog 2020-01-03 13:00:32 +01:00
Brendan Abolivier
dd2954f78d Update sample config 2020-01-03 12:58:12 +01:00
Brendan Abolivier
4efe1d4d3f Fix a typo in the purge jobs configuration example 2020-01-03 12:57:24 +01:00
147 changed files with 4648 additions and 1771 deletions

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.5
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:11
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -1,22 +0,0 @@
version: '3.1'
services:
postgres:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
depends_on:
- postgres
env_file: .env
environment:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
volumes:
- ..:/src

View File

@@ -3,6 +3,10 @@
<!-- Please read CONTRIBUTING.md before submitting your pull request -->
* [ ] Pull request is based on the develop branch
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog)
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog). The entry should:
- Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
- Use markdown where necessary, mostly for `code blocks`.
- End with either a period (.) or an exclamation mark (!).
- Start with a capital letter.
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style))

View File

@@ -1,6 +1,83 @@
Synapse 1.9.0rc1 (2020-01-22)
=============================
**WARNING**: As of this release, Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)).
If your Synapse deployment uses workers, note that the reverse-proxy configurations for the `synapse.app.media_repository`, `synapse.app.federation_reader` and `synapse.app.event_creator` have changed, with the addition of a few paths (see the updated configurations [here](docs/workers.md#available-worker-applications)).
Features
--------
- Allow admin to create or modify a user. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5742](https://github.com/matrix-org/synapse/issues/5742))
- Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media. ([\#6681](https://github.com/matrix-org/synapse/issues/6681), [\#6756](https://github.com/matrix-org/synapse/issues/6756))
- Add `org.matrix.e2e_cross_signing` to `unstable_features` in `/versions` as per [MSC1756](https://github.com/matrix-org/matrix-doc/pull/1756). ([\#6712](https://github.com/matrix-org/synapse/issues/6712))
- Add a new admin API to list and filter rooms on the server. ([\#6720](https://github.com/matrix-org/synapse/issues/6720))
Bugfixes
--------
- Correctly proxy HTTP errors due to API calls to remote group servers. ([\#6654](https://github.com/matrix-org/synapse/issues/6654))
- Fix media repo admin APIs when using a media worker. ([\#6664](https://github.com/matrix-org/synapse/issues/6664))
- Fix "CRITICAL" errors being logged when a request is received for a uri containing non-ascii characters. ([\#6682](https://github.com/matrix-org/synapse/issues/6682))
- Fix a bug where we would assign a numeric user ID if somebody tried registering with an empty username. ([\#6690](https://github.com/matrix-org/synapse/issues/6690))
- Fix `purge_room` admin API. ([\#6711](https://github.com/matrix-org/synapse/issues/6711))
- Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. ([\#6714](https://github.com/matrix-org/synapse/issues/6714))
- Fix the `synapse_port_db` not correctly running background updates. Thanks @tadzik for reporting. ([\#6718](https://github.com/matrix-org/synapse/issues/6718))
- Fix changing password via user admin API. ([\#6730](https://github.com/matrix-org/synapse/issues/6730))
- Fix `/events/:event_id` deprecated API. ([\#6731](https://github.com/matrix-org/synapse/issues/6731))
- Fix monthly active user limiting support for worker mode, fixes [#4639](https://github.com/matrix-org/synapse/issues/4639). ([\#6742](https://github.com/matrix-org/synapse/issues/6742))
- Fix bug when setting `account_validity` to an empty block in the config. Thanks to @Sorunome for reporting. ([\#6747](https://github.com/matrix-org/synapse/issues/6747))
- Fix `AttributeError: 'NoneType' object has no attribute 'get'` in `hash_password` when configuration has an empty `password_config`. Contributed by @ivilata. ([\#6753](https://github.com/matrix-org/synapse/issues/6753))
- Fix the `docker-compose.yaml` overriding the entire `/etc` folder of the container. Contributed by Fabian Meyer. ([\#6656](https://github.com/matrix-org/synapse/issues/6656))
Improved Documentation
----------------------
- Fix a typo in the configuration example for purge jobs in the sample configuration file. ([\#6621](https://github.com/matrix-org/synapse/issues/6621))
- Add complete documentation of the message retention policies support. ([\#6624](https://github.com/matrix-org/synapse/issues/6624), [\#6665](https://github.com/matrix-org/synapse/issues/6665))
- Add some helpful tips about changelog entries to the GitHub pull request template. ([\#6663](https://github.com/matrix-org/synapse/issues/6663))
- Clarify the `account_validity` and `email` sections of the sample configuration. ([\#6685](https://github.com/matrix-org/synapse/issues/6685))
- Add more endpoints to the documentation for Synapse workers. ([\#6698](https://github.com/matrix-org/synapse/issues/6698))
Deprecations and Removals
-------------------------
- Synapse no longer supports versions of SQLite before 3.11, and will refuse to start when configured to use an older version. Administrators are recommended to migrate their database to Postgres (see instructions [here](docs/postgres.md)). ([\#6675](https://github.com/matrix-org/synapse/issues/6675))
Internal Changes
----------------
- Add `local_current_membership` table for tracking local user membership state in rooms. ([\#6655](https://github.com/matrix-org/synapse/issues/6655), [\#6728](https://github.com/matrix-org/synapse/issues/6728))
- Port `synapse.replication.tcp` to async/await. ([\#6666](https://github.com/matrix-org/synapse/issues/6666))
- Fixup `synapse.replication` to pass mypy checks. ([\#6667](https://github.com/matrix-org/synapse/issues/6667))
- Allow `additional_resources` to implement `IResource` directly. ([\#6686](https://github.com/matrix-org/synapse/issues/6686))
- Allow REST endpoint implementations to raise a `RedirectException`, which will redirect the user's browser to a given location. ([\#6687](https://github.com/matrix-org/synapse/issues/6687))
- Updates and extensions to the module API. ([\#6688](https://github.com/matrix-org/synapse/issues/6688))
- Updates to the SAML mapping provider API. ([\#6689](https://github.com/matrix-org/synapse/issues/6689), [\#6723](https://github.com/matrix-org/synapse/issues/6723))
- Remove redundant `RegistrationError` class. ([\#6691](https://github.com/matrix-org/synapse/issues/6691))
- Don't block processing of incoming EDUs behind processing PDUs in the same transaction. ([\#6697](https://github.com/matrix-org/synapse/issues/6697))
- Remove duplicate check for the `session` query parameter on the `/auth/xxx/fallback/web` Client-Server endpoint. ([\#6702](https://github.com/matrix-org/synapse/issues/6702))
- Attempt to retry sending a transaction when we detect a remote server has come back online, rather than waiting for a transaction to be triggered by new data. ([\#6706](https://github.com/matrix-org/synapse/issues/6706))
- Add `StateMap` type alias to simplify types. ([\#6715](https://github.com/matrix-org/synapse/issues/6715))
- Add a `DeltaState` to track changes to be made to current state during event persistence. ([\#6716](https://github.com/matrix-org/synapse/issues/6716))
- Add more logging around message retention policies support. ([\#6717](https://github.com/matrix-org/synapse/issues/6717))
- When processing a SAML response, log the assertions for easier configuration. ([\#6724](https://github.com/matrix-org/synapse/issues/6724))
- Fixup `synapse.rest` to pass mypy. ([\#6732](https://github.com/matrix-org/synapse/issues/6732), [\#6764](https://github.com/matrix-org/synapse/issues/6764))
- Fixup `synapse.api` to pass mypy. ([\#6733](https://github.com/matrix-org/synapse/issues/6733))
- Allow streaming cache 'invalidate all' to workers. ([\#6749](https://github.com/matrix-org/synapse/issues/6749))
- Remove unused CI docker compose files. ([\#6754](https://github.com/matrix-org/synapse/issues/6754))
Synapse 1.8.0 (2020-01-09)
==========================
**WARNING**: As of this release Synapse will refuse to start if the `log_file` config option is specified. Support for the option was removed in v1.3.0.
Bugfixes
--------
@@ -16,7 +93,7 @@ Features
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Add custom SAML username mapping functionality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))

View File

@@ -101,8 +101,8 @@ in the format of `PRnumber.type`. The type can be one of the following:
The content of the file is your changelog entry, which should be a short
description of your change in the same style as the rest of our [changelog](
https://github.com/matrix-org/synapse/blob/master/CHANGES.md). The file can
contain Markdown formatting, and should end with a full stop ('.') for
consistency.
contain Markdown formatting, and should end with a full stop (.) or an
exclamation mark (!) for consistency.
Adding credits to the changelog is encouraged, we value your
contributions and would like to have you shouted out in the release notes!

View File

@@ -133,6 +133,11 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
sudo yum groupinstall "Development Tools"
```
Note that Synapse does not support versions of SQLite before 3.11, and CentOS 7
uses SQLite 3.7. You may be able to work around this by installing a more
recent SQLite version, but it is recommended that you instead use a Postgres
database: see [docs/postgres.md](docs/postgres.md).
#### macOS
Installing prerequisites on macOS:

View File

@@ -75,6 +75,15 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.8.0
===================
Specifying a ``log_file`` config option will now cause Synapse to refuse to
start, and should be replaced by with the ``log_config`` option. Support for
the ``log_file`` option was removed in v1.3.0 and has since had no effect.
Upgrading to v1.7.0
===================

1
changelog.d/6678.misc Normal file
View File

@@ -0,0 +1 @@
Check inbound to device messages for correct devices and log any inconsistencies.

1
changelog.d/6751.misc Normal file
View File

@@ -0,0 +1 @@
Remove some unnecessary admin handler abstraction methods.

View File

@@ -18,7 +18,7 @@ services:
- SYNAPSE_CONFIG_PATH=/etc/homeserver.yaml
volumes:
# You may either store all the files in a local folder
- ./matrix-config:/etc
- ./matrix-config/homeserver.yaml:/etc/homeserver.yaml
- ./files:/data
# .. or you may split this between different storage points
# - ./files:/data

View File

@@ -22,19 +22,81 @@ It returns a JSON body like the following:
}
```
# Quarantine media in a room
This API 'quarantines' all the media in a room.
The API is:
```
POST /_synapse/admin/v1/quarantine_media/<room_id>
{}
```
# Quarantine media
Quarantining media means that it is marked as inaccessible by users. It applies
to any local media, and any locally-cached copies of remote media.
The media file itself (and any thumbnails) is not deleted from the server.
## Quarantining media by ID
This API quarantines a single piece of local or remote media.
Request:
```
POST /_synapse/admin/v1/media/quarantine/<server_name>/<media_id>
{}
```
Where `server_name` is in the form of `example.org`, and `media_id` is in the
form of `abcdefg12345...`.
Response:
```
{}
```
## Quarantining media in a room
This API quarantines all local and remote media in a room.
Request:
```
POST /_synapse/admin/v1/room/<room_id>/media/quarantine
{}
```
Where `room_id` is in the form of `!roomid12345:example.org`.
Response:
```
{
"num_quarantined": 10 # The number of media items successfully quarantined
}
```
Note that there is a legacy endpoint, `POST
/_synapse/admin/v1/quarantine_media/<room_id >`, that operates the same.
However, it is deprecated and may be removed in a future release.
## Quarantining all media of a user
This API quarantines all *local* media that a *local* user has uploaded. That is to say, if
you would like to quarantine media uploaded by a user on a remote homeserver, you should
instead use one of the other APIs.
Request:
```
POST /_synapse/admin/v1/user/<user_id>/media/quarantine
{}
```
Where `user_id` is in the form of `@bob:example.org`.
Response:
```
{
"num_quarantined": 10 # The number of media items successfully quarantined
}
```

173
docs/admin_api/rooms.md Normal file
View File

@@ -0,0 +1,173 @@
# List Room API
The List Room admin API allows server admins to get a list of rooms on their
server. There are various parameters available that allow for filtering and
sorting the returned list. This API supports pagination.
## Parameters
The following query parameters are available:
* `from` - Offset in the returned list. Defaults to `0`.
* `limit` - Maximum amount of rooms to return. Defaults to `100`.
* `order_by` - The method in which to sort the returned list of rooms. Valid values are:
- `alphabetical` - Rooms are ordered alphabetically by room name. This is the default.
- `size` - Rooms are ordered by the number of members. Largest to smallest.
* `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
* `search_term` - Filter rooms by their room name. Search term can be contained in any
part of the room name. Defaults to no filtering.
The following fields are possible in the JSON response body:
* `rooms` - An array of objects, each containing information about a room.
- Room objects contain the following fields:
- `room_id` - The ID of the room.
- `name` - The name of the room.
- `canonical_alias` - The canonical (main) alias address of the room.
- `joined_members` - How many users are currently in the room.
* `offset` - The current pagination offset in rooms. This parameter should be
used instead of `next_token` for room offset as `next_token` is
not intended to be parsed.
* `total_rooms` - The total number of rooms this query can return. Using this
and `offset`, you have enough information to know the current
progression through the list.
* `next_batch` - If this field is present, we know that there are potentially
more rooms on the server that did not all fit into this response.
We can use `next_batch` to get the "next page" of results. To do
so, simply repeat your request, setting the `from` parameter to
the value of `next_batch`.
* `prev_batch` - If this field is present, it is possible to paginate backwards.
Use `prev_batch` for the `from` value in the next request to
get the "previous page" of results.
## Usage
A standard request with no filtering:
```
GET /_synapse/admin/rooms
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
},
... (8 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 10
}
```
Filtering by room name:
```
GET /_synapse/admin/rooms?search_term=TWIM
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 1
}
```
Paginating through a list of rooms:
```
GET /_synapse/admin/rooms?order_by=size
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
"name": "Matrix HQ",
"canonical_alias": "#matrix:matrix.org",
"joined_members": 8326
},
... (98 hidden items) ...
{
"room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
"name": "This Week In Matrix (TWIM)",
"canonical_alias": "#twim:matrix.org",
"joined_members": 314
}
],
"offset": 0,
"total_rooms": 150
"next_token": 100
}
```
The presence of the `next_token` parameter tells us that there are more rooms
than returned in this request, and we need to make another request to get them.
To get the next batch of room results, we repeat our request, setting the `from`
parameter to the value of `next_token`.
```
GET /_synapse/admin/rooms?order_by=size&from=100
{}
```
Response:
```
{
"rooms": [
{
"room_id": "!mscvqgqpHYjBGDxNym:matrix.org",
"name": "Music Theory",
"canonical_alias": "#musictheory:matrix.org",
"joined_members": 127
},
... (48 hidden items) ...
{
"room_id": "!twcBhHVdZlQWuuxBhN:termina.org.uk",
"name": "weechat-matrix",
"canonical_alias": "#weechat-matrix:termina.org.uk",
"joined_members": 137
}
],
"offset": 100,
"prev_batch": 0,
"total_rooms": 150
}
```
Once the `next_token` parameter is no longer present, we know we've reached the
end of the list.

View File

@@ -1,3 +1,33 @@
Create or modify Account
========================
This API allows an administrator to create or modify a user account with a
specific ``user_id``.
This api is::
PUT /_synapse/admin/v2/users/<user_id>
with a body of:
.. code:: json
{
"password": "user_password",
"displayname": "User",
"avatar_url": "<avatar_url>",
"admin": false,
"deactivated": false
}
including an ``access_token`` of a server admin.
The parameter ``displayname`` is optional and defaults to ``user_id``.
The parameter ``avatar_url`` is optional.
The parameter ``admin`` is optional and defaults to 'false'.
The parameter ``deactivated`` is optional and defaults to 'false'.
If the user already exists then optional parameters default to the current value.
List Accounts
=============
@@ -50,7 +80,8 @@ This API returns information about a specific user account.
The api is::
GET /_synapse/admin/v1/whois/<user_id>
GET /_synapse/admin/v1/whois/<user_id> (deprecated)
GET /_synapse/admin/v2/users/<user_id>
including an ``access_token`` of a server admin.

View File

@@ -0,0 +1,191 @@
# Message retention policies
Synapse admins can enable support for message retention policies on
their homeserver. Message retention policies exist at a room level,
follow the semantics described in
[MSC1763](https://github.com/matrix-org/matrix-doc/blob/matthew/msc1763/proposals/1763-configurable-retention-periods.md),
and allow server and room admins to configure how long messages should
be kept in a homeserver's database before being purged from it.
**Please note that, as this feature isn't part of the Matrix
specification yet, this implementation is to be considered as
experimental.**
A message retention policy is mainly defined by its `max_lifetime`
parameter, which defines how long a message can be kept around after
it was sent to the room. If a room doesn't have a message retention
policy, and there's no default one for a given server, then no message
sent in that room is ever purged on that server.
MSC1763 also specifies semantics for a `min_lifetime` parameter which
defines the amount of time after which an event _can_ get purged (after
it was sent to the room), but Synapse doesn't currently support it
beyond registering it.
Both `max_lifetime` and `min_lifetime` are optional parameters.
Note that message retention policies don't apply to state events.
Once an event reaches its expiry date (defined as the time it was sent
plus the value for `max_lifetime` in the room), two things happen:
* Synapse stops serving the event to clients via any endpoint.
* The message gets picked up by the next purge job (see the "Purge jobs"
section) and is removed from Synapse's database.
Since purge jobs don't run continuously, this means that an event might
stay in a server's database for longer than the value for `max_lifetime`
in the room would allow, though hidden from clients.
Similarly, if a server (with support for message retention policies
enabled) receives from another server an event that should have been
purged according to its room's policy, then the receiving server will
process and store that event until it's picked up by the next purge job,
though it will always hide it from clients.
## Server configuration
Support for this feature can be enabled and configured in the
`retention` section of the Synapse configuration file (see the
[sample file](https://github.com/matrix-org/synapse/blob/v1.7.3/docs/sample_config.yaml#L332-L393)).
To enable support for message retention policies, set the setting
`enabled` in this section to `true`.
### Default policy
A default message retention policy is a policy defined in Synapse's
configuration that is used by Synapse for every room that doesn't have a
message retention policy configured in its state. This allows server
admins to ensure that messages are never kept indefinitely in a server's
database.
A default policy can be defined as such, in the `retention` section of
the configuration file:
```yaml
default_policy:
min_lifetime: 1d
max_lifetime: 1y
```
Here, `min_lifetime` and `max_lifetime` have the same meaning and level
of support as previously described. They can be expressed either as a
duration (using the units `s` (seconds), `m` (minutes), `h` (hours),
`d` (days), `w` (weeks) and `y` (years)) or as a number of milliseconds.
### Purge jobs
Purge jobs are the jobs that Synapse runs in the background to purge
expired events from the database. They are only run if support for
message retention policies is enabled in the server's configuration. If
no configuration for purge jobs is configured by the server admin,
Synapse will use a default configuration, which is described in the
[sample configuration file](https://github.com/matrix-org/synapse/blob/master/docs/sample_config.yaml#L332-L393).
Some server admins might want a finer control on when events are removed
depending on an event's room's policy. This can be done by setting the
`purge_jobs` sub-section in the `retention` section of the configuration
file. An example of such configuration could be:
```yaml
purge_jobs:
- longest_max_lifetime: 3d
interval: 12h
- shortest_max_lifetime: 3d
longest_max_lifetime: 1w
interval: 1d
- shortest_max_lifetime: 1w
interval: 2d
```
In this example, we define three jobs:
* one that runs twice a day (every 12 hours) and purges events in rooms
which policy's `max_lifetime` is lower or equal to 3 days.
* one that runs once a day and purges events in rooms which policy's
`max_lifetime` is between 3 days and a week.
* one that runs once every 2 days and purges events in rooms which
policy's `max_lifetime` is greater than a week.
Note that this example is tailored to show different configurations and
features slightly more jobs than it's probably necessary (in practice, a
server admin would probably consider it better to replace the two last
jobs with one that runs once a day and handles rooms which which
policy's `max_lifetime` is greater than 3 days).
Keep in mind, when configuring these jobs, that a purge job can become
quite heavy on the server if it targets many rooms, therefore prefer
having jobs with a low interval that target a limited set of rooms. Also
make sure to include a job with no minimum and one with no maximum to
make sure your configuration handles every policy.
As previously mentioned in this documentation, while a purge job that
runs e.g. every day means that an expired event might stay in the
database for up to a day after its expiry, Synapse hides expired events
from clients as soon as they expire, so the event is not visible to
local users between its expiry date and the moment it gets purged from
the server's database.
### Lifetime limits
**Note: this feature is mainly useful within a closed federation or on
servers that don't federate, because there currently is no way to
enforce these limits in an open federation.**
Server admins can restrict the values their local users are allowed to
use for both `min_lifetime` and `max_lifetime`. These limits can be
defined as such in the `retention` section of the configuration file:
```yaml
allowed_lifetime_min: 1d
allowed_lifetime_max: 1y
```
Here, `allowed_lifetime_min` is the lowest value a local user can set
for both `min_lifetime` and `max_lifetime`, and `allowed_lifetime_max`
is the highest value. Both parameters are optional (e.g. setting
`allowed_lifetime_min` but not `allowed_lifetime_max` only enforces a
minimum and no maximum).
Like other settings in this section, these parameters can be expressed
either as a duration or as a number of milliseconds.
## Room configuration
To configure a room's message retention policy, a room's admin or
moderator needs to send a state event in that room with the type
`m.room.retention` and the following content:
```json
{
"max_lifetime": ...
}
```
In this event's content, the `max_lifetime` parameter has the same
meaning as previously described, and needs to be expressed in
milliseconds. The event's content can also include a `min_lifetime`
parameter, which has the same meaning and limited support as previously
described.
Note that over every server in the room, only the ones with support for
message retention policies will actually remove expired events. This
support is currently not enabled by default in Synapse.
## Note on reclaiming disk space
While purge jobs actually delete data from the database, the disk space
used by the database might not decrease immediately on the database's
host. However, even though the database engine won't free up the disk
space, it will start writing new data into where the purged data was.
If you want to reclaim the freed disk space anyway and return it to the
operating system, the server admin needs to run `VACUUM FULL;` (or
`VACUUM;` for SQLite databases) on Synapse's database (see the related
[PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-vacuum.html)).

View File

@@ -387,17 +387,17 @@ retention:
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
# of outdated messages on a more frequent basis than for the rest of the rooms
# (e.g. every 12h), but not want that purge to be performed by a job that's
# iterating over every room it knows, which could be heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# interval: 12h
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
# interval: 1d
## TLS ##
@@ -874,23 +874,6 @@ media_store_path: "DATADIR/media_store"
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# ``enabled`` defines whether the account validity feature is enabled. Defaults
# to False.
#
# ``period`` allows setting the period after which an account is valid
# after its registration. When renewing the account, its validity period
# will be extended by this amount of time. This parameter is required when using
# the account validity feature.
#
# ``renew_at`` is the amount of time before an account's expiry date at which
# Synapse will send an email to the account's email address with a renewal link.
# This needs the ``email`` and ``public_baseurl`` configuration sections to be
# filled.
#
# ``renew_email_subject`` is the subject of the email sent out with the renewal
# link. ``%(app)s`` can be used as a placeholder for the ``app_name`` parameter
# from the ``email`` section.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
@@ -901,21 +884,55 @@ media_store_path: "DATADIR/media_store"
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10% of the validity period.
#
#account_validity:
# enabled: true
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#
@@ -1353,107 +1370,110 @@ password_config:
#pepper: "EVEN_MORE_SECRET"
# Configuration for sending emails from Synapse.
#
email:
# The hostname of the outgoing SMTP server to use. Defaults to 'localhost'.
#
#smtp_host: mail.server
# Enable sending emails for password resets, notification events or
# account expiry notices
#
# If your SMTP server requires authentication, the optional smtp_user &
# smtp_pass variables should be used
#
#email:
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25 # SSL: 465, STARTTLS: 587
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
# notif_for_new_users: true
#
# # Defining a custom URL for Riot is only needed if email notifications
# # should contain links to a self-hosted installation of Riot; when set
# # the "app_name" setting is ignored
# #
# riot_base_url: "http://localhost/riot"
#
# # Configure the time that a validation email or text message code
# # will expire after sending
# #
# # This is currently used for password resets
# #
# #validation_token_lifetime: 1h
#
# # Template directory. All template files should be stored within this
# # directory. If not set, default templates from within the Synapse
# # package will be used
# #
# # For the list of default templates, please see
# # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
# #
# #template_dir: res/templates
#
# # Templates for email notifications
# #
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
#
# # Templates for account expiry notices
# #
# expiry_template_html: notice_expiry.html
# expiry_template_text: notice_expiry.txt
#
# # Templates for password reset emails sent by the homeserver
# #
# #password_reset_template_html: password_reset.html
# #password_reset_template_text: password_reset.txt
#
# # Templates for registration emails sent by the homeserver
# #
# #registration_template_html: registration.html
# #registration_template_text: registration.txt
#
# # Templates for validation emails sent by the homeserver when adding an email to
# # your user account
# #
# #add_threepid_template_html: add_threepid.html
# #add_threepid_template_text: add_threepid.txt
#
# # Templates for password reset success and failure pages that a user
# # will see after attempting to reset their password
# #
# #password_reset_template_success_html: password_reset_success.html
# #password_reset_template_failure_html: password_reset_failure.html
#
# # Templates for registration success and failure pages that a user
# # will see after attempting to register using an email or phone
# #
# #registration_template_success_html: registration_success.html
# #registration_template_failure_html: registration_failure.html
#
# # Templates for success and failure pages that a user will see after attempting
# # to add an email or phone to their account
# #
# #add_threepid_success_html: add_threepid_success.html
# #add_threepid_failure_html: add_threepid_failure.html
# The port on the mail server for outgoing SMTP. Defaults to 25.
#
#smtp_port: 587
# Username/password for authentication to the SMTP server. By default, no
# authentication is attempted.
#
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# Uncomment the following to require TLS transport security for SMTP.
# By default, Synapse will connect over plain text, and will then switch to
# TLS via STARTTLS *if the SMTP server supports it*. If this option is set,
# Synapse will refuse to connect unless the server supports STARTTLS.
#
#require_transport_security: true
# Enable sending emails for messages that the user has missed
#
#enable_notifs: false
# notif_from defines the "From" address to use when sending emails.
# It must be set if email sending is enabled.
#
# The placeholder '%(app)s' will be replaced by the application name,
# which is normally 'app_name' (below), but may be overridden by the
# Matrix client application.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
#notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name defines the default value for '%(app)s' in notif_from. It
# defaults to 'Matrix'.
#
#app_name: my_branded_matrix_server
# Uncomment the following to disable automatic subscription to email
# notifications for new users. Enabled by default.
#
#notif_for_new_users: false
# Custom URL for client links within the email notifications. By default
# links will be based on "https://matrix.to".
#
# (This setting used to be called riot_base_url; the old name is still
# supported for backwards-compatibility but is now deprecated.)
#
#client_base_url: "http://localhost/riot"
# Configure the time that a validation email will expire after sending.
# Defaults to 1h.
#
#validation_token_lifetime: 15m
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
# If you *do* uncomment it, you will need to make sure that all the templates
# below are in the directory.
#
# Synapse will look for the following templates in this directory:
#
# * The contents of email notifications of missed events: 'notif_mail.html' and
# 'notif_mail.txt'.
#
# * The contents of account expiry notice emails: 'notice_expiry.html' and
# 'notice_expiry.txt'.
#
# * The contents of password reset emails sent by the homeserver:
# 'password_reset.html' and 'password_reset.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in the password reset email: 'password_reset_success.html' and
# 'password_reset_failure.html'
#
# * The contents of address verification emails sent during registration:
# 'registration.html' and 'registration.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent during registration:
# 'registration_success.html' and 'registration_failure.html'
#
# * The contents of address verification emails sent when an address is added
# to a Matrix account: 'add_threepid.html' and 'add_threepid.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent when an address is added
# to a Matrix account: 'add_threepid_success.html' and
# 'add_threepid_failure.html'
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#
#template_dir: "res/templates"
#password_providers:

View File

@@ -209,7 +209,7 @@ Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.
The `<stream_name>` is the name of a replication stream to subscribe
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
@@ -234,6 +234,10 @@ in which case the `<token>` must be set to `NOW`.
Used exclusively in tests
### REMOTE_SERVER_UP (S, C)
Inform other processes that a remote server may have come back online.
See `synapse/replication/tcp/commands.py` for a detailed description and
the format of each command.
@@ -250,6 +254,11 @@ and they key to invalidate. For example:
> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:
> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those
invalidations into a single poke by defining a special cache name that

View File

@@ -168,8 +168,11 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v2/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v2/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v2/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
@@ -199,7 +202,9 @@ Handles the media repository. It can handle all endpoints starting with:
... and the following regular expressions matching media-specific administration APIs:
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media$
^/_synapse/admin/v1/room/.*/media.*$
^/_synapse/admin/v1/user/.*/media.*$
^/_synapse/admin/v1/media/.*$
^/_synapse/admin/v1/quarantine_media/.*$
You should also set `enable_media_repo: False` in the shared configuration
@@ -288,6 +293,7 @@ file. For example:
Handles some event creation. It can handle REST endpoints matching:
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/

View File

@@ -7,6 +7,9 @@ show_error_codes = True
show_traceback = True
mypy_path = stubs
[mypy-pymacaroons.*]
ignore_missing_imports = True
[mypy-zope]
ignore_missing_imports = True
@@ -63,3 +66,12 @@ ignore_missing_imports = True
[mypy-sentry_sdk]
ignore_missing_imports = True
[mypy-PIL.*]
ignore_missing_imports = True
[mypy-lxml]
ignore_missing_imports = True
[mypy-jwt.*]
ignore_missing_imports = True

View File

@@ -22,10 +22,12 @@ import yaml
from twisted.internet import defer, reactor
import synapse
from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("update_database")
@@ -38,6 +40,8 @@ class MockHomeserver(HomeServer):
config.server_name, reactor=reactor, config=config, **kwargs
)
self.version_string = "Synapse/"+get_version_string(synapse)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@@ -81,15 +85,17 @@ if __name__ == "__main__":
hs.setup()
store = hs.get_datastore()
@defer.inlineCallbacks
def run_background_updates():
yield store.db.updates.run_background_updates(sleep=False)
async def run_background_updates():
await store.db.updates.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()
# Apply all background updates on the database.
reactor.callWhenRunning(
lambda: run_as_background_process("background_updates", run_background_updates)
)
def run():
# Apply all background updates on the database.
defer.ensureDeferred(
run_as_background_process("background_updates", run_background_updates)
)
reactor.callWhenRunning(run)
reactor.run()

View File

@@ -52,7 +52,7 @@ if __name__ == "__main__":
if "config" in args and args.config:
config = yaml.safe_load(args.config)
bcrypt_rounds = config.get("bcrypt_rounds", bcrypt_rounds)
password_config = config.get("password_config", {})
password_config = config.get("password_config", None) or {}
password_pepper = password_config.get("pepper", password_pepper)
password = args.password

View File

@@ -27,13 +27,16 @@ from six import string_types
import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
import synapse
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.data_stores.main.deviceinbox import (
DeviceInboxBackgroundUpdateStore,
@@ -61,6 +64,7 @@ from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse_port_db")
@@ -125,6 +129,13 @@ APPEND_ONLY_TABLES = [
]
# Error returned by the run function. Used at the top-level part of the script to
# handle errors and return codes.
end_error = None
# The exec_info for the error, if any. If error is defined but not exec_info the script
# will show only the error message without the stacktrace, if exec_info is defined but
# not the error then the script will show nothing outside of what's printed in the run
# function. If both are defined, the script will print both the error and the stacktrace.
end_error_exec_info = None
@@ -177,6 +188,7 @@ class MockHomeserver:
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
self.version_string = "Synapse/"+get_version_string(synapse)
def get_clock(self):
return self.clock
@@ -189,11 +201,10 @@ class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
@defer.inlineCallbacks
def setup_table(self, table):
async def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = yield self.postgres_store.db.simple_select_one(
row = await self.postgres_store.db.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
@@ -207,10 +218,10 @@ class Porter(object):
forward_chunk,
already_ported,
total_to_port,
) = yield self._setup_sent_transactions()
) = await self._setup_sent_transactions()
backward_chunk = 0
else:
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
@@ -227,7 +238,7 @@ class Porter(object):
backward_chunk = row["backward_rowid"]
if total_to_port is None:
already_ported, total_to_port = yield self._get_total_count_to_port(
already_ported, total_to_port = await self._get_total_count_to_port(
table, forward_chunk, backward_chunk
)
else:
@@ -238,9 +249,9 @@ class Porter(object):
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
yield self.postgres_store.execute(delete_all)
await self.postgres_store.execute(delete_all)
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
@@ -248,16 +259,13 @@ class Porter(object):
forward_chunk = 1
backward_chunk = 0
already_ported, total_to_port = yield self._get_total_count_to_port(
already_ported, total_to_port = await self._get_total_count_to_port(
table, forward_chunk, backward_chunk
)
defer.returnValue(
(table, already_ported, total_to_port, forward_chunk, backward_chunk)
)
return table, already_ported, total_to_port, forward_chunk, backward_chunk
@defer.inlineCallbacks
def handle_table(
async def handle_table(
self, table, postgres_size, table_size, forward_chunk, backward_chunk
):
logger.info(
@@ -275,7 +283,7 @@ class Porter(object):
self.progress.add_table(table, postgres_size, table_size)
if table == "event_search":
yield self.handle_search_table(
await self.handle_search_table(
postgres_size, table_size, forward_chunk, backward_chunk
)
return
@@ -294,7 +302,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
@@ -335,7 +343,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
headers, frows, brows = await self.sqlite_store.db.runInteraction(
"select", r
)
@@ -361,7 +369,7 @@ class Porter(object):
},
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
postgres_size += len(rows)
@@ -369,8 +377,7 @@ class Porter(object):
else:
return
@defer.inlineCallbacks
def handle_search_table(
async def handle_search_table(
self, postgres_size, table_size, forward_chunk, backward_chunk
):
select = (
@@ -390,7 +397,7 @@ class Porter(object):
return headers, rows
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
if rows:
forward_chunk = rows[-1][0] + 1
@@ -438,7 +445,7 @@ class Porter(object):
},
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
postgres_size += len(rows)
@@ -447,20 +454,15 @@ class Porter(object):
else:
return
def setup_db(self, db_config: DatabaseConnectionConfig, engine):
db_conn = make_conn(db_config, engine)
prepare_database(db_conn, engine, config=None)
db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, db_config: DatabaseConnectionConfig):
def build_db_store(
self, db_config: DatabaseConnectionConfig, allow_outdated_version: bool = False,
):
"""Builds and returns a database store using the provided configuration.
Args:
config: The database configuration
db_config: The database configuration
allow_outdated_version: True to suppress errors about the database server
version being too old to run a complete synapse
Returns:
The built Store object.
@@ -468,24 +470,23 @@ class Porter(object):
self.progress.set_state("Preparing %s" % db_config.config["name"])
engine = create_engine(db_config.config)
conn = self.setup_db(db_config, engine)
hs = MockHomeserver(self.hs_config)
store = Store(Database(hs, db_config, engine), conn, hs)
yield store.db.runInteraction(
"%s_engine.check_database" % db_config.config["name"],
engine.check_database,
)
with make_conn(db_config, engine) as db_conn:
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
store = Store(Database(hs, db_config, engine), db_conn, hs)
db_conn.commit()
return store
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
async def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = (
yield self.postgres_store.db.updates.has_completed_background_updates()
await self.postgres_store.db.updates.has_completed_background_updates()
)
if not postgres_ready:
@@ -494,35 +495,44 @@ class Porter(object):
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
yield self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = yield (
await self.postgres_store.db.updates.do_next_background_update(100)
postgres_ready = await (
self.postgres_store.db.updates.has_completed_background_updates()
)
@defer.inlineCallbacks
def run(self):
async def run(self):
"""Ports the SQLite database to a PostgreSQL database.
When a fatal error is met, its message is assigned to the global "end_error"
variable. When this error comes with a stacktrace, its exec_info is assigned to
the global "end_error_exec_info" variable.
"""
global end_error
try:
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config)
# we allow people to port away from outdated versions of sqlite.
self.sqlite_store = self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config),
allow_outdated_version=True,
)
# Check if all background updates are done, abort if not.
updates_complete = (
yield self.sqlite_store.db.updates.has_completed_background_updates()
await self.sqlite_store.db.updates.has_completed_background_updates()
)
if not updates_complete:
sys.stderr.write(
end_error = (
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)
return
self.postgres_store = yield self.build_db_store(
self.postgres_store = self.build_db_store(
self.hs_config.get_single_database()
)
yield self.run_background_updates_on_postgres()
await self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")
@@ -550,22 +560,22 @@ class Porter(object):
)
try:
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
await self.postgres_store.db.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass
yield self.postgres_store.db.runInteraction(
await self.postgres_store.db.runInteraction(
"create_port_table", create_port_table
)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
postgres_tables = await self.postgres_store.db.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
@@ -576,28 +586,34 @@ class Porter(object):
# Step 3. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
setup_res = yield defer.gatherResults(
[
self.setup_table(table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
setup_res = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(self.setup_table, table)
for table in tables
if table not in ["schema_version", "applied_schema_deltas"]
and not table.startswith("sqlite_")
],
consumeErrors=True,
)
)
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
[self.handle_table(*res) for res in setup_res], consumeErrors=True
await make_deferred_yieldable(
defer.gatherResults(
[run_in_background(self.handle_table, *res) for res in setup_res],
consumeErrors=True,
)
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
await self._setup_state_group_id_seq()
self.progress.done()
except Exception:
except Exception as e:
global end_error_exec_info
end_error = e
end_error_exec_info = sys.exc_info()
logger.exception("")
finally:
@@ -637,8 +653,7 @@ class Porter(object):
return outrows
@defer.inlineCallbacks
def _setup_sent_transactions(self):
async def _setup_sent_transactions(self):
# Only save things from the last day
yesterday = int(time.time() * 1000) - 86400000
@@ -659,7 +674,7 @@ class Porter(object):
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = await self.sqlite_store.db.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -672,7 +687,7 @@ class Porter(object):
txn, "sent_transactions", headers[1:], rows
)
yield self.postgres_store.execute(insert)
await self.postgres_store.execute(insert)
else:
max_inserted_rowid = 0
@@ -689,10 +704,10 @@ class Porter(object):
else:
return 1
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = await self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store.db.simple_insert(
await self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
@@ -708,46 +723,49 @@ class Porter(object):
(size,) = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
remaining_count = await self.sqlite_store.execute(get_sent_table_size)
total_count = remaining_count + inserted_rows
defer.returnValue((next_chunk, inserted_rows, total_count))
return next_chunk, inserted_rows, total_count
@defer.inlineCallbacks
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = yield self.sqlite_store.execute_sql(
async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = await self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
)
brows = yield self.sqlite_store.execute_sql(
brows = await self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
)
defer.returnValue(frows[0][0] + brows[0][0])
return frows[0][0] + brows[0][0]
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
rows = yield self.postgres_store.execute_sql(
async def _get_already_ported_count(self, table):
rows = await self.postgres_store.execute_sql(
"SELECT count(*) FROM %s" % (table,)
)
defer.returnValue(rows[0][0])
return rows[0][0]
@defer.inlineCallbacks
def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
remaining, done = yield defer.gatherResults(
[
self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
self._get_already_ported_count(table),
],
consumeErrors=True,
async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
remaining, done = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._get_remaining_count_to_port,
table,
forward_chunk,
backward_chunk,
),
run_in_background(self._get_already_ported_count, table),
],
)
)
remaining = int(remaining) if remaining else 0
done = int(done) if done else 0
defer.returnValue((done, remaining + done))
return done, remaining + done
def _setup_state_group_id_seq(self):
def r(txn):
@@ -1013,7 +1031,12 @@ if __name__ == "__main__":
hs_config=config,
)
reactor.callWhenRunning(porter.run)
@defer.inlineCallbacks
def run():
with LoggingContext("synapse_port_db_run"):
yield defer.ensureDeferred(porter.run())
reactor.callWhenRunning(run)
reactor.run()
@@ -1022,7 +1045,11 @@ if __name__ == "__main__":
else:
start()
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)
if end_error:
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)
sys.stderr.write(end_error)
sys.exit(5)

View File

@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.8.0"
__version__ = "1.9.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -14,7 +14,6 @@
# limitations under the License.
import logging
from typing import Dict, Tuple
from six import itervalues
@@ -35,7 +34,7 @@ from synapse.api.errors import (
ResourceLimitError,
)
from synapse.config.server import is_threepid_reserved
from synapse.types import UserID
from synapse.types import StateMap, UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
@@ -509,10 +508,7 @@ class Auth(object):
return self.store.is_server_admin(user)
def compute_auth_events(
self,
event,
current_state_ids: Dict[Tuple[str, str], str],
for_verification: bool = False,
self, event, current_state_ids: StateMap[str], for_verification: bool = False,
):
"""Given an event and current state return the list of event IDs used
to auth an event.

View File

@@ -17,13 +17,15 @@
"""Contains exceptions and error codes."""
import logging
from typing import Dict
from typing import Dict, List
from six import iteritems
from six.moves import http_client
from canonicaljson import json
from twisted.web import http
logger = logging.getLogger(__name__)
@@ -80,6 +82,29 @@ class CodeMessageException(RuntimeError):
self.msg = msg
class RedirectException(CodeMessageException):
"""A pseudo-error indicating that we want to redirect the client to a different
location
Attributes:
cookies: a list of set-cookies values to add to the response. For example:
b"sessionId=a3fWa; Expires=Wed, 21 Oct 2015 07:28:00 GMT"
"""
def __init__(self, location: bytes, http_code: int = http.FOUND):
"""
Args:
location: the URI to redirect to
http_code: the HTTP response code
"""
msg = "Redirect to %s" % (location.decode("utf-8"),)
super().__init__(code=http_code, msg=msg)
self.location = location
self.cookies = [] # type: List[bytes]
class SynapseError(CodeMessageException):
"""A base exception type for matrix errors which have an errcode and error
message (as well as an HTTP status code).
@@ -158,12 +183,6 @@ class UserDeactivatedError(SynapseError):
)
class RegistrationError(SynapseError):
"""An error raised when a registration event fails."""
pass
class FederationDeniedError(SynapseError):
"""An error raised when the server tries to federate with a server which
is not on its federation whitelist.

View File

@@ -15,6 +15,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from six import text_type
import jsonschema
@@ -293,7 +295,7 @@ class Filter(object):
room_id = None
ev_type = "m.presence"
contains_url = False
labels = []
labels = [] # type: List[str]
else:
sender = event.get("sender", None)
if not sender:

View File

@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from collections import OrderedDict
from typing import Any, Optional, Tuple
from synapse.api.errors import LimitExceededError
@@ -23,7 +24,9 @@ class Ratelimiter(object):
"""
def __init__(self):
self.message_counts = collections.OrderedDict()
self.message_counts = (
OrderedDict()
) # type: OrderedDict[Any, Tuple[float, int, Optional[float]]]
def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True):
"""Can the entity (e.g. user or IP address) perform the action?

View File

@@ -84,8 +84,7 @@ class AdminCmdServer(HomeServer):
class AdminCmdReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
async def on_rdata(self, stream_name, token, rows):
pass
def get_streams_to_replicate(self):

View File

@@ -115,9 +115,8 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
async def on_rdata(self, stream_name, token, rows):
await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View File

@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
RoomStateEventRestServlet,
)
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
RoomStore,
DirectoryStore,
SlavedTransactionStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass

View File

@@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(FederationSenderReplicationHandler, self).on_rdata(
async def on_rdata(self, stream_name, token, rows):
await super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -159,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
args.update(self.send_handler.stream_positions())
return args
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
# Let's wake up the transaction queue for the server in case we have
# pending stuff to send to it.
self.send_handler.wake_destination(server)
def start(config_options):
try:
@@ -206,7 +212,7 @@ class FederationSenderHandler(object):
to the federation sender.
"""
def __init__(self, hs, replication_client):
def __init__(self, hs: FederationSenderServer, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
@@ -227,6 +233,9 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering()
)
def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)
def stream_positions(self):
return {"federation": self.federation_position}

View File

@@ -31,7 +31,7 @@ from prometheus_client import Gauge
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.resource import EncodingResourceWrapper, IResource, NoResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -109,7 +109,16 @@ class SynapseHomeServer(HomeServer):
for path, resmodule in additional_resources.items():
handler_cls, config = load_module(resmodule)
handler = handler_cls(config, module_api)
resources[path] = AdditionalResource(self, handler.handle_request)
if IResource.providedBy(handler):
resource = handler
elif hasattr(handler, "handle_request"):
resource = AdditionalResource(self, handler.handle_request)
else:
raise ConfigError(
"additional_resource %s does not implement a known interface"
% (resmodule["module"],)
)
resources[path] = resource
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:

View File

@@ -34,6 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
@@ -47,6 +48,7 @@ logger = logging.getLogger("synapse.app.media_repository")
class MediaRepositorySlavedStore(
RoomStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,

View File

@@ -141,9 +141,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
async def on_rdata(self, stream_name, token, rows):
await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks

View File

@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
@@ -358,9 +362,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
async def on_rdata(self, stream_name, token, rows):
await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View File

@@ -172,9 +172,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
async def on_rdata(self, stream_name, token, rows):
await super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == EventsStream.NAME:

View File

@@ -37,10 +37,12 @@ class EmailConfig(Config):
self.email_enable_notifs = False
email_config = config.get("email", {})
email_config = config.get("email")
if email_config is None:
email_config = {}
self.email_smtp_host = email_config.get("smtp_host", None)
self.email_smtp_port = email_config.get("smtp_port", None)
self.email_smtp_host = email_config.get("smtp_host", "localhost")
self.email_smtp_port = email_config.get("smtp_port", 25)
self.email_smtp_user = email_config.get("smtp_user", None)
self.email_smtp_pass = email_config.get("smtp_pass", None)
self.require_transport_security = email_config.get(
@@ -74,9 +76,9 @@ class EmailConfig(Config):
self.email_template_dir = os.path.abspath(template_dir)
self.email_enable_notifs = email_config.get("enable_notifs", False)
account_validity_renewal_enabled = config.get("account_validity", {}).get(
"renew_at"
)
account_validity_config = config.get("account_validity") or {}
account_validity_renewal_enabled = account_validity_config.get("renew_at")
self.threepid_behaviour_email = (
# Have Synapse handle the email sending if account_threepid_delegates.email
@@ -278,7 +280,9 @@ class EmailConfig(Config):
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
)
self.email_riot_base_url = email_config.get("riot_base_url", None)
self.email_riot_base_url = email_config.get(
"client_base_url", email_config.get("riot_base_url", None)
)
if account_validity_renewal_enabled:
self.email_expiry_template_html = email_config.get(
@@ -294,107 +298,111 @@ class EmailConfig(Config):
raise ConfigError("Unable to find email template file %s" % (p,))
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """
# Enable sending emails for password resets, notification events or
# account expiry notices
return """\
# Configuration for sending emails from Synapse.
#
# If your SMTP server requires authentication, the optional smtp_user &
# smtp_pass variables should be used
#
#email:
# enable_notifs: false
# smtp_host: "localhost"
# smtp_port: 25 # SSL: 465, STARTTLS: 587
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
#
# # Enable email notifications by default
# #
# notif_for_new_users: true
#
# # Defining a custom URL for Riot is only needed if email notifications
# # should contain links to a self-hosted installation of Riot; when set
# # the "app_name" setting is ignored
# #
# riot_base_url: "http://localhost/riot"
#
# # Configure the time that a validation email or text message code
# # will expire after sending
# #
# # This is currently used for password resets
# #
# #validation_token_lifetime: 1h
#
# # Template directory. All template files should be stored within this
# # directory. If not set, default templates from within the Synapse
# # package will be used
# #
# # For the list of default templates, please see
# # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
# #
# #template_dir: res/templates
#
# # Templates for email notifications
# #
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
#
# # Templates for account expiry notices
# #
# expiry_template_html: notice_expiry.html
# expiry_template_text: notice_expiry.txt
#
# # Templates for password reset emails sent by the homeserver
# #
# #password_reset_template_html: password_reset.html
# #password_reset_template_text: password_reset.txt
#
# # Templates for registration emails sent by the homeserver
# #
# #registration_template_html: registration.html
# #registration_template_text: registration.txt
#
# # Templates for validation emails sent by the homeserver when adding an email to
# # your user account
# #
# #add_threepid_template_html: add_threepid.html
# #add_threepid_template_text: add_threepid.txt
#
# # Templates for password reset success and failure pages that a user
# # will see after attempting to reset their password
# #
# #password_reset_template_success_html: password_reset_success.html
# #password_reset_template_failure_html: password_reset_failure.html
#
# # Templates for registration success and failure pages that a user
# # will see after attempting to register using an email or phone
# #
# #registration_template_success_html: registration_success.html
# #registration_template_failure_html: registration_failure.html
#
# # Templates for success and failure pages that a user will see after attempting
# # to add an email or phone to their account
# #
# #add_threepid_success_html: add_threepid_success.html
# #add_threepid_failure_html: add_threepid_failure.html
email:
# The hostname of the outgoing SMTP server to use. Defaults to 'localhost'.
#
#smtp_host: mail.server
# The port on the mail server for outgoing SMTP. Defaults to 25.
#
#smtp_port: 587
# Username/password for authentication to the SMTP server. By default, no
# authentication is attempted.
#
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# Uncomment the following to require TLS transport security for SMTP.
# By default, Synapse will connect over plain text, and will then switch to
# TLS via STARTTLS *if the SMTP server supports it*. If this option is set,
# Synapse will refuse to connect unless the server supports STARTTLS.
#
#require_transport_security: true
# Enable sending emails for messages that the user has missed
#
#enable_notifs: false
# notif_from defines the "From" address to use when sending emails.
# It must be set if email sending is enabled.
#
# The placeholder '%(app)s' will be replaced by the application name,
# which is normally 'app_name' (below), but may be overridden by the
# Matrix client application.
#
# Note that the placeholder must be written '%(app)s', including the
# trailing 's'.
#
#notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
# app_name defines the default value for '%(app)s' in notif_from. It
# defaults to 'Matrix'.
#
#app_name: my_branded_matrix_server
# Uncomment the following to disable automatic subscription to email
# notifications for new users. Enabled by default.
#
#notif_for_new_users: false
# Custom URL for client links within the email notifications. By default
# links will be based on "https://matrix.to".
#
# (This setting used to be called riot_base_url; the old name is still
# supported for backwards-compatibility but is now deprecated.)
#
#client_base_url: "http://localhost/riot"
# Configure the time that a validation email will expire after sending.
# Defaults to 1h.
#
#validation_token_lifetime: 15m
# Directory in which Synapse will try to find the template files below.
# If not set, default templates from within the Synapse package will be used.
#
# DO NOT UNCOMMENT THIS SETTING unless you want to customise the templates.
# If you *do* uncomment it, you will need to make sure that all the templates
# below are in the directory.
#
# Synapse will look for the following templates in this directory:
#
# * The contents of email notifications of missed events: 'notif_mail.html' and
# 'notif_mail.txt'.
#
# * The contents of account expiry notice emails: 'notice_expiry.html' and
# 'notice_expiry.txt'.
#
# * The contents of password reset emails sent by the homeserver:
# 'password_reset.html' and 'password_reset.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in the password reset email: 'password_reset_success.html' and
# 'password_reset_failure.html'
#
# * The contents of address verification emails sent during registration:
# 'registration.html' and 'registration.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent during registration:
# 'registration_success.html' and 'registration_failure.html'
#
# * The contents of address verification emails sent when an address is added
# to a Matrix account: 'add_threepid.html' and 'add_threepid.txt'
#
# * HTML pages for success and failure that a user will see when they follow
# the link in an address verification email sent when an address is added
# to a Matrix account: 'add_threepid_success.html' and
# 'add_threepid_failure.html'
#
# You can see the default templates at:
# https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
#
#template_dir: "res/templates"
"""

View File

@@ -35,7 +35,7 @@ class PushConfig(Config):
# Now check for the one in the 'email' section and honour it,
# with a warning.
push_config = config.get("email", {})
push_config = config.get("email") or {}
redact_content = push_config.get("redact_content")
if redact_content is not None:
print(

View File

@@ -27,6 +27,9 @@ class AccountValidityConfig(Config):
section = "accountvalidity"
def __init__(self, config, synapse_config):
if config is None:
return
super(AccountValidityConfig, self).__init__()
self.enabled = config.get("enabled", False)
self.renew_by_email_enabled = "renew_at" in config
@@ -91,7 +94,7 @@ class RegistrationConfig(Config):
)
self.account_validity = AccountValidityConfig(
config.get("account_validity", {}), config
config.get("account_validity") or {}, config
)
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
@@ -159,23 +162,6 @@ class RegistrationConfig(Config):
# Optional account validity configuration. This allows for accounts to be denied
# any request after a given period.
#
# ``enabled`` defines whether the account validity feature is enabled. Defaults
# to False.
#
# ``period`` allows setting the period after which an account is valid
# after its registration. When renewing the account, its validity period
# will be extended by this amount of time. This parameter is required when using
# the account validity feature.
#
# ``renew_at`` is the amount of time before an account's expiry date at which
# Synapse will send an email to the account's email address with a renewal link.
# This needs the ``email`` and ``public_baseurl`` configuration sections to be
# filled.
#
# ``renew_email_subject`` is the subject of the email sent out with the renewal
# link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter
# from the ``email`` section.
#
# Once this feature is enabled, Synapse will look for registered users without an
# expiration date at startup and will add one to every account it found using the
# current settings at that time.
@@ -186,21 +172,55 @@ class RegistrationConfig(Config):
# date will be randomly selected within a range [now + period - d ; now + period],
# where d is equal to 10%% of the validity period.
#
#account_validity:
# enabled: true
# period: 6w
# renew_at: 1w
# renew_email_subject: "Renew your %%(app)s account"
# # Directory in which Synapse will try to find the HTML files to serve to the
# # user when trying to renew an account. Optional, defaults to
# # synapse/res/templates.
# template_dir: "res/templates"
# # HTML to be displayed to the user after they successfully renewed their
# # account. Optional.
# account_renewed_html_path: "account_renewed.html"
# # HTML to be displayed when the user tries to renew an account with an invalid
# # renewal token. Optional.
# invalid_token_html_path: "invalid_token.html"
account_validity:
# The account validity feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# The period after which an account is valid after its registration. When
# renewing the account, its validity period will be extended by this amount
# of time. This parameter is required when using the account validity
# feature.
#
#period: 6w
# The amount of time before an account's expiry date at which Synapse will
# send an email to the account's email address with a renewal link. By
# default, no such emails are sent.
#
# If you enable this setting, you will also need to fill out the 'email' and
# 'public_baseurl' configuration sections.
#
#renew_at: 1w
# The subject of the email sent out with the renewal link. '%%(app)s' can be
# used as a placeholder for the 'app_name' parameter from the 'email'
# section.
#
# Note that the placeholder must be written '%%(app)s', including the
# trailing 's'.
#
# If this is not set, a default value is used.
#
#renew_email_subject: "Renew your %%(app)s account"
# Directory in which Synapse will try to find templates for the HTML files to
# serve to the user when trying to renew an account. If not set, default
# templates from within the Synapse package will be used.
#
#template_dir: "res/templates"
# File within 'template_dir' giving the HTML to be displayed to the user after
# they successfully renewed their account. If not set, default text is used.
#
#account_renewed_html_path: "account_renewed.html"
# File within 'template_dir' giving the HTML to be displayed when the user
# tries to renew an account with an invalid renewal token. If not set,
# default text is used.
#
#invalid_token_html_path: "invalid_token.html"
# Time that a user's session remains valid for, after they log in.
#

View File

@@ -121,6 +121,7 @@ class SAML2Config(Config):
required_methods = [
"get_saml_attributes",
"saml_response_to_user_attributes",
"get_remote_user_id",
]
missing_methods = [
method

View File

@@ -294,6 +294,14 @@ class ServerConfig(Config):
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None
if self.retention_enabled:
logger.info(
"Message retention policies support enabled with the following default"
" policy: min_lifetime = %s ; max_lifetime = %s",
self.retention_default_min_lifetime,
self.retention_default_max_lifetime,
)
self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
@@ -948,17 +956,17 @@ class ServerConfig(Config):
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
# of outdated messages on a more frequent basis than for the rest of the rooms
# (e.g. every 12h), but not want that purge to be performed by a job that's
# iterating over every room it knows, which could be heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# interval: 12h
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
# interval: 1d
"""
% locals()
)

View File

@@ -634,7 +634,7 @@ def get_public_keys(invite_event):
return public_keys
def auth_types_for_event(event) -> Set[Tuple[str]]:
def auth_types_for_event(event) -> Set[Tuple[str, str]]:
"""Given an event, return a list of (EventType, StateKey) that may be
needed to auth the event. The returned list may be a superset of what
would actually be required depending on the full state of the room.

View File

@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Optional, Tuple, Union
from typing import Optional, Union
from six import iteritems
@@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.appservice import ApplicationService
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import StateMap
@attr.s(slots=True)
@@ -106,13 +107,11 @@ class EventContext:
_state_group = attr.ib(default=None, type=Optional[int])
state_group_before_event = attr.ib(default=None, type=Optional[int])
prev_group = attr.ib(default=None, type=Optional[int])
delta_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
delta_ids = attr.ib(default=None, type=Optional[StateMap[str]])
app_service = attr.ib(default=None, type=Optional[ApplicationService])
_current_state_ids = attr.ib(
default=None, type=Optional[Dict[Tuple[str, str], str]]
)
_prev_state_ids = attr.ib(default=None, type=Optional[Dict[Tuple[str, str], str]])
_current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
_prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
@staticmethod
def with_state(

View File

@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict
import six
from six import iteritems
@@ -22,6 +23,7 @@ from six import iteritems
from canonicaljson import json
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
@@ -41,7 +43,11 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.context import (
make_deferred_yieldable,
nested_logging_context,
run_in_background,
)
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
@@ -49,7 +55,7 @@ from synapse.replication.http.federation import (
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import glob_to_regex
from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -160,6 +166,43 @@ class FederationServer(FederationBase):
)
return 400, response
# We process PDUs and EDUs in parallel. This is important as we don't
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
pdu_results, _ = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._handle_pdus_in_txn, origin, transaction, request_time
),
run_in_background(self._handle_edus_in_txn, origin, transaction),
],
consumeErrors=True,
).addErrback(unwrapFirstError)
)
response = {"pdus": pdu_results}
logger.debug("Returning: %s", str(response))
await self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response
async def _handle_pdus_in_txn(
self, origin: str, transaction: Transaction, request_time: int
) -> Dict[str, dict]:
"""Process the PDUs in a received transaction.
Args:
origin: the server making the request
transaction: incoming transaction
request_time: timestamp that the HTTP request arrived at
Returns:
A map from event ID of a processed PDU to any errors we should
report back to the sending server.
"""
received_pdus_counter.inc(len(transaction.pdus))
origin_host, _ = parse_server_name(origin)
@@ -250,20 +293,23 @@ class FederationServer(FederationBase):
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
)
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
await self.received_edu(origin, edu.edu_type, edu.content)
return pdu_results
response = {"pdus": pdu_results}
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
"""Process the EDUs in a received transaction.
"""
logger.debug("Returning: %s", str(response))
async def _process_edu(edu_dict):
received_edus_counter.inc()
await self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response
edu = Edu(**edu_dict)
await self.registry.on_edu(edu.edu_type, origin, edu.content)
async def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
await self.registry.on_edu(edu_type, origin, content)
await concurrently_execute(
_process_edu,
getattr(transaction, "edus", []),
TRANSACTION_CONCURRENCY_LIMIT,
)
async def on_context_state_request(self, origin, room_id, event_id):
origin_host, _ = parse_server_name(origin)

View File

@@ -259,7 +259,9 @@ class FederationRemoteSendQueue(object):
def federation_ack(self, token):
self._clear_queue_before_pos(token)
def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
"""Get rows to be sent over federation between the two tokens
Args:

View File

@@ -21,6 +21,7 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse
import synapse.metrics
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
@@ -54,7 +55,7 @@ sent_pdus_destination_dist_total = Counter(
class FederationSender(object):
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.server_name = hs.hostname
@@ -482,7 +483,20 @@ class FederationSender(object):
def send_device_messages(self, destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
logger.warning("Not sending device update to ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()
def wake_destination(self, destination: str):
"""Called when we want to retry sending transactions to a remote.
This is mainly useful if the remote server has been down and we think it
might have come back.
"""
if destination == self.server_name:
logger.warning("Not waking up ourselves")
return
self._get_per_destination_queue(destination).attempt_new_transaction()

View File

@@ -31,6 +31,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import StateMap
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
@@ -77,7 +78,7 @@ class PerDestinationQueue(object):
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
self._pending_edus_keyed = {} # type: StateMap[Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination

View File

@@ -44,6 +44,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -101,12 +102,17 @@ class NoAuthenticationError(AuthenticationError):
class Authenticator(object):
def __init__(self, hs):
def __init__(self, hs: HomeServer):
self._clock = hs.get_clock()
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
self.notifer = hs.get_notifier()
self.replication_client = None
if hs.config.worker.worker_app:
self.replication_client = hs.get_tcp_replication()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(self, request, content):
@@ -166,6 +172,17 @@ class Authenticator(object):
try:
logger.info("Marking origin %r as up", origin)
await self.store.set_destination_retry_timings(origin, None, 0, 0)
# Inform the relevant places that the remote server is back up.
self.notifer.notify_remote_server_up(origin)
if self.replication_client:
# If we're on a worker we try and inform master about this. The
# replication client doesn't hook into the notifier to avoid
# infinite loops where we send a `REMOTE_SERVER_UP` command to
# master, which then echoes it back to us which in turn pokes
# the notifier.
self.replication_client.send_remote_server_up(origin)
except Exception:
logger.exception("Error resetting retry timings on %s", origin)

View File

@@ -14,9 +14,11 @@
# limitations under the License.
import logging
from typing import List
from synapse.api.constants import Membership
from synapse.types import RoomStreamToken
from synapse.events import FrozenEvent
from synapse.types import RoomStreamToken, StateMap
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -51,68 +53,15 @@ class AdminHandler(BaseHandler):
return ret
async def get_users(self):
"""Function to retrieve a list of users in users table.
Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = await self.store.get_users()
async def get_user(self, user):
"""Function to get user details"""
ret = await self.store.get_user_by_id(user.to_string())
if ret:
profile = await self.store.get_profileinfo(user.localpart)
ret["displayname"] = profile.display_name
ret["avatar_url"] = profile.avatar_url
return ret
async def get_users_paginate(self, start, limit, name, guests, deactivated):
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users.
Args:
start (int): start number to begin the query from
limit (int): number of rows to retrieve
name (string): filter for user names
guests (bool): whether to in include guest users
deactivated (bool): whether to include deactivated users
Returns:
defer.Deferred: resolves to json list[dict[str, Any]]
"""
ret = await self.store.get_users_paginate(
start, limit, name, guests, deactivated
)
return ret
async def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.
Args:
term (str): search term
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = await self.store.search_users(term)
return ret
def get_user_server_admin(self, user):
"""
Get the admin bit on a user.
Args:
user_id (UserID): the (necessarily local) user to manipulate
"""
return self.store.is_server_admin(user)
def set_user_server_admin(self, user, admin):
"""
Set the admin bit on a user.
Args:
user_id (UserID): the (necessarily local) user to manipulate
admin (bool): whether or not the user should be an admin of this server
"""
return self.store.set_server_admin(user, admin)
async def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.
@@ -125,7 +74,7 @@ class AdminHandler(BaseHandler):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = await self.store.get_rooms_for_user_where_membership_is(
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
@@ -250,35 +199,26 @@ class ExfiltrationWriter(object):
"""Interface used to specify how to write exported data.
"""
def write_events(self, room_id, events):
def write_events(self, room_id: str, events: List[FrozenEvent]):
"""Write a batch of events for a room.
Args:
room_id (str)
events (list[FrozenEvent])
"""
pass
def write_state(self, room_id, event_id, state):
def write_state(self, room_id: str, event_id: str, state: StateMap[FrozenEvent]):
"""Write the state at the given event in the room.
This only gets called for backward extremities rather than for each
event.
Args:
room_id (str)
event_id (str)
state (dict[tuple[str, str], FrozenEvent])
"""
pass
def write_invite(self, room_id, event, state):
def write_invite(self, room_id: str, event: FrozenEvent, state: StateMap[dict]):
"""Write an invite for the room, with associated invite state.
Args:
room_id (str)
event (FrozenEvent)
state (dict[tuple[str, str], dict]): A subset of the state at the
room_id
event
state: A subset of the state at the
invite, with a subset of the event keys (type, state_key
content and sender)
"""

View File

@@ -140,7 +140,7 @@ class DeactivateAccountHandler(BaseHandler):
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)
pending_invites = await self.store.get_invited_rooms_for_local_user(user_id)
for room in pending_invites:
try:

View File

@@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
import synapse
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
@@ -31,9 +32,11 @@ from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
device_list_debugging_logger = logging.getLogger("synapse.devices.DEBUG_TRACKING")
class DeviceMessageHandler(object):
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
"""
Args:
hs (synapse.server.HomeServer): server
@@ -65,6 +68,9 @@ class DeviceMessageHandler(object):
logger.warning("Request for keys for non-local user %s", user_id)
raise SynapseError(400, "Not a user here")
if not by_device:
continue
messages_by_device = {
device_id: {
"content": message_content,
@@ -73,8 +79,65 @@ class DeviceMessageHandler(object):
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
local_messages[user_id] = messages_by_device
if (
device_list_debugging_logger.isEnabledFor(logging.INFO)
and message_type == "m.room_key_request"
):
# If we get a request to get keys then may mean the recipient
# didn't know about the sender's device (or might just mean
# things are being a bit slow to propogate).
received_devices = set(by_device)
requesting_device_id = list(by_device.values())[0].get(
"requesting_device_id", "<unknown>"
)
request_id = list(by_device.values())[0].get("request_id", "<unknown>")
action = list(by_device.values())[0].get("action", "<unknown>")
device_list_debugging_logger.info(
"Received room_key %s direct message (%s, %s, %s) from %s (%s) to %s (%s).",
action,
message_type,
message_id,
request_id,
sender_user_id,
requesting_device_id,
user_id,
received_devices,
)
elif device_list_debugging_logger.isEnabledFor(logging.INFO):
# We expect the sending user to send the message to all the devices
# to the user, if they don't then that is potentially suspicious and
# so we log for debugging purposes.
expected_devices = yield self.store.get_devices_by_user(user_id)
expected_devices = set(expected_devices)
received_devices = set(by_device)
if received_devices != {"*"} and received_devices != expected_devices:
# Devices that the remote didn't send to
missed = expected_devices - received_devices
# Devices the remote sent to that we don't know bout
extraneous = received_devices - expected_devices
# We try and pull out the `sender_key` from the first message,
# if it has one. This just helps figure out which device the
# message came from.
sender_key = list(by_device.values())[0].get(
"sender_key", "<unknown>"
)
device_list_debugging_logger.info(
"Received direct message (%s, %s) from %s (%s) to %s with mismatched devices."
" Missing: %s, extraneous: %s",
message_type,
message_id,
sender_user_id,
sender_key,
user_id,
missed,
extraneous,
)
stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages

View File

@@ -64,7 +64,7 @@ from synapse.replication.http.federation import (
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import UserID, get_domain_from_id
from synapse.types import StateMap, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
@@ -89,7 +89,7 @@ class _NewEventInfo:
event = attr.ib(type=EventBase)
state = attr.ib(type=Optional[Sequence[EventBase]], default=None)
auth_events = attr.ib(type=Optional[Dict[Tuple[str, str], EventBase]], default=None)
auth_events = attr.ib(type=Optional[StateMap[EventBase]], default=None)
def shortstr(iterable, maxitems=5):
@@ -352,9 +352,7 @@ class FederationHandler(BaseHandler):
ours = await self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(
ours.values()
) # type: list[dict[tuple[str, str], str]]
state_maps = list(ours.values()) # type: list[StateMap[str]]
# we don't need this any more, let's delete it.
del ours
@@ -1912,7 +1910,7 @@ class FederationHandler(BaseHandler):
origin: str,
event: EventBase,
state: Optional[Iterable[EventBase]],
auth_events: Optional[Dict[Tuple[str, str], EventBase]],
auth_events: Optional[StateMap[EventBase]],
backfilled: bool,
):
"""

View File

@@ -130,6 +130,8 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -190,6 +192,8 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.create_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -231,6 +235,8 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -271,6 +277,8 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -315,6 +323,8 @@ class GroupsLocalHandler(object):
res = yield self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -361,6 +371,8 @@ class GroupsLocalHandler(object):
requester_user_id,
content,
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -424,6 +436,8 @@ class GroupsLocalHandler(object):
user_id,
content,
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")
@@ -460,6 +474,8 @@ class GroupsLocalHandler(object):
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id]
)
except HttpResponseException as e:
raise e.to_synapse_error()
except RequestSendFailed:
raise SynapseError(502, "Failed to contact group server")

View File

@@ -101,7 +101,7 @@ class InitialSyncHandler(BaseHandler):
if include_archived:
memberships.append(Membership.LEAVE)
room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, membership_list=memberships
)

View File

@@ -88,6 +88,8 @@ class PaginationHandler(object):
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
self.clock.looping_call(
run_as_background_process,
job["interval"],
@@ -130,11 +132,22 @@ class PaginationHandler(object):
else:
include_null = False
logger.info(
"[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)",
min_ms,
max_ms,
include_null,
)
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
for room_id, retention_policy in iteritems(rooms):
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
@@ -156,7 +169,7 @@ class PaginationHandler(object):
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
r = yield self.store.get_room_event_after_stream_ordering(
r = yield self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:

View File

@@ -20,13 +20,7 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import MAX_USERID_LENGTH, LoginType
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
RegistrationError,
SynapseError,
)
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict
from synapse.replication.http.login import RegisterDeviceReplicationServlet
@@ -165,7 +159,7 @@ class RegistrationHandler(BaseHandler):
Returns:
Deferred[str]: user_id
Raises:
RegistrationError if there was a problem registering.
SynapseError if there was a problem registering.
"""
yield self.check_registration_ratelimit(address)
@@ -174,7 +168,7 @@ class RegistrationHandler(BaseHandler):
if password:
password_hash = yield self._auth_handler.hash(password)
if localpart:
if localpart is not None:
yield self.check_username(localpart, guest_access_token=guest_access_token)
was_guest = guest_access_token is not None
@@ -182,7 +176,7 @@ class RegistrationHandler(BaseHandler):
if not was_guest:
try:
int(localpart)
raise RegistrationError(
raise SynapseError(
400, "Numeric user IDs are reserved for guest users."
)
except ValueError:

View File

@@ -32,7 +32,15 @@ from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, Syna
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.types import (
Requester,
RoomAlias,
RoomID,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.response_cache import ResponseCache
@@ -207,15 +215,19 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def _update_upgraded_room_pls(
self, requester, old_room_id, new_room_id, old_room_state,
self,
requester: Requester,
old_room_id: str,
new_room_id: str,
old_room_state: StateMap[str],
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (str): the id of the room to be replaced
new_room_id (str): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
requester: the user requesting the upgrade
old_room_id: the id of the room to be replaced
new_room_id: the id of the replacement room
old_room_state: the state map for the old room
Returns:
Deferred

View File

@@ -690,7 +690,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_user_in_room(
invite = yield self.store.get_invite_for_local_user_in_room(
user_id=user_id, room_id=room_id
)
if invite:

View File

@@ -24,6 +24,7 @@ from saml2.client import Saml2Client
from synapse.api.errors import SynapseError
from synapse.config import ConfigError
from synapse.http.servlet import parse_string
from synapse.module_api import ModuleApi
from synapse.rest.client.v1.login import SSOAuthHandler
from synapse.types import (
UserID,
@@ -31,6 +32,7 @@ from synapse.types import (
mxid_localpart_allowed_characters,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.iterutils import chunk_seq
logger = logging.getLogger(__name__)
@@ -59,7 +61,8 @@ class SamlHandler:
# plugin to do custom mapping from saml response to mxid
self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class(
hs.config.saml2_user_mapping_provider_config
hs.config.saml2_user_mapping_provider_config,
ModuleApi(hs, hs.get_auth_handler()),
)
# identifier for the external_ids table
@@ -112,10 +115,10 @@ class SamlHandler:
# the dict.
self.expire_sessions()
user_id = await self._map_saml_response_to_user(resp_bytes)
user_id = await self._map_saml_response_to_user(resp_bytes, relay_state)
self._sso_auth_handler.complete_sso_login(user_id, request, relay_state)
async def _map_saml_response_to_user(self, resp_bytes):
async def _map_saml_response_to_user(self, resp_bytes, client_redirect_url):
try:
saml2_auth = self._saml_client.parse_authn_request_response(
resp_bytes,
@@ -130,17 +133,28 @@ class SamlHandler:
logger.warning("SAML2 response was not signed")
raise SynapseError(400, "SAML2 response was not signed")
logger.info("SAML2 response: %s", saml2_auth.origxml)
logger.debug("SAML2 response: %s", saml2_auth.origxml)
for assertion in saml2_auth.assertions:
# kibana limits the length of a log field, whereas this is all rather
# useful, so split it up.
count = 0
for part in chunk_seq(str(assertion), 10000):
logger.info(
"SAML2 assertion: %s%s", "(%i)..." % (count,) if count else "", part
)
count += 1
logger.info("SAML2 mapped attributes: %s", saml2_auth.ava)
try:
remote_user_id = saml2_auth.ava["uid"][0]
except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "'uid' not in SAML2 response")
self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
remote_user_id = self._user_mapping_provider.get_remote_user_id(
saml2_auth, client_redirect_url
)
if not remote_user_id:
raise Exception("Failed to extract remote user id from SAML response")
with (await self._mapping_lock.queue(self._auth_provider_id)):
# first of all, check if we already have a mapping for this user
logger.info(
@@ -183,7 +197,7 @@ class SamlHandler:
# Map saml response to user attributes using the configured mapping provider
for i in range(1000):
attribute_dict = self._user_mapping_provider.saml_response_to_user_attributes(
saml2_auth, i
saml2_auth, i, client_redirect_url=client_redirect_url,
)
logger.debug(
@@ -216,6 +230,8 @@ class SamlHandler:
500, "Unable to generate a Matrix ID from the SAML response"
)
logger.info("Mapped SAML user to local part %s", localpart)
registered_user_id = await self._registration_handler.register_user(
localpart=localpart, default_display_name=displayname
)
@@ -265,17 +281,35 @@ class SamlConfig(object):
class DefaultSamlMappingProvider(object):
__version__ = "0.0.1"
def __init__(self, parsed_config: SamlConfig):
def __init__(self, parsed_config: SamlConfig, module_api: ModuleApi):
"""The default SAML user mapping provider
Args:
parsed_config: Module configuration
module_api: module api proxy
"""
self._mxid_source_attribute = parsed_config.mxid_source_attribute
self._mxid_mapper = parsed_config.mxid_mapper
self._grandfathered_mxid_source_attribute = (
module_api._hs.config.saml2_grandfathered_mxid_source_attribute
)
def get_remote_user_id(
self, saml_response: saml2.response.AuthnResponse, client_redirect_url: str
):
"""Extracts the remote user id from the SAML response"""
try:
return saml_response.ava["uid"][0]
except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "'uid' not in SAML2 response")
def saml_response_to_user_attributes(
self, saml_response: saml2.response.AuthnResponse, failures: int = 0,
self,
saml_response: saml2.response.AuthnResponse,
failures: int,
client_redirect_url: str,
) -> dict:
"""Maps some text from a SAML response to attributes of a new user
@@ -285,6 +319,8 @@ class DefaultSamlMappingProvider(object):
failures: How many times a call to this function with this
saml_response has resulted in a failure
client_redirect_url: where the client wants to redirect to
Returns:
dict: A dict containing new user attributes. Possible keys:
* mxid_localpart (str): Required. The localpart of the user's mxid

View File

@@ -179,7 +179,7 @@ class SearchHandler(BaseHandler):
search_filter = Filter(filter_dict)
# TODO: Search through left rooms too
rooms = yield self.store.get_rooms_for_user_where_membership_is(
rooms = yield self.store.get_rooms_for_local_user_where_membership_is(
user.to_string(),
membership_list=[Membership.JOIN],
# membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],

View File

@@ -1662,7 +1662,7 @@ class SyncHandler(object):
Membership.BAN,
)
room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, membership_list=membership_list
)

View File

@@ -257,7 +257,7 @@ class TypingHandler(object):
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
async def get_all_typing_updates(self, last_id, current_id):
if last_id == current_id:
return []

View File

@@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cgi
import collections
import html
import http.client
import logging
import types
@@ -36,6 +36,7 @@ import synapse.metrics
from synapse.api.errors import (
CodeMessageException,
Codes,
RedirectException,
SynapseError,
UnrecognizedRequestError,
)
@@ -153,14 +154,18 @@ def _return_html_error(f, request):
Args:
f (twisted.python.failure.Failure):
request (twisted.web.iweb.IRequest):
request (twisted.web.server.Request):
"""
if f.check(CodeMessageException):
cme = f.value
code = cme.code
msg = cme.msg
if isinstance(cme, SynapseError):
if isinstance(cme, RedirectException):
logger.info("%s redirect to %s", request, cme.location)
request.setHeader(b"location", cme.location)
request.cookies.extend(cme.cookies)
elif isinstance(cme, SynapseError):
logger.info("%s SynapseError: %s - %s", request, code, msg)
else:
logger.error(
@@ -178,7 +183,7 @@ def _return_html_error(f, request):
exc_info=(f.type, f.value, f.getTracebackObject()),
)
body = HTML_ERROR_TEMPLATE.format(code=code, msg=cgi.escape(msg)).encode("utf-8")
body = HTML_ERROR_TEMPLATE.format(code=code, msg=html.escape(msg)).encode("utf-8")
request.setResponseCode(code)
request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
request.setHeader(b"Content-Length", b"%i" % (len(body),))

View File

@@ -88,7 +88,7 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
uri = self.uri
if isinstance(uri, bytes):
uri = self.uri.decode("ascii")
uri = self.uri.decode("ascii", errors="replace")
return redact_uri(uri)
def get_method(self):

View File

@@ -571,6 +571,9 @@ def run_in_background(f, *args, **kwargs):
yield or await on (for instance because you want to pass it to
deferred.gatherResults()).
If f returns a Coroutine object, it will be wrapped into a Deferred (which will have
the side effect of executing the coroutine).
Note that if you completely discard the result, you should make sure that
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,18 +17,26 @@ import logging
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import UserID
"""
This package defines the 'stable' API which can be used by extension modules which
are loaded into Synapse.
"""
__all__ = ["errors", "make_deferred_yieldable", "run_in_background", "ModuleApi"]
logger = logging.getLogger(__name__)
class ModuleApi(object):
"""A proxy object that gets passed to password auth providers so they
"""A proxy object that gets passed to various plugin modules so they
can register new users etc if necessary.
"""
def __init__(self, hs, auth_handler):
self.hs = hs
self._hs = hs
self._store = hs.get_datastore()
self._auth = hs.get_auth()
@@ -64,7 +73,7 @@ class ModuleApi(object):
"""
if username.startswith("@"):
return username
return UserID(username, self.hs.hostname).to_string()
return UserID(username, self._hs.hostname).to_string()
def check_user_exists(self, user_id):
"""Check if user exists.
@@ -111,10 +120,14 @@ class ModuleApi(object):
displayname (str|None): The displayname of the new user.
emails (List[str]): Emails to bind to the new user.
Raises:
SynapseError if there is an error performing the registration. Check the
'errcode' property for more information on the reason for failure
Returns:
Deferred[str]: user_id
"""
return self.hs.get_registration_handler().register_user(
return self._hs.get_registration_handler().register_user(
localpart=localpart, default_display_name=displayname, bind_emails=emails
)
@@ -131,12 +144,34 @@ class ModuleApi(object):
Returns:
defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
"""
return self.hs.get_registration_handler().register_device(
return self._hs.get_registration_handler().register_device(
user_id=user_id,
device_id=device_id,
initial_display_name=initial_display_name,
)
def record_user_external_id(
self, auth_provider_id: str, remote_user_id: str, registered_user_id: str
) -> defer.Deferred:
"""Record a mapping from an external user id to a mxid
Args:
auth_provider: identifier for the remote auth provider
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
return self._store.record_user_external_id(
auth_provider_id, remote_user_id, registered_user_id
)
def generate_short_term_login_token(
self, user_id: str, duration_in_ms: int = (2 * 60 * 1000)
) -> str:
"""Generate a login token suitable for m.login.token authentication"""
return self._hs.get_macaroon_generator().generate_short_term_login_token(
user_id, duration_in_ms
)
@defer.inlineCallbacks
def invalidate_access_token(self, access_token):
"""Invalidate an access token for a user
@@ -157,7 +192,7 @@ class ModuleApi(object):
user_id = user_info["user"].to_string()
if device_id:
# delete the device, which will also delete its access tokens
yield self.hs.get_device_handler().delete_device(user_id, device_id)
yield self._hs.get_device_handler().delete_device(user_id, device_id)
else:
# no associated device. Just delete the access token.
yield self._auth_handler.delete_access_token(access_token)

View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exception types which are exposed as part of the stable module API"""
from synapse.api.errors import RedirectException, SynapseError # noqa: F401

View File

@@ -15,11 +15,13 @@
import logging
from collections import namedtuple
from typing import Callable, List
from prometheus_client import Counter
from twisted.internet import defer
import synapse.server
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
@@ -154,7 +156,7 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs):
def __init__(self, hs: "synapse.server.HomeServer"):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
@@ -164,7 +166,12 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = []
self.replication_callbacks = []
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
# Called when remote servers have come back online after having been
# down.
self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
@@ -205,7 +212,7 @@ class Notifier(object):
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
)
def add_replication_callback(self, cb):
def add_replication_callback(self, cb: Callable[[], None]):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
@@ -213,6 +220,12 @@ class Notifier(object):
"""
self.replication_callbacks.append(cb)
def add_remote_server_up_callback(self, cb: Callable[[str], None]):
"""Add a callback that will be called when synapse detects a server
has been
"""
self.remote_server_up_callbacks.append(cb)
def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
):
@@ -522,3 +535,15 @@ class Notifier(object):
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()
def notify_remote_server_up(self, server: str):
"""Notify any replication that a remote server has come back up
"""
# We call federation_sender directly rather than registering as a
# callback as a) we already have a reference to it and b) it introduces
# circular dependencies.
if self.federation_sender:
self.federation_sender.wake_destination(server)
for cb in self.remote_server_up_callbacks:
cb(server)

View File

@@ -21,7 +21,7 @@ from synapse.storage import Storage
@defer.inlineCallbacks
def get_badge_count(store, user_id):
invites = yield store.get_invited_rooms_for_user(user_id)
invites = yield store.get_invited_rooms_for_local_user(user_id)
joins = yield store.get_rooms_for_user(user_id)
my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")

View File

@@ -16,6 +16,7 @@
import abc
import logging
import re
from typing import Dict, List, Tuple
from six import raise_from
from six.moves import urllib
@@ -78,9 +79,8 @@ class ReplicationEndpoint(object):
__metaclass__ = abc.ABCMeta
NAME = abc.abstractproperty()
PATH_ARGS = abc.abstractproperty()
NAME = abc.abstractproperty() # type: str # type: ignore
PATH_ARGS = abc.abstractproperty() # type: Tuple[str, ...] # type: ignore
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
@@ -171,7 +171,7 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers = {}
headers = {} # type: Dict[bytes, List[bytes]]
inject_active_span_byte_dict(headers, None, check_destination=False)
try:
result = yield request_func(uri, data, headers=headers)
@@ -207,7 +207,7 @@ class ReplicationEndpoint(object):
method = self.METHOD
if self.CACHE:
handler = self._cached_handler
handler = self._cached_handler # type: ignore
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import Dict
from typing import Dict, Optional
import six
@@ -41,7 +41,7 @@ class BaseSlavedStore(SQLBaseStore):
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
db_conn, "cache_invalidation_stream", "stream_id"
)
) # type: Optional[SlavedIdTracker]
else:
self._cache_id_gen = None
@@ -62,14 +62,20 @@ class BaseSlavedStore(SQLBaseStore):
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
self._cache_id_gen.advance(token)
if self._cache_id_gen:
self._cache_id_gen.advance(token)
for row in rows:
if row.cache_func == CURRENT_STATE_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for current state cache"
)
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)

View File

@@ -152,7 +152,7 @@ class SlavedEventStore(
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_user.invalidate((state_key,))
self.get_invited_rooms_for_local_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))

View File

@@ -29,7 +29,7 @@ class SlavedPresenceStore(BaseSlavedStore):
self._presence_on_startup = self._get_active_presence(db_conn)
self.presence_stream_cache = self.presence_stream_cache = StreamChangeCache(
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)

View File

@@ -16,7 +16,7 @@
"""
import logging
from typing import Dict
from typing import Dict, List, Optional
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
@@ -28,6 +28,7 @@ from synapse.replication.tcp.protocol import (
)
from .commands import (
Command,
FederationAckCommand,
InvalidateCacheCommand,
RemovePusherCommand,
@@ -89,15 +90,15 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
# Any pending commands to be sent once a new connection has been
# established
self.pending_commands = []
self.pending_commands = [] # type: List[Command]
# Map from string -> deferred, to wake up when receiveing a SYNC with
# the given string.
# Used for tests.
self.awaiting_syncs = {}
self.awaiting_syncs = {} # type: Dict[str, defer.Deferred]
# The factory used to create connections.
self.factory = None
self.factory = None # type: Optional[ReplicationClientFactory]
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
@@ -109,7 +110,7 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
async def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
By default this just pokes the slave store. Can be overridden in subclasses to
@@ -120,20 +121,17 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
return self.store.process_replication_rows(stream_name, token, rows)
self.store.process_replication_rows(stream_name, token, rows)
def on_position(self, stream_name, token):
async def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
the slave store.
Can be overriden in subclasses to handle more.
"""
return self.store.process_replication_rows(stream_name, token, [])
self.store.process_replication_rows(stream_name, token, [])
def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting
@@ -145,6 +143,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
if d:
d.callback(data)
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to
subscribe to streams.
@@ -235,4 +236,5 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
self.factory.resetDelay()
if self.factory:
self.factory.resetDelay()

View File

@@ -20,15 +20,16 @@ allowed to be sent by which side.
import logging
import platform
from typing import Tuple, Type
if platform.python_implementation() == "PyPy":
import json
_json_encoder = json.JSONEncoder()
else:
import simplejson as json
import simplejson as json # type: ignore[no-redef] # noqa: F821
_json_encoder = json.JSONEncoder(namedtuple_as_object=False)
_json_encoder = json.JSONEncoder(namedtuple_as_object=False) # type: ignore[call-arg] # noqa: F821
logger = logging.getLogger(__name__)
@@ -44,7 +45,7 @@ class Command(object):
The default implementation creates a command of form `<NAME> <data>`
"""
NAME = None
NAME = None # type: str
def __init__(self, data):
self.data = data
@@ -386,25 +387,39 @@ class UserIpCommand(Command):
)
class RemoteServerUpCommand(Command):
"""Sent when a worker has detected that a remote server is no longer
"down" and retry timings should be reset.
If sent from a client the server will relay to all other workers.
Format::
REMOTE_SERVER_UP <server>
"""
NAME = "REMOTE_SERVER_UP"
_COMMANDS = (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
RemoteServerUpCommand,
) # type: Tuple[Type[Command], ...]
# Map of command name to command type.
COMMAND_MAP = {
cmd.NAME: cmd
for cmd in (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
UserIpCommand,
)
}
COMMAND_MAP = {cmd.NAME: cmd for cmd in _COMMANDS}
# The commands the server is allowed to send
VALID_SERVER_COMMANDS = (
@@ -414,6 +429,7 @@ VALID_SERVER_COMMANDS = (
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
RemoteServerUpCommand.NAME,
)
# The commands the client is allowed to send
@@ -427,4 +443,5 @@ VALID_CLIENT_COMMANDS = (
InvalidateCacheCommand.NAME,
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
)

View File

@@ -53,6 +53,7 @@ import fcntl
import logging
import struct
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Set, Tuple
from six import iteritems, iterkeys
@@ -65,24 +66,26 @@ from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.stringutils import random_string
from .commands import (
from synapse.replication.tcp.commands import (
COMMAND_MAP,
VALID_CLIENT_COMMANDS,
VALID_SERVER_COMMANDS,
Command,
ErrorCommand,
NameCommand,
PingCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
ReplicateCommand,
ServerCommand,
SyncCommand,
UserSyncCommand,
)
from .streams import STREAMS_MAP
from synapse.replication.tcp.streams import STREAMS_MAP
from synapse.types import Collection
from synapse.util import Clock
from synapse.util.stringutils import random_string
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
@@ -124,8 +127,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
delimiter = b"\n"
VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive
VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send
# Valid commands we expect to receive
VALID_INBOUND_COMMANDS = [] # type: Collection[str]
# Valid commands we can send
VALID_OUTBOUND_COMMANDS = [] # type: Collection[str]
max_line_buffer = 10000
@@ -144,13 +150,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.conn_id = random_string(5) # To dedupe in case of name clashes.
# List of pending commands to send once we've established the connection
self.pending_commands = []
self.pending_commands = [] # type: List[Command]
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -235,19 +241,16 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
)
def handle_command(self, cmd):
async def handle_command(self, cmd: Command):
"""Handle a command we have received over the replication stream.
By default delegates to on_<COMMAND>
By default delegates to on_<COMMAND>, which should return an awaitable.
Args:
cmd (synapse.replication.tcp.commands.Command): received command
Returns:
Deferred
cmd: received command
"""
handler = getattr(self, "on_%s" % (cmd.NAME,))
return handler(cmd)
await handler(cmd)
def close(self):
logger.warning("[%s] Closing connection", self.id())
@@ -320,10 +323,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
for cmd in pending:
self.send_command(cmd)
def on_PING(self, line):
async def on_PING(self, line):
self.received_ping = True
def on_ERROR(self, cmd):
async def on_ERROR(self, cmd):
logger.error("[%s] Remote reported error: %r", self.id(), cmd.data)
def pauseProducing(self):
@@ -409,30 +412,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.streamer = streamer
# The streams the client has subscribed to and is up to date with
self.replication_streams = set()
self.replication_streams = set() # type: Set[str]
# The streams the client is currently subscribing to.
self.connecting_streams = set()
self.connecting_streams = set() # type: Set[str]
# Map from stream name to list of updates to send once we've finished
# subscribing the client to the stream.
self.pending_rdata = {}
self.pending_rdata = {} # type: Dict[str, List[Tuple[int, Any]]]
def connectionMade(self):
self.send_command(ServerCommand(self.server_name))
BaseReplicationStreamProtocol.connectionMade(self)
self.streamer.new_connection(self)
def on_NAME(self, cmd):
async def on_NAME(self, cmd):
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
self.name = cmd.data
def on_USER_SYNC(self, cmd):
return self.streamer.on_user_sync(
async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
def on_REPLICATE(self, cmd):
async def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name
token = cmd.token
@@ -443,23 +446,26 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
for stream in iterkeys(self.streamer.streams_by_name)
]
return make_deferred_yieldable(
await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
return self.subscribe_to_stream(stream_name, token)
await self.subscribe_to_stream(stream_name, token)
def on_FEDERATION_ACK(self, cmd):
return self.streamer.federation_ack(cmd.token)
async def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
def on_REMOVE_PUSHER(self, cmd):
return self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
def on_INVALIDATE_CACHE(self, cmd):
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
def on_USER_IP(self, cmd):
return self.streamer.on_user_ip(
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
async def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
@@ -468,8 +474,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
cmd.last_seen,
)
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
async def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those
@@ -481,7 +486,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
try:
# Get missing updates
updates, current_token = yield self.streamer.get_stream_updates(
updates, current_token = await self.streamer.get_stream_updates(
stream_name, token
)
@@ -554,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def send_sync(self, data):
self.send_command(SyncCommand(data))
def send_remote_server_up(self, server: str):
self.send_command(RemoteServerUpCommand(server))
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
self.streamer.lost_connection(self)
@@ -566,7 +574,7 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def on_rdata(self, stream_name, token, rows):
async def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
Args:
@@ -574,14 +582,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
raise NotImplementedError()
@abc.abstractmethod
def on_position(self, stream_name, token):
async def on_position(self, stream_name, token):
"""Called when we get new position data."""
raise NotImplementedError()
@@ -590,6 +595,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""Called when get a new SYNC command."""
raise NotImplementedError()
@abc.abstractmethod
async def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
raise NotImplementedError()
@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
@@ -642,11 +652,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Set of stream names that have been subscribe to, but haven't yet
# caught up with. This is used to track when the client has been fully
# connected to the remote.
self.streams_connecting = set()
self.streams_connecting = set() # type: Set[str]
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
self.pending_batches = {} # type: Dict[str, Any]
def connectionMade(self):
self.send_command(NameCommand(self.client_name))
@@ -670,12 +680,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
if not self.streams_connecting:
self.handler.finished_connecting()
def on_SERVER(self, cmd):
async def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
self.send_error("Wrong remote")
def on_RDATA(self, cmd):
async def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()
@@ -695,19 +705,22 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
return self.handler.on_rdata(stream_name, cmd.token, rows)
await self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
async def on_POSITION(self, cmd):
# When we get a `POSITION` command it means we've finished getting
# missing updates for the given stream, and are now up to date.
self.streams_connecting.discard(cmd.stream_name)
if not self.streams_connecting:
self.handler.finished_connecting()
return self.handler.on_position(cmd.stream_name, cmd.token)
await self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
return self.handler.on_sync(cmd.data)
async def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.handler.on_remote_server_up(cmd.data)
def replicate(self, stream_name, token):
"""Send the subscription request to the server
@@ -766,7 +779,7 @@ def transport_kernel_read_buffer_size(protocol, read=True):
op = SIOCINQ
else:
op = SIOCOUTQ
size = struct.unpack("I", fcntl.ioctl(fileno, op, "\0\0\0\0"))[0]
size = struct.unpack("I", fcntl.ioctl(fileno, op, b"\0\0\0\0"))[0]
return size
return 0

View File

@@ -17,12 +17,12 @@
import logging
import random
from typing import List
from six import itervalues
from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge
@@ -79,7 +79,7 @@ class ReplicationStreamer(object):
self._replication_torture_level = hs.config.replication_torture_level
# Current connections.
self.connections = []
self.connections = [] # type: List[ServerReplicationStreamProtocol]
LaterGauge(
"synapse_replication_tcp_resource_total_connections",
@@ -120,6 +120,7 @@ class ReplicationStreamer(object):
self.federation_sender = hs.get_federation_sender()
self.notifier.add_replication_callback(self.on_notifier_poke)
self.notifier.add_remote_server_up_callback(self.send_remote_server_up)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
@@ -154,8 +155,7 @@ class ReplicationStreamer(object):
run_as_background_process("replication_notifier", self._run_notifier_loop)
@defer.inlineCallbacks
def _run_notifier_loop(self):
async def _run_notifier_loop(self):
self.is_looping = True
try:
@@ -184,7 +184,7 @@ class ReplicationStreamer(object):
continue
if self._replication_torture_level:
yield self.clock.sleep(
await self.clock.sleep(
self._replication_torture_level / 1000.0
)
@@ -195,7 +195,7 @@ class ReplicationStreamer(object):
stream.upto_token,
)
try:
updates, current_token = yield stream.get_updates()
updates, current_token = await stream.get_updates()
except Exception:
logger.info("Failed to handle stream %s", stream.NAME)
raise
@@ -232,7 +232,7 @@ class ReplicationStreamer(object):
self.is_looping = False
@measure_func("repl.get_stream_updates")
def get_stream_updates(self, stream_name, token):
async def get_stream_updates(self, stream_name, token):
"""For a given stream get all updates since token. This is called when
a client first subscribes to a stream.
"""
@@ -240,7 +240,7 @@ class ReplicationStreamer(object):
if not stream:
raise Exception("unknown stream %s", stream_name)
return stream.get_updates_since(token)
return await stream.get_updates_since(token)
@measure_func("repl.federation_ack")
def federation_ack(self, token):
@@ -251,22 +251,20 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
@defer.inlineCallbacks
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
yield self.presence_handler.update_external_syncs_row(
await self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms
)
@measure_func("repl.on_remove_pusher")
@defer.inlineCallbacks
def on_remove_pusher(self, app_id, push_key, user_id):
async def on_remove_pusher(self, app_id, push_key, user_id):
"""A client has asked us to remove a pusher
"""
remove_pusher_counter.inc()
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id=app_id, pushkey=push_key, user_id=user_id
)
@@ -280,15 +278,24 @@ class ReplicationStreamer(object):
getattr(self.store, cache_func).invalidate(tuple(keys))
@measure_func("repl.on_user_ip")
@defer.inlineCallbacks
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
async def on_user_ip(
self, user_id, access_token, ip, user_agent, device_id, last_seen
):
"""The client saw a user request
"""
user_ip_cache_counter.inc()
yield self.store.insert_client_ip(
await self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen
)
yield self._server_notices_sender.on_user_ip(user_id)
await self._server_notices_sender.on_user_ip(user_id)
@measure_func("repl.on_remote_server_up")
def on_remote_server_up(self, server: str):
self.notifier.notify_remote_server_up(server)
def send_remote_server_up(self, server: str):
for conn in self.connections:
conn.send_remote_server_up(server)
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.

View File

@@ -14,12 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
from typing import Any, List, Optional
from twisted.internet import defer
import attr
logger = logging.getLogger(__name__)
@@ -67,10 +67,24 @@ PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
CachesStreamRow = namedtuple(
"CachesStreamRow",
("cache_func", "keys", "invalidation_ts"), # str # list(str) # int
)
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
@@ -104,8 +118,9 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # The name of the stream
ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
NAME = None # type: str # The name of the stream
# The type of the row. Used by the default impl of parse_row.
ROW_TYPE = None # type: Any
_LIMITED = True # Whether the update function takes a limit
@classmethod
@@ -143,8 +158,7 @@ class Stream(object):
self.upto_token = self.current_token()
self.last_token = self.upto_token
@defer.inlineCallbacks
def get_updates(self):
async def get_updates(self):
"""Gets all updates since the last time this function was called (or
since the stream was constructed if it hadn't been called before),
until the `upto_token`
@@ -155,13 +169,12 @@ class Stream(object):
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
"""
updates, current_token = yield self.get_updates_since(self.last_token)
updates, current_token = await self.get_updates_since(self.last_token)
self.last_token = current_token
return updates, current_token
@defer.inlineCallbacks
def get_updates_since(self, from_token):
async def get_updates_since(self, from_token):
"""Like get_updates except allows specifying from when we should
stream updates
@@ -181,15 +194,16 @@ class Stream(object):
if from_token == current_token:
return [], current_token
logger.info("get_updates_since: %s", self.__class__)
if self._LIMITED:
rows = yield self.update_function(
rows = await self.update_function(
from_token, current_token, limit=MAX_EVENTS_BEHIND + 1
)
# never turn more than MAX_EVENTS_BEHIND + 1 into updates.
rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
rows = yield self.update_function(from_token, current_token)
rows = await self.update_function(from_token, current_token)
updates = [(row[0], row[1:]) for row in rows]
@@ -231,8 +245,8 @@ class BackfillStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_backfill_token
self.update_function = store.get_all_new_backfill_event_rows
self.current_token = store.get_current_backfill_token # type: ignore
self.update_function = store.get_all_new_backfill_event_rows # type: ignore
super(BackfillStream, self).__init__(hs)
@@ -246,8 +260,8 @@ class PresenceStream(Stream):
store = hs.get_datastore()
presence_handler = hs.get_presence_handler()
self.current_token = store.get_current_presence_token
self.update_function = presence_handler.get_all_presence_updates
self.current_token = store.get_current_presence_token # type: ignore
self.update_function = presence_handler.get_all_presence_updates # type: ignore
super(PresenceStream, self).__init__(hs)
@@ -260,8 +274,8 @@ class TypingStream(Stream):
def __init__(self, hs):
typing_handler = hs.get_typing_handler()
self.current_token = typing_handler.get_current_token
self.update_function = typing_handler.get_all_typing_updates
self.current_token = typing_handler.get_current_token # type: ignore
self.update_function = typing_handler.get_all_typing_updates # type: ignore
super(TypingStream, self).__init__(hs)
@@ -273,8 +287,8 @@ class ReceiptsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_receipt_stream_id
self.update_function = store.get_all_updated_receipts
self.current_token = store.get_max_receipt_stream_id # type: ignore
self.update_function = store.get_all_updated_receipts # type: ignore
super(ReceiptsStream, self).__init__(hs)
@@ -294,9 +308,8 @@ class PushRulesStream(Stream):
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token
@defer.inlineCallbacks
def update_function(self, from_token, to_token, limit):
rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
async def update_function(self, from_token, to_token, limit):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
return [(row[0], row[2]) for row in rows]
@@ -310,8 +323,8 @@ class PushersStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_pushers_stream_token
self.update_function = store.get_all_updated_pushers_rows
self.current_token = store.get_pushers_stream_token # type: ignore
self.update_function = store.get_all_updated_pushers_rows # type: ignore
super(PushersStream, self).__init__(hs)
@@ -327,8 +340,8 @@ class CachesStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_cache_stream_token
self.update_function = store.get_all_updated_caches
self.current_token = store.get_cache_stream_token # type: ignore
self.update_function = store.get_all_updated_caches # type: ignore
super(CachesStream, self).__init__(hs)
@@ -343,8 +356,8 @@ class PublicRoomsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_current_public_room_stream_id
self.update_function = store.get_all_new_public_rooms
self.current_token = store.get_current_public_room_stream_id # type: ignore
self.update_function = store.get_all_new_public_rooms # type: ignore
super(PublicRoomsStream, self).__init__(hs)
@@ -360,8 +373,8 @@ class DeviceListsStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_device_stream_token
self.update_function = store.get_all_device_list_changes_for_remotes
self.current_token = store.get_device_stream_token # type: ignore
self.update_function = store.get_all_device_list_changes_for_remotes # type: ignore
super(DeviceListsStream, self).__init__(hs)
@@ -376,8 +389,8 @@ class ToDeviceStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_to_device_stream_token
self.update_function = store.get_all_new_device_messages
self.current_token = store.get_to_device_stream_token # type: ignore
self.update_function = store.get_all_new_device_messages # type: ignore
super(ToDeviceStream, self).__init__(hs)
@@ -392,8 +405,8 @@ class TagAccountDataStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_max_account_data_stream_id
self.update_function = store.get_all_updated_tags
self.current_token = store.get_max_account_data_stream_id # type: ignore
self.update_function = store.get_all_updated_tags # type: ignore
super(TagAccountDataStream, self).__init__(hs)
@@ -408,13 +421,12 @@ class AccountDataStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
self.current_token = self.store.get_max_account_data_stream_id
self.current_token = self.store.get_max_account_data_stream_id # type: ignore
super(AccountDataStream, self).__init__(hs)
@defer.inlineCallbacks
def update_function(self, from_token, to_token, limit):
global_results, room_results = yield self.store.get_all_updated_account_data(
async def update_function(self, from_token, to_token, limit):
global_results, room_results = await self.store.get_all_updated_account_data(
from_token, from_token, to_token, limit
)
@@ -434,8 +446,8 @@ class GroupServerStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_group_stream_token
self.update_function = store.get_all_groups_changes
self.current_token = store.get_group_stream_token # type: ignore
self.update_function = store.get_all_groups_changes # type: ignore
super(GroupServerStream, self).__init__(hs)
@@ -451,7 +463,7 @@ class UserSignatureStream(Stream):
def __init__(self, hs):
store = hs.get_datastore()
self.current_token = store.get_device_stream_token
self.update_function = store.get_all_user_signature_changes_for_remotes
self.current_token = store.get_device_stream_token # type: ignore
self.update_function = store.get_all_user_signature_changes_for_remotes # type: ignore
super(UserSignatureStream, self).__init__(hs)

View File

@@ -13,12 +13,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
from typing import Tuple, Type
import attr
from twisted.internet import defer
from ._base import Stream
@@ -63,7 +63,8 @@ class BaseEventsStreamRow(object):
Specifies how to identify, serialize and deserialize the different types.
"""
TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
# Unique string that ids the type. Must be overriden in sub classes.
TypeId = None # type: str
@classmethod
def from_data(cls, data):
@@ -99,9 +100,12 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
event_id = attr.ib() # str, optional
TypeToRow = {
Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow)
}
_EventRows = (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
) # type: Tuple[Type[BaseEventsStreamRow], ...]
TypeToRow = {Row.TypeId: Row for Row in _EventRows}
class EventsStream(Stream):
@@ -112,20 +116,19 @@ class EventsStream(Stream):
def __init__(self, hs):
self._store = hs.get_datastore()
self.current_token = self._store.get_current_events_token
self.current_token = self._store.get_current_events_token # type: ignore
super(EventsStream, self).__init__(hs)
@defer.inlineCallbacks
def update_function(self, from_token, current_token, limit=None):
event_rows = yield self._store.get_all_new_forward_event_rows(
async def update_function(self, from_token, current_token, limit=None):
event_rows = await self._store.get_all_new_forward_event_rows(
from_token, current_token, limit
)
event_updates = (
(row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
)
state_rows = yield self._store.get_all_updated_current_state_deltas(
state_rows = await self._store.get_all_updated_current_state_deltas(
from_token, current_token, limit
)
state_updates = (

View File

@@ -37,7 +37,7 @@ class FederationStream(Stream):
def __init__(self, hs):
federation_sender = hs.get_federation_sender()
self.current_token = federation_sender.get_current_token
self.update_function = federation_sender.get_replication_rows
self.current_token = federation_sender.get_current_token # type: ignore
self.update_function = federation_sender.get_replication_rows # type: ignore
super(FederationStream, self).__init__(hs)

View File

@@ -29,7 +29,7 @@ from synapse.rest.admin._base import (
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.rooms import ShutdownRoomRestServlet
from synapse.rest.admin.rooms import ListRoomRestServlet, ShutdownRoomRestServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import (
AccountValidityRenewServlet,
@@ -38,6 +38,7 @@ from synapse.rest.admin.users import (
SearchUsersRestServlet,
UserAdminServlet,
UserRegisterServlet,
UserRestServletV2,
UsersRestServlet,
UsersRestServletV2,
WhoisRestServlet,
@@ -106,7 +107,7 @@ class PurgeHistoryRestServlet(RestServlet):
stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
r = await self.store.get_room_event_after_stream_ordering(
r = await self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering
)
if not r:
@@ -187,10 +188,12 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
ListRoomRestServlet(hs).register(http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
UserAdminServlet(hs).register(http_server)
UserRestServletV2(hs).register(http_server)
UsersRestServletV2(hs).register(http_server)

View File

@@ -40,6 +40,21 @@ def historical_admin_path_patterns(path_regex):
)
def admin_patterns(path_regex: str):
"""Returns the list of patterns for an admin endpoint
Args:
path_regex: The regex string to match. This should NOT have a ^
as this will be prefixed.
Returns:
A list of regex patterns.
"""
admin_prefix = "^/_synapse/admin/v1"
patterns = [re.compile(admin_prefix + path_regex)]
return patterns
async def assert_requester_is_admin(auth, request):
"""Verify that the requester is an admin user

View File

@@ -32,16 +32,24 @@ class QuarantineMediaInRoom(RestServlet):
this server.
"""
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
PATTERNS = (
historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media/quarantine")
+
# This path kept around for legacy reasons
historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, room_id):
async def on_POST(self, request, room_id: str):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining room: %s", room_id)
# Quarantine all media in this room
num_quarantined = await self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
@@ -49,6 +57,60 @@ class QuarantineMediaInRoom(RestServlet):
return 200, {"num_quarantined": num_quarantined}
class QuarantineMediaByUser(RestServlet):
"""Quarantines all local media by a given user so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns(
"/user/(?P<user_id>[^/]+)/media/quarantine"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, user_id: str):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining local media by user: %s", user_id)
# Quarantine all media this user has uploaded
num_quarantined = await self.store.quarantine_media_ids_by_user(
user_id, requester.user.to_string()
)
return 200, {"num_quarantined": num_quarantined}
class QuarantineMediaByID(RestServlet):
"""Quarantines local or remote media by a given ID so that no one can download
it via this server.
"""
PATTERNS = historical_admin_path_patterns(
"/media/quarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
)
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
async def on_POST(self, request, server_name: str, media_id: str):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
logging.info("Quarantining local media by ID: %s/%s", server_name, media_id)
# Quarantine this media id
await self.store.quarantine_media_by_id(
server_name, media_id, requester.user.to_string()
)
return 200, {}
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
@@ -94,4 +156,6 @@ def register_servlets_for_media_repo(hs, http_server):
"""
PurgeMediaCacheRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
QuarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)

View File

@@ -15,15 +15,20 @@
import logging
from synapse.api.constants import Membership
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_integer,
parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import (
admin_patterns,
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.storage.data_stores.main.room import RoomSortOrder
from synapse.types import create_requester
from synapse.util.async_helpers import maybe_awaitable
@@ -155,3 +160,80 @@ class ShutdownRoomRestServlet(RestServlet):
"new_room_id": new_room_id,
},
)
class ListRoomRestServlet(RestServlet):
"""
List all rooms that are known to the homeserver. Results are returned
in a dictionary containing room information. Supports pagination.
"""
PATTERNS = admin_patterns("/rooms")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
# Extract query parameters
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
order_by = parse_string(request, "order_by", default="alphabetical")
if order_by not in (
RoomSortOrder.ALPHABETICAL.value,
RoomSortOrder.SIZE.value,
):
raise SynapseError(
400,
"Unknown value for order_by: %s" % (order_by,),
errcode=Codes.INVALID_PARAM,
)
search_term = parse_string(request, "search_term")
if search_term == "":
raise SynapseError(
400,
"search_term cannot be an empty string",
errcode=Codes.INVALID_PARAM,
)
direction = parse_string(request, "dir", default="f")
if direction not in ("f", "b"):
raise SynapseError(
400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM
)
reverse_order = True if direction == "b" else False
# Return list of rooms according to parameters
rooms, total_rooms = await self.store.get_rooms_paginate(
start, limit, order_by, reverse_order, search_term
)
response = {
# next_token should be opaque, so return a value the client can parse
"offset": start,
"rooms": rooms,
"total_rooms": total_rooms,
}
# Are there more rooms to paginate through after this?
if (start + limit) < total_rooms:
# There are. Calculate where the query should start from next time
# to get the next part of the list
response["next_batch"] = start + limit
# Is it possible to paginate backwards? Check if we currently have an
# offset
if start > 0:
if start > limit:
# Going back one iteration won't take us to the start.
# Calculate new offset
response["prev_batch"] = start - limit
else:
response["prev_batch"] = 0
return 200, response

View File

@@ -45,6 +45,7 @@ class UsersRestServlet(RestServlet):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
@@ -55,7 +56,7 @@ class UsersRestServlet(RestServlet):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
ret = await self.admin_handler.get_users()
ret = await self.store.get_users()
return 200, ret
@@ -80,6 +81,7 @@ class UsersRestServletV2(RestServlet):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
@@ -92,7 +94,7 @@ class UsersRestServletV2(RestServlet):
guests = parse_boolean(request, "guests", default=True)
deactivated = parse_boolean(request, "deactivated", default=False)
users = await self.admin_handler.get_users_paginate(
users = await self.store.get_users_paginate(
start, limit, user_id, guests, deactivated
)
ret = {"users": users}
@@ -102,6 +104,148 @@ class UsersRestServletV2(RestServlet):
return 200, ret
class UserRestServletV2(RestServlet):
PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>@[^/]+)$"),)
"""Get request to list user details.
This needs user to have administrator access in Synapse.
GET /_synapse/admin/v2/users/<user_id>
returns:
200 OK with user details if success otherwise an error.
Put request to allow an administrator to add or modify a user.
This needs user to have administrator access in Synapse.
We use PUT instead of POST since we already know the id of the user
object to create. POST could be used to create guests.
PUT /_synapse/admin/v2/users/<user_id>
{
"password": "secret",
"displayname": "User"
}
returns:
201 OK with new user object if user was created or
200 OK with modified user object if user was modified
otherwise an error.
"""
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.admin_handler = hs.get_handlers().admin_handler
self.profile_handler = hs.get_profile_handler()
self.set_password_handler = hs.get_set_password_handler()
self.deactivate_account_handler = hs.get_deactivate_account_handler()
self.registration_handler = hs.get_registration_handler()
async def on_GET(self, request, user_id):
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(user_id)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only lookup local users")
ret = await self.admin_handler.get_user(target_user)
return 200, ret
async def on_PUT(self, request, user_id):
await assert_requester_is_admin(self.auth, request)
target_user = UserID.from_string(user_id)
body = parse_json_object_from_request(request)
if not self.hs.is_mine(target_user):
raise SynapseError(400, "This endpoint can only be used with local users")
user = await self.admin_handler.get_user(target_user)
if user: # modify user
requester = await self.auth.get_user_by_req(request)
if "displayname" in body:
await self.profile_handler.set_displayname(
target_user, requester, body["displayname"], True
)
if "avatar_url" in body:
await self.profile_handler.set_avatar_url(
target_user, requester, body["avatar_url"], True
)
if "admin" in body:
set_admin_to = bool(body["admin"])
if set_admin_to != user["admin"]:
auth_user = requester.user
if target_user == auth_user and not set_admin_to:
raise SynapseError(400, "You may not demote yourself.")
await self.admin_handler.set_user_server_admin(
target_user, set_admin_to
)
if "password" in body:
if (
not isinstance(body["password"], text_type)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
else:
new_password = body["password"]
await self.set_password_handler.set_password(
target_user.to_string(), new_password, requester
)
if "deactivated" in body:
deactivate = bool(body["deactivated"])
if deactivate and not user["deactivated"]:
result = await self.deactivate_account_handler.deactivate_account(
target_user.to_string(), False
)
if not result:
raise SynapseError(500, "Could not deactivate user")
user = await self.admin_handler.get_user(target_user)
return 200, user
else: # create user
if "password" not in body:
raise SynapseError(
400, "password must be specified", errcode=Codes.BAD_JSON
)
elif (
not isinstance(body["password"], text_type)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
admin = body.get("admin", None)
user_type = body.get("user_type", None)
displayname = body.get("displayname", None)
if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
raise SynapseError(400, "Invalid user type")
user_id = await self.registration_handler.register_user(
localpart=target_user.localpart,
password=body["password"],
admin=bool(admin),
default_display_name=displayname,
user_type=user_type,
)
if "avatar_url" in body:
await self.profile_handler.set_avatar_url(
user_id, requester, body["avatar_url"], True
)
ret = await self.admin_handler.get_user(target_user)
return 201, ret
class UserRegisterServlet(RestServlet):
"""
Attributes:
@@ -196,21 +340,22 @@ class UserRegisterServlet(RestServlet):
got_mac = body["mac"]
want_mac = hmac.new(
want_mac_builder = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce.encode("utf8"))
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac_builder.update(nonce.encode("utf8"))
want_mac_builder.update(b"\x00")
want_mac_builder.update(username)
want_mac_builder.update(b"\x00")
want_mac_builder.update(password)
want_mac_builder.update(b"\x00")
want_mac_builder.update(b"admin" if admin else b"notadmin")
if user_type:
want_mac.update(b"\x00")
want_mac.update(user_type.encode("utf8"))
want_mac = want_mac.hexdigest()
want_mac_builder.update(b"\x00")
want_mac_builder.update(user_type.encode("utf8"))
want_mac = want_mac_builder.hexdigest()
if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
raise SynapseError(403, "HMAC incorrect")
@@ -373,8 +518,8 @@ class SearchUsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
@@ -397,7 +542,7 @@ class SearchUsersRestServlet(RestServlet):
term = parse_string(request, "term", required=True)
logger.info("term: %s ", term)
ret = await self.handlers.admin_handler.search_users(term)
ret = await self.handlers.store.search_users(term)
return 200, ret
@@ -431,8 +576,8 @@ class UserAdminServlet(RestServlet):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
async def on_GET(self, request, user_id):
await assert_requester_is_admin(self.auth, request)
@@ -442,8 +587,7 @@ class UserAdminServlet(RestServlet):
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Only local users can be admins of this homeserver")
is_admin = await self.handlers.admin_handler.get_user_server_admin(target_user)
is_admin = bool(is_admin)
is_admin = await self.store.is_server_admin(target_user)
return 200, {"admin": is_admin}
@@ -466,8 +610,6 @@ class UserAdminServlet(RestServlet):
if target_user == auth_user and not set_admin_to:
raise SynapseError(400, "You may not demote yourself.")
await self.handlers.admin_handler.set_user_server_admin(
target_user, set_admin_to
)
await self.store.set_user_server_admin(target_user, set_admin_to)
return 200, {}

View File

@@ -70,7 +70,6 @@ class EventStreamRestServlet(RestServlet):
return 200, {}
# TODO: Unit test gets, with and without auth, with different kinds of events.
class EventRestServlet(RestServlet):
PATTERNS = client_patterns("/events/(?P<event_id>[^/]*)$", v1=True)
@@ -78,6 +77,7 @@ class EventRestServlet(RestServlet):
super(EventRestServlet, self).__init__()
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()
self._event_serializer = hs.get_event_client_serializer()
async def on_GET(self, request, event_id):

View File

@@ -514,7 +514,7 @@ class CasTicketServlet(RestServlet):
if user is None:
raise Exception("CAS response does not contain user")
except Exception:
logger.error("Error parsing CAS response", exc_info=1)
logger.exception("Error parsing CAS response")
raise LoginError(401, "Invalid CAS response", errcode=Codes.UNAUTHORIZED)
if not success:
raise LoginError(

View File

@@ -16,6 +16,7 @@
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
import logging
from typing import List, Optional
from six.moves.urllib import parse as urlparse
@@ -207,7 +208,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
requester, event_dict, txn_id=txn_id
)
ret = {}
ret = {} # type: dict
if event:
set_tag("event_id", event.event_id)
ret = {"event_id": event.event_id}
@@ -285,7 +286,7 @@ class JoinRoomAliasServlet(TransactionRestServlet):
try:
remote_room_hosts = [
x.decode("ascii") for x in request.args[b"server_name"]
]
] # type: Optional[List[str]]
except Exception:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
@@ -375,7 +376,7 @@ class PublicRoomListRestServlet(TransactionRestServlet):
server = parse_string(request, "server", default=None)
content = parse_json_object_from_request(request)
limit = int(content.get("limit", 100))
limit = int(content.get("limit", 100)) # type: Optional[int]
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -504,11 +505,16 @@ class RoomMessageListRestServlet(RestServlet):
filter_bytes = parse_string(request, b"filter", encoding=None)
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
event_filter = Filter(json.loads(filter_json))
if event_filter.filter_json.get("event_format", "client") == "federation":
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None
msgs = await self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
@@ -611,7 +617,7 @@ class RoomEventContextServlet(RestServlet):
filter_bytes = parse_string(request, "filter")
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes)
event_filter = Filter(json.loads(filter_json))
event_filter = Filter(json.loads(filter_json)) # type: Optional[Filter]
else:
event_filter = None

View File

@@ -32,7 +32,7 @@ def client_patterns(path_regex, releases=(0,), unstable=True, v1=False):
Args:
path_regex (str): The regex string to match. This should NOT have a ^
as this will be prefixed.
as this will be prefixed.
Returns:
SRE_Pattern
"""

View File

@@ -206,10 +206,6 @@ class AuthRestServlet(RestServlet):
return None
elif stagetype == LoginType.TERMS:
if ("session" not in request.args or len(request.args["session"])) == 0:
raise SynapseError(400, "No session supplied")
session = request.args["session"][0]
authdict = {"session": session}
success = await self.auth_handler.add_oob_auth(

View File

@@ -21,6 +21,7 @@ from typing import List, Union
from six import string_types
import synapse
import synapse.api.auth
import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import (
@@ -405,7 +406,7 @@ class RegisterRestServlet(RestServlet):
return ret
elif kind != b"user":
raise UnrecognizedRequestError(
"Do not understand membership kind: %s" % (kind,)
"Do not understand membership kind: %s" % (kind.decode("utf8"),)
)
# we do basic sanity checks here because the auth layer will store these

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Tuple
from synapse.http import servlet
from synapse.http.servlet import parse_json_object_from_request
@@ -60,7 +61,7 @@ class SendToDeviceRestServlet(servlet.RestServlet):
sender_user_id, message_type, content["messages"]
)
response = (200, {})
response = (200, {}) # type: Tuple[int, dict]
return response

View File

@@ -71,6 +71,8 @@ class VersionsRestServlet(RestServlet):
# Implements support for label-based filtering as described in
# MSC2326.
"org.matrix.label_based_filtering": True,
# Implements support for cross signing as described in MSC1756
"org.matrix.e2e_cross_signing": True,
},
},
)

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import logging
from typing import Dict, Set
from canonicaljson import encode_canonical_json, json
from signedjson.sign import sign_json
@@ -103,7 +104,7 @@ class RemoteKey(DirectServeResource):
async def _async_render_GET(self, request):
if len(request.postpath) == 1:
(server,) = request.postpath
query = {server.decode("ascii"): {}}
query = {server.decode("ascii"): {}} # type: dict
elif len(request.postpath) == 2:
server, key_id = request.postpath
minimum_valid_until_ts = parse_integer(request, "minimum_valid_until_ts")
@@ -148,7 +149,7 @@ class RemoteKey(DirectServeResource):
time_now_ms = self.clock.time_msec()
cache_misses = dict()
cache_misses = dict() # type: Dict[str, Set[str]]
for (server_name, key_id, from_server), results in cached.items():
results = [(result["ts_added_ms"], result) for result in results]

View File

@@ -18,6 +18,7 @@ import errno
import logging
import os
import shutil
from typing import Dict, Tuple
from six import iteritems
@@ -605,7 +606,7 @@ class MediaRepository(object):
# We deduplicate the thumbnail sizes by ignoring the cropped versions if
# they have the same dimensions of a scaled one.
thumbnails = {}
thumbnails = {} # type: Dict[Tuple[int, int, str], str]
for r_width, r_height, r_method, r_type in requirements:
if r_method == "crop":
thumbnails.setdefault((r_width, r_height, r_type), r_method)

View File

@@ -23,6 +23,7 @@ import re
import shutil
import sys
import traceback
from typing import Dict, Optional
import six
from six import string_types
@@ -237,8 +238,8 @@ class PreviewUrlResource(DirectServeResource):
# If we don't find a match, we'll look at the HTTP Content-Type, and
# if that doesn't exist, we'll fall back to UTF-8.
if not encoding:
match = _content_type_match.match(media_info["media_type"])
encoding = match.group(1) if match else "utf-8"
content_match = _content_type_match.match(media_info["media_type"])
encoding = content_match.group(1) if content_match else "utf-8"
og = decode_and_calc_og(body, media_info["uri"], encoding)
@@ -518,7 +519,7 @@ def _calc_og(tree, media_uri):
# "og:video:height" : "720",
# "og:video:secure_url": "https://www.youtube.com/v/LXDBoHyjmtw?version=3",
og = {}
og = {} # type: Dict[str, Optional[str]]
for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
if "content" in tag.attrib:
# if we've got more than 50 tags, someone is taking the piss

View File

@@ -296,8 +296,8 @@ class ThumbnailResource(DirectServeResource):
d_h = desired_height
if desired_method.lower() == "crop":
info_list = []
info_list2 = []
crop_info_list = []
crop_info_list2 = []
for info in thumbnail_infos:
t_w = info["thumbnail_width"]
t_h = info["thumbnail_height"]
@@ -309,7 +309,7 @@ class ThumbnailResource(DirectServeResource):
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
if t_w >= d_w or t_h >= d_h:
info_list.append(
crop_info_list.append(
(
aspect_quality,
min_quality,
@@ -320,7 +320,7 @@ class ThumbnailResource(DirectServeResource):
)
)
else:
info_list2.append(
crop_info_list2.append(
(
aspect_quality,
min_quality,
@@ -330,10 +330,10 @@ class ThumbnailResource(DirectServeResource):
info,
)
)
if info_list:
return min(info_list)[-1]
if crop_info_list:
return min(crop_info_list)[-1]
else:
return min(info_list2)[-1]
return min(crop_info_list2)[-1]
else:
info_list = []
info_list2 = []

View File

@@ -1,3 +1,5 @@
import twisted.internet
import synapse.api.auth
import synapse.config.homeserver
import synapse.federation.sender
@@ -9,10 +11,12 @@ import synapse.handlers.deactivate_account
import synapse.handlers.device
import synapse.handlers.e2e_keys
import synapse.handlers.message
import synapse.handlers.presence
import synapse.handlers.room
import synapse.handlers.room_member
import synapse.handlers.set_password
import synapse.http.client
import synapse.notifier
import synapse.rest.media.v1.media_repository
import synapse.server_notices.server_notices_manager
import synapse.server_notices.server_notices_sender
@@ -85,3 +89,11 @@ class HomeServer(object):
self,
) -> synapse.server_notices.server_notices_sender.ServerNoticesSender:
pass
def get_notifier(self) -> synapse.notifier.Notifier:
pass
def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
pass
def get_clock(self) -> synapse.util.Clock:
pass
def get_reactor(self) -> twisted.internet.base.ReactorBase:
pass

Some files were not shown because too many files have changed in this diff Show More