1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Erik Johnston
b1e498d6e1 Handle prefill 2019-11-22 18:39:26 +00:00
Erik Johnston
9cca5ee743 Track cache hit ratios 2019-11-22 18:19:55 +00:00
319 changed files with 7873 additions and 12698 deletions

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -e
set -ex
if [[ "$BUILDKITE_BRANCH" =~ ^(develop|master|dinsic|shhs|release-.*)$ ]]; then
echo "Not merging forward, as this is a release branch"
@@ -18,8 +18,6 @@ else
GITBASE=$BUILDKITE_PULL_REQUEST_BASE_BRANCH
fi
echo "--- merge_base_branch $GITBASE"
# Show what we are before
git --no-pager show -s

View File

@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the script to connect to the postgresql database that will be available in the
# CI's Docker setup at the point where this file is considered.
server_name: "localhost:8800"
server_name: "test"
signing_key_path: "/src/.buildkite/test.signing.key"

View File

@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the 'update_database' script to connect to the test SQLite database to upgrade its
# schema and run background updates on it.
server_name: "localhost:8800"
server_name: "test"
signing_key_path: "/src/.buildkite/test.signing.key"

View File

@@ -28,14 +28,3 @@ User sees updates to presence from other users in the incremental sync.
Gapped incremental syncs include all state changes
Old members are included in gappy incr LL sync if they start speaking
# new failures as of https://github.com/matrix-org/sytest/pull/732
Device list doesn't change if remote server is down
Remote servers cannot set power levels in rooms without existing powerlevels
Remote servers should reject attempts by non-creators to set the power levels
# https://buildkite.com/matrix-dot-org/synapse/builds/6134#6f67bf47-e234-474d-80e8-c6e1868b15c5
Server correctly handles incoming m.device_list_update
# this fails reliably with a torture level of 100 due to https://github.com/matrix-org/synapse/issues/6536
Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state

View File

@@ -1,8 +1,8 @@
### Pull Request Checklist
<!-- Please read CONTRIBUTING.md before submitting your pull request -->
<!-- Please read CONTRIBUTING.rst before submitting your pull request -->
* [ ] Pull request is based on the develop branch
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog)
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style))
* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#changelog)
* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#sign-off)
* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.rst#code-style))

View File

@@ -46,6 +46,3 @@ Joseph Weston <joseph at weston.cloud>
Benjamin Saunders <ben.e.saunders at gmail dot com>
* Documentation improvements
Werner Sembach <werner.sembach at fau dot de>
* Automatically remove a group/community when it is empty

View File

@@ -1,251 +1,3 @@
Synapse 1.8.0 (2020-01-09)
==========================
Bugfixes
--------
- Fix `GET` request on `/_synapse/admin/v2/users` endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#6563](https://github.com/matrix-org/synapse/issues/6563))
- Fix incorrect signing of responses from the key server implementation. ([\#6657](https://github.com/matrix-org/synapse/issues/6657))
Synapse 1.8.0rc1 (2020-01-07)
=============================
Features
--------
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))
- Add experimental config option to specify multiple databases. ([\#6580](https://github.com/matrix-org/synapse/issues/6580))
- Raise an error if someone tries to use the `log_file` config option. ([\#6626](https://github.com/matrix-org/synapse/issues/6626))
Bugfixes
--------
- Prevent redacted events from being returned during message search. ([\#6377](https://github.com/matrix-org/synapse/issues/6377), [\#6522](https://github.com/matrix-org/synapse/issues/6522))
- Prevent error on trying to search a upgraded room when the server is not in the predecessor room. ([\#6385](https://github.com/matrix-org/synapse/issues/6385))
- Improve performance of looking up cross-signing keys. ([\#6486](https://github.com/matrix-org/synapse/issues/6486))
- Fix race which occasionally caused deleted devices to reappear. ([\#6514](https://github.com/matrix-org/synapse/issues/6514))
- Fix missing row in `device_max_stream_id` that could cause unable to decrypt errors after server restart. ([\#6555](https://github.com/matrix-org/synapse/issues/6555))
- Fix a bug which meant that we did not send systemd notifications on startup if acme was enabled. ([\#6571](https://github.com/matrix-org/synapse/issues/6571))
- Fix exception when fetching the `matrix.org:ed25519:auto` key. ([\#6625](https://github.com/matrix-org/synapse/issues/6625))
- Fix bug where a moderator upgraded a room and became an admin in the new room. ([\#6633](https://github.com/matrix-org/synapse/issues/6633))
- Fix an error which was thrown by the `PresenceHandler` `_on_shutdown` handler. ([\#6640](https://github.com/matrix-org/synapse/issues/6640))
- Fix exceptions in the synchrotron worker log when events are rejected. ([\#6645](https://github.com/matrix-org/synapse/issues/6645))
- Ensure that upgraded rooms are removed from the directory. ([\#6648](https://github.com/matrix-org/synapse/issues/6648))
- Fix a bug causing Synapse not to fetch missing events when it believes it has every event in the room. ([\#6652](https://github.com/matrix-org/synapse/issues/6652))
Improved Documentation
----------------------
- Document the Room Shutdown Admin API. ([\#6541](https://github.com/matrix-org/synapse/issues/6541))
- Reword sections of [docs/federate.md](docs/federate.md) that explained delegation at time of Synapse 1.0 transition. ([\#6601](https://github.com/matrix-org/synapse/issues/6601))
- Added the section 'Configuration' in [docs/turn-howto.md](docs/turn-howto.md). ([\#6614](https://github.com/matrix-org/synapse/issues/6614))
Deprecations and Removals
-------------------------
- Remove redundant code from event authorisation implementation. ([\#6502](https://github.com/matrix-org/synapse/issues/6502))
- Remove unused, undocumented `/_matrix/content` API. ([\#6628](https://github.com/matrix-org/synapse/issues/6628))
Internal Changes
----------------
- Add *experimental* support for multiple physical databases and split out state storage to separate data store. ([\#6245](https://github.com/matrix-org/synapse/issues/6245), [\#6510](https://github.com/matrix-org/synapse/issues/6510), [\#6511](https://github.com/matrix-org/synapse/issues/6511), [\#6513](https://github.com/matrix-org/synapse/issues/6513), [\#6564](https://github.com/matrix-org/synapse/issues/6564), [\#6565](https://github.com/matrix-org/synapse/issues/6565))
- Port sections of code base to async/await. ([\#6496](https://github.com/matrix-org/synapse/issues/6496), [\#6504](https://github.com/matrix-org/synapse/issues/6504), [\#6505](https://github.com/matrix-org/synapse/issues/6505), [\#6517](https://github.com/matrix-org/synapse/issues/6517), [\#6559](https://github.com/matrix-org/synapse/issues/6559), [\#6647](https://github.com/matrix-org/synapse/issues/6647), [\#6653](https://github.com/matrix-org/synapse/issues/6653))
- Remove `SnapshotCache` in favour of `ResponseCache`. ([\#6506](https://github.com/matrix-org/synapse/issues/6506))
- Silence mypy errors for files outside those specified. ([\#6512](https://github.com/matrix-org/synapse/issues/6512))
- Clean up some logging when handling incoming events over federation. ([\#6515](https://github.com/matrix-org/synapse/issues/6515))
- Test more folders against mypy. ([\#6534](https://github.com/matrix-org/synapse/issues/6534))
- Update `mypy` to new version. ([\#6537](https://github.com/matrix-org/synapse/issues/6537))
- Adjust the sytest blacklist for worker mode. ([\#6538](https://github.com/matrix-org/synapse/issues/6538))
- Remove unused `get_pagination_rows` methods from `EventSource` classes. ([\#6557](https://github.com/matrix-org/synapse/issues/6557))
- Clean up logs from the push notifier at startup. ([\#6558](https://github.com/matrix-org/synapse/issues/6558))
- Improve diagnostics on database upgrade failure. ([\#6570](https://github.com/matrix-org/synapse/issues/6570))
- Reduce the reconnect time when worker replication fails, to make it easier to catch up. ([\#6617](https://github.com/matrix-org/synapse/issues/6617))
- Simplify http handling by removing redundant `SynapseRequestFactory`. ([\#6619](https://github.com/matrix-org/synapse/issues/6619))
- Add a workaround for synapse raising exceptions when fetching the notary's own key from the notary. ([\#6620](https://github.com/matrix-org/synapse/issues/6620))
- Automate generation of the sample log config. ([\#6627](https://github.com/matrix-org/synapse/issues/6627))
- Simplify event creation code by removing redundant queries on the `event_reference_hashes` table. ([\#6629](https://github.com/matrix-org/synapse/issues/6629))
- Fix errors when `frozen_dicts` are enabled. ([\#6642](https://github.com/matrix-org/synapse/issues/6642))
Synapse 1.7.3 (2019-12-31)
==========================
This release fixes a long-standing bug in the state resolution algorithm.
Bugfixes
--------
- Fix exceptions caused by state resolution choking on malformed events. ([\#6608](https://github.com/matrix-org/synapse/issues/6608))
Synapse 1.7.2 (2019-12-20)
==========================
This release fixes some regressions introduced in Synapse 1.7.0 and 1.7.1.
Bugfixes
--------
- Fix a regression introduced in Synapse 1.7.1 which caused errors when attempting to backfill rooms over federation. ([\#6576](https://github.com/matrix-org/synapse/issues/6576))
- Fix a bug introduced in Synapse 1.7.0 which caused an error on startup when upgrading from versions before 1.3.0. ([\#6578](https://github.com/matrix-org/synapse/issues/6578))
Synapse 1.7.1 (2019-12-18)
==========================
This release includes several security fixes as well as a fix to a bug exposed by the security fixes. Administrators are encouraged to upgrade as soon as possible.
Security updates
----------------
- Fix a bug which could cause room events to be incorrectly authorized using events from a different room. ([\#6501](https://github.com/matrix-org/synapse/issues/6501), [\#6503](https://github.com/matrix-org/synapse/issues/6503), [\#6521](https://github.com/matrix-org/synapse/issues/6521), [\#6524](https://github.com/matrix-org/synapse/issues/6524), [\#6530](https://github.com/matrix-org/synapse/issues/6530), [\#6531](https://github.com/matrix-org/synapse/issues/6531))
- Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event. ([\#6553](https://github.com/matrix-org/synapse/issues/6553))
- Fix a cause of state resets in room versions 2 onwards. ([\#6556](https://github.com/matrix-org/synapse/issues/6556), [\#6560](https://github.com/matrix-org/synapse/issues/6560))
Bugfixes
--------
- Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. ([\#6526](https://github.com/matrix-org/synapse/issues/6526), [\#6527](https://github.com/matrix-org/synapse/issues/6527))
Synapse 1.7.0 (2019-12-13)
==========================
This release changes the default settings so that only local authenticated users can query the server's room directory. See the [upgrade notes](UPGRADE.rst#upgrading-to-v170) for details.
Support for SQLite versions before 3.11 is now deprecated. A future release will refuse to start if used with an SQLite version before 3.11.
Administrators are reminded that SQLite should not be used for production instances. Instructions for migrating to Postgres are available [here](docs/postgres.md). A future release of synapse will, by default, disable federation for servers using SQLite.
No significant changes since 1.7.0rc2.
Synapse 1.7.0rc2 (2019-12-11)
=============================
Bugfixes
--------
- Fix incorrect error message for invalid requests when setting user's avatar URL. ([\#6497](https://github.com/matrix-org/synapse/issues/6497))
- Fix support for SQLite 3.7. ([\#6499](https://github.com/matrix-org/synapse/issues/6499))
- Fix regression where sending email push would not work when using a pusher worker. ([\#6507](https://github.com/matrix-org/synapse/issues/6507), [\#6509](https://github.com/matrix-org/synapse/issues/6509))
Synapse 1.7.0rc1 (2019-12-09)
=============================
Features
--------
- Implement per-room message retention policies. ([\#5815](https://github.com/matrix-org/synapse/issues/5815), [\#6436](https://github.com/matrix-org/synapse/issues/6436))
- Add etag and count fields to key backup endpoints to help clients guess if there are new keys. ([\#5858](https://github.com/matrix-org/synapse/issues/5858))
- Add `/admin/v2/users` endpoint with pagination. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Require User-Interactive Authentication for `/account/3pid/add`, meaning the user's password will be required to add a third-party ID to their account. ([\#6119](https://github.com/matrix-org/synapse/issues/6119))
- Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314. ([\#6176](https://github.com/matrix-org/synapse/issues/6176))
- Configure privacy-preserving settings by default for the room directory. ([\#6355](https://github.com/matrix-org/synapse/issues/6355))
- Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). ([\#6409](https://github.com/matrix-org/synapse/issues/6409))
- Add support for [MSC 2367](https://github.com/matrix-org/matrix-doc/pull/2367), which allows specifying a reason on all membership events. ([\#6434](https://github.com/matrix-org/synapse/issues/6434))
Bugfixes
--------
- Transfer non-standard power levels on room upgrade. ([\#6237](https://github.com/matrix-org/synapse/issues/6237))
- Fix error from the Pillow library when uploading RGBA images. ([\#6241](https://github.com/matrix-org/synapse/issues/6241))
- Correctly apply the event filter to the `state`, `events_before` and `events_after` fields in the response to `/context` requests. ([\#6329](https://github.com/matrix-org/synapse/issues/6329))
- Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices. ([\#6332](https://github.com/matrix-org/synapse/issues/6332))
- Prevent account data syncs getting lost across TCP replication. ([\#6333](https://github.com/matrix-org/synapse/issues/6333))
- Fix bug: TypeError in `register_user()` while using LDAP auth module. ([\#6406](https://github.com/matrix-org/synapse/issues/6406))
- Fix an intermittent exception when handling read-receipts. ([\#6408](https://github.com/matrix-org/synapse/issues/6408))
- Fix broken guest registration when there are existing blocks of numeric user IDs. ([\#6420](https://github.com/matrix-org/synapse/issues/6420))
- Fix startup error when http proxy is defined. ([\#6421](https://github.com/matrix-org/synapse/issues/6421))
- Fix error when using synapse_port_db on a vanilla synapse db. ([\#6449](https://github.com/matrix-org/synapse/issues/6449))
- Fix uploading multiple cross signing signatures for the same user. ([\#6451](https://github.com/matrix-org/synapse/issues/6451))
- Fix bug which lead to exceptions being thrown in a loop when a cross-signed device is deleted. ([\#6462](https://github.com/matrix-org/synapse/issues/6462))
- Fix `synapse_port_db` not exiting with a 0 code if something went wrong during the port process. ([\#6470](https://github.com/matrix-org/synapse/issues/6470))
- Improve sanity-checking when receiving events over federation. ([\#6472](https://github.com/matrix-org/synapse/issues/6472))
- Fix inaccurate per-block Prometheus metrics. ([\#6491](https://github.com/matrix-org/synapse/issues/6491))
- Fix small performance regression for sending invites. ([\#6493](https://github.com/matrix-org/synapse/issues/6493))
- Back out cross-signing code added in Synapse 1.5.0, which caused a performance regression. ([\#6494](https://github.com/matrix-org/synapse/issues/6494))
Improved Documentation
----------------------
- Update documentation and variables in user contributed systemd reference file. ([\#6369](https://github.com/matrix-org/synapse/issues/6369), [\#6490](https://github.com/matrix-org/synapse/issues/6490))
- Fix link in the user directory documentation. ([\#6388](https://github.com/matrix-org/synapse/issues/6388))
- Add build instructions to the docker readme. ([\#6390](https://github.com/matrix-org/synapse/issues/6390))
- Switch Ubuntu package install recommendation to use python3 packages in INSTALL.md. ([\#6443](https://github.com/matrix-org/synapse/issues/6443))
- Write some docs for the quarantine_media api. ([\#6458](https://github.com/matrix-org/synapse/issues/6458))
- Convert CONTRIBUTING.rst to markdown (among other small fixes). ([\#6461](https://github.com/matrix-org/synapse/issues/6461))
Deprecations and Removals
-------------------------
- Remove admin/v1/users_paginate endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5925](https://github.com/matrix-org/synapse/issues/5925))
- Remove fallback for federation with old servers which lack the /federation/v1/state_ids API. ([\#6488](https://github.com/matrix-org/synapse/issues/6488))
Internal Changes
----------------
- Add benchmarks for structured logging and improve output performance. ([\#6266](https://github.com/matrix-org/synapse/issues/6266))
- Improve the performance of outputting structured logging. ([\#6322](https://github.com/matrix-org/synapse/issues/6322))
- Refactor some code in the event authentication path for clarity. ([\#6343](https://github.com/matrix-org/synapse/issues/6343), [\#6468](https://github.com/matrix-org/synapse/issues/6468), [\#6480](https://github.com/matrix-org/synapse/issues/6480))
- Clean up some unnecessary quotation marks around the codebase. ([\#6362](https://github.com/matrix-org/synapse/issues/6362))
- Complain on startup instead of 500'ing during runtime when `public_baseurl` isn't set when necessary. ([\#6379](https://github.com/matrix-org/synapse/issues/6379))
- Add a test scenario to make sure room history purges don't break `/messages` in the future. ([\#6392](https://github.com/matrix-org/synapse/issues/6392))
- Clarifications for the email configuration settings. ([\#6423](https://github.com/matrix-org/synapse/issues/6423))
- Add more tests to the blacklist when running in worker mode. ([\#6429](https://github.com/matrix-org/synapse/issues/6429))
- Refactor data store layer to support multiple databases in the future. ([\#6454](https://github.com/matrix-org/synapse/issues/6454), [\#6464](https://github.com/matrix-org/synapse/issues/6464), [\#6469](https://github.com/matrix-org/synapse/issues/6469), [\#6487](https://github.com/matrix-org/synapse/issues/6487))
- Port synapse.rest.client.v1 to async/await. ([\#6482](https://github.com/matrix-org/synapse/issues/6482))
- Port synapse.rest.client.v2_alpha to async/await. ([\#6483](https://github.com/matrix-org/synapse/issues/6483))
- Port SyncHandler to async/await. ([\#6484](https://github.com/matrix-org/synapse/issues/6484))
Synapse 1.6.1 (2019-11-28)
==========================
Security updates
----------------
This release includes a security fix ([\#6426](https://github.com/matrix-org/synapse/issues/6426), below). Administrators are encouraged to upgrade as soon as possible.
Bugfixes
--------
- Clean up local threepids from user on account deactivation. ([\#6426](https://github.com/matrix-org/synapse/issues/6426))
- Fix startup error when http proxy is defined. ([\#6421](https://github.com/matrix-org/synapse/issues/6421))
Synapse 1.6.0 (2019-11-26)
==========================
Bugfixes
--------
- Fix phone home stats reporting. ([\#6418](https://github.com/matrix-org/synapse/issues/6418))
Synapse 1.6.0rc2 (2019-11-25)
=============================
Bugfixes
--------
- Fix a bug which could cause the background database update hander for event labels to get stuck in a loop raising exceptions. ([\#6407](https://github.com/matrix-org/synapse/issues/6407))
Synapse 1.6.0rc1 (2019-11-20)
=============================

View File

@@ -1,210 +0,0 @@
# Contributing code to Matrix
Everyone is welcome to contribute code to Matrix
(https://github.com/matrix-org), provided that they are willing to license
their contributions under the same license as the project itself. We follow a
simple 'inbound=outbound' model for contributions: the act of submitting an
'inbound' contribution means that the contributor agrees to license the code
under the same terms as the project's overall 'outbound' license - in our
case, this is almost always Apache Software License v2 (see [LICENSE](LICENSE)).
## How to contribute
The preferred and easiest way to contribute changes to Matrix is to fork the
relevant project on github, and then [create a pull request](
https://help.github.com/articles/using-pull-requests/) to ask us to pull
your changes into our repo.
**The single biggest thing you need to know is: please base your changes on
the develop branch - *not* master.**
We use the master branch to track the most recent release, so that folks who
blindly clone the repo and automatically check out master get something that
works. Develop is the unstable branch where all the development actually
happens: the workflow is that contributors should fork the develop branch to
make a 'feature' branch for a particular contribution, and then make a pull
request to merge this back into the matrix.org 'official' develop branch. We
use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use [Buildkite](https://buildkite.com/matrix-dot-org/synapse) for continuous
integration. If your change breaks the build, this will be shown in GitHub, so
please keep an eye on the pull request for feedback.
To run unit tests in a local development environment, you can use:
- ``tox -e py35`` (requires tox to be installed by ``pip install tox``)
for SQLite-backed Synapse on Python 3.5.
- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6.
- ``tox -e py36-postgres`` for PostgreSQL-backed Synapse on Python 3.6
(requires a running local PostgreSQL with access to create databases).
- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 3.5
(requires Docker). Entirely self-contained, recommended if you don't want to
set up PostgreSQL yourself.
Docker images are available for running the integration tests (SyTest) locally,
see the [documentation in the SyTest repo](
https://github.com/matrix-org/sytest/blob/develop/docker/README.md) for more
information.
## Code style
All Matrix projects have a well-defined code-style - and sometimes we've even
got as far as documenting it... For instance, synapse's code style doc lives
[here](docs/code_style.md).
To facilitate meeting these criteria you can run `scripts-dev/lint.sh`
locally. Since this runs the tools listed in the above document, you'll need
python 3.6 and to install each tool:
```
# Install the dependencies
pip install -U black flake8 isort
# Run the linter script
./scripts-dev/lint.sh
```
**Note that the script does not just test/check, but also reformats code, so you
may wish to ensure any new code is committed first**. By default this script
checks all files and can take some time; if you alter only certain files, you
might wish to specify paths as arguments to reduce the run-time:
```
./scripts-dev/lint.sh path/to/file1.py path/to/file2.py path/to/folder
```
Before pushing new changes, ensure they don't produce linting errors. Commit any
files that were corrected.
Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise.
## Changelog
All changes, even minor ones, need a corresponding changelog / newsfragment
entry. These are managed by [Towncrier](https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the `changelog.d` directory named
in the format of `PRnumber.type`. The type can be one of the following:
* `feature`
* `bugfix`
* `docker` (for updates to the Docker image)
* `doc` (for updates to the documentation)
* `removal` (also used for deprecations)
* `misc` (for internal-only changes)
The content of the file is your changelog entry, which should be a short
description of your change in the same style as the rest of our [changelog](
https://github.com/matrix-org/synapse/blob/master/CHANGES.md). The file can
contain Markdown formatting, and should end with a full stop ('.') for
consistency.
Adding credits to the changelog is encouraged, we value your
contributions and would like to have you shouted out in the release notes!
For example, a fix in PR #1234 would have its changelog entry in
`changelog.d/1234.bugfix`, and contain content like "The security levels of
Florbs are now validated when received over federation. Contributed by Jane
Matrix.".
## Debian changelog
Changes which affect the debian packaging files (in `debian`) are an
exception.
In this case, you will need to add an entry to the debian changelog for the
next release. For this, run the following command:
```
dch
```
This will make up a new version number (if there isn't already an unreleased
version in flight), and open an editor where you can add a new changelog entry.
(Our release process will ensure that the version number and maintainer name is
corrected for the release.)
If your change affects both the debian packaging *and* files outside the debian
directory, you will need both a regular newsfragment *and* an entry in the
debian changelog. (Though typically such changes should be submitted as two
separate pull requests.)
## Sign off
In order to have a concrete record that your contribution is intentional
and you agree to license it under the same terms as the project's license, we've adopted the
same lightweight approach that the Linux Kernel
[submitting patches process](
https://www.kernel.org/doc/html/latest/process/submitting-patches.html#sign-your-work-the-developer-s-certificate-of-origin>),
[Docker](https://github.com/docker/docker/blob/master/CONTRIBUTING.md), and many other
projects use: the DCO (Developer Certificate of Origin:
http://developercertificate.org/). This is a simple declaration that you wrote
the contribution or otherwise have the right to contribute it to Matrix:
```
Developer Certificate of Origin
Version 1.1
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
660 York Street, Suite 102,
San Francisco, CA 94110 USA
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.
Developer's Certificate of Origin 1.1
By making a contribution to this project, I certify that:
(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or
(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or
(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.
(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
```
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment:
```
Signed-off-by: Your Name <your@email.example.org>
```
We accept contributions under a legally identifiable name, such as
your name on government documentation or common-law names (names
claimed by legitimate usage or repute). Unfortunately, we cannot
accept anonymous contributions at this time.
Git allows you to add this signoff automatically when using the `-s`
flag to `git commit`, which uses the name and email set in your
`user.name` and `user.email` git configs.
## Conclusion
That's it! Matrix is a very open and collaborative project as you might expect
given our obsession with open communication. If we're going to successfully
matrix together all the fragmented communication technologies out there we are
reliant on contributions and collaboration from the community to do so. So
please get involved - and we hope you have as much fun hacking on Matrix as we
do!

206
CONTRIBUTING.rst Normal file
View File

@@ -0,0 +1,206 @@
Contributing code to Matrix
===========================
Everyone is welcome to contribute code to Matrix
(https://github.com/matrix-org), provided that they are willing to license
their contributions under the same license as the project itself. We follow a
simple 'inbound=outbound' model for contributions: the act of submitting an
'inbound' contribution means that the contributor agrees to license the code
under the same terms as the project's overall 'outbound' license - in our
case, this is almost always Apache Software License v2 (see LICENSE).
How to contribute
~~~~~~~~~~~~~~~~~
The preferred and easiest way to contribute changes to Matrix is to fork the
relevant project on github, and then create a pull request to ask us to pull
your changes into our repo
(https://help.github.com/articles/using-pull-requests/)
**The single biggest thing you need to know is: please base your changes on
the develop branch - /not/ master.**
We use the master branch to track the most recent release, so that folks who
blindly clone the repo and automatically check out master get something that
works. Develop is the unstable branch where all the development actually
happens: the workflow is that contributors should fork the develop branch to
make a 'feature' branch for a particular contribution, and then make a pull
request to merge this back into the matrix.org 'official' develop branch. We
use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use `Buildkite <https://buildkite.com/matrix-dot-org/synapse>`_ for
continuous integration. Buildkite builds need to be authorised by a
maintainer. If your change breaks the build, this will be shown in GitHub, so
please keep an eye on the pull request for feedback.
To run unit tests in a local development environment, you can use:
- ``tox -e py35`` (requires tox to be installed by ``pip install tox``)
for SQLite-backed Synapse on Python 3.5.
- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6.
- ``tox -e py36-postgres`` for PostgreSQL-backed Synapse on Python 3.6
(requires a running local PostgreSQL with access to create databases).
- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 3.5
(requires Docker). Entirely self-contained, recommended if you don't want to
set up PostgreSQL yourself.
Docker images are available for running the integration tests (SyTest) locally,
see the `documentation in the SyTest repo
<https://github.com/matrix-org/sytest/blob/develop/docker/README.md>`_ for more
information.
Code style
~~~~~~~~~~
All Matrix projects have a well-defined code-style - and sometimes we've even
got as far as documenting it... For instance, synapse's code style doc lives
at https://github.com/matrix-org/synapse/tree/master/docs/code_style.md.
To facilitate meeting these criteria you can run ``scripts-dev/lint.sh``
locally. Since this runs the tools listed in the above document, you'll need
python 3.6 and to install each tool. **Note that the script does not just
test/check, but also reformats code, so you may wish to ensure any new code is
committed first**. By default this script checks all files and can take some
time; if you alter only certain files, you might wish to specify paths as
arguments to reduce the run-time.
Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise.
Before doing a commit, ensure the changes you've made don't produce
linting errors. You can do this by running the linters as follows. Ensure to
commit any files that were corrected.
::
# Install the dependencies
pip install -U black flake8 isort
# Run the linter script
./scripts-dev/lint.sh
Changelog
~~~~~~~~~
All changes, even minor ones, need a corresponding changelog / newsfragment
entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d`` file named
in the format of ``PRnumber.type``. The type can be one of the following:
* ``feature``.
* ``bugfix``.
* ``docker`` (for updates to the Docker image).
* ``doc`` (for updates to the documentation).
* ``removal`` (also used for deprecations).
* ``misc`` (for internal-only changes).
The content of the file is your changelog entry, which should be a short
description of your change in the same style as the rest of our `changelog
<https://github.com/matrix-org/synapse/blob/master/CHANGES.md>`_. The file can
contain Markdown formatting, and should end with a full stop ('.') for
consistency.
Adding credits to the changelog is encouraged, we value your
contributions and would like to have you shouted out in the release notes!
For example, a fix in PR #1234 would have its changelog entry in
``changelog.d/1234.bugfix``, and contain content like "The security levels of
Florbs are now validated when recieved over federation. Contributed by Jane
Matrix.".
Debian changelog
----------------
Changes which affect the debian packaging files (in ``debian``) are an
exception.
In this case, you will need to add an entry to the debian changelog for the
next release. For this, run the following command::
dch
This will make up a new version number (if there isn't already an unreleased
version in flight), and open an editor where you can add a new changelog entry.
(Our release process will ensure that the version number and maintainer name is
corrected for the release.)
If your change affects both the debian packaging *and* files outside the debian
directory, you will need both a regular newsfragment *and* an entry in the
debian changelog. (Though typically such changes should be submitted as two
separate pull requests.)
Sign off
~~~~~~~~
In order to have a concrete record that your contribution is intentional
and you agree to license it under the same terms as the project's license, we've adopted the
same lightweight approach that the Linux Kernel
`submitting patches process <https://www.kernel.org/doc/html/latest/process/submitting-patches.html#sign-your-work-the-developer-s-certificate-of-origin>`_, Docker
(https://github.com/docker/docker/blob/master/CONTRIBUTING.md), and many other
projects use: the DCO (Developer Certificate of Origin:
http://developercertificate.org/). This is a simple declaration that you wrote
the contribution or otherwise have the right to contribute it to Matrix::
Developer Certificate of Origin
Version 1.1
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
660 York Street, Suite 102,
San Francisco, CA 94110 USA
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.
Developer's Certificate of Origin 1.1
By making a contribution to this project, I certify that:
(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or
(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or
(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.
(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment::
Signed-off-by: Your Name <your@email.example.org>
We accept contributions under a legally identifiable name, such as
your name on government documentation or common-law names (names
claimed by legitimate usage or repute). Unfortunately, we cannot
accept anonymous contributions at this time.
Git allows you to add this signoff automatically when using the ``-s``
flag to ``git commit``, which uses the name and email set in your
``user.name`` and ``user.email`` git configs.
Conclusion
~~~~~~~~~~
That's it! Matrix is a very open and collaborative project as you might expect
given our obsession with open communication. If we're going to successfully
matrix together all the fragmented communication technologies out there we are
reliant on contributions and collaboration from the community to do so. So
please get involved - and we hope you have as much fun hacking on Matrix as we
do!

View File

@@ -109,8 +109,8 @@ Installing prerequisites on Ubuntu or Debian:
```
sudo apt-get install build-essential python3-dev libffi-dev \
python3-pip python3-setuptools sqlite3 \
libssl-dev python3-virtualenv libjpeg-dev libxslt1-dev
python-pip python-setuptools sqlite3 \
libssl-dev python-virtualenv libjpeg-dev libxslt1-dev
```
#### ArchLinux

View File

@@ -393,4 +393,4 @@ something like the following in their logs::
2019-09-11 19:32:04,271 - synapse.federation.transport.server - 288 - WARNING - GET-11752 - authenticate_request failed: 401: Invalid signature for server <server> with key ed25519:a_EqML: Unable to verify signature for <server>
This is normally caused by a misconfiguration in your reverse-proxy. See
`<docs/reverse_proxy.md>`_ and double-check that your settings are correct.
`<docs/reverse_proxy.rst>`_ and double-check that your settings are correct.

View File

@@ -75,23 +75,6 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.7.0
===================
In an attempt to configure Synapse in a privacy preserving way, the default
behaviours of ``allow_public_rooms_without_auth`` and
``allow_public_rooms_over_federation`` have been inverted. This means that by
default, only authenticated users querying the Client/Server API will be able
to query the room directory, and relatedly that the server will not share
room directory information with other servers over federation.
If your installation does not explicitly set these settings one way or the other
and you want either setting to be ``true`` then it will necessary to update
your homeserver configuration file accordingly.
For more details on the surrounding context see our `explainer
<https://matrix.org/blog/2019/11/09/avoiding-unwelcome-visitors-on-private-matrix-servers>`_.
Upgrading to v1.5.0
===================

1
changelog.d/6362.misc Normal file
View File

@@ -0,0 +1 @@
Clean up some unnecessary quotation marks around the codebase.

1
changelog.d/6388.doc Normal file
View File

@@ -0,0 +1 @@
Fix link in the user directory documentation.

View File

@@ -1,17 +0,0 @@
# Setup Synapse with Systemd
This is a setup for managing synapse with a user contributed systemd unit
file. It provides a `matrix-synapse` systemd unit file that should be tailored
to accommodate your installation in accordance with the installation
instructions provided in [installation instructions](../../INSTALL.md).
## Setup
1. Under the service section, ensure the `User` variable matches which user
you installed synapse under and wish to run it as.
2. Under the service section, ensure the `WorkingDirectory` variable matches
where you have installed synapse.
3. Under the service section, ensure the `ExecStart` variable matches the
appropriate locations of your installation.
4. Copy the `matrix-synapse.service` to `/etc/systemd/system/`
5. Start Synapse: `sudo systemctl start matrix-synapse`
6. Verify Synapse is running: `sudo systemctl status matrix-synapse`
7. *optional* Enable Synapse to start at system boot: `sudo systemctl enable matrix-synapse`

View File

@@ -4,11 +4,8 @@
# systemctl enable matrix-synapse
# systemctl start matrix-synapse
#
# This assumes that Synapse has been installed by a user named
# synapse.
#
# This assumes that Synapse has been installed in a virtualenv in
# the user's home directory: `/home/synapse/synapse/env`.
# /opt/synapse/env.
#
# **NOTE:** This is an example service file that may change in the future. If you
# wish to use this please copy rather than symlink it.
@@ -25,8 +22,8 @@ Restart=on-abort
User=synapse
Group=nogroup
WorkingDirectory=/home/synapse/synapse
ExecStart=/home/synapse/synapse/env/bin/python -m synapse.app.homeserver --config-path=/home/synapse/synapse/homeserver.yaml
WorkingDirectory=/opt/synapse
ExecStart=/opt/synapse/env/bin/python -m synapse.app.homeserver --config-path=/opt/synapse/homeserver.yaml
SyslogIdentifier=matrix-synapse
# adjust the cache factor if necessary

View File

@@ -85,9 +85,6 @@ PYTHONPATH="$tmpdir" \
' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml"
# build the log config file
"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_log_config" \
--output-file="${PACKAGE_BUILD_DIR}/etc/matrix-synapse/log.yaml"
# add a dependency on the right version of python to substvars.
PYPKG=`basename $SNAKE`

46
debian/changelog vendored
View File

@@ -1,49 +1,3 @@
matrix-synapse-py3 (1.8.0) stable; urgency=medium
[ Richard van der Hoff ]
* Automate generation of the default log configuration file.
[ Synapse Packaging team ]
* New synapse release 1.8.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 09 Jan 2020 11:39:27 +0000
matrix-synapse-py3 (1.7.3) stable; urgency=medium
* New synapse release 1.7.3.
-- Synapse Packaging team <packages@matrix.org> Tue, 31 Dec 2019 10:45:04 +0000
matrix-synapse-py3 (1.7.2) stable; urgency=medium
* New synapse release 1.7.2.
-- Synapse Packaging team <packages@matrix.org> Fri, 20 Dec 2019 10:56:50 +0000
matrix-synapse-py3 (1.7.1) stable; urgency=medium
* New synapse release 1.7.1.
-- Synapse Packaging team <packages@matrix.org> Wed, 18 Dec 2019 09:37:59 +0000
matrix-synapse-py3 (1.7.0) stable; urgency=medium
* New synapse release 1.7.0.
-- Synapse Packaging team <packages@matrix.org> Fri, 13 Dec 2019 10:19:38 +0000
matrix-synapse-py3 (1.6.1) stable; urgency=medium
* New synapse release 1.6.1.
-- Synapse Packaging team <packages@matrix.org> Thu, 28 Nov 2019 11:10:40 +0000
matrix-synapse-py3 (1.6.0) stable; urgency=medium
* New synapse release 1.6.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 26 Nov 2019 12:15:40 +0000
matrix-synapse-py3 (1.5.1) stable; urgency=medium
* New synapse release 1.5.1.

1
debian/install vendored
View File

@@ -1 +1,2 @@
debian/log.yaml etc/matrix-synapse
debian/manage_debconf.pl /opt/venvs/matrix-synapse/lib/

36
debian/log.yaml vendored Normal file
View File

@@ -0,0 +1,36 @@
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
level: WARN
loggers:
synapse:
level: INFO
synapse.storage.SQL:
level: INFO
root:
level: INFO
handlers: [file, console]

View File

@@ -130,15 +130,3 @@ docker run -it --rm \
This will generate the same configuration file as the legacy mode used, but
will store it in `/data/homeserver.yaml` instead of a temporary location. You
can then use it as shown above at [Running synapse](#running-synapse).
## Building the image
If you need to build the image from a Synapse checkout, use the following `docker
build` command from the repo's root:
```
docker build -t matrixdotorg/synapse -f docker/Dockerfile .
```
You can choose to build a different docker image by changing the value of the `-f` flag to
point to another Dockerfile.

View File

@@ -21,20 +21,3 @@ It returns a JSON body like the following:
]
}
```
# Quarantine media in a room
This API 'quarantines' all the media in a room.
The API is:
```
POST /_synapse/admin/v1/quarantine_media/<room_id>
{}
```
Quarantining media means that it is marked as inaccessible by users. It applies
to any local media, and any locally-cached copies of remote media.
The media file itself (and any thumbnails) is not deleted from the server.

View File

@@ -1,72 +0,0 @@
# Shutdown room API
Shuts down a room, preventing new joins and moves local users and room aliases automatically
to a new room. The new room will be created with the user specified by the
`new_room_user_id` parameter as room administrator and will contain a message
explaining what happened. Users invited to the new room will have power level
-10 by default, and thus be unable to speak. The old room's power levels will be changed to
disallow any further invites or joins.
The local server will only have the power to move local user and room aliases to
the new room. Users on other servers will be unaffected.
## API
You will need to authenticate with an access token for an admin user.
### URL
`POST /_synapse/admin/v1/shutdown_room/{room_id}`
### URL Parameters
* `room_id` - The ID of the room (e.g `!someroom:example.com`)
### JSON Body Parameters
* `new_room_user_id` - Required. A string representing the user ID of the user that will admin
the new room that all users in the old room will be moved to.
* `room_name` - Optional. A string representing the name of the room that new users will be
invited to.
* `message` - Optional. A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly convey why the
original room was shut down.
If not specified, the default value of `room_name` is "Content Violation
Notification". The default value of `message` is "Sharing illegal content on
othis server is not permitted and rooms in violation will be blocked."
### Response Parameters
* `kicked_users` - An integer number representing the number of users that
were kicked.
* `failed_to_kick_users` - An integer number representing the number of users
that were not kicked.
* `local_aliases` - An array of strings representing the local aliases that were migrated from
the old room to the new.
* `new_room_id` - A string representing the room ID of the new room.
## Example
Request:
```
POST /_synapse/admin/v1/shutdown_room/!somebadroom%3Aexample.com
{
"new_room_user_id": "@someuser:example.com",
"room_name": "Content Violation Notification",
"message": "Bad Room has been shutdown due to content violations on this server. Please review our Terms of Service."
}
```
Response:
```
{
"kicked_users": 5,
"failed_to_kick_users": 0,
"local_aliases": ["#badroom:example.com", "#evilsaloon:example.com],
"new_room_id": "!newroomid:example.com",
},
```

View File

@@ -1,48 +1,3 @@
List Accounts
=============
This API returns all local user accounts.
The api is::
GET /_synapse/admin/v2/users?from=0&limit=10&guests=false
including an ``access_token`` of a server admin.
The parameters ``from`` and ``limit`` are required only for pagination.
By default, a ``limit`` of 100 is used.
The parameter ``user_id`` can be used to select only users with user ids that
contain this value.
The parameter ``guests=false`` can be used to exclude guest users,
default is to include guest users.
The parameter ``deactivated=true`` can be used to include deactivated users,
default is to exclude deactivated users.
If the endpoint does not return a ``next_token`` then there are no more users left.
It returns a JSON body like the following:
.. code:: json
{
"users": [
{
"name": "<user_id1>",
"password_hash": "<password_hash1>",
"is_guest": 0,
"admin": 0,
"user_type": null,
"deactivated": 0
}, {
"name": "<user_id2>",
"password_hash": "<password_hash2>",
"is_guest": 0,
"admin": 1,
"user_type": null,
"deactivated": 0
}
],
"next_token": "100"
}
Query Account
=============

View File

@@ -137,7 +137,6 @@ Some guidelines follow:
correctly handles the top-level option being set to `None` (as it
will be if no sub-options are enabled).
- Lines should be wrapped at 80 characters.
- Use two-space indents.
Example:
@@ -156,13 +155,13 @@ Example:
# Settings for the frobber
#
frobber:
# frobbing speed. Defaults to 1.
#
#speed: 10
# frobbing speed. Defaults to 1.
#
#speed: 10
# frobbing distance. Defaults to 1000.
#
#distance: 100
# frobbing distance. Defaults to 1000.
#
#distance: 100
Note that the sample configuration is generated from the synapse code
and is maintained by a script, `scripts-dev/generate_sample_config`.

View File

@@ -66,6 +66,10 @@ therefore cannot gain access to the necessary certificate. With .well-known,
federation servers will check for a valid TLS certificate for the delegated
hostname (in our example: ``synapse.example.com``).
.well-known support first appeared in Synapse v0.99.0. To federate with older
servers you may need to additionally configure SRV delegation. Alternatively,
encourage the server admin in question to upgrade :).
### DNS SRV delegation
To use this delegation method, you need to have write access to your
@@ -107,15 +111,29 @@ giving it a `server_name` of `example.com`, and once [ACME](acme.md) support is
it would automatically generate a valid TLS certificate for you via Let's Encrypt
and no SRV record or .well-known URI would be needed.
This is the common case, although you can add an SRV record or
`.well-known/matrix/server` URI for completeness if you wish.
**However**, if your server does not listen on port 8448, or if your `server_name`
does not point to the host that your homeserver runs on, you will need to let
other servers know how to find it. The way to do this is via .well-known or an
SRV record.
#### I have created a .well-known URI. Do I also need an SRV record?
#### I have created a .well-known URI. Do I still need an SRV record?
No. You can use either `.well-known` delegation or use an SRV record for delegation. You
do not need to use both to delegate to the same location.
As of Synapse 0.99, Synapse will first check for the existence of a .well-known
URI and follow any delegation it suggests. It will only then check for the
existence of an SRV record.
That means that the SRV record will often be redundant. However, you should
remember that there may still be older versions of Synapse in the federation
which do not understand .well-known URIs, so if you removed your SRV record
you would no longer be able to federate with them.
It is therefore best to leave the SRV record in place for now. Synapse 0.34 and
earlier will follow the SRV record (and not care about the invalid
certificate). Synapse 0.99 and later will follow the .well-known URI, with the
correct certificate chain.
#### Can I manage my own certificates rather than having Synapse renew certificates itself?

View File

@@ -1,77 +0,0 @@
# SAML Mapping Providers
A SAML mapping provider is a Python class (loaded via a Python module) that
works out how to map attributes of a SAML response object to Matrix-specific
user attributes. Details such as user ID localpart, displayname, and even avatar
URLs are all things that can be mapped from talking to a SSO service.
As an example, a SSO service may return the email address
"john.smith@example.com" for a user, whereas Synapse will need to figure out how
to turn that into a displayname when creating a Matrix user for this individual.
It may choose `John Smith`, or `Smith, John [Example.com]` or any number of
variations. As each Synapse configuration may want something different, this is
where SAML mapping providers come into play.
## Enabling Providers
External mapping providers are provided to Synapse in the form of an external
Python module. Retrieve this module from [PyPi](https://pypi.org) or elsewhere,
then tell Synapse where to look for the handler class by editing the
`saml2_config.user_mapping_provider.module` config option.
`saml2_config.user_mapping_provider.config` allows you to provide custom
configuration options to the module. Check with the module's documentation for
what options it provides (if any). The options listed by default are for the
user mapping provider built in to Synapse. If using a custom module, you should
comment these options out and use those specified by the module instead.
## Building a Custom Mapping Provider
A custom mapping provider must specify the following methods:
* `__init__(self, parsed_config)`
- Arguments:
- `parsed_config` - A configuration object that is the return value of the
`parse_config` method. You should set any configuration options needed by
the module here.
* `saml_response_to_user_attributes(self, saml_response, failures)`
- Arguments:
- `saml_response` - A `saml2.response.AuthnResponse` object to extract user
information from.
- `failures` - An `int` that represents the amount of times the returned
mxid localpart mapping has failed. This should be used
to create a deduplicated mxid localpart which should be
returned instead. For example, if this method returns
`john.doe` as the value of `mxid_localpart` in the returned
dict, and that is already taken on the homeserver, this
method will be called again with the same parameters but
with failures=1. The method should then return a different
`mxid_localpart` value, such as `john.doe1`.
- This method must return a dictionary, which will then be used by Synapse
to build a new user. The following keys are allowed:
* `mxid_localpart` - Required. The mxid localpart of the new user.
* `displayname` - The displayname of the new user. If not provided, will default to
the value of `mxid_localpart`.
* `parse_config(config)`
- This method should have the `@staticmethod` decoration.
- Arguments:
- `config` - A `dict` representing the parsed content of the
`saml2_config.user_mapping_provider.config` homeserver config option.
Runs on homeserver startup. Providers should extract any option values
they need here.
- Whatever is returned will be passed back to the user mapping provider module's
`__init__` method during construction.
* `get_saml_attributes(config)`
- This method should have the `@staticmethod` decoration.
- Arguments:
- `config` - A object resulting from a call to `parse_config`.
- Returns a tuple of two sets. The first set equates to the saml auth
response attributes that are required for the module to function, whereas
the second set consists of those attributes which can be used if available,
but are not necessary.
## Synapse's Default Provider
Synapse has a built-in SAML mapping provider if a custom provider isn't
specified in the config. It is located at
[`synapse.handlers.saml_handler.DefaultSamlMappingProvider`](../synapse/handlers/saml_handler.py).

View File

@@ -54,23 +54,15 @@ pid_file: DATADIR/homeserver.pid
#
#require_auth_for_profile_requests: true
# Uncomment to require a user to share a room with another user in order
# to retrieve their profile information. Only checked on Client-Server
# requests. Profile requests from other servers should be checked by the
# requesting server. Defaults to 'false'.
# If set to 'false', requires authentication to access the server's public rooms
# directory through the client API. Defaults to 'true'.
#
#limit_profile_requests_to_users_who_share_rooms: true
#allow_public_rooms_without_auth: false
# If set to 'true', removes the need for authentication to access the server's
# public rooms directory through the client API, meaning that anyone can
# query the room directory. Defaults to 'false'.
# If set to 'false', forbids any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'true'.
#
#allow_public_rooms_without_auth: true
# If set to 'true', allows any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'false'.
#
#allow_public_rooms_over_federation: true
#allow_public_rooms_over_federation: false
# The default room version for newly created rooms.
#
@@ -336,69 +328,6 @@ listeners:
#
#user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
## TLS ##
@@ -692,6 +621,10 @@ media_store_path: "DATADIR/media_store"
# config:
# directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes
#
#max_upload_size: 10M
@@ -1118,19 +1051,14 @@ metrics_flags:
signing_key_path: "CONFDIR/SERVERNAME.signing.key"
# The keys that the server used to sign messages with but won't use
# to sign new messages.
# to sign new messages. E.g. it has lost its private key
#
old_signing_keys:
# For each key, `key` should be the base64-encoded public key, and
# `expired_ts`should be the time (in milliseconds since the unix epoch) that
# it was last used.
#
# It is possible to build an entry from an old signing.key file using the
# `export_signing_key` script which is provided with synapse.
#
# For example:
#
#"ed25519:id": { key: "base64string", expired_ts: 123456789123 }
#old_signing_keys:
# "ed25519:auto":
# # Base64 encoded public key
# key: "The public part of your old signing key."
# # Millisecond POSIX timestamp when the key expired.
# expired_ts: 123456789123
# How long key response published by this server is valid for.
# Used to set the valid_until_ts in /key/v2 APIs.
@@ -1258,58 +1186,33 @@ saml2_config:
#
#config_path: "CONFDIR/sp_conf.py"
# The lifetime of a SAML session. This defines how long a user has to
# the lifetime of a SAML session. This defines how long a user has to
# complete the authentication process, if allow_unsolicited is unset.
# The default is 5 minutes.
#
#saml_session_lifetime: 5m
# An external module can be provided here as a custom solution to
# mapping attributes returned from a saml provider onto a matrix user.
# The SAML attribute (after mapping via the attribute maps) to use to derive
# the Matrix ID from. 'uid' by default.
#
user_mapping_provider:
# The custom module's class. Uncomment to use a custom module.
#
#module: mapping_provider.SamlMappingProvider
#mxid_source_attribute: displayName
# Custom configuration values for the module. Below options are
# intended for the built-in provider, they should be changed if
# using a custom module. This section will be passed as a Python
# dictionary to the module's `parse_config` method.
#
config:
# The SAML attribute (after mapping via the attribute maps) to use
# to derive the Matrix ID from. 'uid' by default.
#
# Note: This used to be configured by the
# saml2_config.mxid_source_attribute option. If that is still
# defined, its value will be used instead.
#
#mxid_source_attribute: displayName
# The mapping system to use for mapping the saml attribute onto a matrix ID.
# Options include:
# * 'hexencode' (which maps unpermitted characters to '=xx')
# * 'dotreplace' (which replaces unpermitted characters with '.').
# The default is 'hexencode'.
#
#mxid_mapping: dotreplace
# The mapping system to use for mapping the saml attribute onto a
# matrix ID.
#
# Options include:
# * 'hexencode' (which maps unpermitted characters to '=xx')
# * 'dotreplace' (which replaces unpermitted characters with
# '.').
# The default is 'hexencode'.
#
# Note: This used to be configured by the
# saml2_config.mxid_mapping option. If that is still defined, its
# value will be used instead.
#
#mxid_mapping: dotreplace
# In previous versions of synapse, the mapping from SAML attribute to
# MXID was always calculated dynamically rather than stored in a
# table. For backwards- compatibility, we will look for user_ids
# matching such a pattern before creating a new account.
# In previous versions of synapse, the mapping from SAML attribute to MXID was
# always calculated dynamically rather than stored in a table. For backwards-
# compatibility, we will look for user_ids matching such a pattern before
# creating a new account.
#
# This setting controls the SAML attribute which will be used for this
# backwards-compatibility lookup. Typically it should be 'uid', but if
# the attribute maps are changed, it may be necessary to change it.
# backwards-compatibility lookup. Typically it should be 'uid', but if the
# attribute maps are changed, it may be necessary to change it.
#
# The default is 'uid'.
#
@@ -1367,23 +1270,8 @@ password_config:
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
# app_name: Matrix
#
# # Enable email notifications by default
# #

View File

@@ -1,43 +0,0 @@
# Log configuration for Synapse.
#
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
version: 1
formatters:
precise:
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
filters:
context:
(): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
file:
class: logging.handlers.RotatingFileHandler
formatter: precise
filename: /var/log/matrix-synapse/homeserver.log
maxBytes: 104857600
backupCount: 10
filters: [context]
encoding: utf8
console:
class: logging.StreamHandler
formatter: precise
filters: [context]
loggers:
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
root:
level: INFO
handlers: [file, console]
disable_existing_loggers: false

View File

@@ -39,8 +39,6 @@ The TURN daemon `coturn` is available from a variety of sources such as native p
make
make install
### Configuration
1. Create or edit the config file in `/etc/turnserver.conf`. The relevant
lines, with example values, are:

View File

@@ -196,7 +196,7 @@ Handles the media repository. It can handle all endpoints starting with:
/_matrix/media/
... and the following regular expressions matching media-specific administration APIs:
And the following regular expressions matching media-specific administration APIs:
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media$
@@ -206,18 +206,6 @@ You should also set `enable_media_repo: False` in the shared configuration
file to stop the main synapse running background jobs related to managing the
media repository.
In the `media_repository` worker configuration file, configure the http listener to
expose the `media` resource. For example:
```yaml
worker_listeners:
- type: http
port: 8085
resources:
- names:
- media
```
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.client_reader`

View File

@@ -1,7 +1,7 @@
[mypy]
namespace_packages = True
plugins = mypy_zope:plugin
follow_imports = silent
follow_imports = normal
check_untyped_defs = True
show_error_codes = True
show_traceback = True

View File

@@ -7,22 +7,12 @@ set -e
cd `dirname $0`/..
SAMPLE_CONFIG="docs/sample_config.yaml"
SAMPLE_LOG_CONFIG="docs/sample_log_config.yaml"
check() {
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || return 1
}
if [ "$1" == "--check" ]; then
diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1
}
diff -u "$SAMPLE_LOG_CONFIG" <(./scripts/generate_log_config) >/dev/null || {
echo -e "\e[1m\e[31m$SAMPLE_LOG_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2
exit 1
}
else
./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG"
./scripts/generate_log_config -o "$SAMPLE_LOG_CONFIG"
fi

View File

@@ -27,7 +27,7 @@ class Store(object):
"_store_pdu_reference_hash_txn"
]
_store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"]
simple_insert_txn = SQLBaseStore.__dict__["simple_insert_txn"]
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
store = Store()

View File

@@ -1,184 +0,0 @@
#!/bin/bash
#
# This script generates SQL files for creating a brand new Synapse DB with the latest
# schema, on both SQLite3 and Postgres.
#
# It does so by having Synapse generate an up-to-date SQLite DB, then running
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
POSTGRES_HOST="localhost"
POSTGRES_DB_NAME="synapse_full_schema.$$"
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
POSTGRES_FULL_SCHEMA_OUTPUT_FILE="full.sql.postgres"
REQUIRED_DEPS=("matrix-synapse" "psycopg2")
usage() {
echo
echo "Usage: $0 -p <postgres_username> -o <path> [-c] [-n] [-h]"
echo
echo "-p <postgres_username>"
echo " Username to connect to local postgres instance. The password will be requested"
echo " during script execution."
echo "-c"
echo " CI mode. Enables coverage tracking and prints every command that the script runs."
echo "-o <path>"
echo " Directory to output full schema files to."
echo "-h"
echo " Display this help text."
}
while getopts "p:co:h" opt; do
case $opt in
p)
POSTGRES_USERNAME=$OPTARG
;;
c)
# Print all commands that are being executed
set -x
# Modify required dependencies for coverage
REQUIRED_DEPS+=("coverage" "coverage-enable-subprocess")
COVERAGE=1
;;
o)
command -v realpath > /dev/null || (echo "The -o flag requires the 'realpath' binary to be installed" && exit 1)
OUTPUT_DIR="$(realpath "$OPTARG")"
;;
h)
usage
exit
;;
\?)
echo "ERROR: Invalid option: -$OPTARG" >&2
usage
exit
;;
esac
done
# Check that required dependencies are installed
unsatisfied_requirements=()
for dep in "${REQUIRED_DEPS[@]}"; do
pip show "$dep" --quiet || unsatisfied_requirements+=("$dep")
done
if [ ${#unsatisfied_requirements} -ne 0 ]; then
echo "Please install the following python packages: ${unsatisfied_requirements[*]}"
exit 1
fi
if [ -z "$POSTGRES_USERNAME" ]; then
echo "No postgres username supplied"
usage
exit 1
fi
if [ -z "$OUTPUT_DIR" ]; then
echo "No output directory supplied"
usage
exit 1
fi
# Create the output directory if it doesn't exist
mkdir -p "$OUTPUT_DIR"
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
echo ""
# Exit immediately if a command fails
set -e
# cd to root of the synapse directory
cd "$(dirname "$0")/.."
# Create temporary SQLite and Postgres homeserver db configs and key file
TMPDIR=$(mktemp -d)
KEY_FILE=$TMPDIR/test.signing.key # default Synapse signing key path
SQLITE_CONFIG=$TMPDIR/sqlite.conf
SQLITE_DB=$TMPDIR/homeserver.db
POSTGRES_CONFIG=$TMPDIR/postgres.conf
# Ensure these files are delete on script exit
trap 'rm -rf $TMPDIR' EXIT
cat > "$SQLITE_CONFIG" <<EOF
server_name: "test"
signing_key_path: "$KEY_FILE"
macaroon_secret_key: "abcde"
report_stats: false
database:
name: "sqlite3"
args:
database: "$SQLITE_DB"
# Suppress the key server warning.
trusted_key_servers: []
EOF
cat > "$POSTGRES_CONFIG" <<EOF
server_name: "test"
signing_key_path: "$KEY_FILE"
macaroon_secret_key: "abcde"
report_stats: false
database:
name: "psycopg2"
args:
user: "$POSTGRES_USERNAME"
host: "$POSTGRES_HOST"
password: "$POSTGRES_PASSWORD"
database: "$POSTGRES_DB_NAME"
# Suppress the key server warning.
trusted_key_servers: []
EOF
# Generate the server's signing key.
echo "Generating SQLite3 db schema..."
python -m synapse.app.homeserver --generate-keys -c "$SQLITE_CONFIG"
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
echo "Running db background jobs..."
scripts-dev/update_database --database-config "$SQLITE_CONFIG"
# Create the PostgreSQL database.
echo "Creating postgres database..."
createdb $POSTGRES_DB_NAME
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
if [ -z "$COVERAGE" ]; then
# No coverage needed
scripts/synapse_port_db --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
else
# Coverage desired
coverage run scripts/synapse_port_db --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
fi
# Delete schema_version, applied_schema_deltas and applied_module_schemas tables
# This needs to be done after synapse_port_db is run
echo "Dropping unwanted db tables..."
SQL="
DROP TABLE schema_version;
DROP TABLE applied_schema_deltas;
DROP TABLE applied_module_schemas;
"
sqlite3 "$SQLITE_DB" <<< "$SQL"
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"
echo "Dumping Postgres schema to '$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE'..."
pg_dump --format=plain --no-tablespaces --no-acl --no-owner $POSTGRES_DB_NAME | sed -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE"
echo "Cleaning up temporary Postgres database..."
dropdb $POSTGRES_DB_NAME
echo "Done! Files dumped to: $OUTPUT_DIR"

View File

@@ -26,6 +26,8 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger("update_database")
@@ -33,11 +35,21 @@ logger = logging.getLogger("update_database")
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore
def __init__(self, config, **kwargs):
def __init__(self, config, database_engine, db_conn, **kwargs):
super(MockHomeserver, self).__init__(
config.server_name, reactor=reactor, config=config, **kwargs
config.server_name,
reactor=reactor,
config=config,
database_engine=database_engine,
**kwargs
)
self.database_engine = database_engine
self.db_conn = db_conn
def get_db_conn(self):
return self.db_conn
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@@ -46,10 +58,10 @@ if __name__ == "__main__":
" on it."
)
)
parser.add_argument("-v", action="store_true")
parser.add_argument("-v", action='store_true')
parser.add_argument(
"--database-config",
type=argparse.FileType("r"),
type=argparse.FileType('r'),
required=True,
help="A database config file for either a SQLite3 database or a PostgreSQL one.",
)
@@ -73,23 +85,40 @@ if __name__ == "__main__":
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")
# Instantiate and initialise the homeserver object.
hs = MockHomeserver(config)
# Create the database engine and a connection to it.
database_engine = create_engine(config.database_config)
db_conn = database_engine.module.connect(
**{
k: v
for k, v in config.database_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
# Setup instantiates the store within the homeserver object and updates the
# DB.
# Update the database to the latest schema.
prepare_database(db_conn, database_engine, config=config)
db_conn.commit()
# Instantiate and initialise the homeserver object.
hs = MockHomeserver(
config,
database_engine,
db_conn,
db_config=config.database_config,
)
# setup instantiates the store within the homeserver object.
hs.setup()
store = hs.get_datastore()
@defer.inlineCallbacks
def run_background_updates():
yield store.db.updates.run_background_updates(sleep=False)
yield store.run_background_updates(sleep=False)
# Stop the reactor to exit the script once every background update is run.
reactor.stop()
# Apply all background updates on the database.
reactor.callWhenRunning(
lambda: run_as_background_process("background_updates", run_background_updates)
)
reactor.callWhenRunning(lambda: run_as_background_process(
"background_updates", run_background_updates
))
reactor.run()

View File

@@ -1,94 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
import time
from typing import Optional
import nacl.signing
from signedjson.key import encode_verify_key_base64, get_verify_key, read_signing_keys
def exit(status: int = 0, message: Optional[str] = None):
if message:
print(message, file=sys.stderr)
sys.exit(status)
def format_plain(public_key: nacl.signing.VerifyKey):
print(
"%s:%s %s"
% (public_key.alg, public_key.version, encode_verify_key_base64(public_key),)
)
def format_for_config(public_key: nacl.signing.VerifyKey, expiry_ts: int):
print(
' "%s:%s": { key: "%s", expired_ts: %i }'
% (
public_key.alg,
public_key.version,
encode_verify_key_base64(public_key),
expiry_ts,
)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"key_file", nargs="+", type=argparse.FileType("r"), help="The key file to read",
)
parser.add_argument(
"-x",
action="store_true",
dest="for_config",
help="format the output for inclusion in the old_signing_keys config setting",
)
parser.add_argument(
"--expiry-ts",
type=int,
default=int(time.time() * 1000) + 6*3600000,
help=(
"The expiry time to use for -x, in milliseconds since 1970. The default "
"is (now+6h)."
),
)
args = parser.parse_args()
formatter = (
(lambda k: format_for_config(k, args.expiry_ts))
if args.for_config
else format_plain
)
keys = []
for file in args.key_file:
try:
res = read_signing_keys(file)
except Exception as e:
exit(
status=1,
message="Error reading key from file %s: %s %s"
% (file.name, type(e), e),
)
res = []
for key in res:
formatter(get_verify_key(key))

View File

@@ -1,43 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
from synapse.config.logger import DEFAULT_LOG_CONFIG
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-o",
"--output-file",
type=argparse.FileType("w"),
default=sys.stdout,
help="File to write the configuration to. Default: stdout",
)
parser.add_argument(
"-f",
"--log-file",
type=str,
default="/var/log/matrix-synapse/homeserver.log",
help="name of the log file",
)
args = parser.parse_args()
args.output_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file))

View File

@@ -30,7 +30,6 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
@@ -48,16 +47,13 @@ from synapse.storage.data_stores.main.media_repository import (
from synapse.storage.data_stores.main.registration import (
RegistrationBackgroundUpdateStore,
)
from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
from synapse.storage.data_stores.main.state import StateBackgroundUpdateStore
from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
@@ -135,23 +131,54 @@ class Store(
EventsBackgroundUpdatesStore,
MediaRepositoryBackgroundUpdateStore,
RegistrationBackgroundUpdateStore,
RoomBackgroundUpdateStore,
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
MainStateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super().__init__(db_conn, hs)
self.db_pool = hs.get_db_pool()
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
i = 0
N = 5
while True:
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine, [], []),
*args,
**kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warning("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
conn.rollback()
continue
raise
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise
with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))
def execute(self, f, *args, **kwargs):
return self.db.runInteraction(f.__name__, f, *args, **kwargs)
return self.runInteraction(f.__name__, f, *args, **kwargs)
def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
return self.db.runInteraction("execute_sql", r)
return self.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -166,24 +193,25 @@ class Store(
logger.exception("Failed to insert: %s", table)
raise
def set_room_is_public(self, room_id, is_public):
raise Exception(
"Attempt to set room_is_public during port_db: database not empty?"
)
class MockHomeserver:
def __init__(self, config):
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name
def get_db_conn(self):
return self.db_conn
def get_db_pool(self):
return self.db_pool
def get_clock(self):
return self.clock
def get_reactor(self):
return reactor
class Porter(object):
def __init__(self, **kwargs):
@@ -193,7 +221,7 @@ class Porter(object):
def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
row = yield self.postgres_store.db.simple_select_one(
row = yield self.postgres_store._simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
@@ -203,14 +231,12 @@ class Porter(object):
total_to_port = None
if row is None:
if table == "sent_transactions":
(
forward_chunk,
already_ported,
total_to_port,
) = yield self._setup_sent_transactions()
forward_chunk, already_ported, total_to_port = (
yield self._setup_sent_transactions()
)
backward_chunk = 0
else:
yield self.postgres_store.db.simple_insert(
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
@@ -240,7 +266,7 @@ class Porter(object):
yield self.postgres_store.execute(delete_all)
yield self.postgres_store.db.simple_insert(
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
@@ -294,7 +320,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store.db.simple_insert(
yield self.postgres_store._simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
@@ -335,9 +361,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
headers, frows, brows = yield self.sqlite_store.db.runInteraction(
"select", r
)
headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
if frows or brows:
if frows:
@@ -351,7 +375,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
self.postgres_store.db.simple_update_one_txn(
self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
@@ -390,7 +414,7 @@ class Porter(object):
return headers, rows
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = yield self.sqlite_store.runInteraction("select", r)
if rows:
forward_chunk = rows[-1][0] + 1
@@ -407,8 +431,8 @@ class Porter(object):
rows_dict = []
for row in rows:
d = dict(zip(headers, row))
if "\0" in d["value"]:
logger.warning("dropping search row %s", d)
if "\0" in d['value']:
logger.warning('dropping search row %s', d)
else:
rows_dict.append(d)
@@ -428,7 +452,7 @@ class Porter(object):
],
)
self.postgres_store.db.simple_update_one_txn(
self.postgres_store._simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
@@ -447,35 +471,47 @@ class Porter(object):
else:
return
def setup_db(self, db_config: DatabaseConnectionConfig, engine):
db_conn = make_conn(db_config, engine)
prepare_database(db_conn, engine, config=None)
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v
for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
prepare_database(db_conn, database_engine, config=None)
db_conn.commit()
return db_conn
@defer.inlineCallbacks
def build_db_store(self, db_config: DatabaseConnectionConfig):
def build_db_store(self, config):
"""Builds and returns a database store using the provided configuration.
Args:
config: The database configuration
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
Returns:
The built Store object.
"""
self.progress.set_state("Preparing %s" % db_config.config["name"])
engine = create_engine(config)
engine = create_engine(db_config.config)
conn = self.setup_db(db_config, engine)
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
hs = MockHomeserver(self.hs_config)
db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)
store = Store(Database(hs, db_config, engine), conn, hs)
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
yield store.db.runInteraction(
"%s_engine.check_database" % db_config.config["name"],
store = Store(conn, hs)
yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
)
@@ -484,9 +520,7 @@ class Porter(object):
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = (
yield self.postgres_store.db.updates.has_completed_background_updates()
)
postgres_ready = yield self.postgres_store.has_completed_background_updates()
if not postgres_ready:
# Only say that we're running background updates when there are background
@@ -494,22 +528,18 @@ class Porter(object):
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
yield self.postgres_store.db.updates.do_next_background_update(100)
yield self.postgres_store.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.db.updates.has_completed_background_updates()
self.postgres_store.has_completed_background_updates()
)
@defer.inlineCallbacks
def run(self):
try:
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig("master-sqlite", self.sqlite_config)
)
self.sqlite_store = yield self.build_db_store(self.sqlite_config)
# Check if all background updates are done, abort if not.
updates_complete = (
yield self.sqlite_store.db.updates.has_completed_background_updates()
)
updates_complete = yield self.sqlite_store.has_completed_background_updates()
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
@@ -519,7 +549,7 @@ class Porter(object):
defer.returnValue(None)
self.postgres_store = yield self.build_db_store(
self.hs_config.get_single_database()
self.hs_config.database_config
)
yield self.run_background_updates_on_postgres()
@@ -550,22 +580,22 @@ class Porter(object):
)
try:
yield self.postgres_store.db.runInteraction("alter_table", alter_table)
yield self.postgres_store.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass
yield self.postgres_store.db.runInteraction(
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
postgres_tables = yield self.postgres_store.db.simple_select_onecol(
postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
@@ -655,11 +685,11 @@ class Porter(object):
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
ts_ind = headers.index("ts")
ts_ind = headers.index('ts')
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
headers, rows = yield self.sqlite_store.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -692,7 +722,7 @@ class Porter(object):
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
yield self.postgres_store.db.simple_insert(
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
@@ -705,7 +735,7 @@ class Porter(object):
txn.execute(
"SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
(size,) = txn.fetchone()
size, = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
@@ -752,13 +782,10 @@ class Porter(object):
def _setup_state_group_id_seq(self):
def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
curr_id = txn.fetchone()[0]
if not curr_id:
return
next_id = curr_id + 1
next_id = txn.fetchone()[0] + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
##############################################
@@ -839,7 +866,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
duration_str = "%02dm %02ds" % (minutes, seconds)
duration_str = '%02dm %02ds' % (minutes, seconds)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
@@ -849,7 +876,7 @@ class CursesProgress(Progress):
left = float(self.total_remaining) / self.total_processed
est_remaining = (int(now) - self.start_time) * left
est_remaining_str = "%02dm %02ds remaining" % divmod(est_remaining, 60)
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = "Time spent: %s (est. remaining: %s)" % (
@@ -935,7 +962,7 @@ if __name__ == "__main__":
description="A script to port an existing synapse SQLite database to"
" a new PostgreSQL database."
)
parser.add_argument("-v", action="store_true")
parser.add_argument("-v", action='store_true')
parser.add_argument(
"--sqlite-database",
required=True,
@@ -944,12 +971,12 @@ if __name__ == "__main__":
)
parser.add_argument(
"--postgres-config",
type=argparse.FileType("r"),
type=argparse.FileType('r'),
required=True,
help="The database config file for the PostgreSQL database",
)
parser.add_argument(
"--curses", action="store_true", help="display a curses based progress UI"
"--curses", action='store_true', help="display a curses based progress UI"
)
parser.add_argument(
@@ -1025,4 +1052,3 @@ if __name__ == "__main__":
if end_error_exec_info:
exc_type, exc_value, exc_traceback = end_error_exec_info
traceback.print_exception(exc_type, exc_value, exc_traceback)
sys.exit(5)

View File

@@ -36,7 +36,7 @@ try:
except ImportError:
pass
__version__ = "1.8.0"
__version__ = "1.6.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View File

@@ -14,7 +14,6 @@
# limitations under the License.
import logging
from typing import Dict, Tuple
from six import itervalues
@@ -26,7 +25,13 @@ from twisted.internet import defer
import synapse.logging.opentracing as opentracing
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes
from synapse.api.constants import (
EventTypes,
JoinRules,
LimitBlockingTypes,
Membership,
UserTypes,
)
from synapse.api.errors import (
AuthError,
Codes,
@@ -79,7 +84,7 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, room_version, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
event, prev_state_ids, for_verification=True
)
@@ -508,43 +513,71 @@ class Auth(object):
"""
return self.store.is_server_admin(user)
def compute_auth_events(
self,
event,
current_state_ids: Dict[Tuple[str, str], str],
for_verification: bool = False,
):
"""Given an event and current state return the list of event IDs used
to auth an event.
If `for_verification` is False then only return auth events that
should be added to the event's `auth_events`.
Returns:
defer.Deferred(list[str]): List of event IDs.
"""
@defer.inlineCallbacks
def compute_auth_events(self, event, current_state_ids, for_verification=False):
if event.type == EventTypes.Create:
return defer.succeed([])
# Currently we ignore the `for_verification` flag even though there are
# some situations where we can drop particular auth events when adding
# to the event's `auth_events` (e.g. joins pointing to previous joins
# when room is publically joinable). Dropping event IDs has the
# advantage that the auth chain for the room grows slower, but we use
# the auth chain in state resolution v2 to order events, which means
# care must be taken if dropping events to ensure that it doesn't
# introduce undesirable "state reset" behaviour.
#
# All of which sounds a bit tricky so we don't bother for now.
return []
auth_ids = []
for etype, state_key in event_auth.auth_types_for_event(event):
auth_ev_id = current_state_ids.get((etype, state_key))
if auth_ev_id:
auth_ids.append(auth_ev_id)
return defer.succeed(auth_ids)
key = (EventTypes.PowerLevels, "")
power_level_event_id = current_state_ids.get(key)
if power_level_event_id:
auth_ids.append(power_level_event_id)
key = (EventTypes.JoinRules, "")
join_rule_event_id = current_state_ids.get(key)
key = (EventTypes.Member, event.sender)
member_event_id = current_state_ids.get(key)
key = (EventTypes.Create, "")
create_event_id = current_state_ids.get(key)
if create_event_id:
auth_ids.append(create_event_id)
if join_rule_event_id:
join_rule_event = yield self.store.get_event(join_rule_event_id)
join_rule = join_rule_event.content.get("join_rule")
is_public = join_rule == JoinRules.PUBLIC if join_rule else False
else:
is_public = False
if event.type == EventTypes.Member:
e_type = event.content["membership"]
if e_type in [Membership.JOIN, Membership.INVITE]:
if join_rule_event_id:
auth_ids.append(join_rule_event_id)
if e_type == Membership.JOIN:
if member_event_id and not is_public:
auth_ids.append(member_event_id)
else:
if member_event_id:
auth_ids.append(member_event_id)
if for_verification:
key = (EventTypes.Member, event.state_key)
existing_event_id = current_state_ids.get(key)
if existing_event_id:
auth_ids.append(existing_event_id)
if e_type == Membership.INVITE:
if "third_party_invite" in event.content:
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
)
third_party_invite_id = current_state_ids.get(key)
if third_party_invite_id:
auth_ids.append(third_party_invite_id)
elif member_event_id:
member_event = yield self.store.get_event(member_event_id)
if member_event.content["membership"] == Membership.JOIN:
auth_ids.append(member_event.event_id)
return auth_ids
@defer.inlineCallbacks
def check_can_change_room_list(self, room_id, user):

View File

@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -95,8 +94,6 @@ class EventTypes(object):
ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events"
Retention = "m.room.retention"
class RejectedReason(object):
AUTH_ERROR = "auth_error"
@@ -148,7 +145,3 @@ class EventContentFields(object):
# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS = "org.matrix.labels"
# Timestamp to delete the event after
# cf https://github.com/matrix-org/matrix-doc/pull/2228
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"

View File

@@ -1,8 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@@ -29,6 +29,7 @@ FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content"
SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
MEDIA_PREFIX = "/_matrix/media/r0"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"

View File

@@ -237,12 +237,6 @@ def start(hs, listeners=None):
"""
Start a Synapse server or worker.
Should be called once the reactor is running and (if we're using ACME) the
TLS certificates are in place.
Will start the main HTTP listeners and do some other startup tasks, and then
notify systemd.
Args:
hs (synapse.server.HomeServer)
listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
@@ -275,7 +269,7 @@ def start(hs, listeners=None):
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().db.start_profiling()
hs.get_datastore().start_profiling()
setup_sentry(hs)
setup_sdnotify(hs)
@@ -317,7 +311,9 @@ def setup_sdnotify(hs):
# Tell systemd our state, if we're using it. This will silently fail if
# we're not using systemd.
sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
hs.get_reactor().addSystemEventTrigger(
"after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),)
)
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", sdnotify, b"STOPPING=1"

View File

@@ -45,6 +45,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string
@@ -104,10 +105,8 @@ def export_data_command(hs, args):
user_id = args.user_id
directory = args.output_directory
res = yield defer.ensureDeferred(
hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
res = yield hs.get_handlers().admin_handler.export_user_data(
user_id, FileExfiltrationWriter(user_id, directory=directory)
)
print(res)
@@ -230,10 +229,14 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = AdminCmdServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -34,6 +34,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -142,6 +143,8 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.notify_appservices:
sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process"
@@ -156,8 +159,10 @@ def start(config_options):
ps = AppserviceServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)

View File

@@ -62,6 +62,7 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -180,10 +181,14 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import (
)
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -179,10 +180,14 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -46,6 +46,7 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -161,10 +162,14 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -40,7 +40,7 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
@@ -59,8 +59,8 @@ class FederationSenderSlaveStore(
SlavedDeviceStore,
SlavedPresenceStore,
):
def __init__(self, database: Database, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs)
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
@@ -173,6 +173,8 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.send_federation:
sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process"
@@ -187,8 +189,10 @@ def start(config_options):
ss = FederationSenderServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -39,6 +39,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -233,10 +234,14 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -39,6 +39,7 @@ import synapse
import synapse.config.logger
from synapse import events
from synapse.api.urls import (
CONTENT_REPO_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
MEDIA_PREFIX,
@@ -64,11 +65,12 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -221,7 +223,13 @@ class SynapseHomeServer(HomeServer):
if self.get_config().enable_media_repo:
media_repo = self.get_media_repository_resource()
resources.update(
{MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
{
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
}
)
elif name == "media":
raise ConfigError(
@@ -286,6 +294,22 @@ class SynapseHomeServer(HomeServer):
else:
logger.warning("Unrecognized listener type: %s", listener["type"])
def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
db_conn.cursor(), database_engine, self.hostname
)
if not all_users_native:
quit_with_error(
"Found users in database not native to %s!\n"
"You cannot changed a synapse server_name after it's been configured"
% (self.hostname,)
)
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
@@ -310,7 +334,7 @@ def setup(config_options):
"Synapse Homeserver", config_options
)
except ConfigError as e:
sys.stderr.write("\nERROR: %s\n" % (e,))
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:
@@ -320,23 +344,40 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
hs = SynapseHomeServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
logger.info("Setting up server")
logger.info("Preparing database: %s...", config.database_config["name"])
try:
hs.setup()
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))
except UpgradeDatabaseException as e:
quit_with_error("Failed to upgrade database: %s" % (e,))
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
"Have you checked for version specific instructions in"
" UPGRADES.rst?\n"
)
sys.exit(1)
logger.info("Database prepared in %s.", config.database_config["name"])
hs.setup()
hs.setup_master()
@defer.inlineCallbacks
@@ -395,7 +436,7 @@ def setup(config_options):
_base.start(hs, config.listeners)
hs.get_pusherpool().start()
hs.get_datastore().db.updates.start_doing_background_updates()
hs.get_datastore().start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)
@@ -501,10 +542,8 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
# Database version
#
# This only reports info about the *main* database.
stats["database_engine"] = hs.get_datastore().db.engine.module.__name__
stats["database_server_version"] = hs.get_datastore().db.engine.server_version
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
yield hs.get_proxied_http_client().put_json(
@@ -546,7 +585,7 @@ def run(hs):
def performance_stats_init():
_stats_process.clear()
_stats_process.append(
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
(int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF)))
)
def start_phone_stats_home():

View File

@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
import synapse
from synapse import events
from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -37,8 +37,10 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -81,6 +83,9 @@ class MediaRepositoryServer(HomeServer):
{
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
"/_synapse/admin": admin_resource,
}
)
@@ -152,10 +157,14 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -33,10 +33,10 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -45,11 +45,7 @@ logger = logging.getLogger("synapse.app.pusher")
class PusherSlaveStore(
SlavedEventStore,
SlavedPusherStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
RoomStore,
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = __func__(
DataStore.update_pusher_last_stream_ordering_and_success
@@ -202,10 +198,14 @@ def start(config_options):
# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True
database_engine = create_engine(config.database_config)
ps = PusherServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)

View File

@@ -48,13 +48,14 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
@@ -150,7 +151,7 @@ class SynchrotronPresence(object):
def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
return defer.succeed(None)
pass
get_states = __func__(PresenceHandler.get_states)
get_state = __func__(PresenceHandler.get_state)
@@ -371,7 +372,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
def get_currently_syncing_users(self):
return self.presence_handler.get_currently_syncing_users()
async def process_and_notify(self, stream_name, token, rows):
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
@@ -379,14 +381,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
continue
event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@@ -418,11 +413,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = await self.store.get_rooms_for_user(row.user_id)
room_ids = yield self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
await self.presence_handler.process_replication_rows(token, rows)
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
@@ -442,10 +437,14 @@ def start(config_options):
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
ss = SynchrotronServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
application_service_handler=SynchrotronApplicationService(),
)

View File

@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import (
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -60,11 +60,11 @@ class UserDirectorySlaveStore(
UserDirectoryStore,
BaseSlavedStore,
):
def __init__(self, database: Database, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)
def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
@@ -199,6 +199,8 @@ def start(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
if config.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
@@ -213,8 +215,10 @@ def start(config_options):
ss = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)

View File

@@ -12,45 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from textwrap import indent
import yaml
from synapse.config._base import Config, ConfigError
logger = logging.getLogger(__name__)
class DatabaseConnectionConfig:
"""Contains the connection config for a particular database.
Args:
name: A label for the database, used for logging.
db_config: The config for a particular database, as per `database`
section of main config. Has three fields: `name` for database
module name, `args` for the args to give to the database
connector, and optional `data_stores` that is a list of stores to
provision on this database (defaulting to all).
"""
def __init__(self, name: str, db_config: dict):
if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],))
if db_config["name"] == "sqlite3":
db_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
data_stores = db_config.get("data_stores")
if data_stores is None:
data_stores = ["main", "state"]
self.name = name
self.config = db_config
self.data_stores = data_stores
from ._base import Config
class DatabaseConfig(Config):
@@ -59,43 +26,22 @@ class DatabaseConfig(Config):
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))
# We *experimentally* support specifying multiple databases via the
# `databases` key. This is a map from a label to database config in the
# same format as the `database` config option, plus an extra
# `data_stores` key to specify which data store goes where. For example:
#
# databases:
# master:
# name: psycopg2
# data_stores: ["main"]
# args: {}
# state:
# name: psycopg2
# data_stores: ["state"]
# args: {}
self.database_config = config.get("database")
multi_database_config = config.get("databases")
database_config = config.get("database")
if multi_database_config and database_config:
raise ConfigError("Can't specify both 'database' and 'datbases' in config")
if multi_database_config:
if config.get("database_path"):
raise ConfigError("Can't specify 'database_path' with 'databases'")
self.databases = [
DatabaseConnectionConfig(name, db_conf)
for name, db_conf in multi_database_config.items()
]
if self.database_config is None:
self.database_config = {"name": "sqlite3", "args": {}}
name = self.database_config.get("name", None)
if name == "psycopg2":
pass
elif name == "sqlite3":
self.database_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
else:
if database_config is None:
database_config = {"name": "sqlite3", "args": {}}
raise RuntimeError("Unsupported database type '%s'" % (name,))
self.databases = [DatabaseConnectionConfig("master", database_config)]
self.set_databasepath(config.get("database_path"))
self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
if not database_conf:
@@ -130,24 +76,11 @@ class DatabaseConfig(Config):
self.set_databasepath(args.database_path)
def set_databasepath(self, database_path):
if database_path is None:
return
if database_path != ":memory:":
database_path = self.abspath(database_path)
# We only support setting a database path if we have a single sqlite3
# database.
if len(self.databases) != 1:
raise ConfigError("Cannot specify 'database_path' with multiple databases")
database = self.get_single_database()
if database.config["name"] != "sqlite3":
# We don't raise here as we haven't done so before for this case.
logger.warn("Ignoring 'database_path' for non-sqlite3 database")
return
database.config["args"]["database"] = database_path
if self.database_config.get("name", None) == "sqlite3":
if database_path is not None:
self.database_config["args"]["database"] = database_path
@staticmethod
def add_arguments(parser):
@@ -158,11 +91,3 @@ class DatabaseConfig(Config):
metavar="SQLITE_DATABASE_PATH",
help="The path to a sqlite database to use.",
)
def get_single_database(self) -> DatabaseConnectionConfig:
"""Returns the database if there is only one, useful for e.g. tests
"""
if len(self.databases) != 1:
raise Exception("More than one database exists")
return self.databases[0]

View File

@@ -21,7 +21,6 @@ from __future__ import print_function
import email.utils
import os
from enum import Enum
from typing import Optional
import pkg_resources
@@ -102,7 +101,7 @@ class EmailConfig(Config):
# both in RegistrationConfig and here. We should factor this bit out
self.account_threepid_delegate_email = self.trusted_third_party_id_servers[
0
] # type: Optional[str]
]
self.using_identity_server_from_trusted_list = True
else:
raise ConfigError(
@@ -147,8 +146,6 @@ class EmailConfig(Config):
if k not in email_config:
missing.append("email." + k)
# public_baseurl is required to build password reset and validation links that
# will be emailed to users
if config.get("public_baseurl") is None:
missing.append("public_baseurl")
@@ -308,23 +305,8 @@ class EmailConfig(Config):
# smtp_user: "exampleusername"
# smtp_pass: "examplepassword"
# require_transport_security: false
#
# # notif_from defines the "From" address to use when sending emails.
# # It must be set if email sending is enabled.
# #
# # The placeholder '%(app)s' will be replaced by the application name,
# # which is normally 'app_name' (below), but may be overridden by the
# # Matrix client application.
# #
# # Note that the placeholder must be written '%(app)s', including the
# # trailing 's'.
# #
# notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
#
# # app_name defines the default value for '%(app)s' in notif_from. It
# # defaults to 'Matrix'.
# #
# #app_name: my_branded_matrix_server
# app_name: Matrix
#
# # Enable email notifications by default
# #

View File

@@ -108,7 +108,7 @@ class KeyConfig(Config):
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys")
config.get("old_signing_keys", {})
)
self.key_refresh_interval = self.parse_duration(
config.get("key_refresh_interval", "1d")
@@ -199,19 +199,14 @@ class KeyConfig(Config):
signing_key_path: "%(base_key_name)s.signing.key"
# The keys that the server used to sign messages with but won't use
# to sign new messages.
# to sign new messages. E.g. it has lost its private key
#
old_signing_keys:
# For each key, `key` should be the base64-encoded public key, and
# `expired_ts`should be the time (in milliseconds since the unix epoch) that
# it was last used.
#
# It is possible to build an entry from an old signing.key file using the
# `export_signing_key` script which is provided with synapse.
#
# For example:
#
#"ed25519:id": { key: "base64string", expired_ts: 123456789123 }
#old_signing_keys:
# "ed25519:auto":
# # Base64 encoded public key
# key: "The public part of your old signing key."
# # Millisecond POSIX timestamp when the key expired.
# expired_ts: 123456789123
# How long key response published by this server is valid for.
# Used to set the valid_until_ts in /key/v2 APIs.
@@ -295,8 +290,6 @@ class KeyConfig(Config):
raise ConfigError("Error reading %s: %s" % (name, str(e)))
def read_old_signing_keys(self, old_signing_keys):
if old_signing_keys is None:
return {}
keys = {}
for key_id, key_data in old_signing_keys.items():
if is_signing_algorithm_supported(key_id):

View File

@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import logging.config
import os
@@ -37,17 +37,10 @@ from synapse.logging._structured import (
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config, ConfigError
from ._base import Config
DEFAULT_LOG_CONFIG = Template(
"""\
# Log configuration for Synapse.
#
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
"""
version: 1
formatters:
@@ -88,18 +81,11 @@ disable_existing_loggers: false
"""
)
LOG_FILE_ERROR = """\
Support for the log_file configuration option and --log-file command-line option was
removed in Synapse 1.3.0. You should instead set up a separate log configuration file.
"""
class LoggingConfig(Config):
section = "logging"
def read_config(self, config, **kwargs):
if config.get("log_file"):
raise ConfigError(LOG_FILE_ERROR)
self.log_config = self.abspath(config.get("log_config"))
self.no_redirect_stdio = config.get("no_redirect_stdio", False)
@@ -120,8 +106,6 @@ class LoggingConfig(Config):
def read_arguments(self, args):
if args.no_redirect_stdio is not None:
self.no_redirect_stdio = args.no_redirect_stdio
if args.log_file is not None:
raise ConfigError(LOG_FILE_ERROR)
@staticmethod
def add_arguments(parser):
@@ -134,10 +118,6 @@ class LoggingConfig(Config):
help="Do not redirect stdout/stderr to the log",
)
logging_group.add_argument(
"-f", "--log-file", dest="log_file", help=argparse.SUPPRESS,
)
def generate_files(self, config, config_dir_path):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):

View File

@@ -83,9 +83,10 @@ class RatelimitConfig(Config):
)
rc_admin_redaction = config.get("rc_admin_redaction")
self.rc_admin_redaction = None
if rc_admin_redaction:
self.rc_admin_redaction = RateLimitConfig(rc_admin_redaction)
else:
self.rc_admin_redaction = None
def generate_config_section(self, **kwargs):
return """\

View File

@@ -106,13 +106,6 @@ class RegistrationConfig(Config):
account_threepid_delegates = config.get("account_threepid_delegates") or {}
self.account_threepid_delegate_email = account_threepid_delegates.get("email")
self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
if self.account_threepid_delegate_msisdn and not self.public_baseurl:
raise ConfigError(
"The configuration option `public_baseurl` is required if "
"`account_threepid_delegate.msisdn` is set, such that "
"clients know where to submit validation tokens to. Please "
"configure `public_baseurl`."
)
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)

View File

@@ -156,6 +156,7 @@ class ContentRepositoryConfig(Config):
(provider_class, parsed_config, wrapper_config)
)
self.uploads_path = self.ensure_directory(config.get("uploads_path", "uploads"))
self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES)
@@ -230,6 +231,10 @@ class ContentRepositoryConfig(Config):
# config:
# directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
#
uploads_path: "%(uploads_path)s"
# The largest allowed upload size in bytes
#
#max_upload_size: 10M

View File

@@ -14,19 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from synapse.python_dependencies import DependencyException, check_requirements
from synapse.util.module_loader import load_module, load_python_module
from synapse.types import (
map_username_to_mxid_localpart,
mxid_localpart_allowed_characters,
)
from synapse.util.module_loader import load_python_module
from ._base import Config, ConfigError
logger = logging.getLogger(__name__)
DEFAULT_USER_MAPPING_PROVIDER = (
"synapse.handlers.saml_handler.DefaultSamlMappingProvider"
)
def _dict_merge(merge_dict, into_dict):
"""Do a deep merge of two dicts
@@ -77,69 +75,15 @@ class SAML2Config(Config):
self.saml2_enabled = True
self.saml2_mxid_source_attribute = saml2_config.get(
"mxid_source_attribute", "uid"
)
self.saml2_grandfathered_mxid_source_attribute = saml2_config.get(
"grandfathered_mxid_source_attribute", "uid"
)
# user_mapping_provider may be None if the key is present but has no value
ump_dict = saml2_config.get("user_mapping_provider") or {}
# Use the default user mapping provider if not set
ump_dict.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
# Ensure a config is present
ump_dict["config"] = ump_dict.get("config") or {}
if ump_dict["module"] == DEFAULT_USER_MAPPING_PROVIDER:
# Load deprecated options for use by the default module
old_mxid_source_attribute = saml2_config.get("mxid_source_attribute")
if old_mxid_source_attribute:
logger.warning(
"The config option saml2_config.mxid_source_attribute is deprecated. "
"Please use saml2_config.user_mapping_provider.config"
".mxid_source_attribute instead."
)
ump_dict["config"]["mxid_source_attribute"] = old_mxid_source_attribute
old_mxid_mapping = saml2_config.get("mxid_mapping")
if old_mxid_mapping:
logger.warning(
"The config option saml2_config.mxid_mapping is deprecated. Please "
"use saml2_config.user_mapping_provider.config.mxid_mapping instead."
)
ump_dict["config"]["mxid_mapping"] = old_mxid_mapping
# Retrieve an instance of the module's class
# Pass the config dictionary to the module for processing
(
self.saml2_user_mapping_provider_class,
self.saml2_user_mapping_provider_config,
) = load_module(ump_dict)
# Ensure loaded user mapping module has defined all necessary methods
# Note parse_config() is already checked during the call to load_module
required_methods = [
"get_saml_attributes",
"saml_response_to_user_attributes",
]
missing_methods = [
method
for method in required_methods
if not hasattr(self.saml2_user_mapping_provider_class, method)
]
if missing_methods:
raise ConfigError(
"Class specified by saml2_config."
"user_mapping_provider.module is missing required "
"methods: %s" % (", ".join(missing_methods),)
)
# Get the desired saml auth response attributes from the module
saml2_config_dict = self._default_saml_config_dict(
*self.saml2_user_mapping_provider_class.get_saml_attributes(
self.saml2_user_mapping_provider_config
)
)
saml2_config_dict = self._default_saml_config_dict()
_dict_merge(
merge_dict=saml2_config.get("sp_config", {}), into_dict=saml2_config_dict
)
@@ -159,27 +103,22 @@ class SAML2Config(Config):
saml2_config.get("saml_session_lifetime", "5m")
)
def _default_saml_config_dict(
self, required_attributes: set, optional_attributes: set
):
"""Generate a configuration dictionary with required and optional attributes that
will be needed to process new user registration
mapping = saml2_config.get("mxid_mapping", "hexencode")
try:
self.saml2_mxid_mapper = MXID_MAPPER_MAP[mapping]
except KeyError:
raise ConfigError("%s is not a known mxid_mapping" % (mapping,))
Args:
required_attributes: SAML auth response attributes that are
necessary to function
optional_attributes: SAML auth response attributes that can be used to add
additional information to Synapse user accounts, but are not required
Returns:
dict: A SAML configuration dictionary
"""
def _default_saml_config_dict(self):
import saml2
public_baseurl = self.public_baseurl
if public_baseurl is None:
raise ConfigError("saml2_config requires a public_baseurl to be set")
required_attributes = {"uid", self.saml2_mxid_source_attribute}
optional_attributes = {"displayName"}
if self.saml2_grandfathered_mxid_source_attribute:
optional_attributes.add(self.saml2_grandfathered_mxid_source_attribute)
optional_attributes -= required_attributes
@@ -268,58 +207,33 @@ class SAML2Config(Config):
#
#config_path: "%(config_dir_path)s/sp_conf.py"
# The lifetime of a SAML session. This defines how long a user has to
# the lifetime of a SAML session. This defines how long a user has to
# complete the authentication process, if allow_unsolicited is unset.
# The default is 5 minutes.
#
#saml_session_lifetime: 5m
# An external module can be provided here as a custom solution to
# mapping attributes returned from a saml provider onto a matrix user.
# The SAML attribute (after mapping via the attribute maps) to use to derive
# the Matrix ID from. 'uid' by default.
#
user_mapping_provider:
# The custom module's class. Uncomment to use a custom module.
#
#module: mapping_provider.SamlMappingProvider
#mxid_source_attribute: displayName
# Custom configuration values for the module. Below options are
# intended for the built-in provider, they should be changed if
# using a custom module. This section will be passed as a Python
# dictionary to the module's `parse_config` method.
#
config:
# The SAML attribute (after mapping via the attribute maps) to use
# to derive the Matrix ID from. 'uid' by default.
#
# Note: This used to be configured by the
# saml2_config.mxid_source_attribute option. If that is still
# defined, its value will be used instead.
#
#mxid_source_attribute: displayName
# The mapping system to use for mapping the saml attribute onto a matrix ID.
# Options include:
# * 'hexencode' (which maps unpermitted characters to '=xx')
# * 'dotreplace' (which replaces unpermitted characters with '.').
# The default is 'hexencode'.
#
#mxid_mapping: dotreplace
# The mapping system to use for mapping the saml attribute onto a
# matrix ID.
#
# Options include:
# * 'hexencode' (which maps unpermitted characters to '=xx')
# * 'dotreplace' (which replaces unpermitted characters with
# '.').
# The default is 'hexencode'.
#
# Note: This used to be configured by the
# saml2_config.mxid_mapping option. If that is still defined, its
# value will be used instead.
#
#mxid_mapping: dotreplace
# In previous versions of synapse, the mapping from SAML attribute to
# MXID was always calculated dynamically rather than stored in a
# table. For backwards- compatibility, we will look for user_ids
# matching such a pattern before creating a new account.
# In previous versions of synapse, the mapping from SAML attribute to MXID was
# always calculated dynamically rather than stored in a table. For backwards-
# compatibility, we will look for user_ids matching such a pattern before
# creating a new account.
#
# This setting controls the SAML attribute which will be used for this
# backwards-compatibility lookup. Typically it should be 'uid', but if
# the attribute maps are changed, it may be necessary to change it.
# backwards-compatibility lookup. Typically it should be 'uid', but if the
# attribute maps are changed, it may be necessary to change it.
#
# The default is 'uid'.
#
@@ -327,3 +241,23 @@ class SAML2Config(Config):
""" % {
"config_dir_path": config_dir_path
}
DOT_REPLACE_PATTERN = re.compile(
("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),))
)
def dot_replace_for_mxid(username: str) -> str:
username = username.lower()
username = DOT_REPLACE_PATTERN.sub(".", username)
# regular mxids aren't allowed to start with an underscore either
username = re.sub("^_", "", username)
return username
MXID_MAPPER_MAP = {
"hexencode": map_username_to_mxid_localpart,
"dotreplace": dot_replace_for_mxid,
}

View File

@@ -19,7 +19,7 @@ import logging
import os.path
import re
from textwrap import indent
from typing import Dict, List, Optional
from typing import List
import attr
import yaml
@@ -102,12 +102,6 @@ class ServerConfig(Config):
"require_auth_for_profile_requests", False
)
# Whether to require sharing a room with a user to retrieve their
# profile data
self.limit_profile_requests_to_users_who_share_rooms = config.get(
"limit_profile_requests_to_users_who_share_rooms", False,
)
if "restrict_public_rooms_to_local_users" in config and (
"allow_public_rooms_without_auth" in config
or "allow_public_rooms_over_federation" in config
@@ -124,16 +118,15 @@ class ServerConfig(Config):
self.allow_public_rooms_without_auth = False
self.allow_public_rooms_over_federation = False
else:
# If set to 'true', removes the need for authentication to access the server's
# public rooms directory through the client API, meaning that anyone can
# query the room directory. Defaults to 'false'.
# If set to 'False', requires authentication to access the server's public
# rooms directory through the client API. Defaults to 'True'.
self.allow_public_rooms_without_auth = config.get(
"allow_public_rooms_without_auth", False
"allow_public_rooms_without_auth", True
)
# If set to 'true', allows any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'false'.
# If set to 'False', forbids any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'True'.
self.allow_public_rooms_over_federation = config.get(
"allow_public_rooms_over_federation", False
"allow_public_rooms_over_federation", True
)
default_room_version = config.get("default_room_version", DEFAULT_ROOM_VERSION)
@@ -206,7 +199,7 @@ class ServerConfig(Config):
self.admin_contact = config.get("admin_contact", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None # type: Optional[dict]
self.federation_domain_whitelist = None
federation_domain_whitelist = config.get("federation_domain_whitelist", None)
if federation_domain_whitelist is not None:
@@ -253,124 +246,6 @@ class ServerConfig(Config):
# events with profile information that differ from the target's global profile.
self.allow_per_room_profiles = config.get("allow_per_room_profiles", True)
retention_config = config.get("retention")
if retention_config is None:
retention_config = {}
self.retention_enabled = retention_config.get("enabled", False)
retention_default_policy = retention_config.get("default_policy")
if retention_default_policy is not None:
self.retention_default_min_lifetime = retention_default_policy.get(
"min_lifetime"
)
if self.retention_default_min_lifetime is not None:
self.retention_default_min_lifetime = self.parse_duration(
self.retention_default_min_lifetime
)
self.retention_default_max_lifetime = retention_default_policy.get(
"max_lifetime"
)
if self.retention_default_max_lifetime is not None:
self.retention_default_max_lifetime = self.parse_duration(
self.retention_default_max_lifetime
)
if (
self.retention_default_min_lifetime is not None
and self.retention_default_max_lifetime is not None
and (
self.retention_default_min_lifetime
> self.retention_default_max_lifetime
)
):
raise ConfigError(
"The default retention policy's 'min_lifetime' can not be greater"
" than its 'max_lifetime'"
)
else:
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None
self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
if self.retention_allowed_lifetime_min is not None:
self.retention_allowed_lifetime_min = self.parse_duration(
self.retention_allowed_lifetime_min
)
self.retention_allowed_lifetime_max = retention_config.get(
"allowed_lifetime_max"
)
if self.retention_allowed_lifetime_max is not None:
self.retention_allowed_lifetime_max = self.parse_duration(
self.retention_allowed_lifetime_max
)
if (
self.retention_allowed_lifetime_min is not None
and self.retention_allowed_lifetime_max is not None
and self.retention_allowed_lifetime_min
> self.retention_allowed_lifetime_max
):
raise ConfigError(
"Invalid retention policy limits: 'allowed_lifetime_min' can not be"
" greater than 'allowed_lifetime_max'"
)
self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]]
for purge_job_config in retention_config.get("purge_jobs", []):
interval_config = purge_job_config.get("interval")
if interval_config is None:
raise ConfigError(
"A retention policy's purge jobs configuration must have the"
" 'interval' key set."
)
interval = self.parse_duration(interval_config)
shortest_max_lifetime = purge_job_config.get("shortest_max_lifetime")
if shortest_max_lifetime is not None:
shortest_max_lifetime = self.parse_duration(shortest_max_lifetime)
longest_max_lifetime = purge_job_config.get("longest_max_lifetime")
if longest_max_lifetime is not None:
longest_max_lifetime = self.parse_duration(longest_max_lifetime)
if (
shortest_max_lifetime is not None
and longest_max_lifetime is not None
and shortest_max_lifetime > longest_max_lifetime
):
raise ConfigError(
"A retention policy's purge jobs configuration's"
" 'shortest_max_lifetime' value can not be greater than its"
" 'longest_max_lifetime' value."
)
self.retention_purge_jobs.append(
{
"interval": interval,
"shortest_max_lifetime": shortest_max_lifetime,
"longest_max_lifetime": longest_max_lifetime,
}
)
if not self.retention_purge_jobs:
self.retention_purge_jobs = [
{
"interval": self.parse_duration("1d"),
"shortest_max_lifetime": None,
"longest_max_lifetime": None,
}
]
self.listeners = [] # type: List[dict]
for listener in config.get("listeners", []):
if not isinstance(listener.get("port", None), int):
@@ -497,8 +372,6 @@ class ServerConfig(Config):
"cleanup_extremities_with_dummy_events", True
)
self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)
def has_tls_listener(self) -> bool:
return any(l["tls"] for l in self.listeners)
@@ -627,23 +500,15 @@ class ServerConfig(Config):
#
#require_auth_for_profile_requests: true
# Uncomment to require a user to share a room with another user in order
# to retrieve their profile information. Only checked on Client-Server
# requests. Profile requests from other servers should be checked by the
# requesting server. Defaults to 'false'.
# If set to 'false', requires authentication to access the server's public rooms
# directory through the client API. Defaults to 'true'.
#
#limit_profile_requests_to_users_who_share_rooms: true
#allow_public_rooms_without_auth: false
# If set to 'true', removes the need for authentication to access the server's
# public rooms directory through the client API, meaning that anyone can
# query the room directory. Defaults to 'false'.
# If set to 'false', forbids any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'true'.
#
#allow_public_rooms_without_auth: true
# If set to 'true', allows any other homeserver to fetch the server's public
# rooms directory via federation. Defaults to 'false'.
#
#allow_public_rooms_over_federation: true
#allow_public_rooms_over_federation: false
# The default room version for newly created rooms.
#
@@ -896,69 +761,6 @@ class ServerConfig(Config):
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d
# Message retention policy at the server level.
#
# Room admins and mods can define a retention period for their rooms using the
# 'm.room.retention' state event, and server admins can cap this period by setting
# the 'allowed_lifetime_min' and 'allowed_lifetime_max' config options.
#
# If this feature is enabled, Synapse will regularly look for and purge events
# which are older than the room's maximum retention period. Synapse will also
# filter events received over federation so that events that should have been
# purged are ignored and not stored again.
#
retention:
# The message retention policies feature is disabled by default. Uncomment the
# following line to enable it.
#
#enabled: true
# Default retention policy. If set, Synapse will apply it to rooms that lack the
# 'm.room.retention' state event. Currently, the value of 'min_lifetime' doesn't
# matter much because Synapse doesn't take it into account yet.
#
#default_policy:
# min_lifetime: 1d
# max_lifetime: 1y
# Retention policy limits. If set, a user won't be able to send a
# 'm.room.retention' event which features a 'min_lifetime' or a 'max_lifetime'
# that's not within this range. This is especially useful in closed federations,
# in which server admins can make sure every federating server applies the same
# rules.
#
#allowed_lifetime_min: 1d
#allowed_lifetime_max: 1y
# Server admins can define the settings of the background jobs purging the
# events which lifetime has expired under the 'purge_jobs' section.
#
# If no configuration is provided, a single job will be set up to delete expired
# events in every room daily.
#
# Each job's configuration defines which range of message lifetimes the job
# takes care of. For example, if 'shortest_max_lifetime' is '2d' and
# 'longest_max_lifetime' is '3d', the job will handle purging expired events in
# rooms whose state defines a 'max_lifetime' that's both higher than 2 days, and
# lower than or equal to 3 days. Both the minimum and the maximum value of a
# range are optional, e.g. a job with no 'shortest_max_lifetime' and a
# 'longest_max_lifetime' of '3d' will handle every room with a retention policy
# which 'max_lifetime' is lower than or equal to three days.
#
# The rationale for this per-job configuration is that some rooms might have a
# retention policy with a low 'max_lifetime', where history needs to be purged
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
# that purge to be performed by a job that's iterating over every room it knows,
# which would be quite heavy on the server.
#
#purge_jobs:
# - shortest_max_lifetime: 1d
# longest_max_lifetime: 3d
# interval: 5m:
# - shortest_max_lifetime: 3d
# longest_max_lifetime: 1y
# interval: 24h
"""
% locals()
)

View File

@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import hashlib
import logging
@@ -40,11 +40,8 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
# some malformed events lack a 'hashes'. Protect against it being missing
# or a weird type by basically treating it the same as an unhashed event.
hashes = event.get("hashes")
# nb it might be a frozendict or a dict
if not isinstance(hashes, collections.abc.Mapping):
raise SynapseError(
400, "Malformed 'hashes': %s" % (type(hashes),), Codes.UNAUTHORIZED
)
if not isinstance(hashes, dict):
raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
if name not in hashes:
raise SynapseError(

View File

@@ -511,18 +511,17 @@ class BaseV2KeyFetcher(object):
server_name = response_json["server_name"]
verified = False
for key_id in response_json["signatures"].get(server_name, {}):
# each of the keys used for the signature must be present in the response
# json.
key = verify_keys.get(key_id)
if not key:
# the key may not be present in verify_keys if:
# * we got the key from the notary server, and:
# * the key belongs to the notary server, and:
# * the notary server is using a different key to sign notary
# responses.
continue
raise KeyLookupError(
"Key response is signed by key id %s:%s but that key is not "
"present in the response" % (server_name, key_id)
)
verify_signed_json(response_json, server_name, key.verify_key)
verified = True
break
if not verified:
raise KeyLookupError(

View File

@@ -14,7 +14,6 @@
# limitations under the License.
import logging
from typing import Set, Tuple
from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
@@ -43,26 +42,12 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
Returns:
if the auth checks pass.
"""
assert isinstance(auth_events, dict)
if do_size_check:
_check_size_limits(event)
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
room_id = event.room_id
# I'm not really expecting to get auth events in the wrong room, but let's
# sanity-check it
for auth_event in auth_events.values():
if auth_event.room_id != room_id:
raise Exception(
"During auth for event %s in room %s, found event %s in the state "
"which is in room %s"
% (event.event_id, room_id, auth_event.event_id, auth_event.room_id)
)
if do_sig_check:
sender_domain = get_domain_from_id(event.sender)
@@ -89,6 +74,12 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
if not event.signatures.get(event_id_domain):
raise AuthError(403, "Event not signed by sending server")
if auth_events is None:
# Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now)
logger.warning("Trusting event: %s", event.event_id)
return
if event.type == EventTypes.Create:
sender_domain = get_domain_from_id(event.sender)
room_id_domain = get_domain_from_id(event.room_id)
@@ -634,7 +625,7 @@ def get_public_keys(invite_event):
return public_keys
def auth_types_for_event(event) -> Set[Tuple[str]]:
def auth_types_for_event(event):
"""Given an event, return a list of (EventType, StateKey) that may be
needed to auth the event. The returned list may be a superset of what
would actually be required depending on the full state of the room.
@@ -643,20 +634,20 @@ def auth_types_for_event(event) -> Set[Tuple[str]]:
actually auth the event.
"""
if event.type == EventTypes.Create:
return set()
return []
auth_types = {
auth_types = [
(EventTypes.PowerLevels, ""),
(EventTypes.Member, event.sender),
(EventTypes.Create, ""),
}
]
if event.type == EventTypes.Member:
membership = event.content["membership"]
if membership in [Membership.JOIN, Membership.INVITE]:
auth_types.add((EventTypes.JoinRules, ""))
auth_types.append((EventTypes.JoinRules, ""))
auth_types.add((EventTypes.Member, event.state_key))
auth_types.append((EventTypes.Member, event.state_key))
if membership == Membership.INVITE:
if "third_party_invite" in event.content:
@@ -664,6 +655,6 @@ def auth_types_for_event(event) -> Set[Tuple[str]]:
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
)
auth_types.add(key)
auth_types.append(key)
return auth_types

View File

@@ -149,7 +149,7 @@ class EventContext:
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_ids = yield self.get_prev_state_ids()
prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
@@ -167,13 +167,12 @@ class EventContext:
}
@staticmethod
def deserialize(storage, input):
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Args:
storage (Storage): Used to convert AS ID to AS object and fetch
state.
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
@@ -182,7 +181,6 @@ class EventContext:
context = _AsyncEventContextImpl(
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
storage=storage,
prev_state_id=input["prev_state_id"],
event_type=input["event_type"],
event_state_key=input["event_state_key"],
@@ -195,7 +193,7 @@ class EventContext:
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = storage.main.get_app_service_by_id(app_service_id)
context.app_service = store.get_app_service_by_id(app_service_id)
return context
@@ -218,7 +216,7 @@ class EventContext:
return self._state_group
@defer.inlineCallbacks
def get_current_state_ids(self):
def get_current_state_ids(self, store):
"""
Gets the room state map, including this event - ie, the state in ``state_group``
@@ -236,11 +234,11 @@ class EventContext:
if self.rejected:
raise RuntimeError("Attempt to access state_ids of rejected event")
yield self._ensure_fetched()
yield self._ensure_fetched(store)
return self._current_state_ids
@defer.inlineCallbacks
def get_prev_state_ids(self):
def get_prev_state_ids(self, store):
"""
Gets the room state map, excluding this event.
@@ -252,7 +250,7 @@ class EventContext:
Maps a (type, state_key) to the event ID of the state event matching
this tuple.
"""
yield self._ensure_fetched()
yield self._ensure_fetched(store)
return self._prev_state_ids
def get_cached_current_state_ids(self):
@@ -272,7 +270,7 @@ class EventContext:
return self._current_state_ids
def _ensure_fetched(self):
def _ensure_fetched(self, store):
return defer.succeed(None)
@@ -284,8 +282,6 @@ class _AsyncEventContextImpl(EventContext):
Attributes:
_storage (Storage)
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
@@ -299,30 +295,28 @@ class _AsyncEventContextImpl(EventContext):
that was replaced.
"""
# This needs to have a default as we're inheriting
_storage = attr.ib(default=None)
_prev_state_id = attr.ib(default=None)
_event_type = attr.ib(default=None)
_event_state_key = attr.ib(default=None)
_fetching_state_deferred = attr.ib(default=None)
def _ensure_fetched(self):
def _ensure_fetched(self, store):
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(self._fill_out_state)
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store
)
return make_deferred_yieldable(self._fetching_state_deferred)
@defer.inlineCallbacks
def _fill_out_state(self):
def _fill_out_state(self, store):
"""Called to populate the _current_state_ids and _prev_state_ids
attributes by loading from the database.
"""
if self.state_group is None:
return
self._current_state_ids = yield self._storage.state.get_state_ids_for_group(
self.state_group
)
self._current_state_ids = yield store.get_state_ids_for_group(self.state_group)
if self._prev_state_id and self._event_state_key is not None:
self._prev_state_ids = dict(self._current_state_ids)

View File

@@ -53,7 +53,7 @@ class ThirdPartyEventRules(object):
if self.third_party_rules is None:
return True
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
# Retrieve the state events from the database.
state_events = {}

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six import integer_types, string_types
from six import string_types
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
@@ -22,12 +22,11 @@ from synapse.types import EventID, RoomID, UserID
class EventValidator(object):
def validate_new(self, event, config):
def validate_new(self, event):
"""Validates the event has roughly the right format
Args:
event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
event (FrozenEvent)
"""
self.validate_builder(event)
@@ -68,99 +67,6 @@ class EventValidator(object):
Codes.INVALID_PARAM,
)
if event.type == EventTypes.Retention:
self._validate_retention(event, config)
def _validate_retention(self, event, config):
"""Checks that an event that defines the retention policy for a room respects the
boundaries imposed by the server's administrator.
Args:
event (FrozenEvent): The event to validate.
config (Config): The homeserver's configuration.
"""
min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime")
if min_lifetime is not None:
if not isinstance(min_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'min_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and min_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be lower than the minimum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and min_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'min_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if max_lifetime is not None:
if not isinstance(max_lifetime, integer_types):
raise SynapseError(
code=400,
msg="'max_lifetime' must be an integer",
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_min is not None
and max_lifetime < config.retention_allowed_lifetime_min
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be lower than the minimum allowed value"
" enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
config.retention_allowed_lifetime_max is not None
and max_lifetime > config.retention_allowed_lifetime_max
):
raise SynapseError(
code=400,
msg=(
"'max_lifetime' can't be greater than the maximum allowed"
" value enforced by the server's administrator"
),
errcode=Codes.BAD_JSON,
)
if (
min_lifetime is not None
and max_lifetime is not None
and min_lifetime > max_lifetime
):
raise SynapseError(
code=400,
msg="'min_lifetime' can't be greater than 'max_lifetime",
errcode=Codes.BAD_JSON,
)
def validate_builder(self, event):
"""Validates that the builder/event has roughly the right format. Only
checks values that we expect a proto event to have, rather than all the

View File

@@ -18,6 +18,8 @@ import copy
import itertools
import logging
from six.moves import range
from prometheus_client import Counter
from twisted.internet import defer
@@ -37,7 +39,7 @@ from synapse.api.room_versions import (
)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.utils import log_function
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
@@ -308,26 +310,162 @@ class FederationClient(FederationBase):
return signed_pdu
@defer.inlineCallbacks
def get_room_state_ids(self, destination: str, room_id: str, event_id: str):
"""Calls the /state_ids endpoint to fetch the state at a particular point
in the room, and the auth events for the given event
@log_function
def get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the room state at a given event from a remote homeserver.
Args:
destination (str): The remote homeserver to query for the state.
room_id (str): The id of the room we're interested in.
event_id (str): The id of the event we want the state at.
Returns:
Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids)
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
"""
result = yield self.transport_layer.get_room_state_ids(
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
# However, this may 404 if the other side has an old synapse.
result = yield self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id
)
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest(
destination, room_id, set(state_event_ids + auth_event_ids)
)
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s: %s",
room_id,
failed_to_fetch,
)
event_map = {ev.event_id: ev for ev in fetched_events}
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
auth_chain = [
event_map[e_id] for e_id in auth_event_ids if e_id in event_map
]
auth_chain.sort(key=lambda e: e.depth)
return pdus, auth_chain
except HttpResponseException as e:
if e.code == 400 or e.code == 404:
logger.info("Failed to use get_room_state_ids API, falling back")
else:
raise e
result = yield self.transport_layer.get_room_state(
destination, room_id, event_id=event_id
)
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
room_version = yield self.store.get_room_version(room_id)
format_ver = room_version_to_event_format(room_version)
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
raise Exception("invalid response from /state_ids")
pdus = [
event_from_pdu_json(p, format_ver, outlier=True) for p in result["pdus"]
]
return state_event_ids, auth_event_ids
auth_chain = [
event_from_pdu_json(p, format_ver, outlier=True)
for p in result.get("auth_chain", [])
]
seen_events = yield self.store.get_events(
[ev.event_id for ev in itertools.chain(pdus, auth_chain)]
)
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in pdus if p.event_id not in seen_events],
outlier=True,
room_version=room_version,
)
signed_pdus.extend(
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
)
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination,
[p for p in auth_chain if p.event_id not in seen_events],
outlier=True,
room_version=room_version,
)
signed_auth.extend(
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
)
signed_auth.sort(key=lambda e: e.depth)
return signed_pdus, signed_auth
@defer.inlineCallbacks
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
"""Fetch events from a remote destination, checking if we already have them.
Args:
destination (str)
room_id (str)
event_ids (list)
Returns:
Deferred: A deferred resolving to a 2-tuple where the first is a list of
events and the second is a list of event ids that we failed to fetch.
"""
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = list(seen_events.values())
failed_to_fetch = set()
missing_events = set(event_ids)
for k in seen_events:
missing_events.discard(k)
if not missing_events:
return signed_events, failed_to_fetch
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
event_ids,
)
room_version = yield self.store.get_room_version(room_id)
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
batch = set(missing_events[i : i + batch_size])
deferreds = [
run_in_background(
self.get_pdu,
destinations=[destination],
event_id=e_id,
room_version=room_version,
)
for e_id in batch
]
res = yield make_deferred_yieldable(
defer.DeferredList(deferreds, consumeErrors=True)
)
for success, result in res:
if success and result:
signed_events.append(result)
batch.discard(result.event_id)
# We removed all events we successfully fetched from `batch`
failed_to_fetch.update(batch)
return signed_events, failed_to_fetch
@defer.inlineCallbacks
@log_function
@@ -526,7 +664,13 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
content = yield self._do_send_join(destination, pdu)
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
logger.debug("Got content: %s", content)
@@ -593,44 +737,6 @@ class FederationClient(FederationBase):
return self._try_destination_list("send_join", destinations, send_request)
@defer.inlineCallbacks
def _do_send_join(self, destination, pdu):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_join_v2(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_join_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
room_version = yield self.store.get_room_version(room_id)
@@ -740,50 +846,18 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
content = yield self._do_send_leave(destination, pdu)
logger.debug("Got content: %s", content)
return None
return self._try_destination_list("send_leave", destinations, send_request)
@defer.inlineCallbacks
def _do_send_leave(self, destination, pdu):
time_now = self._clock.time_msec()
try:
content = yield self.transport_layer.send_leave_v2(
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_leave(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
return content
except HttpResponseException as e:
if e.code in [400, 404]:
err = e.to_synapse_error()
logger.debug("Got content: %s", content)
return None
# If we receive an error response that isn't a generic error, or an
# unrecognised endpoint error, we assume that the remote understands
# the v2 invite API and this is a legitimate error.
if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
raise err
else:
raise e.to_synapse_error()
logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
resp = yield self.transport_layer.send_leave_v1(
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]
return self._try_destination_list("send_leave", destinations, send_request)
def get_public_rooms(
self,

View File

@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -74,7 +73,6 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -266,6 +264,9 @@ class FederationServer(FederationBase):
await self.registry.on_edu(edu_type, origin, content)
async def on_context_state_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
@@ -279,18 +280,13 @@ class FederationServer(FederationBase):
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))):
resp = dict(
await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)
resp = await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)
room_version = await self.store.get_room_version(room_id)
resp["room_version"] = room_version
return 200, resp
async def on_state_ids_request(self, origin, room_id, event_id):
@@ -310,11 +306,7 @@ class FederationServer(FederationBase):
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
async def _on_context_state_request_compute(self, room_id, event_id):
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (await self.state.get_current_state(room_id)).values()
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
return {
@@ -384,10 +376,15 @@ class FederationServer(FederationBase):
res_pdus = await self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
return {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
}
return (
200,
{
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
},
)
async def on_make_leave_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin)
@@ -414,7 +411,7 @@ class FederationServer(FederationBase):
pdu = await self._check_sigs_and_hash(room_version, pdu)
await self.handler.on_send_leave_request(origin, pdu)
return {}
return 200, {}
async def on_event_auth(self, origin, room_id, event_id):
with (await self._server_linearizer.queue((origin, room_id))):

View File

@@ -38,6 +38,30 @@ class TransportLayerClient(object):
self.server_name = hs.hostname
self.client = hs.get_http_client()
@log_function
def get_room_state(self, destination, room_id, event_id):
""" Requests all state for a given room from the given server at the
given event.
Args:
destination (str): The host name of the remote homeserver we want
to get the state from.
context (str): The name of the context we want the state of
event_id (str): The event we want the context at.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_room_state dest=%s, room=%s", destination, room_id)
path = _create_v1_path("/state/%s", room_id)
return self.client.get_json(
destination,
path=path,
args={"event_id": event_id},
try_trailing_slash_on_400=True,
)
@log_function
def get_room_state_ids(self, destination, room_id, event_id):
""" Requests all state for a given room from the given server at the
@@ -243,7 +267,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join_v1(self, destination, room_id, event_id, content):
def send_join(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@@ -254,18 +278,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination, path=path, data=content
)
return response
@defer.inlineCallbacks
@log_function
def send_leave_v1(self, destination, room_id, event_id, content):
def send_leave(self, destination, room_id, event_id, content):
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@@ -281,24 +294,6 @@ class TransportLayerClient(object):
return response
@defer.inlineCallbacks
@log_function
def send_leave_v2(self, destination, room_id, event_id, content):
path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
# we want to do our best to send this through. The problem is
# that if it fails, we won't retry it later, so if the remote
# server was just having a momentary blip, the room will be out of
# sync.
ignore_backoff=True,
)
return response
@defer.inlineCallbacks
@log_function
def send_invite_v1(self, destination, room_id, event_id, content):

View File

@@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet):
return await self.handler.on_pdu_request(origin, event_id)
class FederationStateV1Servlet(BaseFederationServlet):
class FederationStateServlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context.
@@ -429,7 +429,7 @@ class FederationStateV1Servlet(BaseFederationServlet):
return await self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None, required=False),
parse_string_from_args(query, "event_id", None, required=True),
)
@@ -506,19 +506,9 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
return 200, content
class FederationV1SendLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, (200, content)
class FederationV2SendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
PREFIX = FEDERATION_V2_PREFIX
async def on_PUT(self, origin, content, query, room_id, event_id):
content = await self.handler.on_send_leave_request(origin, content, room_id)
return 200, content
@@ -531,21 +521,9 @@ class FederationEventAuthServlet(BaseFederationServlet):
return await self.handler.on_event_auth(origin, context, event_id)
class FederationV1SendJoinServlet(BaseFederationServlet):
class FederationSendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = await self.handler.on_send_join_request(origin, content, context)
return 200, (200, content)
class FederationV2SendJoinServlet(BaseFederationServlet):
PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
PREFIX = FEDERATION_V2_PREFIX
async def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
@@ -1382,17 +1360,15 @@ class RoomComplexityServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
FederationStateV1Servlet,
FederationStateServlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
FederationEventServlet,
FederationV1SendJoinServlet,
FederationV2SendJoinServlet,
FederationV1SendLeaveServlet,
FederationV2SendLeaveServlet,
FederationSendJoinServlet,
FederationSendLeaveServlet,
FederationV1InviteServlet,
FederationV2InviteServlet,
FederationQueryAuthServlet,

View File

@@ -773,11 +773,6 @@ class GroupsServerHandler(object):
if not self.hs.is_mine_id(user_id):
yield self.store.maybe_delete_remote_profile_cache(user_id)
# Delete group if the last user has left
users = yield self.store.get_users_in_group(group_id, include_private=True)
if not users:
yield self.store.delete_group(group_id)
return {}
@defer.inlineCallbacks

View File

@@ -134,7 +134,7 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
current_state_ids = yield context.get_current_state_ids()
current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
list(current_state_ids.values())
)

View File

@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
class AccountDataEventSource(object):
def __init__(self, hs):
@@ -21,14 +23,15 @@ class AccountDataEventSource(object):
def get_current_key(self, direction="f"):
return self.store.get_max_account_data_stream_id()
async def get_new_events(self, user, from_key, **kwargs):
@defer.inlineCallbacks
def get_new_events(self, user, from_key, **kwargs):
user_id = user.to_string()
last_stream_id = from_key
current_stream_id = self.store.get_max_account_data_stream_id()
current_stream_id = yield self.store.get_max_account_data_stream_id()
results = []
tags = await self.store.get_updated_tags(user_id, last_stream_id)
tags = yield self.store.get_updated_tags(user_id, last_stream_id)
for room_id, room_tags in tags.items():
results.append(
@@ -38,7 +41,7 @@ class AccountDataEventSource(object):
(
account_data,
room_account_data,
) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id)
) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content})
@@ -50,3 +53,7 @@ class AccountDataEventSource(object):
)
return results, current_stream_id
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
return [], config.to_id

View File

@@ -18,7 +18,8 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.logging.context import make_deferred_yieldable
@@ -77,39 +78,42 @@ class AccountValidityHandler(object):
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"send_renewals", self._send_renewal_emails
"send_renewals", self.send_renewal_emails
)
self.clock.looping_call(send_emails, 30 * 60 * 1000)
async def _send_renewal_emails(self):
@defer.inlineCallbacks
def send_renewal_emails(self):
"""Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity``
configuration, and sends renewal emails to all of these users as long as they
have an email 3PID attached to their account.
"""
expiring_users = await self.store.get_users_expiring_soon()
expiring_users = yield self.store.get_users_expiring_soon()
if expiring_users:
for user in expiring_users:
await self._send_renewal_email(
yield self._send_renewal_email(
user_id=user["user_id"], expiration_ts=user["expiration_ts_ms"]
)
async def send_renewal_email_to_user(self, user_id: str):
expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
await self._send_renewal_email(user_id, expiration_ts)
@defer.inlineCallbacks
def send_renewal_email_to_user(self, user_id):
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
yield self._send_renewal_email(user_id, expiration_ts)
async def _send_renewal_email(self, user_id: str, expiration_ts: int):
@defer.inlineCallbacks
def _send_renewal_email(self, user_id, expiration_ts):
"""Sends out a renewal email to every email address attached to the given user
with a unique link allowing them to renew their account.
Args:
user_id: ID of the user to send email(s) to.
expiration_ts: Timestamp in milliseconds for the expiration date of
user_id (str): ID of the user to send email(s) to.
expiration_ts (int): Timestamp in milliseconds for the expiration date of
this user's account (used in the email templates).
"""
addresses = await self._get_email_addresses_for_user(user_id)
addresses = yield self._get_email_addresses_for_user(user_id)
# Stop right here if the user doesn't have at least one email address.
# In this case, they will have to ask their server admin to renew their
@@ -121,7 +125,7 @@ class AccountValidityHandler(object):
return
try:
user_display_name = await self.store.get_profile_displayname(
user_display_name = yield self.store.get_profile_displayname(
UserID.from_string(user_id).localpart
)
if user_display_name is None:
@@ -129,7 +133,7 @@ class AccountValidityHandler(object):
except StoreError:
user_display_name = user_id
renewal_token = await self._get_renewal_token(user_id)
renewal_token = yield self._get_renewal_token(user_id)
url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % (
self.hs.config.public_baseurl,
renewal_token,
@@ -161,7 +165,7 @@ class AccountValidityHandler(object):
logger.info("Sending renewal email to %s", address)
await make_deferred_yieldable(
yield make_deferred_yieldable(
self.sendmail(
self.hs.config.email_smtp_host,
self._raw_from,
@@ -176,18 +180,19 @@ class AccountValidityHandler(object):
)
)
await self.store.set_renewal_mail_status(user_id=user_id, email_sent=True)
yield self.store.set_renewal_mail_status(user_id=user_id, email_sent=True)
async def _get_email_addresses_for_user(self, user_id: str) -> List[str]:
@defer.inlineCallbacks
def _get_email_addresses_for_user(self, user_id):
"""Retrieve the list of email addresses attached to a user's account.
Args:
user_id: ID of the user to lookup email addresses for.
user_id (str): ID of the user to lookup email addresses for.
Returns:
Email addresses for this account.
defer.Deferred[list[str]]: Email addresses for this account.
"""
threepids = await self.store.user_get_threepids(user_id)
threepids = yield self.store.user_get_threepids(user_id)
addresses = []
for threepid in threepids:
@@ -196,15 +201,16 @@ class AccountValidityHandler(object):
return addresses
async def _get_renewal_token(self, user_id: str) -> str:
@defer.inlineCallbacks
def _get_renewal_token(self, user_id):
"""Generates a 32-byte long random string that will be inserted into the
user's renewal email's unique link, then saves it into the database.
Args:
user_id: ID of the user to generate a string for.
user_id (str): ID of the user to generate a string for.
Returns:
The generated string.
defer.Deferred[str]: The generated string.
Raises:
StoreError(500): Couldn't generate a unique string after 5 attempts.
@@ -213,52 +219,52 @@ class AccountValidityHandler(object):
while attempts < 5:
try:
renewal_token = stringutils.random_string(32)
await self.store.set_renewal_token_for_user(user_id, renewal_token)
yield self.store.set_renewal_token_for_user(user_id, renewal_token)
return renewal_token
except StoreError:
attempts += 1
raise StoreError(500, "Couldn't generate a unique string as refresh string.")
async def renew_account(self, renewal_token: str) -> bool:
@defer.inlineCallbacks
def renew_account(self, renewal_token):
"""Renews the account attached to a given renewal token by pushing back the
expiration date by the current validity period in the server's configuration.
Args:
renewal_token: Token sent with the renewal request.
renewal_token (str): Token sent with the renewal request.
Returns:
Whether the provided token is valid.
bool: Whether the provided token is valid.
"""
try:
user_id = await self.store.get_user_from_renewal_token(renewal_token)
user_id = yield self.store.get_user_from_renewal_token(renewal_token)
except StoreError:
return False
defer.returnValue(False)
logger.debug("Renewing an account for user %s", user_id)
await self.renew_account_for_user(user_id)
yield self.renew_account_for_user(user_id)
return True
defer.returnValue(True)
async def renew_account_for_user(
self, user_id: str, expiration_ts: int = None, email_sent: bool = False
) -> int:
@defer.inlineCallbacks
def renew_account_for_user(self, user_id, expiration_ts=None, email_sent=False):
"""Renews the account attached to a given user by pushing back the
expiration date by the current validity period in the server's
configuration.
Args:
renewal_token: Token sent with the renewal request.
expiration_ts: New expiration date. Defaults to now + validity period.
email_sen: Whether an email has been sent for this validity period.
renewal_token (str): Token sent with the renewal request.
expiration_ts (int): New expiration date. Defaults to now + validity period.
email_sent (bool): Whether an email has been sent for this validity period.
Defaults to False.
Returns:
New expiration date for this account, as a timestamp in
milliseconds since epoch.
defer.Deferred[int]: New expiration date for this account, as a timestamp
in milliseconds since epoch.
"""
if expiration_ts is None:
expiration_ts = self.clock.time_msec() + self._account_validity.period
await self.store.set_account_validity_for_user(
yield self.store.set_account_validity_for_user(
user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
)

View File

@@ -15,6 +15,8 @@
import logging
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.types import RoomStreamToken
from synapse.visibility import filter_events_for_client
@@ -31,10 +33,11 @@ class AdminHandler(BaseHandler):
self.storage = hs.get_storage()
self.state_store = self.storage.state
async def get_whois(self, user):
@defer.inlineCallbacks
def get_whois(self, user):
connections = []
sessions = await self.store.get_user_ip_and_agents(user)
sessions = yield self.store.get_user_ip_and_agents(user)
for session in sessions:
connections.append(
{
@@ -51,37 +54,37 @@ class AdminHandler(BaseHandler):
return ret
async def get_users(self):
"""Function to retrieve a list of users in users table.
@defer.inlineCallbacks
def get_users(self):
"""Function to reterive a list of users in users table.
Args:
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = await self.store.get_users()
ret = yield self.store.get_users()
return ret
async def get_users_paginate(self, start, limit, name, guests, deactivated):
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users.
@defer.inlineCallbacks
def get_users_paginate(self, order, start, limit):
"""Function to reterive a paginated list of users from
users list. This will return a json object, which contains
list of users and the total number of users in users table.
Args:
order (str): column name to order the select by this column
start (int): start number to begin the query from
limit (int): number of rows to retrieve
name (string): filter for user names
guests (bool): whether to in include guest users
deactivated (bool): whether to include deactivated users
limit (int): number of rows to reterive
Returns:
defer.Deferred: resolves to json list[dict[str, Any]]
defer.Deferred: resolves to json object {list[dict[str, Any]], count}
"""
ret = await self.store.get_users_paginate(
start, limit, name, guests, deactivated
)
ret = yield self.store.get_users_paginate(order, start, limit)
return ret
async def search_users(self, term):
@defer.inlineCallbacks
def search_users(self, term):
"""Function to search users list for one or more users with
the matched term.
@@ -90,7 +93,7 @@ class AdminHandler(BaseHandler):
Returns:
defer.Deferred: resolves to list[dict[str, Any]]
"""
ret = await self.store.search_users(term)
ret = yield self.store.search_users(term)
return ret
@@ -113,7 +116,8 @@ class AdminHandler(BaseHandler):
"""
return self.store.set_server_admin(user, admin)
async def export_user_data(self, user_id, writer):
@defer.inlineCallbacks
def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.
Args:
@@ -125,7 +129,7 @@ class AdminHandler(BaseHandler):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = await self.store.get_rooms_for_user_where_membership_is(
rooms = yield self.store.get_rooms_for_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
@@ -138,7 +142,7 @@ class AdminHandler(BaseHandler):
# We only try and fetch events for rooms the user has been in. If
# they've been e.g. invited to a room without joining then we handle
# those seperately.
rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)
for index, room in enumerate(rooms):
room_id = room.room_id
@@ -147,7 +151,7 @@ class AdminHandler(BaseHandler):
"[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
)
forgotten = await self.store.did_forget(user_id, room_id)
forgotten = yield self.store.did_forget(user_id, room_id)
if forgotten:
logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
continue
@@ -159,7 +163,7 @@ class AdminHandler(BaseHandler):
if room.membership == Membership.INVITE:
event_id = room.event_id
invite = await self.store.get_event(event_id, allow_none=True)
invite = yield self.store.get_event(event_id, allow_none=True)
if invite:
invited_state = invite.unsigned["invite_room_state"]
writer.write_invite(room_id, invite, invited_state)
@@ -170,7 +174,7 @@ class AdminHandler(BaseHandler):
# were joined. We estimate that point by looking at the
# stream_ordering of the last membership if it wasn't a join.
if room.membership == Membership.JOIN:
stream_ordering = self.store.get_room_max_stream_ordering()
stream_ordering = yield self.store.get_room_max_stream_ordering()
else:
stream_ordering = room.stream_ordering
@@ -196,7 +200,7 @@ class AdminHandler(BaseHandler):
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = await self.store.paginate_room_events(
events, _ = yield self.store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction="f"
)
if not events:
@@ -204,7 +208,7 @@ class AdminHandler(BaseHandler):
from_key = events[-1].internal_metadata.after
events = await filter_events_for_client(self.storage, user_id, events)
events = yield filter_events_for_client(self.storage, user_id, events)
writer.write_events(room_id, events)
@@ -240,7 +244,7 @@ class AdminHandler(BaseHandler):
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
state = await self.state_store.get_state_for_event(event_id)
state = yield self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)
return writer.finished()

View File

@@ -15,6 +15,8 @@
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
@@ -44,7 +46,8 @@ class DeactivateAccountHandler(BaseHandler):
self._account_validity_enabled = hs.config.account_validity.enabled
async def deactivate_account(self, user_id, erase_data, id_server=None):
@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account
Args:
@@ -71,11 +74,11 @@ class DeactivateAccountHandler(BaseHandler):
identity_server_supports_unbinding = True
# Retrieve the 3PIDs this user has bound to an identity server
threepids = await self.store.user_get_bound_threepids(user_id)
threepids = yield self.store.user_get_bound_threepids(user_id)
for threepid in threepids:
try:
result = await self._identity_handler.try_unbind_threepid(
result = yield self._identity_handler.try_unbind_threepid(
user_id,
{
"medium": threepid["medium"],
@@ -88,33 +91,30 @@ class DeactivateAccountHandler(BaseHandler):
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
raise SynapseError(400, "Failed to remove threepid from ID server")
await self.store.user_delete_threepid(
yield self.store.user_delete_threepid(
user_id, threepid["medium"], threepid["address"]
)
# Remove all 3PIDs this user has bound to the homeserver
await self.store.user_delete_threepids(user_id)
# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
await self._device_handler.delete_all_devices_for_user(user_id)
yield self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
await self._auth_handler.delete_access_tokens_for_user(user_id)
yield self._auth_handler.delete_access_tokens_for_user(user_id)
await self.store.user_set_password_hash(user_id, None)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
await self.store.add_user_pending_deactivation(user_id)
yield self.store.add_user_pending_deactivation(user_id)
# delete from user directory
await self.user_directory_handler.handle_user_deactivated(user_id)
yield self.user_directory_handler.handle_user_deactivated(user_id)
# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
await self.store.mark_user_erased(user_id)
yield self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
@@ -122,29 +122,30 @@ class DeactivateAccountHandler(BaseHandler):
# Reject all pending invites for the user, so that the user doesn't show up in the
# "invited" section of rooms' members list.
await self._reject_pending_invites_for_user(user_id)
yield self._reject_pending_invites_for_user(user_id)
# Remove all information on the user from the account_validity table.
if self._account_validity_enabled:
await self.store.delete_account_validity_for_user(user_id)
yield self.store.delete_account_validity_for_user(user_id)
# Mark the user as deactivated.
await self.store.set_user_deactivated_status(user_id, True)
yield self.store.set_user_deactivated_status(user_id, True)
return identity_server_supports_unbinding
async def _reject_pending_invites_for_user(self, user_id):
@defer.inlineCallbacks
def _reject_pending_invites_for_user(self, user_id):
"""Reject pending invites addressed to a given user ID.
Args:
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)
pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
for room in pending_invites:
try:
await self._room_member_handler.update_membership(
yield self._room_member_handler.update_membership(
create_requester(user),
user,
room.room_id,
@@ -176,7 +177,8 @@ class DeactivateAccountHandler(BaseHandler):
if not self._user_parter_running:
run_as_background_process("user_parter_loop", self._user_parter_loop)
async def _user_parter_loop(self):
@defer.inlineCallbacks
def _user_parter_loop(self):
"""Loop that parts deactivated users from rooms
Returns:
@@ -186,18 +188,19 @@ class DeactivateAccountHandler(BaseHandler):
logger.info("Starting user parter")
try:
while True:
user_id = await self.store.get_user_pending_deactivation()
user_id = yield self.store.get_user_pending_deactivation()
if user_id is None:
break
logger.info("User parter parting %r", user_id)
await self._part_user(user_id)
await self.store.del_user_pending_deactivation(user_id)
yield self._part_user(user_id)
yield self.store.del_user_pending_deactivation(user_id)
logger.info("User parter finished parting %r", user_id)
logger.info("User parter finished: stopping")
finally:
self._user_parter_running = False
async def _part_user(self, user_id):
@defer.inlineCallbacks
def _part_user(self, user_id):
"""Causes the given user_id to leave all the rooms they're joined to
Returns:
@@ -205,11 +208,11 @@ class DeactivateAccountHandler(BaseHandler):
"""
user = UserID.from_string(user_id)
rooms_for_user = await self.store.get_rooms_for_user(user_id)
rooms_for_user = yield self.store.get_rooms_for_user(user_id)
for room_id in rooms_for_user:
logger.info("User parter parting %r from %r", user_id, room_id)
try:
await self._room_member_handler.update_membership(
yield self._room_member_handler.update_membership(
create_requester(user),
user,
room_id,

View File

@@ -30,7 +30,6 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
@@ -54,12 +53,6 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
self._is_master = hs.config.worker_app is None
if not self._is_master:
self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -198,15 +191,9 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
if self._is_master:
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
user_devices = yield self._user_device_resync_client(
user_id=user_id
)
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
@@ -284,26 +271,29 @@ class E2eKeysHandler(object):
self_signing_keys = {}
user_signing_keys = {}
user_ids = list(query)
for user_id in query:
# XXX: consider changing the store functions to allow querying
# multiple users simultaneously.
key = yield self.store.get_e2e_cross_signing_key(
user_id, "master", from_user_id
)
if key:
master_keys[user_id] = key
keys = yield self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing", from_user_id
)
if key:
self_signing_keys[user_id] = key
for user_id, user_info in keys.items():
if user_info is None:
continue
if "master" in user_info:
master_keys[user_id] = user_info["master"]
if "self_signing" in user_info:
self_signing_keys[user_id] = user_info["self_signing"]
if (
from_user_id in keys
and keys[from_user_id] is not None
and "user_signing" in keys[from_user_id]
):
# users can see other users' master and self-signing keys, but can
# only see their own user-signing keys
user_signing_keys[from_user_id] = keys[from_user_id]["user_signing"]
if from_user_id == user_id:
key = yield self.store.get_e2e_cross_signing_key(
user_id, "user_signing", from_user_id
)
if key:
user_signing_keys[user_id] = key
return {
"master_keys": master_keys,

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -104,35 +103,14 @@ class E2eRoomKeysHandler(object):
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
Raises:
NotFoundError: if the backup version does not exist
Returns:
A dict containing the count and etag for the backup version
A deferred of the deletion transaction
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
# make sure the backup version exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
user_id, version
)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
else:
raise
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
version_etag = version_info["etag"] + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
@@ -160,9 +138,6 @@ class E2eRoomKeysHandler(object):
}
}
Returns:
A dict containing the count and etag for the backup version
Raises:
NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
@@ -196,62 +171,59 @@ class E2eRoomKeysHandler(object):
else:
raise
# Fetch any existing room keys for the sessions that have been
# submitted. Then compare them with the submitted keys. If the
# key is new, insert it; if the key should be updated, then update
# it; otherwise, drop it.
existing_keys = yield self.store.get_e2e_room_keys_multi(
user_id, version, room_keys["rooms"]
)
to_insert = [] # batch the inserts together
changed = False # if anything has changed, we need to update the etag
# go through the room_keys.
# XXX: this should/could be done concurrently, given we're in a lock.
for room_id, room in iteritems(room_keys["rooms"]):
for session_id, room_key in iteritems(room["sessions"]):
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
for session_id, session in iteritems(room["sessions"]):
yield self._upload_room_key(
user_id, version, room_id, session_id, session
)
current_room_key = existing_keys.get(room_id, {}).get(session_id)
if current_room_key:
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
# updates are done one at a time in the DB, so send
# updates right away rather than batching them up,
# like we do with the inserts
yield self.store.update_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
changed = True
else:
log_kv({"message": "Not replacing room_key."})
else:
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
log_kv({"message": "Replacing room key."})
to_insert.append((room_id, session_id, room_key))
changed = True
if len(to_insert):
yield self.store.add_e2e_room_keys(user_id, version, to_insert)
@defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
"""Upload a given room_key for a given room and session into a given
version of the backup. Merges the key with any which might already exist.
version_etag = version_info["etag"]
if changed:
version_etag = version_etag + 1
yield self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
Args:
user_id(str): the user whose backup we're setting
version(str): the version ID of the backup we're updating
room_id(str): the ID of the room whose keys we're setting
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
current_room_key = yield self.store.get_e2e_room_key(
user_id, version, room_id, session_id
)
except StoreError as e:
if e.code == 404:
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise
count = yield self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -342,8 +314,6 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version")
else:
raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res
@trace

View File

@@ -16,6 +16,8 @@
import logging
import random
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
@@ -48,8 +50,9 @@ class EventStreamHandler(BaseHandler):
self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
@log_function
async def get_stream(
def get_stream(
self,
auth_user_id,
pagin_config,
@@ -66,17 +69,17 @@ class EventStreamHandler(BaseHandler):
"""
if room_id:
blocked = await self.store.is_room_blocked(room_id)
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(auth_user_id)
yield self._server_notices_sender.on_user_syncing(auth_user_id)
auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_presence_handler()
context = await presence_handler.user_syncing(
context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence
)
with context:
@@ -88,7 +91,7 @@ class EventStreamHandler(BaseHandler):
# thundering herds on restart.
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
events, tokens = await self.notifier.get_events_for(
events, tokens = yield self.notifier.get_events_for(
auth_user,
pagin_config,
timeout,
@@ -109,14 +112,14 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
users = yield self.state.get_current_users_in_room(
event.room_id
)
states = await presence_handler.get_states(users, as_event=True)
states = yield presence_handler.get_states(users, as_event=True)
to_add.extend(states)
else:
ev = await presence_handler.get_state(
ev = yield presence_handler.get_state(
UserID.from_string(event.state_key), as_event=True
)
to_add.append(ev)
@@ -125,7 +128,7 @@ class EventStreamHandler(BaseHandler):
time_now = self.clock.time_msec()
chunks = await self._event_serializer.serialize_events(
chunks = yield self._event_serializer.serialize_events(
events,
time_now,
as_client_event=as_client_event,
@@ -148,7 +151,8 @@ class EventHandler(BaseHandler):
super(EventHandler, self).__init__(hs)
self.storage = hs.get_storage()
async def get_event(self, user, room_id, event_id):
@defer.inlineCallbacks
def get_event(self, user, room_id, event_id):
"""Retrieve a single specified event.
Args:
@@ -163,15 +167,15 @@ class EventHandler(BaseHandler):
AuthError if the user does not have the rights to inspect this
event.
"""
event = await self.store.get_event(event_id, check_room_id=room_id)
event = yield self.store.get_event(event_id, check_room_id=room_id)
if not event:
return None
users = await self.store.get_users_in_room(event.room_id)
users = yield self.store.get_users_in_room(event.room_id)
is_peeking = user.to_string() not in users
filtered = await filter_events_for_client(
filtered = yield filter_events_for_client(
self.storage, user.to_string(), [event], is_peeking=is_peeking
)

File diff suppressed because it is too large Load Diff

View File

@@ -26,7 +26,7 @@ from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -41,7 +41,7 @@ class InitialSyncHandler(BaseHandler):
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = ResponseCache(hs, "initial_sync_cache")
self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_store = self.storage.state
@@ -79,17 +79,21 @@ class InitialSyncHandler(BaseHandler):
as_client_event,
include_archived,
)
now_ms = self.clock.time_msec()
result = self.snapshot_cache.get(now_ms, key)
if result is not None:
return result
return self.snapshot_cache.wrap(
return self.snapshot_cache.set(
now_ms,
key,
self._snapshot_all_rooms,
user_id,
pagin_config,
as_client_event,
include_archived,
self._snapshot_all_rooms(
user_id, pagin_config, as_client_event, include_archived
),
)
async def _snapshot_all_rooms(
@defer.inlineCallbacks
def _snapshot_all_rooms(
self,
user_id=None,
pagin_config=None,
@@ -101,7 +105,7 @@ class InitialSyncHandler(BaseHandler):
if include_archived:
memberships.append(Membership.LEAVE)
room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=memberships
)
@@ -109,32 +113,33 @@ class InitialSyncHandler(BaseHandler):
rooms_ret = []
now_token = await self.hs.get_event_sources().get_current_token()
now_token = yield self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
presence, _ = await presence_stream.get_pagination_rows(
presence, _ = yield presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None
)
receipt_stream = self.hs.get_event_sources().sources["receipt"]
receipt, _ = await receipt_stream.get_pagination_rows(
receipt, _ = yield receipt_stream.get_pagination_rows(
user, pagination_config.get_source_config("receipt"), None
)
tags_by_room = await self.store.get_tags_for_user(user_id)
tags_by_room = yield self.store.get_tags_for_user(user_id)
account_data, account_data_by_room = await self.store.get_account_data_for_user(
account_data, account_data_by_room = yield self.store.get_account_data_for_user(
user_id
)
public_room_ids = await self.store.get_public_room_ids()
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
if limit is None:
limit = 10
async def handle_room(event):
@defer.inlineCallbacks
def handle_room(event):
d = {
"room_id": event.room_id,
"membership": event.membership,
@@ -147,8 +152,8 @@ class InitialSyncHandler(BaseHandler):
time_now = self.clock.time_msec()
d["inviter"] = event.sender
invite_event = await self.store.get_event(event.event_id)
d["invite"] = await self._event_serializer.serialize_event(
invite_event = yield self.store.get_event(event.event_id)
d["invite"] = yield self._event_serializer.serialize_event(
invite_event, time_now, as_client_event
)
@@ -172,7 +177,7 @@ class InitialSyncHandler(BaseHandler):
lambda states: states[event.event_id]
)
(messages, token), current_state = await make_deferred_yieldable(
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
@@ -186,7 +191,7 @@ class InitialSyncHandler(BaseHandler):
)
).addErrback(unwrapFirstError)
messages = await filter_events_for_client(
messages = yield filter_events_for_client(
self.storage, user_id, messages
)
@@ -196,7 +201,7 @@ class InitialSyncHandler(BaseHandler):
d["messages"] = {
"chunk": (
await self._event_serializer.serialize_events(
yield self._event_serializer.serialize_events(
messages, time_now=time_now, as_client_event=as_client_event
)
),
@@ -204,7 +209,7 @@ class InitialSyncHandler(BaseHandler):
"end": end_token.to_string(),
}
d["state"] = await self._event_serializer.serialize_events(
d["state"] = yield self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event,
@@ -227,7 +232,7 @@ class InitialSyncHandler(BaseHandler):
except Exception:
logger.exception("Failed to get snapshot")
await concurrently_execute(handle_room, room_list, 10)
yield concurrently_execute(handle_room, room_list, 10)
account_data_events = []
for account_data_type, content in account_data.items():
@@ -251,7 +256,8 @@ class InitialSyncHandler(BaseHandler):
return ret
async def room_initial_sync(self, requester, room_id, pagin_config=None):
@defer.inlineCallbacks
def room_initial_sync(self, requester, room_id, pagin_config=None):
"""Capture the a snapshot of a room. If user is currently a member of
the room this will be what is currently in the room. If the user left
the room this will be what was in the room when they left.
@@ -268,32 +274,32 @@ class InitialSyncHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room.
"""
blocked = await self.store.is_room_blocked(room_id)
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
user_id = requester.user.to_string()
membership, member_event_id = await self._check_in_room_or_world_readable(
membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
is_peeking = member_event_id is None
if membership == Membership.JOIN:
result = await self._room_initial_sync_joined(
result = yield self._room_initial_sync_joined(
user_id, room_id, pagin_config, membership, is_peeking
)
elif membership == Membership.LEAVE:
result = await self._room_initial_sync_parted(
result = yield self._room_initial_sync_parted(
user_id, room_id, pagin_config, membership, member_event_id, is_peeking
)
account_data_events = []
tags = await self.store.get_tags_for_room(user_id, room_id)
tags = yield self.store.get_tags_for_room(user_id, room_id)
if tags:
account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
account_data = await self.store.get_account_data_for_room(user_id, room_id)
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
for account_data_type, content in account_data.items():
account_data_events.append({"type": account_data_type, "content": content})
@@ -301,10 +307,11 @@ class InitialSyncHandler(BaseHandler):
return result
async def _room_initial_sync_parted(
@defer.inlineCallbacks
def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
room_state = await self.state_store.get_state_for_events([member_event_id])
room_state = yield self.state_store.get_state_for_events([member_event_id])
room_state = room_state[member_event_id]
@@ -312,13 +319,13 @@ class InitialSyncHandler(BaseHandler):
if limit is None:
limit = 10
stream_token = await self.store.get_stream_token_for_event(member_event_id)
stream_token = yield self.store.get_stream_token_for_event(member_event_id)
messages, token = await self.store.get_recent_events_for_room(
messages, token = yield self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
)
messages = await filter_events_for_client(
messages = yield filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)
@@ -332,13 +339,13 @@ class InitialSyncHandler(BaseHandler):
"room_id": room_id,
"messages": {
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
yield self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
"state": (
await self._event_serializer.serialize_events(
yield self._event_serializer.serialize_events(
room_state.values(), time_now
)
),
@@ -346,18 +353,19 @@ class InitialSyncHandler(BaseHandler):
"receipts": [],
}
async def _room_initial_sync_joined(
@defer.inlineCallbacks
def _room_initial_sync_joined(
self, user_id, room_id, pagin_config, membership, is_peeking
):
current_state = await self.state.get_current_state(room_id=room_id)
current_state = yield self.state.get_current_state(room_id=room_id)
# TODO: These concurrently
time_now = self.clock.time_msec()
state = await self._event_serializer.serialize_events(
state = yield self._event_serializer.serialize_events(
current_state.values(), time_now
)
now_token = await self.hs.get_event_sources().get_current_token()
now_token = yield self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
if limit is None:
@@ -372,26 +380,28 @@ class InitialSyncHandler(BaseHandler):
presence_handler = self.hs.get_presence_handler()
async def get_presence():
@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
return []
states = await presence_handler.get_states(
states = yield presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
)
return states
async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
@defer.inlineCallbacks
def get_receipts():
receipts = yield self.store.get_linearized_receipts_for_room(
room_id, to_key=now_token.receipt_key
)
if not receipts:
receipts = []
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
presence, receipts, (messages, token) = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(get_presence),
@@ -407,7 +417,7 @@ class InitialSyncHandler(BaseHandler):
).addErrback(unwrapFirstError)
)
messages = await filter_events_for_client(
messages = yield filter_events_for_client(
self.storage, user_id, messages, is_peeking=is_peeking
)
@@ -420,7 +430,7 @@ class InitialSyncHandler(BaseHandler):
"room_id": room_id,
"messages": {
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
yield self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
@@ -434,17 +444,18 @@ class InitialSyncHandler(BaseHandler):
return ret
async def _check_in_room_or_world_readable(self, room_id, user_id):
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = await self.auth.check_user_was_in_room(room_id, user_id)
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
return member_event.membership, member_event.event_id
except AuthError:
visibility = await self.state_handler.get_current_state(
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (

View File

@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from six import iteritems, itervalues, string_types
@@ -23,16 +22,9 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RelationTypes,
UserTypes,
)
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -46,9 +38,8 @@ from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Collection, RoomAlias, UserID, create_requester
from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.metrics import measure_func
@@ -71,17 +62,6 @@ class MessageHandler(object):
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
self._is_worker_app = bool(hs.config.worker_app)
# The scheduled call to self._expire_event. None if no call is currently
# scheduled.
self._scheduled_expiry = None # type: Optional[IDelayedCall]
if not hs.config.worker_app:
run_as_background_process(
"_schedule_next_expiry", self._schedule_next_expiry
)
@defer.inlineCallbacks
def get_room_data(
@@ -158,7 +138,7 @@ class MessageHandler(object):
raise NotFoundError("Can't find event for token %s" % (at_token,))
visible_events = yield filter_events_for_client(
self.storage, user_id, last_events, apply_retention_policies=False
self.storage, user_id, last_events
)
event = last_events[0]
@@ -245,100 +225,6 @@ class MessageHandler(object):
for user_id, profile in iteritems(users_with_profile)
}
def maybe_schedule_expiry(self, event):
"""Schedule the expiry of an event if there's not already one scheduled,
or if the one running is for an event that will expire after the provided
timestamp.
This function needs to invalidate the event cache, which is only possible on
the master process, and therefore needs to be run on there.
Args:
event (EventBase): The event to schedule the expiry of.
"""
assert not self._is_worker_app
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
if not isinstance(expiry_ts, int) or event.is_state():
return
# _schedule_expiry_for_event won't actually schedule anything if there's already
# a task scheduled for a timestamp that's sooner than the provided one.
self._schedule_expiry_for_event(event.event_id, expiry_ts)
@defer.inlineCallbacks
def _schedule_next_expiry(self):
"""Retrieve the ID and the expiry timestamp of the next event to be expired,
and schedule an expiry task for it.
If there's no event left to expire, set _expiry_scheduled to None so that a
future call to save_expiry_ts can schedule a new expiry task.
"""
# Try to get the expiry timestamp of the next event to expire.
res = yield self.store.get_next_event_to_expire()
if res:
event_id, expiry_ts = res
self._schedule_expiry_for_event(event_id, expiry_ts)
def _schedule_expiry_for_event(self, event_id, expiry_ts):
"""Schedule an expiry task for the provided event if there's not already one
scheduled at a timestamp that's sooner than the provided one.
Args:
event_id (str): The ID of the event to expire.
expiry_ts (int): The timestamp at which to expire the event.
"""
if self._scheduled_expiry:
# If the provided timestamp refers to a time before the scheduled time of the
# next expiry task, cancel that task and reschedule it for this timestamp.
next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
if expiry_ts < next_scheduled_expiry_ts:
self._scheduled_expiry.cancel()
else:
return
# Figure out how many seconds we need to wait before expiring the event.
now_ms = self.clock.time_msec()
delay = (expiry_ts - now_ms) / 1000
# callLater doesn't support negative delays, so trim the delay to 0 if we're
# in that case.
if delay < 0:
delay = 0
logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
self._scheduled_expiry = self.clock.call_later(
delay,
run_as_background_process,
"_expire_event",
self._expire_event,
event_id,
)
@defer.inlineCallbacks
def _expire_event(self, event_id):
"""Retrieve and expire an event that needs to be expired from the database.
If the event doesn't exist in the database, log it and delete the expiry date
from the database (so that we don't try to expire it again).
"""
assert self._ephemeral_events_enabled
self._scheduled_expiry = None
logger.info("Expiring event %s", event_id)
try:
# Expire the event if we know about it. This function also deletes the expiry
# date from the database in the same database transaction.
yield self.store.expire_event(event_id)
except Exception as e:
logger.error("Could not expire event %s: %r", event_id, e)
# Schedule the expiry of the next event to expire.
yield self._schedule_next_expiry()
# The duration (in ms) after which rooms should be removed
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
@@ -364,8 +250,6 @@ class EventCreationHandler(object):
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self.room_invite_state_types = self.hs.config.room_invite_state_types
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
@@ -411,10 +295,6 @@ class EventCreationHandler(object):
5 * 60 * 1000,
)
self._message_handler = hs.get_message_handler()
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
@defer.inlineCallbacks
def create_event(
self,
@@ -422,7 +302,7 @@ class EventCreationHandler(object):
event_dict,
token_id=None,
txn_id=None,
prev_event_ids: Optional[Collection[str]] = None,
prev_events_and_hashes=None,
require_consent=True,
):
"""
@@ -439,9 +319,10 @@ class EventCreationHandler(object):
token_id (str)
txn_id (str)
prev_event_ids:
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event.
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
@@ -497,7 +378,9 @@ class EventCreationHandler(object):
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
builder=builder, requester=requester, prev_event_ids=prev_event_ids,
builder=builder,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -512,7 +395,7 @@ class EventCreationHandler(object):
# federation as well as those created locally. As of room v3, aliases events
# can be created by users that are not in the room, therefore we have to
# tolerate them in event_auth.check().
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
yield self.store.get_event(prev_event_id, allow_none=True)
@@ -534,7 +417,7 @@ class EventCreationHandler(object):
403, "You must be in the room to create an alias for it"
)
self.validator.validate_new(event, self.config)
self.validator.validate_new(event)
return (event, context)
@@ -662,7 +545,7 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
return
@@ -711,7 +594,7 @@ class EventCreationHandler(object):
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(
self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
self, builder, requester=None, prev_events_and_hashes=None
):
"""Create a new event for a local client
@@ -720,9 +603,10 @@ class EventCreationHandler(object):
requester (synapse.types.Requester|None):
prev_event_ids:
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event.
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
@@ -730,20 +614,27 @@ class EventCreationHandler(object):
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
if prev_event_ids is not None:
assert len(prev_event_ids) <= 10, (
if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, (
"Attempting to create an event with %i prev_events"
% (len(prev_event_ids),)
% (len(prev_events_and_hashes),)
)
else:
prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
builder.room_id
)
event = yield builder.build(prev_event_ids=prev_event_ids)
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
context = yield self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
self.validator.validate_new(event, self.config)
self.validator.validate_new(event)
# If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an
@@ -865,7 +756,7 @@ class EventCreationHandler(object):
if event.type == EventTypes.Redaction:
original_event = yield self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
check_redacted=False,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
@@ -903,12 +794,12 @@ class EventCreationHandler(object):
def is_inviter_member_event(e):
return e.type == EventTypes.Member and e.sender == event.sender
current_state_ids = yield context.get_current_state_ids()
current_state_ids = yield context.get_current_state_ids(self.store)
state_to_include_ids = [
e_id
for k, e_id in iteritems(current_state_ids)
if k[0] in self.room_invite_state_types
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -942,7 +833,7 @@ class EventCreationHandler(object):
if event.type == EventTypes.Redaction:
original_event = yield self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
check_redacted=False,
get_prev_content=False,
allow_rejected=False,
allow_none=True,
@@ -956,7 +847,7 @@ class EventCreationHandler(object):
if original_event.room_id != event.room_id:
raise SynapseError(400, "Cannot redact event from a different room")
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
)
@@ -978,7 +869,7 @@ class EventCreationHandler(object):
event.internal_metadata.recheck_redaction = False
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
@@ -986,10 +877,6 @@ class EventCreationHandler(object):
event, context=context
)
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
def _notify():
@@ -1031,7 +918,9 @@ class EventCreationHandler(object):
# For each room we need to find a joined member we can use to send
# the dummy event with.
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids
@@ -1050,7 +939,7 @@ class EventCreationHandler(object):
"room_id": room_id,
"sender": user_id,
},
prev_event_ids=latest_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
)
event.internal_metadata.proactively_send = False

View File

@@ -15,15 +15,12 @@
# limitations under the License.
import logging
from six import iteritems
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -83,109 +80,6 @@ class PaginationHandler(object):
self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
self.clock.looping_call(
run_as_background_process,
job["interval"],
"purge_history_for_rooms_in_range",
self.purge_history_for_rooms_in_range,
job["shortest_max_lifetime"],
job["longest_max_lifetime"],
)
@defer.inlineCallbacks
def purge_history_for_rooms_in_range(self, min_ms, max_ms):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
'max_lifetime' is within this range, also targets rooms which don't have a
retention policy.
Args:
min_ms (int|None): Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, it means that the range has no
lower limit.
max_ms (int|None): Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, it means that the range has no
upper limit.
"""
# We want the storage layer to to include rooms with no retention policy in its
# return value only if a default retention policy is defined in the server's
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
# max_ms or higher than min_ms (or both).
if self._retention_default_max_lifetime is not None:
include_null = True
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
# The default max_lifetime is lower than (or equal to) min_ms.
include_null = False
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
# The default max_lifetime is higher than max_ms.
include_null = False
else:
include_null = False
rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
for room_id, retention_policy in iteritems(rooms):
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
max_lifetime = retention_policy["max_lifetime"]
if max_lifetime is None:
# If max_lifetime is None, it means that include_null equals True,
# therefore we can safely assume that there is a default policy defined
# in the server's configuration.
max_lifetime = self._retention_default_max_lifetime
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
r = yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
ts,
stream_ordering,
)
continue
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
purge_id = random_string(16)
self._purges_by_id[purge_id] = PurgeStatus()
logger.info(
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
"_purge_history", self._purge_history, purge_id, room_id, token, True,
)
def start_purge_history(self, room_id, token, delete_local_events=False):
"""Start off a history purge on a room.
@@ -280,7 +174,8 @@ class PaginationHandler(object):
await self.storage.purge_events.purge_room(room_id)
async def get_messages(
@defer.inlineCallbacks
def get_messages(
self,
requester,
room_id=None,
@@ -306,7 +201,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination()
yield self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key
@@ -318,11 +213,11 @@ class PaginationHandler(object):
source_config = pagin_config.get_source_config("room")
with (await self.pagination_lock.read(room_id)):
with (yield self.pagination_lock.read(room_id)):
(
membership,
member_event_id,
) = await self.auth.check_in_room_or_world_readable(room_id, user_id)
) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
if source_config.direction == "b":
# if we're going backwards, we might need to backfill. This
@@ -330,7 +225,7 @@ class PaginationHandler(object):
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = await self.store.get_max_topological_token(
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
@@ -338,18 +233,18 @@ class PaginationHandler(object):
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = await self.store.get_topological_token_for_event(
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
await self.hs.get_handlers().federation_handler.maybe_backfill(
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = await self.store.paginate_room_events(
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
@@ -364,7 +259,7 @@ class PaginationHandler(object):
if event_filter:
events = event_filter.filter(events)
events = await filter_events_for_client(
events = yield filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)
)
@@ -384,19 +279,19 @@ class PaginationHandler(object):
(EventTypes.Member, event.sender) for event in events
)
state_ids = await self.state_store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
events[0].event_id, state_filter=state_filter
)
if state_ids:
state = await self.store.get_events(list(state_ids.values()))
state = yield self.store.get_events(list(state_ids.values()))
state = state.values()
time_now = self.clock.time_msec()
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
yield self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event
)
),
@@ -405,7 +300,7 @@ class PaginationHandler(object):
}
if state:
chunk["state"] = await self._event_serializer.serialize_events(
chunk["state"] = yield self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
)

View File

@@ -95,7 +95,12 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
def __init__(self, hs: "synapse.server.HomeServer"):
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
@@ -225,7 +230,7 @@ class PresenceHandler(object):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.store.db.is_running():
if not self.hs.get_db_pool().running:
return
logger.info(

View File

@@ -295,16 +295,12 @@ class BaseProfileHandler(BaseHandler):
be found to be in any room the server is in, and therefore the query
is denied.
"""
# Implementation of MSC1301: don't allow looking up profiles if the
# requester isn't in the same room as the target. We expect requester to
# be None when this function is called outside of a profile query, e.g.
# when building a membership event. In this case, we must allow the
# lookup.
if (
not self.hs.config.limit_profile_requests_to_users_who_share_rooms
or not requester
):
if not self.hs.config.require_auth_for_profile_requests or not requester:
return
# Always allow the user to query their own profile.

View File

@@ -266,7 +266,7 @@ class RegistrationHandler(BaseHandler):
}
# Bind email to new account
yield self._register_email_threepid(user_id, threepid_dict, None)
yield self._register_email_threepid(user_id, threepid_dict, None, False)
return user_id

View File

@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,7 +15,6 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
import itertools
import logging
import math
@@ -185,7 +183,7 @@ class RoomCreationHandler(BaseHandler):
requester, tombstone_event, tombstone_context
)
old_room_state = yield tombstone_context.get_current_state_ids()
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
# update any aliases
yield self._move_aliases_to_new_room(
@@ -200,21 +198,21 @@ class RoomCreationHandler(BaseHandler):
# finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
requester, old_room_id, new_room_id, old_room_state,
requester, old_room_id, new_room_id, old_room_state
)
return new_room_id
@defer.inlineCallbacks
def _update_upgraded_room_pls(
self, requester, old_room_id, new_room_id, old_room_state,
self, requester, old_room_id, new_room_id, old_room_state
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (str): the id of the room to be replaced
new_room_id (str): the id of the replacement room
old_room_id (unicode): the id of the room to be replaced
new_room_id (unicode): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
@@ -272,7 +270,7 @@ class RoomCreationHandler(BaseHandler):
except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e)
logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content)
logger.info("Setting correct PLs in new room")
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
@@ -300,7 +298,7 @@ class RoomCreationHandler(BaseHandler):
tombstone_event_id (unicode|str): the ID of the tombstone event in the old
room.
Returns:
Deferred
Deferred[None]
"""
user_id = requester.user.to_string()
@@ -335,7 +333,6 @@ class RoomCreationHandler(BaseHandler):
(EventTypes.Encryption, ""),
(EventTypes.ServerACL, ""),
(EventTypes.RelatedGroups, ""),
(EventTypes.PowerLevels, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -349,36 +346,6 @@ class RoomCreationHandler(BaseHandler):
if old_event:
initial_state[k] = old_event.content
# Resolve the minimum power level required to send any state event
# We will give the upgrading user this power level temporarily (if necessary) such that
# they are able to copy all of the state events over, then revert them back to their
# original power level afterwards in _update_upgraded_room_pls
# Copy over user power levels now as this will not be possible with >100PL users once
# the room has been created
power_levels = initial_state[(EventTypes.PowerLevels, "")]
# Calculate the minimum power level needed to clone the room
event_power_levels = power_levels.get("events", {})
state_default = power_levels.get("state_default", 0)
ban = power_levels.get("ban")
needed_power_level = max(state_default, ban, max(event_power_levels.values()))
# Raise the requester's power level in the new room if necessary
current_power_level = power_levels["users"][user_id]
if current_power_level < needed_power_level:
# make sure we copy the event content rather than overwriting it.
# note that if frozen_dicts are enabled, `power_levels` will be a frozen
# dict so we can't just copy.deepcopy it.
new_power_levels = {k: v for k, v in power_levels.items() if k != "users"}
new_power_levels["users"] = {
k: v for k, v in power_levels.get("users", {}).items() if k != user_id
}
new_power_levels["users"][user_id] = needed_power_level
initial_state[(EventTypes.PowerLevels, "")] = new_power_levels
yield self._send_events_for_new_room(
requester,
new_room_id,
@@ -739,7 +706,7 @@ class RoomCreationHandler(BaseHandler):
initial_state,
creation_content,
room_alias=None,
power_level_content_override=None, # Doesn't apply when initial state has power level state event content
power_level_content_override=None,
creator_join_profile=None,
):
def create(etype, content, **kwargs):
@@ -907,16 +874,9 @@ class RoomContextHandler(object):
room_id, event_id, before_limit, after_limit, event_filter
)
if event_filter:
results["events_before"] = event_filter.filter(results["events_before"])
results["events_after"] = event_filter.filter(results["events_after"])
results["events_before"] = yield filter_evts(results["events_before"])
results["events_after"] = yield filter_evts(results["events_after"])
# filter_evts can return a pruned event in case the user is allowed to see that
# there's something there but not see the content, so use the event that's in
# `filtered` rather than the event we retrieved from the datastore.
results["event"] = filtered[0]
results["event"] = event
if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
@@ -942,12 +902,7 @@ class RoomContextHandler(object):
state = yield self.state_store.get_state_for_events(
[last_event_id], state_filter=state_filter
)
state_events = list(state[last_event_id].values())
if event_filter:
state_events = event_filter.filter(state_events)
results["state"] = yield filter_evts(state_events)
results["state"] = list(state[last_event_id].values())
# We use a dummy token here as we only care about the room portion of
# the token, which we replace.
@@ -1017,3 +972,15 @@ class RoomEventSource(object):
def get_current_key_for_room(self, room_id):
return self.store.get_room_events_max_id(room_id)
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
events, next_key = yield self.store.paginate_room_events(
room_id=key,
from_key=config.from_key,
to_key=config.to_key,
direction=config.direction,
limit=config.limit,
)
return (events, next_key)

View File

@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import Collection, RoomID, UserID
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -94,9 +94,7 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -106,7 +104,6 @@ class RoomMemberHandler(object):
reject invite
room_id (str)
target (UserID): The user rejecting the invite
content (dict): The content for the rejection event
Returns:
Deferred[dict]: A dictionary to be returned to the client, may
@@ -149,7 +146,7 @@ class RoomMemberHandler(object):
target,
room_id,
membership,
prev_event_ids: Collection[str],
prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
@@ -177,7 +174,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
require_consent=require_consent,
)
@@ -193,7 +190,7 @@ class RoomMemberHandler(object):
requester, event, context, extra_users=[target], ratelimit=ratelimit
)
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -370,7 +367,8 @@ class RoomMemberHandler(object):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids
@@ -473,7 +471,7 @@ class RoomMemberHandler(object):
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
res = yield self._remote_reject_invite(
requester, remote_room_hosts, room_id, target, content,
requester, remote_room_hosts, room_id, target
)
return res
@@ -484,7 +482,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
require_consent=require_consent,
)
@@ -506,8 +504,6 @@ class RoomMemberHandler(object):
Returns:
Deferred
"""
logger.info("Transferring room state from %s to %s", old_room_id, room_id)
# Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
@@ -602,7 +598,7 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
prev_state_ids = yield context.get_prev_state_ids()
prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
guest_can_join = yield self._can_guest_join(prev_state_ids)
@@ -975,15 +971,13 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
@defer.inlineCallbacks
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
remote_room_hosts, room_id, target.to_string()
)
return ret
except Exception as e:

View File

@@ -55,9 +55,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content
):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
return self._remote_reject_client(
@@ -65,7 +63,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content,
)
def _user_joined_room(self, target, room_id):

View File

@@ -13,36 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from typing import Tuple
import attr
import saml2
import saml2.response
from saml2.client import Saml2Client
from synapse.api.errors import SynapseError
from synapse.config import ConfigError
from synapse.http.servlet import parse_string
from synapse.rest.client.v1.login import SSOAuthHandler
from synapse.types import (
UserID,
map_username_to_mxid_localpart,
mxid_localpart_allowed_characters,
)
from synapse.types import UserID, map_username_to_mxid_localpart
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@attr.s
class Saml2SessionData:
"""Data we track about SAML2 sessions"""
# time the session was created, in milliseconds
creation_time = attr.ib()
class SamlHandler:
def __init__(self, hs):
self._saml_client = Saml2Client(hs.config.saml2_sp_config)
@@ -53,14 +37,11 @@ class SamlHandler:
self._datastore = hs.get_datastore()
self._hostname = hs.hostname
self._saml2_session_lifetime = hs.config.saml2_session_lifetime
self._mxid_source_attribute = hs.config.saml2_mxid_source_attribute
self._grandfathered_mxid_source_attribute = (
hs.config.saml2_grandfathered_mxid_source_attribute
)
# plugin to do custom mapping from saml response to mxid
self._user_mapping_provider = hs.config.saml2_user_mapping_provider_class(
hs.config.saml2_user_mapping_provider_config
)
self._mxid_mapper = hs.config.saml2_mxid_mapper
# identifier for the external_ids table
self._auth_provider_id = "saml"
@@ -137,10 +118,22 @@ class SamlHandler:
remote_user_id = saml2_auth.ava["uid"][0]
except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "'uid' not in SAML2 response")
raise SynapseError(400, "uid not in SAML2 response")
try:
mxid_source = saml2_auth.ava[self._mxid_source_attribute][0]
except KeyError:
logger.warning(
"SAML2 response lacks a '%s' attestation", self._mxid_source_attribute
)
raise SynapseError(
400, "%s not in SAML2 response" % (self._mxid_source_attribute,)
)
self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
displayName = saml2_auth.ava.get("displayName", [None])[0]
with (await self._mapping_lock.queue(self._auth_provider_id)):
# first of all, check if we already have a mapping for this user
logger.info(
@@ -180,46 +173,22 @@ class SamlHandler:
)
return registered_user_id
# Map saml response to user attributes using the configured mapping provider
for i in range(1000):
attribute_dict = self._user_mapping_provider.saml_response_to_user_attributes(
saml2_auth, i
)
# figure out a new mxid for this user
base_mxid_localpart = self._mxid_mapper(mxid_source)
logger.debug(
"Retrieved SAML attributes from user mapping provider: %s "
"(attempt %d)",
attribute_dict,
i,
)
localpart = attribute_dict.get("mxid_localpart")
if not localpart:
logger.error(
"SAML mapping provider plugin did not return a "
"mxid_localpart object"
)
raise SynapseError(500, "Error parsing SAML2 response")
displayname = attribute_dict.get("displayname")
# Check if this mxid already exists
suffix = 0
while True:
localpart = base_mxid_localpart + (str(suffix) if suffix else "")
if not await self._datastore.get_users_by_id_case_insensitive(
UserID(localpart, self._hostname).to_string()
):
# This mxid is free
break
else:
# Unable to generate a username in 1000 iterations
# Break and return error to the user
raise SynapseError(
500, "Unable to generate a Matrix ID from the SAML response"
)
suffix += 1
logger.info("Allocating mxid for new user with localpart %s", localpart)
registered_user_id = await self._registration_handler.register_user(
localpart=localpart, default_display_name=displayname
localpart=localpart, default_display_name=displayName
)
await self._datastore.record_user_external_id(
self._auth_provider_id, remote_user_id, registered_user_id
)
@@ -236,120 +205,9 @@ class SamlHandler:
del self._outstanding_requests_dict[reqid]
DOT_REPLACE_PATTERN = re.compile(
("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),))
)
def dot_replace_for_mxid(username: str) -> str:
username = username.lower()
username = DOT_REPLACE_PATTERN.sub(".", username)
# regular mxids aren't allowed to start with an underscore either
username = re.sub("^_", "", username)
return username
MXID_MAPPER_MAP = {
"hexencode": map_username_to_mxid_localpart,
"dotreplace": dot_replace_for_mxid,
}
@attr.s
class SamlConfig(object):
mxid_source_attribute = attr.ib()
mxid_mapper = attr.ib()
class Saml2SessionData:
"""Data we track about SAML2 sessions"""
class DefaultSamlMappingProvider(object):
__version__ = "0.0.1"
def __init__(self, parsed_config: SamlConfig):
"""The default SAML user mapping provider
Args:
parsed_config: Module configuration
"""
self._mxid_source_attribute = parsed_config.mxid_source_attribute
self._mxid_mapper = parsed_config.mxid_mapper
def saml_response_to_user_attributes(
self, saml_response: saml2.response.AuthnResponse, failures: int = 0,
) -> dict:
"""Maps some text from a SAML response to attributes of a new user
Args:
saml_response: A SAML auth response object
failures: How many times a call to this function with this
saml_response has resulted in a failure
Returns:
dict: A dict containing new user attributes. Possible keys:
* mxid_localpart (str): Required. The localpart of the user's mxid
* displayname (str): The displayname of the user
"""
try:
mxid_source = saml_response.ava[self._mxid_source_attribute][0]
except KeyError:
logger.warning(
"SAML2 response lacks a '%s' attestation", self._mxid_source_attribute,
)
raise SynapseError(
400, "%s not in SAML2 response" % (self._mxid_source_attribute,)
)
# Use the configured mapper for this mxid_source
base_mxid_localpart = self._mxid_mapper(mxid_source)
# Append suffix integer if last call to this function failed to produce
# a usable mxid
localpart = base_mxid_localpart + (str(failures) if failures else "")
# Retrieve the display name from the saml response
# If displayname is None, the mxid_localpart will be used instead
displayname = saml_response.ava.get("displayName", [None])[0]
return {
"mxid_localpart": localpart,
"displayname": displayname,
}
@staticmethod
def parse_config(config: dict) -> SamlConfig:
"""Parse the dict provided by the homeserver's config
Args:
config: A dictionary containing configuration options for this provider
Returns:
SamlConfig: A custom config object for this module
"""
# Parse config options and use defaults where necessary
mxid_source_attribute = config.get("mxid_source_attribute", "uid")
mapping_type = config.get("mxid_mapping", "hexencode")
# Retrieve the associating mapping function
try:
mxid_mapper = MXID_MAPPER_MAP[mapping_type]
except KeyError:
raise ConfigError(
"saml2_config.user_mapping_provider.config: '%s' is not a valid "
"mxid_mapping value" % (mapping_type,)
)
return SamlConfig(mxid_source_attribute, mxid_mapper)
@staticmethod
def get_saml_attributes(config: SamlConfig) -> Tuple[set, set]:
"""Returns the required attributes of a SAML
Args:
config: A SamlConfig object containing configuration params for this provider
Returns:
tuple[set,set]: The first set equates to the saml auth response
attributes that are required for the module to function, whereas the
second set consists of those attributes which can be used if
available, but are not necessary
"""
return {"uid", config.mxid_source_attribute}, {"displayName"}
# time the session was created, in milliseconds
creation_time = attr.ib()

View File

@@ -21,7 +21,7 @@ from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
@@ -37,7 +37,6 @@ class SearchHandler(BaseHandler):
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.auth = hs.get_auth()
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -54,38 +53,23 @@ class SearchHandler(BaseHandler):
room_id (str): id of the room to search through.
Returns:
Deferred[iterable[str]]: predecessor room ids
Deferred[iterable[unicode]]: predecessor room ids
"""
historical_room_ids = []
# The initial room must have been known for us to get this far
predecessor = yield self.store.get_room_predecessor(room_id)
while True:
predecessor = yield self.store.get_room_predecessor(room_id)
# If no predecessor, assume we've hit a dead end
if not predecessor:
# We have reached the end of the chain of predecessors
break
if not isinstance(predecessor.get("room_id"), str):
# This predecessor object is malformed. Exit here
break
# Add predecessor's room ID
historical_room_ids.append(predecessor["room_id"])
predecessor_room_id = predecessor["room_id"]
# Don't add it to the list until we have checked that we are in the room
try:
next_predecessor_room = yield self.store.get_room_predecessor(
predecessor_room_id
)
except NotFoundError:
# The predecessor is not a known room, so we are done here
break
historical_room_ids.append(predecessor_room_id)
# And repeat
predecessor = next_predecessor_room
# Scan through the old room for further predecessors
room_id = predecessor["room_id"]
return historical_room_ids

View File

@@ -22,6 +22,8 @@ from six import iteritems, itervalues
from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.logging.context import LoggingContext
from synapse.push.clientformat import format_push_rules_for_user
@@ -239,7 +241,8 @@ class SyncHandler(object):
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
async def wait_for_sync_for_user(
@defer.inlineCallbacks
def wait_for_sync_for_user(
self, sync_config, since_token=None, timeout=0, full_state=False
):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -252,9 +255,9 @@ class SyncHandler(object):
# not been exceeded (if not part of the group by this point, almost certain
# auth_blocking will occur)
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(user_id)
yield self.auth.check_auth_blocking(user_id)
res = await self.response_cache.wrap(
res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config,
@@ -264,9 +267,8 @@ class SyncHandler(object):
)
return res
async def _wait_for_sync_for_user(
self, sync_config, since_token, timeout, full_state
):
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state):
if since_token is None:
sync_type = "initial_sync"
elif full_state:
@@ -281,7 +283,7 @@ class SyncHandler(object):
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = await self.current_sync_for_user(
result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state
)
else:
@@ -289,7 +291,7 @@ class SyncHandler(object):
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
result = await self.notifier.wait_for_events(
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(),
timeout,
current_sync_callback,
@@ -312,13 +314,15 @@ class SyncHandler(object):
"""
return self.generate_sync_result(sync_config, since_token, full_state)
async def push_rules_for_user(self, user):
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
rules = await self.store.get_push_rules_for_user(user_id)
rules = yield self.store.get_push_rules_for_user(user_id)
rules = format_push_rules_for_user(user, rules)
return rules
async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
@defer.inlineCallbacks
def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
Args:
sync_result_builder(SyncResultBuilder)
@@ -339,7 +343,7 @@ class SyncHandler(object):
room_ids = sync_result_builder.joined_room_ids
typing_source = self.event_sources.sources["typing"]
typing, typing_key = await typing_source.get_new_events(
typing, typing_key = yield typing_source.get_new_events(
user=sync_config.user,
from_key=typing_key,
limit=sync_config.filter_collection.ephemeral_limit(),
@@ -361,7 +365,7 @@ class SyncHandler(object):
receipt_key = since_token.receipt_key if since_token else "0"
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
receipts, receipt_key = yield receipt_source.get_new_events(
user=sync_config.user,
from_key=receipt_key,
limit=sync_config.filter_collection.ephemeral_limit(),
@@ -378,7 +382,8 @@ class SyncHandler(object):
return now_token, ephemeral_by_room
async def _load_filtered_recents(
@defer.inlineCallbacks
def _load_filtered_recents(
self,
room_id,
sync_config,
@@ -410,10 +415,10 @@ class SyncHandler(object):
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in recents):
current_state_ids = await self.state.get_current_state_ids(room_id)
current_state_ids = yield self.state.get_current_state_ids(room_id)
current_state_ids = frozenset(itervalues(current_state_ids))
recents = await filter_events_for_client(
recents = yield filter_events_for_client(
self.storage,
sync_config.user.to_string(),
recents,
@@ -444,14 +449,14 @@ class SyncHandler(object):
# Otherwise, we want to return the last N events in the room
# in toplogical ordering.
if since_key:
events, end_key = await self.store.get_room_events_stream_for_room(
events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
from_key=since_key,
to_key=end_key,
)
else:
events, end_key = await self.store.get_recent_events_for_room(
events, end_key = yield self.store.get_recent_events_for_room(
room_id, limit=load_limit + 1, end_token=end_key
)
loaded_recents = sync_config.filter_collection.filter_room_timeline(
@@ -463,10 +468,10 @@ class SyncHandler(object):
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
current_state_ids = await self.state.get_current_state_ids(room_id)
current_state_ids = yield self.state.get_current_state_ids(room_id)
current_state_ids = frozenset(itervalues(current_state_ids))
loaded_recents = await filter_events_for_client(
loaded_recents = yield filter_events_for_client(
self.storage,
sync_config.user.to_string(),
loaded_recents,
@@ -493,7 +498,8 @@ class SyncHandler(object):
limited=limited or newly_joined_room,
)
async def get_state_after_event(self, event, state_filter=StateFilter.all()):
@defer.inlineCallbacks
def get_state_after_event(self, event, state_filter=StateFilter.all()):
"""
Get the room state after the given event
@@ -505,7 +511,7 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
state_ids = await self.state_store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
event.event_id, state_filter=state_filter
)
if event.is_state():
@@ -513,9 +519,8 @@ class SyncHandler(object):
state_ids[(event.type, event.state_key)] = event.event_id
return state_ids
async def get_state_at(
self, room_id, stream_position, state_filter=StateFilter.all()
):
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
""" Get the room state at a particular stream position
Args:
@@ -531,13 +536,13 @@ class SyncHandler(object):
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = await self.store.get_recent_events_for_room(
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1
)
if last_events:
last_event = last_events[-1]
state = await self.get_state_after_event(
state = yield self.get_state_after_event(
last_event, state_filter=state_filter
)
@@ -546,7 +551,8 @@ class SyncHandler(object):
state = {}
return state
async def compute_summary(self, room_id, sync_config, batch, state, now_token):
@defer.inlineCallbacks
def compute_summary(self, room_id, sync_config, batch, state, now_token):
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
@@ -568,7 +574,7 @@ class SyncHandler(object):
# FIXME: we could/should get this from room_stats when matthew/stats lands
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = await self.store.get_recent_event_ids_for_room(
last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1
)
@@ -576,7 +582,7 @@ class SyncHandler(object):
return None
last_event = last_events[-1]
state_ids = await self.state_store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
last_event.event_id,
state_filter=StateFilter.from_types(
[(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
@@ -584,7 +590,7 @@ class SyncHandler(object):
)
# this is heavily cached, thus: fast.
details = await self.store.get_room_summary(room_id)
details = yield self.store.get_room_summary(room_id)
name_id = state_ids.get((EventTypes.Name, ""))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
@@ -602,12 +608,12 @@ class SyncHandler(object):
# calculating heroes. Empty strings are falsey, so we check
# for the "name" value and default to an empty string.
if name_id:
name = await self.store.get_event(name_id, allow_none=True)
name = yield self.store.get_event(name_id, allow_none=True)
if name and name.content.get("name"):
return summary
if canonical_alias_id:
canonical_alias = await self.store.get_event(
canonical_alias = yield self.store.get_event(
canonical_alias_id, allow_none=True
)
if canonical_alias and canonical_alias.content.get("alias"):
@@ -672,7 +678,7 @@ class SyncHandler(object):
)
]
missing_hero_state = await self.store.get_events(missing_hero_event_ids)
missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
missing_hero_state = missing_hero_state.values()
for s in missing_hero_state:
@@ -691,7 +697,8 @@ class SyncHandler(object):
logger.debug("found LruCache for %r", cache_key)
return cache
async def compute_state_delta(
@defer.inlineCallbacks
def compute_state_delta(
self, room_id, batch, sync_config, since_token, now_token, full_state
):
""" Works out the difference in state between the start of the timeline
@@ -752,16 +759,16 @@ class SyncHandler(object):
if full_state:
if batch:
current_state_ids = await self.state_store.get_state_ids_for_event(
current_state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
state_ids = await self.state_store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
else:
current_state_ids = await self.get_state_at(
current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
@@ -776,13 +783,13 @@ class SyncHandler(object):
)
elif batch.limited:
if batch:
state_at_timeline_start = await self.state_store.get_state_ids_for_event(
state_at_timeline_start = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
state_at_timeline_start = yield self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
@@ -800,19 +807,19 @@ class SyncHandler(object):
# about them).
state_filter = StateFilter.all()
state_at_previous_sync = await self.get_state_at(
state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
)
if batch:
current_state_ids = await self.state_store.get_state_ids_for_event(
current_state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
)
else:
# Its not clear how we get here, but empirically we do
# (#5407). Logging has been added elsewhere to try and
# figure out where this state comes from.
current_state_ids = await self.get_state_at(
current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
)
@@ -836,7 +843,7 @@ class SyncHandler(object):
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
state_ids = await self.state_store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
@@ -876,7 +883,7 @@ class SyncHandler(object):
state = {}
if state_ids:
state = await self.store.get_events(list(state_ids.values()))
state = yield self.store.get_events(list(state_ids.values()))
return {
(e.type, e.state_key): e
@@ -885,9 +892,10 @@ class SyncHandler(object):
)
}
async def unread_notifs_for_room_id(self, room_id, sync_config):
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_type="m.read",
@@ -895,7 +903,7 @@ class SyncHandler(object):
notifs = []
if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs
@@ -904,9 +912,8 @@ class SyncHandler(object):
# count is whatever it was last time.
return None
async def generate_sync_result(
self, sync_config, since_token=None, full_state=False
):
@defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
"""Generates a sync result.
Args:
@@ -921,7 +928,7 @@ class SyncHandler(object):
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = await self.event_sources.get_current_token()
now_token = yield self.event_sources.get_current_token()
logger.info(
"Calculating sync response for %r between %s and %s",
@@ -937,9 +944,10 @@ class SyncHandler(object):
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
joined_room_ids = yield self.get_rooms_for_user_at(
user_id, now_token.room_stream_id
)
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
@@ -948,11 +956,11 @@ class SyncHandler(object):
joined_room_ids=joined_room_ids,
)
account_data_by_room = await self._generate_sync_entry_for_account_data(
account_data_by_room = yield self._generate_sync_entry_for_account_data(
sync_result_builder
)
res = await self._generate_sync_entry_for_rooms(
res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
newly_joined_rooms, newly_joined_or_invited_users, _, _ = res
@@ -962,13 +970,13 @@ class SyncHandler(object):
since_token is None and sync_config.filter_collection.blocks_all_presence()
)
if self.hs_config.use_presence and not block_all_presence_data:
await self._generate_sync_entry_for_presence(
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
)
await self._generate_sync_entry_for_to_device(sync_result_builder)
yield self._generate_sync_entry_for_to_device(sync_result_builder)
device_lists = await self._generate_sync_entry_for_device_list(
device_lists = yield self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_users=newly_joined_or_invited_users,
@@ -979,11 +987,11 @@ class SyncHandler(object):
device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)
await self._generate_sync_entry_for_groups(sync_result_builder)
yield self._generate_sync_entry_for_groups(sync_result_builder)
# debug for https://github.com/matrix-org/synapse/issues/4422
for joined_room in sync_result_builder.joined:
@@ -1007,17 +1015,18 @@ class SyncHandler(object):
)
@measure_func("_generate_sync_entry_for_groups")
async def _generate_sync_entry_for_groups(self, sync_result_builder):
@defer.inlineCallbacks
def _generate_sync_entry_for_groups(self, sync_result_builder):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
if since_token and since_token.groups_key:
results = await self.store.get_groups_changes_for_user(
results = yield self.store.get_groups_changes_for_user(
user_id, since_token.groups_key, now_token.groups_key
)
else:
results = await self.store.get_all_groups_for_user(
results = yield self.store.get_all_groups_for_user(
user_id, now_token.groups_key
)
@@ -1050,7 +1059,8 @@ class SyncHandler(object):
)
@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(
self,
sync_result_builder,
newly_joined_rooms,
@@ -1098,32 +1108,32 @@ class SyncHandler(object):
# room with by looking at all users that have left a room plus users
# that were in a room we've left.
users_who_share_room = await self.store.get_users_who_share_room_with_user(
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
# Step 1a, check for changes in devices of users we share a room with
users_that_have_changed = await self.store.get_users_whose_devices_changed(
users_that_have_changed = yield self.store.get_users_whose_devices_changed(
since_token.device_list_key, users_who_share_room
)
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id)
joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
users_that_have_changed.update(newly_joined_or_invited_users)
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
user_signatures_changed = yield self.store.get_users_whose_signatures_changed(
user_id, since_token.device_list_key
)
users_that_have_changed.update(user_signatures_changed)
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id)
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
@@ -1133,7 +1143,8 @@ class SyncHandler(object):
else:
return DeviceLists(changed=[], left=[])
async def _generate_sync_entry_for_to_device(self, sync_result_builder):
@defer.inlineCallbacks
def _generate_sync_entry_for_to_device(self, sync_result_builder):
"""Generates the portion of the sync response. Populates
`sync_result_builder` with the result.
@@ -1154,14 +1165,14 @@ class SyncHandler(object):
# We only delete messages when a new message comes in, but that's
# fine so long as we delete them at some point.
deleted = await self.store.delete_messages_for_device(
deleted = yield self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
messages, stream_id = await self.store.get_new_messages_for_device(
messages, stream_id = yield self.store.get_new_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)
@@ -1179,7 +1190,8 @@ class SyncHandler(object):
else:
sync_result_builder.to_device = []
async def _generate_sync_entry_for_account_data(self, sync_result_builder):
@defer.inlineCallbacks
def _generate_sync_entry_for_account_data(self, sync_result_builder):
"""Generates the account data portion of the sync response. Populates
`sync_result_builder` with the result.
@@ -1197,25 +1209,25 @@ class SyncHandler(object):
(
account_data,
account_data_by_room,
) = await self.store.get_updated_account_data_for_user(
) = yield self.store.get_updated_account_data_for_user(
user_id, since_token.account_data_key
)
push_rules_changed = await self.store.have_push_rules_changed_for_user(
push_rules_changed = yield self.store.have_push_rules_changed_for_user(
user_id, int(since_token.push_rules_key)
)
if push_rules_changed:
account_data["m.push_rules"] = await self.push_rules_for_user(
account_data["m.push_rules"] = yield self.push_rules_for_user(
sync_config.user
)
else:
(
account_data,
account_data_by_room,
) = await self.store.get_account_data_for_user(sync_config.user.to_string())
) = yield self.store.get_account_data_for_user(sync_config.user.to_string())
account_data["m.push_rules"] = await self.push_rules_for_user(
account_data["m.push_rules"] = yield self.push_rules_for_user(
sync_config.user
)
@@ -1230,7 +1242,8 @@ class SyncHandler(object):
return account_data_by_room
async def _generate_sync_entry_for_presence(
@defer.inlineCallbacks
def _generate_sync_entry_for_presence(
self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
):
"""Generates the presence portion of the sync response. Populates the
@@ -1258,7 +1271,7 @@ class SyncHandler(object):
presence_key = None
include_offline = False
presence, presence_key = await presence_source.get_new_events(
presence, presence_key = yield presence_source.get_new_events(
user=user,
from_key=presence_key,
is_guest=sync_config.is_guest,
@@ -1270,12 +1283,12 @@ class SyncHandler(object):
extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id)
users = yield self.state.get_current_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
if extra_users_ids:
states = await self.presence_handler.get_states(extra_users_ids)
states = yield self.presence_handler.get_states(extra_users_ids)
presence.extend(states)
# Deduplicate the presence entries so that there's at most one per user
@@ -1285,9 +1298,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms(
self, sync_result_builder, account_data_by_room
):
@defer.inlineCallbacks
def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1309,7 +1321,7 @@ class SyncHandler(object):
if block_all_room_ephemeral:
ephemeral_by_room = {}
else:
now_token, ephemeral_by_room = await self.ephemeral_by_room(
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_result_builder,
now_token=sync_result_builder.now_token,
since_token=sync_result_builder.since_token,
@@ -1321,16 +1333,16 @@ class SyncHandler(object):
since_token = sync_result_builder.since_token
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = yield self._have_rooms_changed(sync_result_builder)
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
tags_by_room = yield self.store.get_updated_tags(
user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
return [], [], [], []
ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
)
@@ -1340,18 +1352,18 @@ class SyncHandler(object):
ignored_users = frozenset()
if since_token:
res = await self._get_rooms_changed(sync_result_builder, ignored_users)
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms, newly_left_rooms = res
tags_by_room = await self.store.get_updated_tags(
tags_by_room = yield self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
res = await self._get_all_rooms(sync_result_builder, ignored_users)
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
newly_left_rooms = []
tags_by_room = await self.store.get_tags_for_user(user_id)
tags_by_room = yield self.store.get_tags_for_user(user_id)
def handle_room_entries(room_entry):
return self._generate_room_entry(
@@ -1364,7 +1376,7 @@ class SyncHandler(object):
always_include=sync_result_builder.full_state,
)
await concurrently_execute(handle_room_entries, room_entries, 10)
yield concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builder.invited.extend(invited)
@@ -1398,7 +1410,8 @@ class SyncHandler(object):
newly_left_users,
)
async def _have_rooms_changed(self, sync_result_builder):
@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
"""
@@ -1409,7 +1422,7 @@ class SyncHandler(object):
assert since_token
# Get a list of membership change events that have happened.
rooms_changed = await self.store.get_membership_changes_for_user(
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)
@@ -1422,7 +1435,8 @@ class SyncHandler(object):
return True
return False
async def _get_rooms_changed(self, sync_result_builder, ignored_users):
@defer.inlineCallbacks
def _get_rooms_changed(self, sync_result_builder, ignored_users):
"""Gets the the changes that have happened since the last sync.
Args:
@@ -1447,7 +1461,7 @@ class SyncHandler(object):
assert since_token
# Get a list of membership change events that have happened.
rooms_changed = await self.store.get_membership_changes_for_user(
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)
@@ -1485,11 +1499,11 @@ class SyncHandler(object):
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
old_mem_ev = await self.store.get_event(
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
@@ -1522,13 +1536,13 @@ class SyncHandler(object):
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
old_mem_ev = None
if old_mem_ev_id:
old_mem_ev = await self.store.get_event(
old_mem_ev = yield self.store.get_event(
old_mem_ev_id, allow_none=True
)
if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
@@ -1552,7 +1566,7 @@ class SyncHandler(object):
if leave_events:
leave_event = leave_events[-1]
leave_stream_token = await self.store.get_stream_token_for_event(
leave_stream_token = yield self.store.get_stream_token_for_event(
leave_event.event_id
)
leave_token = since_token.copy_and_replace(
@@ -1589,7 +1603,7 @@ class SyncHandler(object):
timeline_limit = sync_config.filter_collection.timeline_limit()
# Get all events for rooms we're currently joined to.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_to_events = yield self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
@@ -1638,7 +1652,8 @@ class SyncHandler(object):
return room_entries, invited, newly_joined_rooms, newly_left_rooms
async def _get_all_rooms(self, sync_result_builder, ignored_users):
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
"""Returns entries for all rooms for the user.
Args:
@@ -1662,7 +1677,7 @@ class SyncHandler(object):
Membership.BAN,
)
room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user_id, membership_list=membership_list
)
@@ -1685,7 +1700,7 @@ class SyncHandler(object):
elif event.membership == Membership.INVITE:
if event.sender in ignored_users:
continue
invite = await self.store.get_event(event.event_id)
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
elif event.membership in (Membership.LEAVE, Membership.BAN):
# Always send down rooms we were banned or kicked from.
@@ -1711,7 +1726,8 @@ class SyncHandler(object):
return room_entries, invited, []
async def _generate_room_entry(
@defer.inlineCallbacks
def _generate_room_entry(
self,
sync_result_builder,
ignored_users,
@@ -1753,7 +1769,7 @@ class SyncHandler(object):
since_token = room_builder.since_token
upto_token = room_builder.upto_token
batch = await self._load_filtered_recents(
batch = yield self._load_filtered_recents(
room_id,
sync_config,
now_token=upto_token,
@@ -1780,7 +1796,7 @@ class SyncHandler(object):
# tag was added by synapse e.g. for server notice rooms.
if full_state:
user_id = sync_result_builder.sync_config.user.to_string()
tags = await self.store.get_tags_for_room(user_id, room_id)
tags = yield self.store.get_tags_for_room(user_id, room_id)
# If there aren't any tags, don't send the empty tags list down
# sync
@@ -1805,7 +1821,7 @@ class SyncHandler(object):
):
return
state = await self.compute_state_delta(
state = yield self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
@@ -1828,7 +1844,7 @@ class SyncHandler(object):
)
or since_token is None
):
summary = await self.compute_summary(
summary = yield self.compute_summary(
room_id, sync_config, batch, state, now_token
)
@@ -1845,7 +1861,7 @@ class SyncHandler(object):
)
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
notifs = yield self.unread_notifs_for_room_id(room_id, sync_config)
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
@@ -1871,7 +1887,8 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(self, user_id, stream_ordering):
@defer.inlineCallbacks
def get_rooms_for_user_at(self, user_id, stream_ordering):
"""Get set of joined rooms for a user at the given stream ordering.
The stream ordering *must* be recent, otherwise this may throw an
@@ -1886,7 +1903,7 @@ class SyncHandler(object):
Deferred[frozenset[str]]: Set of room_ids the user is in at given
stream_ordering.
"""
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id)
joined_room_ids = set()
@@ -1904,10 +1921,10 @@ class SyncHandler(object):
logger.info("User joined room after current token: %s", room_id)
extrems = await self.store.get_forward_extremeties_for_room(
extrems = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
users_in_room = yield self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
joined_room_ids.add(room_id)

View File

@@ -313,7 +313,10 @@ class TypingNotificationEventSource(object):
events.append(self._make_event_for(room_id))
return defer.succeed((events, handler._latest_room_serial))
return events, handler._latest_room_serial
def get_current_key(self):
return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return [], pagination_config.from_key

View File

@@ -47,9 +47,9 @@ class SynapseRequest(Request):
logcontext(LoggingContext) : the log context for this request
"""
def __init__(self, channel, *args, **kw):
def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = channel.site
self.site = site
self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
@@ -331,6 +331,18 @@ class XForwardedForRequest(SynapseRequest):
)
class SynapseRequestFactory(object):
def __init__(self, site, x_forwarded_for):
self.site = site
self.x_forwarded_for = x_forwarded_for
def __call__(self, *args, **kwargs):
if self.x_forwarded_for:
return XForwardedForRequest(self.site, *args, **kwargs)
else:
return SynapseRequest(self.site, *args, **kwargs)
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
@@ -352,7 +364,7 @@ class SynapseSite(Site):
self.site_tag = site_tag
proxied = config.get("x_forwarded", False)
self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name)
self.server_version_string = server_version_string.encode("ascii")

Some files were not shown because too many files have changed in this diff Show More