1
0

Compare commits

..

109 Commits

Author SHA1 Message Date
Andrew Morgan
460f09d32a Merge branch 'develop' into anoa/v2_is 2019-09-02 11:54:41 +01:00
Aaron Raimist
cee00a3584 Update INSTALL.md to say that Python 2 is no longer supported (#5953)
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-09-02 11:27:39 +01:00
Andrew Morgan
bed1e94ca0 Merge branch 'develop' into anoa/v2_is 2019-08-30 17:36:38 +01:00
Andrew Morgan
0766ef23cf Merge branch 'anoa/m_id_access_token' into anoa/v2_is 2019-08-30 17:18:57 +01:00
Andrew Morgan
2a012e8a04 Revert "Add m.id_access_token flag (#5930)" (#5945)
This reverts commit 4765f0cfd9.
2019-08-30 17:13:37 +01:00
Andrew Morgan
4548d1f87e Remove unnecessary parentheses around return statements (#5931)
Python will return a tuple whether there are parentheses around the returned values or not.

I'm just sick of my editor complaining about this all over the place :)
2019-08-30 16:28:26 +01:00
Amber Brown
4fca313389 Move buildkite config to the pipelines repo (#5943) 2019-08-31 01:01:57 +10:00
Andrew Morgan
4765f0cfd9 Add m.id_access_token flag (#5930)
Adds a flag to `/versions`' `unstable_features` section indicating that this Synapse understands what an `id_access_token` is, as per https://github.com/matrix-org/synapse/issues/5927#issuecomment-523566043

Fixes #5927
2019-08-30 15:22:51 +01:00
Amber Brown
d19505a8c1 Removed unused jenkins/ folder and script (#5938) 2019-08-30 23:13:16 +10:00
Andrew Morgan
2be726a4dd Merge branch 'develop' into anoa/v2_is 2019-08-30 13:18:37 +01:00
Andrew Morgan
3057095a5d Revert "Use the v2 lookup API for 3PID invites (#5897)" (#5937)
This reverts commit 71fc04069a.

This broke 3PID invites as #5892 was required for it to work correctly.
2019-08-30 12:00:20 +01:00
Andrew Morgan
a578c422f4 Merge branch 'develop' into anoa/m_id_access_token 2019-08-30 10:33:00 +01:00
Amber Brown
5625abe503 Fix buildkite pipeline plugin matrix-org/annotate using the wrong variable config 2019-08-30 15:06:40 +10:00
Andrew Morgan
60e447d627 Merge branch 'develop' into anoa/m_id_access_token 2019-08-29 13:40:26 +01:00
Amber Brown
e7011280c7 Fix coverage in sytest and use plugins for buildkite (#5922) 2019-08-29 22:19:57 +10:00
Andrew Morgan
6fcab5b4cf Add changelog 2019-08-29 11:56:15 +01:00
Andrew Morgan
a640aa6e18 Add m.id_access_token flag 2019-08-29 11:53:49 +01:00
Jorik Schellekens
92c1550f4a Add a link to python's logging config schema (#5926) 2019-08-28 19:08:32 +01:00
Will Hunt
c8fa620d7a Merge pull request #5902 from matrix-org/hs/exempt-support-users-from-consent
Exempt support users from consent
2019-08-28 16:31:40 +01:00
Jorik Schellekens
deca277d09 Let synctl use a config directory. (#5904)
* Let synctl use a config directory.
2019-08-28 15:55:58 +01:00
Will Hunt
5798a134c0 Removing entry for 5903 2019-08-28 14:25:05 +01:00
Andrew Morgan
71fc04069a Use the v2 lookup API for 3PID invites (#5897)
Fixes https://github.com/matrix-org/synapse/issues/5861

Adds support for the v2 lookup API as defined in [MSC2134](https://github.com/matrix-org/matrix-doc/pull/2134). Currently this is only used for 3PID invites.

Sytest PR: https://github.com/matrix-org/sytest/pull/679
2019-08-28 14:59:26 +02:00
Andrew Morgan
849d8dc19f Merge branch 'anoa/v2_lookup' of github.com:matrix-org/synapse into anoa/v2_lookup 2019-08-28 13:44:02 +01:00
Andrew Morgan
4dc08495b8 lint 2019-08-28 13:43:52 +01:00
Andrew Morgan
8f1346d82b Apply suggestions from code review
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-28 14:43:05 +02:00
Andrew Morgan
38dac2774f Warn user when the id_server they chose does not support any of the hs' desired lookup algos 2019-08-28 13:41:29 +01:00
Jorik Schellekens
6d97843793 Config templating (#5900)
Template config files

* Imagine a system composed entirely of x, y, z etc and the basic operations..

Wait George, why XOR? Why not just neq?

George: Eh, I didn't think of that..

Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-28 13:12:22 +01:00
Amber Brown
7dc398586c Implement a structured logging output system. (#5680) 2019-08-28 21:18:53 +10:00
Andrew Morgan
e68d648594 small fixes and remove unnecessary Enum 2019-08-28 11:10:32 +01:00
Richard van der Hoff
49ef8ec399 Fix a cache-invalidation bug for worker-based deployments (#5920)
Some of the caches on worker processes were not being correctly invalidated
when a room's state was changed in a way that did not affect the membership
list of the room.

We need to make sure we send out cache invalidations even when no memberships
are changing.
2019-08-28 10:18:16 +01:00
reivilibre
a3f0635686 Merge pull request #5914 from matrix-org/rei/admin_getadmin
Add GET method to admin API /users/@user:dom/admin
2019-08-28 09:44:22 +01:00
Victor Goff
1196ee32b3 Typographical corrections in docker/README (#5921) 2019-08-28 09:34:49 +01:00
reivilibre
7ccc251415 Merge pull request #5859 from matrix-org/rei/msc2197
MSC2197 Search Filters over Federation
2019-08-28 09:00:21 +01:00
Erik Johnston
dfd10f5133 Merge pull request #5864 from matrix-org/erikj/reliable_lookups
Refactor MatrixFederationAgent to retry SRV.
2019-08-27 16:54:06 +01:00
Erik Johnston
91caa5b430 Fix off by one error in SRV result shuffling 2019-08-27 13:56:42 +01:00
Olivier Wilkinson (reivilibre)
1b959b6977 Document GET method for retrieving admin bit of user in admin API
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:19:19 +01:00
Olivier Wilkinson (reivilibre)
c88a119259 Add GET method to admin API /users/@user:dom/admin
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:12:27 +01:00
Andrew Morgan
75ef0f8b1d lint 2019-08-27 13:08:14 +01:00
Andrew Morgan
7bfccadf31 Address review comments 2019-08-27 13:06:29 +01:00
reivilibre
322ccac33f Allow schema deltas to be engine-specific (#5911)
* Allow schema deltas to be engine-specific

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Code style (Black)

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 11:53:21 +01:00
Richard van der Hoff
ccb15a5bbe Merge pull request #5906 from matrix-org/neilj/increase_display_name_limit
Increase profile display name limit
2019-08-27 11:52:59 +01:00
Erik Johnston
f5b50d0871 Merge pull request #5895 from matrix-org/erikj/notary_key
Add config option to sign remote key query responses with a separate key.
2019-08-27 11:51:37 +01:00
Richard van der Hoff
e7577427c9 Update 5909.misc 2019-08-27 11:50:52 +01:00
Richard van der Hoff
7837a5f2ea Merge pull request #5909 from aaronraimist/public_base_url
public_base_url is actually public_baseurl
2019-08-27 11:49:59 +01:00
reivilibre
1a7e6eb633 Add Admin API capability to set adminship of a user (#5878)
Admin API: Set adminship of a user
2019-08-27 10:14:00 +01:00
Olivier Wilkinson (reivilibre)
d1e0b91083 Code style (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:39:11 +01:00
Olivier Wilkinson (reivilibre)
62a1639287 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:36:12 +01:00
Olivier Wilkinson (reivilibre)
aefa76f5cd Allow schema deltas to be engine-specific
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:14:00 +01:00
Aaron Raimist
c25137a99f Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-08-26 21:06:10 -05:00
Aaron Raimist
e8e3e033ee public_base_url is actually public_baseurl
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-08-26 21:01:56 -05:00
Neil Johnson
27d3fc421a Increase max display name limit 2019-08-24 22:33:43 +01:00
Erik Johnston
fbb758a7ce Fixup comments 2019-08-23 15:37:20 +01:00
Erik Johnston
e70f0081da Fix logcontexts 2019-08-23 15:37:20 +01:00
Erik Johnston
fe0ac98e66 Don't implicitly include server signing key 2019-08-23 15:36:28 +01:00
Erik Johnston
7af5a63063 Fixup review comments 2019-08-23 15:36:28 +01:00
Will Hunt
c998f25006 Apply suggestions from code review
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-23 10:28:54 +01:00
Half-Shot
4a2d2c2b6f Update changelog 2019-08-23 09:57:07 +01:00
Half-Shot
9ba32f6573 Exempt bot users 2019-08-23 09:56:31 +01:00
Half-Shot
ffa5b757c7 Merge branch 'hs/bot-user-type' into hs/exempt-support-users-from-consent 2019-08-23 09:55:57 +01:00
Half-Shot
971c980c6e Add changelog 2019-08-23 09:53:48 +01:00
Half-Shot
d9b8cf81be Add bot type 2019-08-23 09:52:09 +01:00
Half-Shot
0fb5189072 Fix registration test 2019-08-23 09:25:35 +01:00
Half-Shot
80793e813c newsfile 5902 2019-08-23 09:20:31 +01:00
Half-Shot
ae38e0569f Ignore consent for support users 2019-08-23 09:15:10 +01:00
Half-Shot
886eceba3e Return user_type in get_user_by_id 2019-08-23 09:14:52 +01:00
Jorik Schellekens
8767b63a82 Propagate opentracing contexts through EDUs (#5852)
Propagate opentracing contexts through EDUs
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2019-08-22 18:21:10 +01:00
Richard van der Hoff
0b39fa53b6 Merge pull request #5877 from Awesome-Technologies/remove_shared_secret_registration
Remove shared secret registration
2019-08-22 18:12:25 +01:00
Jorik Schellekens
812ed6b0d5 Opentracing across workers (#5771)
Propagate opentracing contexts across workers


Also includes some Convenience modifications to opentracing for servlets, notably:
- Add boolean to skip the whitelisting check on inject
  extract methods. - useful when injecting into carriers
  locally. Otherwise we'd always have to include our
  own servername and whitelist our servername
- start_active_span_from_request instead of header
- Add boolean to decide whether to extract context
  from a request to a servlet
2019-08-22 18:08:07 +01:00
Manuel Stahl
0bab582fd6 Remove shared secret registration from client/r0/register endpoint
This type of registration was probably never used. It only includes the
user name in the HMAC but not the password.

Shared secret registration is still available via
client/r0/admin/register.

Signed-off-by: Manuel Stahl <manuel.stahl@awesome-technologies.de>
2019-08-22 18:04:08 +02:00
Brendan Abolivier
dbd46decad Revert "Do not send consent notices if "no-consent-required" is set"
This reverts commit 27a686e53b.
2019-08-22 14:47:43 +01:00
Brendan Abolivier
1c5b8c6222 Revert "Add "require_consent" parameter for registration"
This reverts commit 3320aaab3a.
2019-08-22 14:47:34 +01:00
Half-Shot
27a686e53b Do not send consent notices if "no-consent-required" is set 2019-08-22 14:22:04 +01:00
Half-Shot
3320aaab3a Add "require_consent" parameter for registration 2019-08-22 14:21:54 +01:00
Erik Johnston
1e4b4d85e7 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/reliable_lookups 2019-08-22 13:41:57 +01:00
Erik Johnston
1b09cf8658 Merge pull request #5850 from matrix-org/erikj/retry_well_known_on_fail
Retry well known on fail
2019-08-22 13:17:05 +01:00
Jorik Schellekens
9a6f2be572 Opentrace e2e keys (#5855)
Add opentracing tags and logs for e2e keys
2019-08-22 11:28:12 +01:00
Richard van der Hoff
c9f11d09fc Add missing index on users_in_public_rooms. (#5894) 2019-08-22 10:43:13 +01:00
Richard van der Hoff
119aa31b10 Servlet to purge old rooms (#5845) 2019-08-22 10:42:59 +01:00
Richard van der Hoff
ef1c524bb3 Improve error msg when key-fetch fails (#5896)
There's no point doing a raise_from here, because the exception is always
logged at warn with no stacktrace in the caller. Instead, let's try to give
better messages to reduce confusion.

In particular, this means that we won't log 'Failed to connect to remote
server' when we don't even attempt to connect to the remote server due to
blacklisting.
2019-08-22 10:42:06 +01:00
Andrew Morgan
2472e2e40d lint 2019-08-21 18:24:37 +02:00
Andrew Morgan
73fb6f3723 Continue to support v1 lookup 2019-08-21 18:23:34 +02:00
Andrew Morgan
5426e13168 Merge branch 'develop' into anoa/v2_lookup
* develop:
  Drop some unused tables. (#5893)
  Avoid deep recursion in appservice recovery (#5885)
  Opentracing doc update (#5776)
  Refactor the Appservice scheduler code
2019-08-21 18:15:59 +02:00
Andrew Morgan
3a114fe105 linter fight 2019-08-21 16:31:06 +02:00
Andrew Morgan
902ef397af add changelog 2019-08-21 16:29:06 +02:00
Andrew Morgan
24ee3aecd5 lint 2019-08-21 16:27:42 +02:00
Andrew Morgan
1954438610 Use the v2 lookup API 2019-08-21 16:25:00 +02:00
Richard van der Hoff
4dab867288 Drop some unused tables. (#5893)
These tables are never used, so we may as well drop them.
2019-08-21 13:16:28 +01:00
Erik Johnston
62fb643cdc Newsfile 2019-08-21 11:21:58 +01:00
Erik Johnston
97cbc96093 Only sign when we respond to remote key requests 2019-08-21 11:21:58 +01:00
Erik Johnston
5906be8589 Add config option for keys to use to sign keys
This allows servers to separate keys that are used to sign remote keys
when acting as a notary server.
2019-08-21 10:44:58 +01:00
Richard van der Hoff
72bc285669 Refactor the Appservice scheduler code (#5886)
Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
2019-08-20 17:42:45 +01:00
Richard van der Hoff
baa3f4a80d Avoid deep recursion in appservice recovery (#5885)
Hopefully, this will fix a stack overflow when recovering an appservice.

The recursion here leads to a huge chain of deferred callbacks, which then
overflows the stack when the chain completes. `inlineCallbacks` makes a better
job of this if we use iteration instead.

Clean up the code a bit too, while we're there.
2019-08-20 17:39:38 +01:00
Jorik Schellekens
c886f976e0 Opentracing doc update (#5776)
Update opentracing docs to use the unified 'trace' method
2019-08-20 13:56:03 +01:00
Erik Johnston
29763f01c6 Make changelog entry be a feature 2019-08-20 12:38:06 +01:00
Erik Johnston
74f016d343 Remove now unused pick_server_from_list 2019-08-20 12:37:08 +01:00
Erik Johnston
1f9df1cc7b Fixup _sort_server_list to be slightly more efficient
Also document that we are using the algorithm described in RFC2782 and
ensure we handle zero weight correctly.
2019-08-20 12:36:11 +01:00
Richard van der Hoff
5019945828 Refactor the Appservice scheduler code
Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
2019-08-20 11:50:23 +01:00
Erik Johnston
7777d353bf Remove test debugs 2019-08-20 11:46:59 +01:00
Erik Johnston
1dec31560e Change jitter to be a factor rather than absolute value 2019-08-20 11:46:00 +01:00
Olivier Wilkinson (reivilibre)
502728777c Newsfile on one line
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 08:49:53 +01:00
Olivier Wilkinson (reivilibre)
bb29bc2937 Use MSC2197 on stable prefix as it has almost finished FCP
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 08:49:31 +01:00
Erik Johnston
861d663c15 Fixup changelog and remove debug logging 2019-08-16 13:15:26 +01:00
Erik Johnston
c03e3e8301 Newsfile 2019-08-15 15:43:22 +01:00
Erik Johnston
f299c5414c Refactor MatrixFederationAgent to retry SRV.
This refactors MatrixFederationAgent to move the SRV lookup into the
endpoint code, this has two benefits:
	1. Its easier to retry different host/ports in the same way as
	   HostnameEndpoint.
	2. We avoid SRV lookups if we have a free connection in the pool
2019-08-15 15:43:22 +01:00
Olivier Wilkinson (reivilibre)
a3df04a899 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-15 11:09:07 +01:00
Olivier Wilkinson (reivilibre)
2253b083d9 Add support for inbound MSC2197 requests on unstable Federation API
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-15 11:06:21 +01:00
Olivier Wilkinson (reivilibre)
6fadb560fc Support MSC2197 outbound with unstable prefix
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-15 10:59:37 +01:00
Erik Johnston
1771f0045d Newsfile 2019-08-15 09:28:58 +01:00
Erik Johnston
e6e136decc Retry well known on fail.
If we have recently seen a valid well-known for a domain we want to
retry on (non-final) errors a few times, to handle temporary blips in
networking/etc.
2019-08-15 09:28:58 +01:00
200 changed files with 3838 additions and 1451 deletions

View File

@@ -6,6 +6,7 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.5
@@ -16,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /app
working_dir: /src
volumes:
- ..:/app
- ..:/src

View File

@@ -6,6 +6,7 @@ services:
image: postgres:11
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
@@ -16,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /app
working_dir: /src
volumes:
- ..:/app
- ..:/src

View File

@@ -6,6 +6,7 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
@@ -16,6 +17,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /app
working_dir: /src
volumes:
- ..:/app
- ..:/src

View File

@@ -1,3 +1,18 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from tap.parser import Parser
from tap.line import Result, Unknown, Diagnostic

View File

@@ -27,7 +27,7 @@ git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
git merge --no-edit origin/$GITBASE
git merge --no-edit --no-commit origin/$GITBASE
# Show what we are after.
git --no-pager show -s

View File

@@ -1,240 +0,0 @@
env:
CODECOV_TOKEN: "2dd7eb9b-0eda-45fe-a47c-9b5ac040045f"
steps:
- command:
- "python -m pip install tox"
- "tox -e check_codestyle"
label: "\U0001F9F9 Check Style"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "tox -e packaging"
label: "\U0001F9F9 packaging"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "tox -e check_isort"
label: "\U0001F9F9 isort"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "scripts-dev/check-newsfragment"
label: ":newspaper: Newsfile"
branches: "!master !develop !release-*"
plugins:
- docker#v3.0.1:
image: "python:3.6"
propagate-environment: true
- command:
- "python -m pip install tox"
- "tox -e check-sampleconfig"
label: "\U0001F9F9 check-sample-config"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- wait
- command:
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
- "python3.5 -m pip install tox"
- "tox -e py35-old,codecov"
label: ":python: 3.5 / SQLite / Old Deps"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py35,codecov"
label: ":python: 3.5 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.5"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py36,codecov"
label: ":python: 3.6 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.6"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py37,codecov"
label: ":python: 3.7 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.7"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.5 / :postgres: 9.5"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py35.pg95.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.7 / :postgres: 9.5"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py37.pg95.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.7 / :postgres: 11"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py37.pg11.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / SQLite / Monolith"
agents:
queue: "medium"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Monolith"
agents:
queue: "medium"
env:
POSTGRES: "1"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Workers"
agents:
queue: "medium"
env:
POSTGRES: "1"
WORKERS: "1"
BLACKLIST: "synapse-blacklist-with-workers"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash -c 'cat /src/sytest-blacklist /src/.buildkite/worker-blacklist > /src/synapse-blacklist-with-workers'"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2

View File

@@ -1,7 +1,8 @@
[run]
branch = True
parallel = True
include = synapse/*
include=$TOP/synapse/*
data_file = $TOP/.coverage
[report]
precision = 2

5
.gitignore vendored
View File

@@ -20,6 +20,7 @@ _trial_temp*/
/*.signing.key
/env/
/homeserver*.yaml
/logs
/media_store/
/uploads
@@ -29,8 +30,9 @@ _trial_temp*/
/.vscode/
# build products
/.coverage*
!/.coveragerc
/.coverage*
/.mypy_cache/
/.tox
/build/
/coverage.*
@@ -38,4 +40,3 @@ _trial_temp*/
/docs/build/
/htmlcov
/pip-wheel-metadata/

View File

@@ -36,7 +36,7 @@ that your email address is probably `user@example.com` rather than
System requirements:
- POSIX-compliant system (tested on Linux & OS X)
- Python 3.5, 3.6, 3.7, or 2.7
- Python 3.5, 3.6, or 3.7
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in
@@ -421,7 +421,7 @@ If Synapse is not configured with an SMTP server, password reset via email will
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
Alternatively you can do so from the command line if you have installed via pip.
Alternatively you can do so from the command line if you have installed via pip.
This can be done as follows:

1
changelog.d/5680.misc Normal file
View File

@@ -0,0 +1 @@
Lay the groundwork for structured logging output.

1
changelog.d/5771.feature Normal file
View File

@@ -0,0 +1 @@
Make Opentracing work in worker mode.

1
changelog.d/5776.misc Normal file
View File

@@ -0,0 +1 @@
Update opentracing docs to use the unified `trace` method.

1
changelog.d/5845.feature Normal file
View File

@@ -0,0 +1 @@
Add an admin API to purge old rooms from the database.

1
changelog.d/5850.feature Normal file
View File

@@ -0,0 +1 @@
Add retry to well-known lookups if we have recently seen a valid well-known record for the server.

1
changelog.d/5852.feature Normal file
View File

@@ -0,0 +1 @@
Pass opentracing contexts between servers when transmitting EDUs.

1
changelog.d/5855.misc Normal file
View File

@@ -0,0 +1 @@
Opentracing for room and e2e keys.

1
changelog.d/5859.feature Normal file
View File

@@ -0,0 +1 @@
Add unstable support for MSC2197 (filtered search requests over federation), in order to allow upcoming room directory query performance improvements.

1
changelog.d/5864.feature Normal file
View File

@@ -0,0 +1 @@
Correctly retry all hosts returned from SRV when we fail to connect.

1
changelog.d/5877.removal Normal file
View File

@@ -0,0 +1 @@
Remove shared secret registration from client/r0/register endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.

1
changelog.d/5885.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix stack overflow when recovering an appservice which had an outage.

1
changelog.d/5886.misc Normal file
View File

@@ -0,0 +1 @@
Refactor the Appservice scheduler code.

1
changelog.d/5893.misc Normal file
View File

@@ -0,0 +1 @@
Drop some unused tables.

1
changelog.d/5894.misc Normal file
View File

@@ -0,0 +1 @@
Add missing index on users_in_public_rooms to improve the performance of directory queries.

1
changelog.d/5895.feature Normal file
View File

@@ -0,0 +1 @@
Add config option to sign remote key query responses with a separate key.

1
changelog.d/5896.misc Normal file
View File

@@ -0,0 +1 @@
Improve the logging when we have an error when fetching signing keys.

1
changelog.d/5897.feature Normal file
View File

@@ -0,0 +1 @@
Switch to the v2 lookup API for 3PID invites.

1
changelog.d/5900.feature Normal file
View File

@@ -0,0 +1 @@
Add support for config templating.

1
changelog.d/5902.feature Normal file
View File

@@ -0,0 +1 @@
Users with the type of "support" or "bot" are no longer required to consent.

1
changelog.d/5904.feature Normal file
View File

@@ -0,0 +1 @@
Let synctl accept a directory of config files.

1
changelog.d/5906.feature Normal file
View File

@@ -0,0 +1 @@
Increase max display name size to 256.

1
changelog.d/5909.misc Normal file
View File

@@ -0,0 +1 @@
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!

1
changelog.d/5911.misc Normal file
View File

@@ -0,0 +1 @@
Add support for database engine-specific schema deltas, based on file extension.

1
changelog.d/5914.feature Normal file
View File

@@ -0,0 +1 @@
Add admin API endpoint for getting whether or not a user is a server administrator.

1
changelog.d/5920.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a cache-invalidation bug for worker-based deployments.

1
changelog.d/5922.misc Normal file
View File

@@ -0,0 +1 @@
Update Buildkite pipeline to use plugins instead of buildkite-agent commands.

1
changelog.d/5926.misc Normal file
View File

@@ -0,0 +1 @@
Add link in sample config to the logging config schema.

1
changelog.d/5930.misc Normal file
View File

@@ -0,0 +1 @@
Add temporary flag to /versions in unstable_features to indicate this Synapse supports receiving id_access_token parameters on calls to identity server-proxying endpoints.

1
changelog.d/5931.misc Normal file
View File

@@ -0,0 +1 @@
Remove unnecessary parentheses in return statements.

1
changelog.d/5938.misc Normal file
View File

@@ -0,0 +1 @@
Remove unused jenkins/prepare_sytest.sh file.

1
changelog.d/5943.misc Normal file
View File

@@ -0,0 +1 @@
Move Buildkite pipeline config to the pipelines repo.

1
changelog.d/5953.misc Normal file
View File

@@ -0,0 +1 @@
Update INSTALL.md to say that Python 2 is no longer supported.

View File

@@ -17,7 +17,7 @@ By default, the image expects a single volume, located at ``/data``, that will h
* the appservices configuration.
You are free to use separate volumes depending on storage endpoints at your
disposal. For instance, ``/data/media`` coud be stored on a large but low
disposal. For instance, ``/data/media`` could be stored on a large but low
performance hdd storage while other files could be stored on high performance
endpoints.
@@ -27,8 +27,8 @@ configuration file there. Multiple application services are supported.
## Generating a configuration file
The first step is to genearte a valid config file. To do this, you can run the
image with the `generate` commandline option.
The first step is to generate a valid config file. To do this, you can run the
image with the `generate` command line option.
You will need to specify values for the `SYNAPSE_SERVER_NAME` and
`SYNAPSE_REPORT_STATS` environment variable, and mount a docker volume to store
@@ -59,7 +59,7 @@ The following environment variables are supported in `generate` mode:
* `SYNAPSE_CONFIG_PATH`: path to the file to be generated. Defaults to
`<SYNAPSE_CONFIG_DIR>/homeserver.yaml`.
* `SYNAPSE_DATA_DIR`: where the generated config will put persistent data
such as the datatase and media store. Defaults to `/data`.
such as the database and media store. Defaults to `/data`.
* `UID`, `GID`: the user id and group id to use for creating the data
directories. Defaults to `991`, `991`.
@@ -115,7 +115,7 @@ not given).
To migrate from a dynamic configuration file to a static one, run the docker
container once with the environment variables set, and `migrate_config`
commandline option. For example:
command line option. For example:
```
docker run -it --rm \

View File

@@ -0,0 +1,18 @@
Purge room API
==============
This API will remove all trace of a room from your database.
All local users must have left the room before it can be removed.
The API is:
```
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
```
You must authenticate using the access token of an admin user.

View File

@@ -86,6 +86,25 @@ with a body of:
including an ``access_token`` of a server admin.
Get whether a user is a server administrator or not
===================================================
The api is::
GET /_synapse/admin/v1/users/<user_id>/admin
including an ``access_token`` of a server admin.
A response body like the following is returned:
.. code:: json
{
"admin": true
}
Change whether a user is a server administrator or not
======================================================

View File

@@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans
it creates. This is called the sampling policy and it can be configured
through Jaeger's settings.
For OpenTracing concepts see
For OpenTracing concepts see
https://opentracing.io/docs/overview/what-is-tracing/.
For more information about Jaeger's implementation see
@@ -79,7 +79,7 @@ Homeserver whitelisting
The homeserver whitelist is configured using regular expressions. A list of regular
expressions can be given and their union will be compared when propagating any
spans contexts to another homeserver.
spans contexts to another homeserver.
Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead to
@@ -92,6 +92,29 @@ two problems, namely:
but that doesn't prevent another server sending you baggage which will be logged
to OpenTracing's logs.
==========
EDU FORMAT
==========
EDUs can contain tracing data in their content. This is not specced but
it could be of interest for other homeservers.
EDU format (if you're using jaeger):
.. code-block:: json
{
"edu_type": "type",
"content": {
"org.matrix.opentracing_context": {
"uber-trace-id": "fe57cf3e65083289"
}
}
}
Though you don't have to use jaeger you must inject the span context into
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
==================
Configuring Jaeger
==================

View File

@@ -205,9 +205,9 @@ listeners:
#
- port: 8008
tls: false
bind_addresses: ['::1', '127.0.0.1']
type: http
x_forwarded: true
bind_addresses: ['::1', '127.0.0.1']
resources:
- names: [client, federation]
@@ -392,10 +392,10 @@ listeners:
# permission to listen on port 80.
#
acme:
# ACME support is disabled by default. Uncomment the following line
# (and tls_certificate_path and tls_private_key_path above) to enable it.
# ACME support is disabled by default. Set this to `true` and uncomment
# tls_certificate_path and tls_private_key_path above to enable it.
#
#enabled: true
enabled: False
# Endpoint to use to request certificates. If you only want to test,
# use Let's Encrypt's staging url:
@@ -406,17 +406,17 @@ acme:
# Port number to listen on for the HTTP-01 challenge. Change this if
# you are forwarding connections through Apache/Nginx/etc.
#
#port: 80
port: 80
# Local addresses to listen on for incoming connections.
# Again, you may want to change this if you are forwarding connections
# through Apache/Nginx/etc.
#
#bind_addresses: ['::', '0.0.0.0']
bind_addresses: ['::', '0.0.0.0']
# How many days remaining on a certificate before it is renewed.
#
#reprovision_threshold: 30
reprovision_threshold: 30
# The domain that the certificate should be for. Normally this
# should be the same as your Matrix domain (i.e., 'server_name'), but,
@@ -430,7 +430,7 @@ acme:
#
# If not set, defaults to your 'server_name'.
#
#domain: matrix.example.com
domain: matrix.example.com
# file to use for the account key. This will be generated if it doesn't
# exist.
@@ -485,7 +485,8 @@ database:
## Logging ##
# A yaml python logging config file
# A yaml python logging config file as described by
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
#
log_config: "CONFDIR/SERVERNAME.log.config"
@@ -1027,6 +1028,14 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
# Enable SAML2 for registration and login. Uses pysaml2.

View File

@@ -0,0 +1,83 @@
# Structured Logging
A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
A structured logging configuration looks similar to the following:
```yaml
structured: true
loggers:
synapse:
level: INFO
synapse.storage.SQL:
level: WARNING
drains:
console:
type: console
location: stdout
file:
type: file_json
location: homeserver.log
```
The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
## Drain Types
Drain types can be specified by the `type` key.
### `console`
Outputs human-readable logs to the console.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `console_json`
Outputs machine-readable JSON logs to the console.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `console_json_terse`
Outputs machine-readable JSON logs to the console, separated by newlines. This
format is not designed to be read and re-formatted into human-readable text, but
is optimal for a logging aggregation system.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `file`
Outputs human-readable logs to a file.
Arguments:
- `location`: An absolute path to the file to log to.
### `file_json`
Outputs machine-readable logs to a file.
Arguments:
- `location`: An absolute path to the file to log to.
### `network_json_terse`
Delivers machine-readable JSON logs to a log aggregator over TCP. This is
compatible with LogStash's TCP input with the codec set to `json_lines`.
Arguments:
- `host`: Hostname or IP address of the log aggregator.
- `port`: Numerical port to contact on the host.

View File

@@ -1,16 +0,0 @@
#! /bin/bash
set -eux
cd "`dirname $0`/.."
TOX_DIR=$WORKSPACE/.tox
mkdir -p $TOX_DIR
if ! [ $TOX_DIR -ef .tox ]; then
ln -s "$TOX_DIR" .tox
fi
# set up the virtualenv
tox -e py27 --notest -v

View File

@@ -276,25 +276,25 @@ class Auth(object):
self.get_access_token_from_request(request)
)
if app_service is None:
return (None, None)
return None, None
if app_service.ip_range_whitelist:
ip_address = IPAddress(self.hs.get_ip_from_request(request))
if ip_address not in app_service.ip_range_whitelist:
return (None, None)
return None, None
if b"user_id" not in request.args:
return (app_service.sender, app_service)
return app_service.sender, app_service
user_id = request.args[b"user_id"][0].decode("utf8")
if app_service.sender == user_id:
return (app_service.sender, app_service)
return app_service.sender, app_service
if not app_service.is_interested_in_user(user_id):
raise AuthError(403, "Application service cannot masquerade as this user.")
if not (yield self.store.get_user_by_id(user_id)):
raise AuthError(403, "Application service has not registered this user")
return (user_id, app_service)
return user_id, app_service
@defer.inlineCallbacks
def get_user_by_access_token(self, token, rights="access"):
@@ -694,7 +694,7 @@ class Auth(object):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_was_in_room(room_id, user_id)
return (member_event.membership, member_event.event_id)
return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@@ -703,7 +703,7 @@ class Auth(object):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
return (Membership.JOIN, None)
return Membership.JOIN, None
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN

View File

@@ -122,7 +122,8 @@ class UserTypes(object):
"""
SUPPORT = "support"
ALL_USER_TYPES = (SUPPORT,)
BOT = "bot"
ALL_USER_TYPES = (SUPPORT, BOT)
class RelationTypes(object):

View File

@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
# list of tuples of function, args list, kwargs dict
_sighup_callbacks = []
def register_sighup(func):
def register_sighup(func, *args, **kwargs):
"""
Register a function to be called when a SIGHUP occurs.
Args:
func (function): Function to be called when sent a SIGHUP signal.
Will be called with a single argument, the homeserver.
Will be called with a single default argument, the homeserver.
*args, **kwargs: args and kwargs to be passed to the target function.
"""
_sighup_callbacks.append(func)
_sighup_callbacks.append((func, args, kwargs))
def start_worker_reactor(appname, config, run_command=reactor.run):
@@ -248,8 +250,8 @@ def start(hs, listeners=None):
# we're not using systemd.
sdnotify(b"RELOADING=1")
for i in _sighup_callbacks:
i(hs)
for i, args, kwargs in _sighup_callbacks:
i(hs, *args, **kwargs)
sdnotify(b"READY=1")

View File

@@ -227,8 +227,6 @@ def start(config_options):
config.start_pushers = False
config.send_federation = False
setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -241,6 +239,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
# We use task.react as the basic run command as it correctly handles tearing

View File

@@ -141,8 +141,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.appservice"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -167,6 +165,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
ps.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ps, config.worker_listeners

View File

@@ -179,8 +179,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.client_reader"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -193,6 +191,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -175,8 +175,6 @@ def start(config_options):
assert config.worker_replication_http_port is not None
setup_logging(config, use_worker_options=True)
# This should only be done on the user directory worker or the master
config.update_user_directory = False
@@ -192,6 +190,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -160,8 +160,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_reader"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -174,6 +172,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -171,8 +171,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_sender"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -197,6 +195,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet):
except HttpResponseException as e:
raise e.to_synapse_error()
return (200, result)
return 200, result
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
return (200, {})
return 200, {}
class KeyUploadServlet(RestServlet):
@@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet):
self.main_uri + request.uri.decode("ascii"), body, headers=headers
)
return (200, result)
return 200, result
else:
# Just interested in counts.
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
return (200, {"one_time_key_counts": result})
return 200, {"one_time_key_counts": result}
class FrontendProxySlavedStore(
@@ -232,8 +232,6 @@ def start(config_options):
assert config.worker_main_http_uri is not None
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -246,6 +244,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -341,8 +341,6 @@ def setup(config_options):
# generating config files and shouldn't try to continue.
sys.exit(0)
synapse.config.logger.setup_logging(config, use_worker_options=False)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -356,6 +354,8 @@ def setup(config_options):
database_engine=database_engine,
)
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
logger.info("Preparing database: %s...", config.database_config["name"])
try:

View File

@@ -155,8 +155,6 @@ def start(config_options):
"Please add ``enable_media_repo: false`` to the main config\n"
)
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -169,6 +167,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -184,8 +184,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.pusher"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
if config.start_pushers:
@@ -210,6 +208,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
ps.setup()
def start():

View File

@@ -435,8 +435,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.synchrotron"
setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -450,6 +448,8 @@ def start(config_options):
application_service_handler=SynchrotronApplicationService(),
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -197,8 +197,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.user_dir"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -223,6 +221,8 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback):
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
services = yield self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
self.txn_ctrl.add_recoverers(recoverers)
for service in services:
self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
class _ServiceQueuer(object):
"""Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
"""Queue of events waiting to be sent to appservices.
Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
"""
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
class _TransactionController(object):
def __init__(self, clock, store, as_api, recoverer_fn):
"""Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""
def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []
# map from service id to recoverer instance
self.recoverers = {}
# for UTs
self.RECOVERER_CLASS = _Recoverer
@defer.inlineCallbacks
def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))
@defer.inlineCallbacks
def _start_recoverer(self, service):
def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
class _Recoverer(object):
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
logger.info(
"Starting recoverer for AS ID %s which was marked as " "DOWN",
r.service.id,
)
r.recover()
return recoverers
"""Manages retries and backoff for a DOWN appservice.
We have one of these for each appservice which is currently considered DOWN.
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
service (synapse.appservice.ApplicationService): the service we are managing
callback (callable[_Recoverer]): called once the service recovers.
"""
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
"as-recoverer-%s" % (self.service.id,), self.retry
)
self.clock.call_later((2 ** self.backoff_counter), _retry)
delay = 2 ** self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.clock.call_later(delay, _retry)
def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
@defer.inlineCallbacks
def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
return
logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
# reset the backoff counter and retry immediately
self.backoff_counter = 1
yield self.retry()
else:
self._backoff()
else:
self._set_service_recovered()
except Exception as e:
logger.exception(e)
self._backoff()
if not sent:
break
def _set_service_recovered(self):
self.callback(self)
yield txn.complete(self.store)
# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
except Exception:
logger.exception("Unexpected error running retries")
# we didn't manage to send all of the transactions before we got an error of
# some flavour: reschedule the next retry.
self._backoff()

View File

@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import ConfigError
from ._base import ConfigError, find_config_files
# export ConfigError if somebody does import *
# export ConfigError and find_config_files if somebody does
# import *
# this is largely a fudge to stop PEP8 moaning about the import
__all__ = ["ConfigError"]
__all__ = ["ConfigError", "find_config_files"]

View File

@@ -181,6 +181,11 @@ class Config(object):
generate_secrets=False,
report_stats=None,
open_private_ports=False,
listeners=None,
database_conf=None,
tls_certificate_path=None,
tls_private_key_path=None,
acme_domain=None,
):
"""Build a default configuration file
@@ -207,6 +212,33 @@ class Config(object):
open_private_ports (bool): True to leave private ports (such as the non-TLS
HTTP listener) open to the internet.
listeners (list(dict)|None): A list of descriptions of the listeners
synapse should start with each of which specifies a port (str), a list of
resources (list(str)), tls (bool) and type (str). For example:
[{
"port": 8448,
"resources": [{"names": ["federation"]}],
"tls": True,
"type": "http",
},
{
"port": 443,
"resources": [{"names": ["client"]}],
"tls": False,
"type": "http",
}],
database (str|None): The database type to configure, either `psycog2`
or `sqlite3`.
tls_certificate_path (str|None): The path to the tls certificate.
tls_private_key_path (str|None): The path to the tls private key.
acme_domain (str|None): The domain acme will try to validate. If
specified acme will be enabled.
Returns:
str: the yaml config file
"""
@@ -220,6 +252,11 @@ class Config(object):
generate_secrets=generate_secrets,
report_stats=report_stats,
open_private_ports=open_private_ports,
listeners=listeners,
database_conf=database_conf,
tls_certificate_path=tls_certificate_path,
tls_private_key_path=tls_private_key_path,
acme_domain=acme_domain,
)
)

View File

@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from textwrap import indent
import yaml
from ._base import Config
@@ -38,20 +41,28 @@ class DatabaseConfig(Config):
self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, **kwargs):
database_path = os.path.join(data_dir_path, "homeserver.db")
return (
"""\
## Database ##
database:
# The database engine name
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
if not database_conf:
database_path = os.path.join(data_dir_path, "homeserver.db")
database_conf = (
"""# The database engine name
name: "sqlite3"
# Arguments to pass to the engine
args:
# Path to the database
database: "%(database_path)s"
"""
% locals()
)
else:
database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
return (
"""\
## Database ##
database:
%(database_conf)s
# Number of events to cache in memory.
#
#event_cache_size: 10K

View File

@@ -115,7 +115,7 @@ class EmailConfig(Config):
missing.append("email." + k)
if config.get("public_baseurl") is None:
missing.append("public_base_url")
missing.append("public_baseurl")
if len(missing) > 0:
raise RuntimeError(

View File

@@ -76,7 +76,7 @@ class KeyConfig(Config):
config_dir_path, config["server_name"] + ".signing.key"
)
self.signing_key = self.read_signing_key(signing_key_path)
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
@@ -85,6 +85,14 @@ class KeyConfig(Config):
config.get("key_refresh_interval", "1d")
)
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
if key_server_signing_keys_path:
self.key_server_signing_keys = self.read_signing_keys(
key_server_signing_keys_path, "key_server_signing_keys_path"
)
else:
self.key_server_signing_keys = list(self.signing_key)
# if neither trusted_key_servers nor perspectives are given, use the default.
if "perspectives" not in config and "trusted_key_servers" not in config:
key_servers = [{"server_name": "matrix.org"}]
@@ -210,16 +218,34 @@ class KeyConfig(Config):
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
"""
% locals()
)
def read_signing_key(self, signing_key_path):
signing_keys = self.read_file(signing_key_path, "signing_key")
def read_signing_keys(self, signing_key_path, name):
"""Read the signing keys in the given path.
Args:
signing_key_path (str)
name (str): Associated config key name
Returns:
list[SigningKey]
"""
signing_keys = self.read_file(signing_key_path, name)
try:
return read_signing_keys(signing_keys.splitlines(True))
except Exception as e:
raise ConfigError("Error reading signing_key: %s" % (str(e)))
raise ConfigError("Error reading %s: %s" % (name, str(e)))
def read_old_signing_keys(self, old_signing_keys):
keys = {}

View File

@@ -25,6 +25,10 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
import synapse
from synapse.app import _base as appbase
from synapse.logging._structured import (
reload_structured_logging,
setup_structured_logging,
)
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
@@ -85,7 +89,8 @@ class LoggingConfig(Config):
"""\
## Logging ##
# A yaml python logging config file
# A yaml python logging config file as described by
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
#
log_config: "%(log_config)s"
"""
@@ -119,21 +124,10 @@ class LoggingConfig(Config):
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
def setup_logging(config, use_worker_options=False):
""" Set up python logging
Args:
config (LoggingConfig | synapse.config.workers.WorkerConfig):
configuration data
use_worker_options (bool): True to use the 'worker_log_config' option
instead of 'log_config'.
register_sighup (func | None): Function to call to register a
sighup handler.
def _setup_stdlib_logging(config, log_config):
"""
Set up Python stdlib logging.
"""
log_config = config.worker_log_config if use_worker_options else config.log_config
if log_config is None:
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
@@ -151,35 +145,10 @@ def setup_logging(config, use_worker_options=False):
handler.addFilter(LoggingContextFilter(request=""))
logger.addHandler(handler)
else:
logging.config.dictConfig(log_config)
def load_log_config():
with open(log_config, "r") as f:
logging.config.dictConfig(yaml.safe_load(f))
def sighup(*args):
# it might be better to use a file watcher or something for this.
load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config()
appbase.register_sighup(sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164
#
# Routing to the python logging framework could be a performance problem if
# the handlers blocked for a long time as python.logging is a blocking API
# see https://twistedmatrix.com/documents/current/core/howto/logger.html
# filed as https://github.com/matrix-org/synapse/issues/1727
#
# However this may not be too much of a problem if we are just writing to a file.
# Route Twisted's native logging through to the standard library logging
# system.
observer = STDLibLogObserver()
def _log(event):
@@ -201,3 +170,54 @@ def setup_logging(config, use_worker_options=False):
)
if not config.no_redirect_stdio:
print("Redirected stdout/stderr to logs")
def _reload_stdlib_logging(*args, log_config=None):
logger = logging.getLogger("")
if not log_config:
logger.warn("Reloaded a blank config?")
logging.config.dictConfig(log_config)
def setup_logging(hs, config, use_worker_options=False):
"""
Set up the logging subsystem.
Args:
config (LoggingConfig | synapse.config.workers.WorkerConfig):
configuration data
use_worker_options (bool): True to use the 'worker_log_config' option
instead of 'log_config'.
"""
log_config = config.worker_log_config if use_worker_options else config.log_config
def read_config(*args, callback=None):
if log_config is None:
return None
with open(log_config, "rb") as f:
log_config_body = yaml.safe_load(f.read())
if callback:
callback(log_config=log_config_body)
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
return log_config_body
log_config_body = read_config()
if log_config_body and log_config_body.get("structured") is True:
setup_structured_logging(hs, config, log_config_body)
appbase.register_sighup(read_config, callback=reload_structured_logging)
else:
_setup_stdlib_logging(config, log_config_body)
appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)

View File

@@ -17,8 +17,11 @@
import logging
import os.path
import re
from textwrap import indent
import attr
import yaml
from netaddr import IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -352,7 +355,7 @@ class ServerConfig(Config):
return any(l["tls"] for l in self.listeners)
def generate_config_section(
self, server_name, data_dir_path, open_private_ports, **kwargs
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
):
_, bind_port = parse_and_validate_server_name(server_name)
if bind_port is not None:
@@ -366,11 +369,68 @@ class ServerConfig(Config):
# Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
# default config string
default_room_version = DEFAULT_ROOM_VERSION
secure_listeners = []
unsecure_listeners = []
private_addresses = ["::1", "127.0.0.1"]
if listeners:
for listener in listeners:
if listener["tls"]:
secure_listeners.append(listener)
else:
# If we don't want open ports we need to bind the listeners
# to some address other than 0.0.0.0. Here we chose to use
# localhost.
# If the addresses are already bound we won't overwrite them
# however.
if not open_private_ports:
listener.setdefault("bind_addresses", private_addresses)
unsecure_http_binding = "port: %i\n tls: false" % (unsecure_port,)
if not open_private_ports:
unsecure_http_binding += (
"\n bind_addresses: ['::1', '127.0.0.1']"
unsecure_listeners.append(listener)
secure_http_bindings = indent(
yaml.dump(secure_listeners), " " * 10
).lstrip()
unsecure_http_bindings = indent(
yaml.dump(unsecure_listeners), " " * 10
).lstrip()
if not unsecure_listeners:
unsecure_http_bindings = (
"""- port: %(unsecure_port)s
tls: false
type: http
x_forwarded: true"""
% locals()
)
if not open_private_ports:
unsecure_http_bindings += (
"\n bind_addresses: ['::1', '127.0.0.1']"
)
unsecure_http_bindings += """
resources:
- names: [client, federation]
compress: false"""
if listeners:
# comment out this block
unsecure_http_bindings = "#" + re.sub(
"\n {10}",
lambda match: match.group(0) + "#",
unsecure_http_bindings,
)
if not secure_listeners:
secure_http_bindings = (
"""#- port: %(bind_port)s
# type: http
# tls: true
# resources:
# - names: [client, federation]"""
% locals()
)
return (
@@ -556,11 +616,7 @@ class ServerConfig(Config):
# will also need to give Synapse a TLS key and certificate: see the TLS section
# below.)
#
#- port: %(bind_port)s
# type: http
# tls: true
# resources:
# - names: [client, federation]
%(secure_http_bindings)s
# Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy
# that unwraps TLS.
@@ -568,13 +624,7 @@ class ServerConfig(Config):
# If you plan to use a reverse proxy, please see
# https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst.
#
- %(unsecure_http_binding)s
type: http
x_forwarded: true
resources:
- names: [client, federation]
compress: false
%(unsecure_http_bindings)s
# example additional_resources:
#

View File

@@ -239,12 +239,38 @@ class TlsConfig(Config):
self.tls_fingerprints.append({"sha256": sha256_fingerprint})
def generate_config_section(
self, config_dir_path, server_name, data_dir_path, **kwargs
self,
config_dir_path,
server_name,
data_dir_path,
tls_certificate_path,
tls_private_key_path,
acme_domain,
**kwargs
):
"""If the acme_domain is specified acme will be enabled.
If the TLS paths are not specified the default will be certs in the
config directory"""
base_key_name = os.path.join(config_dir_path, server_name)
tls_certificate_path = base_key_name + ".tls.crt"
tls_private_key_path = base_key_name + ".tls.key"
if bool(tls_certificate_path) != bool(tls_private_key_path):
raise ConfigError(
"Please specify both a cert path and a key path or neither."
)
tls_enabled = (
"" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
)
if not tls_certificate_path:
tls_certificate_path = base_key_name + ".tls.crt"
if not tls_private_key_path:
tls_private_key_path = base_key_name + ".tls.key"
acme_enabled = bool(acme_domain)
acme_domain = "matrix.example.com"
default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
# this is to avoid the max line length. Sorrynotsorry
@@ -269,11 +295,11 @@ class TlsConfig(Config):
# instance, if using certbot, use `fullchain.pem` as your certificate,
# not `cert.pem`).
#
#tls_certificate_path: "%(tls_certificate_path)s"
%(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
# PEM-encoded private key for TLS
#
#tls_private_key_path: "%(tls_private_key_path)s"
%(tls_enabled)stls_private_key_path: "%(tls_private_key_path)s"
# Whether to verify TLS server certificates for outbound federation requests.
#
@@ -340,10 +366,10 @@ class TlsConfig(Config):
# permission to listen on port 80.
#
acme:
# ACME support is disabled by default. Uncomment the following line
# (and tls_certificate_path and tls_private_key_path above) to enable it.
# ACME support is disabled by default. Set this to `true` and uncomment
# tls_certificate_path and tls_private_key_path above to enable it.
#
#enabled: true
enabled: %(acme_enabled)s
# Endpoint to use to request certificates. If you only want to test,
# use Let's Encrypt's staging url:
@@ -354,17 +380,17 @@ class TlsConfig(Config):
# Port number to listen on for the HTTP-01 challenge. Change this if
# you are forwarding connections through Apache/Nginx/etc.
#
#port: 80
port: 80
# Local addresses to listen on for incoming connections.
# Again, you may want to change this if you are forwarding connections
# through Apache/Nginx/etc.
#
#bind_addresses: ['::', '0.0.0.0']
bind_addresses: ['::', '0.0.0.0']
# How many days remaining on a certificate before it is renewed.
#
#reprovision_threshold: 30
reprovision_threshold: 30
# The domain that the certificate should be for. Normally this
# should be the same as your Matrix domain (i.e., 'server_name'), but,
@@ -378,7 +404,7 @@ class TlsConfig(Config):
#
# If not set, defaults to your 'server_name'.
#
#domain: matrix.example.com
domain: %(acme_domain)s
# file to use for the account key. This will be generated if it doesn't
# exist.

View File

@@ -83,7 +83,7 @@ def compute_content_hash(event_dict, hash_algorithm):
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return (hashed.name, hashed.digest())
return hashed.name, hashed.digest()
def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
@@ -106,7 +106,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
event_dict.pop("unsigned", None)
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return (hashed.name, hashed.digest())
return hashed.name, hashed.digest()
def compute_event_signature(event_dict, signature_name, signing_key):

View File

@@ -18,7 +18,6 @@ import logging
from collections import defaultdict
import six
from six import raise_from
from six.moves import urllib
import attr
@@ -30,7 +29,6 @@ from signedjson.key import (
from signedjson.sign import (
SignatureVerifyException,
encode_canonical_json,
sign_json,
signature_ids,
verify_signed_json,
)
@@ -540,13 +538,7 @@ class BaseV2KeyFetcher(object):
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
)
# re-sign the json with our own key, so that it is ready if we are asked to
# give it out as a notary server
signed_key_json = sign_json(
response_json, self.config.server_name, self.config.signing_key[0]
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
key_json_bytes = encode_canonical_json(response_json)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -558,7 +550,7 @@ class BaseV2KeyFetcher(object):
from_server=from_server,
ts_now_ms=time_added_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
key_json_bytes=key_json_bytes,
)
for key_id in verify_keys
],
@@ -657,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
},
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
# these both have str() representations which we can't really improve upon
raise KeyLookupError(str(e))
except HttpResponseException as e:
raise_from(KeyLookupError("Remote server returned an error"), e)
raise KeyLookupError("Remote server returned an error: %s" % (e,))
keys = {}
added_keys = []
@@ -821,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
# these both have str() representations which we can't really improve
# upon
raise KeyLookupError(str(e))
except HttpResponseException as e:
raise_from(KeyLookupError("Remote server returned an error"), e)
raise KeyLookupError("Remote server returned an error: %s" % (e,))
if response["server_name"] != server_name:
raise KeyLookupError(

View File

@@ -355,7 +355,7 @@ class FederationClient(FederationBase):
auth_chain.sort(key=lambda e: e.depth)
return (pdus, auth_chain)
return pdus, auth_chain
except HttpResponseException as e:
if e.code == 400 or e.code == 404:
logger.info("Failed to use get_room_state_ids API, falling back")
@@ -404,7 +404,7 @@ class FederationClient(FederationBase):
signed_auth.sort(key=lambda e: e.depth)
return (signed_pdus, signed_auth)
return signed_pdus, signed_auth
@defer.inlineCallbacks
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
@@ -429,7 +429,7 @@ class FederationClient(FederationBase):
missing_events.discard(k)
if not missing_events:
return (signed_events, failed_to_fetch)
return signed_events, failed_to_fetch
logger.debug(
"Fetching unknown state/auth events %s for room %s",
@@ -465,7 +465,7 @@ class FederationClient(FederationBase):
# We removed all events we successfully fetched from `batch`
failed_to_fetch.update(batch)
return (signed_events, failed_to_fetch)
return signed_events, failed_to_fetch
@defer.inlineCallbacks
@log_function

View File

@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -99,7 +100,7 @@ class FederationServer(FederationBase):
res = self._transaction_from_pdus(pdus).get_dict()
return (200, res)
return 200, res
@defer.inlineCallbacks
@log_function
@@ -162,7 +163,7 @@ class FederationServer(FederationBase):
yield self.transaction_actions.set_response(
origin, transaction, 400, response
)
return (400, response)
return 400, response
received_pdus_counter.inc(len(transaction.pdus))
@@ -264,7 +265,7 @@ class FederationServer(FederationBase):
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(origin, transaction, 200, response)
return (200, response)
return 200, response
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
@@ -297,7 +298,7 @@ class FederationServer(FederationBase):
event_id,
)
return (200, resp)
return 200, resp
@defer.inlineCallbacks
def on_state_ids_request(self, origin, room_id, event_id):
@@ -314,7 +315,7 @@ class FederationServer(FederationBase):
state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
return (200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids})
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
@defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
@@ -344,15 +345,15 @@ class FederationServer(FederationBase):
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu:
return (200, self._transaction_from_pdus([pdu]).get_dict())
return 200, self._transaction_from_pdus([pdu]).get_dict()
else:
return (404, "")
return 404, ""
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
return (200, resp)
return 200, resp
@defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
@@ -434,7 +435,7 @@ class FederationServer(FederationBase):
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
yield self.handler.on_send_leave_request(origin, pdu)
return (200, {})
return 200, {}
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
@@ -445,7 +446,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
return (200, res)
return 200, res
@defer.inlineCallbacks
def on_query_auth_request(self, origin, content, room_id, event_id):
@@ -498,7 +499,7 @@ class FederationServer(FederationBase):
"missing": ret.get("missing", []),
}
return (200, send_content)
return 200, send_content
@log_function
def on_query_client_keys(self, origin, content):
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
@trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)

View File

@@ -14,11 +14,19 @@
# limitations under the License.
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
)
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
span_contexts = (
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
with start_active_span_follows_from("send_transaction", span_contexts):
self._next_txn_id += 1
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
success = True
# Actually send the transaction
logger.debug("TX [%s] _attempt_new_transaction", destination)
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
txn_id = str(self._next_txn_id)
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise e
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
self._next_txn_id += 1
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
e_id,
r,
p.event_id,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
p.event_id,
)
success = False
success = False
return success
set_tag(tags.ERROR, not success)
return success

View File

@@ -327,21 +327,37 @@ class TransportLayerClient(object):
include_all_networks=False,
third_party_instance_id=None,
):
path = _create_v1_path("/publicRooms")
if search_filter:
# this uses MSC2197 (Search Filtering over Federation)
path = _create_v1_path("/publicRooms")
args = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
args["limit"] = [str(limit)]
if since_token:
args["since"] = [since_token]
data = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
data["third_party_instance_id"] = third_party_instance_id
if limit:
data["limit"] = str(limit)
if since_token:
data["since"] = since_token
# TODO(erikj): Actually send the search_filter across federation.
data["filter"] = search_filter
response = yield self.client.get_json(
destination=remote_server, path=path, args=args, ignore_backoff=True
)
response = yield self.client.post_json(
destination=remote_server, path=path, data=data, ignore_backoff=True
)
else:
path = _create_v1_path("/publicRooms")
args = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
args["limit"] = [str(limit)]
if since_token:
args["since"] = [since_token]
response = yield self.client.get_json(
destination=remote_server, path=path, args=args, ignore_backoff=True
)
return response

View File

@@ -38,7 +38,12 @@ from synapse.http.servlet import (
parse_string_from_args,
)
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import start_active_span_from_context, tags
from synapse.logging.opentracing import (
start_active_span,
start_active_span_from_request,
tags,
whitelisted_homeserver,
)
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e)
raise
# Start an opentracing span
with start_active_span_from_context(
request.requestHeaders,
"incoming-federation-request",
tags={
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
},
):
request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
}
# Only accept the span context if the origin is authenticated
# and whitelisted
if origin and whitelisted_homeserver(origin):
scope = start_active_span_from_request(
request, "incoming-federation-request", tags=request_tags
)
else:
scope = start_active_span(
"incoming-federation-request", tags=request_tags
)
with scope:
if origin:
with ratelimiter.ratelimit(origin) as d:
await d
@@ -757,6 +770,42 @@ class PublicRoomList(BaseFederationServlet):
)
return 200, data
async def on_POST(self, origin, content, query):
# This implements MSC2197 (Search Filtering over Federation)
if not self.allow_access:
raise FederationDeniedError(origin)
limit = int(content.get("limit", 100))
since_token = content.get("since", None)
search_filter = content.get("filter", None)
include_all_networks = content.get("include_all_networks", False)
third_party_instance_id = content.get("third_party_instance_id", None)
if include_all_networks:
network_tuple = None
if third_party_instance_id is not None:
raise SynapseError(
400, "Can't use include_all_networks with an explicit network"
)
elif third_party_instance_id is None:
network_tuple = ThirdPartyInstanceID(None, None)
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
if search_filter is None:
logger.warning("Nonefilter")
data = await self.handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
search_filter=search_filter,
network_tuple=network_tuple,
from_federation=True,
)
return 200, data
class FederationVersionServlet(BaseFederationServlet):
PATH = "/version"

View File

@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
internal_keys = ["origin", "destination"]
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home

View File

@@ -51,8 +51,8 @@ class AccountDataEventSource(object):
{"type": account_data_type, "content": content, "room_id": room_id}
)
return (results, current_stream_id)
return results, current_stream_id
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
return ([], config.to_id)
return [], config.to_id

View File

@@ -94,6 +94,15 @@ class AdminHandler(BaseHandler):
return ret
def get_user_server_admin(self, user):
"""
Get the admin bit on a user.
Args:
user_id (UserID): the (necessarily local) user to manipulate
"""
return self.store.is_server_admin(user)
def set_user_server_admin(self, user, admin):
"""
Set the admin bit on a user.

View File

@@ -280,7 +280,7 @@ class AuthHandler(BaseHandler):
creds,
list(clientdict),
)
return (creds, clientdict, session["id"])
return creds, clientdict, session["id"]
ret = self._auth_dict_for_flows(flows, session)
ret["completed"] = list(creds)
@@ -722,7 +722,7 @@ class AuthHandler(BaseHandler):
known_login_type = True
is_valid = yield provider.check_password(qualified_user_id, password)
if is_valid:
return (qualified_user_id, None)
return qualified_user_id, None
if not hasattr(provider, "get_supported_login_types") or not hasattr(
provider, "check_auth"
@@ -766,7 +766,7 @@ class AuthHandler(BaseHandler):
)
if canonical_user_id:
return (canonical_user_id, None)
return canonical_user_id, None
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
@@ -816,7 +816,7 @@ class AuthHandler(BaseHandler):
result = (result, None)
return result
return (None, None)
return None, None
@defer.inlineCallbacks
def _check_local_password(self, user_id, password):

View File

@@ -15,9 +15,17 @@
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
message_id = random_string(16)
context = get_active_span_text_map()
remote_edu_contents = {}
for destination, messages in remote_messages.items():
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
}
with start_active_span("to_device_for_user"):
set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
}
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents

View File

@@ -24,6 +24,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
@trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)
# First get local devices.
failures = {}
results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
@trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures}
@trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
log_kv(
{
"message": "Requested a local key for a user which"
" was not local to the homeserver",
"user_id": user_id,
}
)
set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
log_kv(results)
return result_dict
@defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
@trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
@trace
@defer.inlineCallbacks
def claim_client_keys(destination):
set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
),
)
log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
@defer.inlineCallbacks
@tag_args
def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
user_id,
time_now,
)
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
else:
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
@defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)

View File

@@ -26,6 +26,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
log_kv(results)
return results
@trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
)
except StoreError as e:
if e.code == 404:
pass
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
return False
return True
@trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
raise
return res
@trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
else:
raise
@trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup

View File

@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
ours = yield self.store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
# type: list[dict[tuple[str, str], str]]
state_maps = list(ours.values())
state_maps = list(
ours.values()
) # type: list[dict[tuple[str, str], str]]
# we don't need this any more, let's delete it.
del ours
@@ -1427,7 +1428,7 @@ class FederationHandler(BaseHandler):
assert event.user_id == user_id
assert event.state_key == user_id
assert event.room_id == room_id
return (origin, event, format_ver)
return origin, event, format_ver
@defer.inlineCallbacks
@log_function

View File

@@ -282,3 +282,16 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
class LookupAlgorithm:
"""
Supported hashing algorithms when performing a 3PID lookup.
SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
encoding
NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
"""
SHA256 = "sha256"
NONE = "none"

View File

@@ -449,7 +449,7 @@ class InitialSyncHandler(BaseHandler):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
return (member_event.membership, member_event.event_id)
return member_event.membership, member_event.event_id
return
except AuthError:
visibility = yield self.state_handler.get_current_state(
@@ -459,7 +459,7 @@ class InitialSyncHandler(BaseHandler):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
return (Membership.JOIN, None)
return Membership.JOIN, None
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN

View File

@@ -24,7 +24,7 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -469,6 +469,9 @@ class EventCreationHandler(object):
u = yield self.store.get_user_by_id(user_id)
assert u is not None
if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
# support and bot users are not required to consent
return
if u["appservice_id"] is not None:
# users registered by an appservice are exempt
return

View File

@@ -70,6 +70,7 @@ class PaginationHandler(object):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self._server_name = hs.hostname
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
"""
return self._purges_by_id.get(purge_id)
async def purge_room(self, room_id):
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
# check we know about the room
await self.store.get_room_version(room_id)
# first check that we have no users in this room
joined = await defer.maybeDeferred(
self.store.is_host_joined, room_id, self._server_name
)
if joined:
raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id)
@defer.inlineCallbacks
def get_messages(
self,

View File

@@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
#
# Hence this guard where we just return nothing so that the sync
# doesn't return. C.f. #5503.
return ([], max_token)
return [], max_token
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
@@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
return (room_ids_to_states, users_to_states)
return room_ids_to_states, users_to_states
@defer.inlineCallbacks

View File

@@ -34,7 +34,7 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
MAX_DISPLAYNAME_LEN = 100
MAX_DISPLAYNAME_LEN = 256
MAX_AVATAR_URL_LEN = 1000

View File

@@ -148,7 +148,7 @@ class ReceiptEventSource(object):
to_key = yield self.get_current_key()
if from_key == to_key:
return ([], to_key)
return [], to_key
events = yield self.store.get_linearized_receipts_for_rooms(
room_ids, from_key=from_key, to_key=to_key

View File

@@ -622,7 +622,7 @@ class RegistrationHandler(BaseHandler):
initial_display_name=initial_display_name,
is_guest=is_guest,
)
return (r["device_id"], r["access_token"])
return r["device_id"], r["access_token"]
valid_until_ms = None
if self.session_lifetime is not None:

View File

@@ -25,6 +25,7 @@ from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, HttpResponseException
from synapse.types import ThirdPartyInstanceID
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -485,7 +486,33 @@ class RoomListHandler(BaseHandler):
return {"chunk": [], "total_room_count_estimate": 0}
if search_filter:
# We currently don't support searching across federation, so we have
# Searching across federation is defined in MSC2197.
# However, the remote homeserver may or may not actually support it.
# So we first try an MSC2197 remote-filtered search, then fall back
# to a locally-filtered search if we must.
try:
res = yield self._get_remote_list_cached(
server_name,
limit=limit,
since_token=since_token,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
search_filter=search_filter,
)
return res
except HttpResponseException as hre:
syn_err = hre.to_synapse_error()
if hre.code in (404, 405) or syn_err.errcode in (
Codes.UNRECOGNIZED,
Codes.NOT_FOUND,
):
logger.debug("Falling back to locally-filtered /publicRooms")
else:
raise # Not an error that should trigger a fallback.
# if we reach this point, then we fall back to the situation where
# we currently don't support searching across federation, so we have
# to do it manually without pagination
limit = None
since_token = None

View File

@@ -29,9 +29,11 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
from synapse.handlers.identity import LookupAlgorithm
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.hash import sha256_and_url_safe_base64
from ._base import BaseHandler
@@ -523,7 +525,7 @@ class RoomMemberHandler(object):
event (SynapseEvent): The membership event.
context: The context of the event.
is_guest (bool): Whether the sender is a guest.
room_hosts ([str]): Homeservers which are likely to already be in
remote_room_hosts (list[str]|None): Homeservers which are likely to already be in
the room, and could be danced with in order to join this
homeserver for the first time.
ratelimit (bool): Whether to rate limit this request.
@@ -634,7 +636,7 @@ class RoomMemberHandler(object):
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
return (RoomID.from_string(room_id), servers)
return RoomID.from_string(room_id), servers
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
@@ -697,6 +699,44 @@ class RoomMemberHandler(object):
raise SynapseError(
403, "Looking up third-party identifiers is denied from this server"
)
# Check what hashing details are supported by this identity server
use_v1 = False
hash_details = None
try:
hash_details = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server)
)
except (HttpResponseException, ValueError) as e:
# Catch HttpResponseExcept for a non-200 response code
# Catch ValueError for non-JSON response body
# Check if this identity server does not know about v2 lookups
if e.code == 404:
# This is an old identity server that does not yet support v2 lookups
use_v1 = True
else:
logger.warn("Error when looking up hashing details: %s" % (e,))
return None
if use_v1:
return (yield self._lookup_3pid_v1(id_server, medium, address))
return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details))
@defer.inlineCallbacks
def _lookup_3pid_v1(self, id_server, medium, address):
"""Looks up a 3pid in the passed identity server using v1 lookup.
Args:
id_server (str): The server name (including port, if required)
of the identity server to use.
medium (str): The type of the third party identifier (e.g. "email").
address (str): The third party identifier (e.g. "foo@example.com").
Returns:
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@@ -711,8 +751,83 @@ class RoomMemberHandler(object):
except IOError as e:
logger.warn("Error from identity server lookup: %s" % (e,))
return None
@defer.inlineCallbacks
def _lookup_3pid_v2(self, id_server, medium, address, hash_details):
"""Looks up a 3pid in the passed identity server using v2 lookup.
Args:
id_server (str): The server name (including port, if required)
of the identity server to use.
medium (str): The type of the third party identifier (e.g. "email").
address (str): The third party identifier (e.g. "foo@example.com").
hash_details (dict[str, str|list]): A dictionary containing hashing information
provided by an identity server.
Returns:
Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
"""
# Extract information from hash_details
supported_lookup_algorithms = hash_details["algorithms"]
lookup_pepper = hash_details["lookup_pepper"]
# Check if any of the supported lookup algorithms are present
if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
# Perform a hashed lookup
lookup_algorithm = LookupAlgorithm.SHA256
# Hash address, medium and the pepper with sha256
to_hash = "%s %s %s" % (address, medium, lookup_pepper)
lookup_value = sha256_and_url_safe_base64(to_hash)
elif LookupAlgorithm.NONE in supported_lookup_algorithms:
# Perform a non-hashed lookup
lookup_algorithm = LookupAlgorithm.NONE
# Combine together plaintext address and medium
lookup_value = "%s %s" % (address, medium)
else:
logger.warn(
"None of the provided lookup algorithms of %s%s are supported: %s",
id_server_scheme,
id_server,
hash_details["algorithms"],
)
raise SynapseError(
400,
"Provided identity server does not support any v2 lookup "
"algorithms that this homeserver supports.",
)
try:
lookup_results = yield self.simple_http_client.post_json_get_json(
"%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
{
"addresses": [lookup_value],
"algorithm": lookup_algorithm,
"pepper": lookup_pepper,
},
)
except (HttpResponseException, ValueError) as e:
# Catch HttpResponseExcept for a non-200 response code
# Catch ValueError for non-JSON response body
logger.warn("Error when performing a 3pid lookup: %s" % (e,))
return None
# Check for a mapping from what we looked up to an MXID
if "mappings" not in lookup_results or not isinstance(
lookup_results["mappings"], dict
):
logger.debug("No results from 3pid lookup")
return None
# Return the MXID if it's available, or None otherwise
mxid = lookup_results["mappings"].get(lookup_value)
return mxid
@defer.inlineCallbacks
def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
@@ -903,7 +1018,7 @@ class RoomMemberHandler(object):
if not public_keys:
public_keys.append(fallback_public_key)
display_name = data["display_name"]
return (token, public_keys, fallback_public_key, display_name)
return token, public_keys, fallback_public_key, display_name
@defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
@@ -962,9 +1077,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
)
if complexity:
if complexity["v1"] > max_complexity:
return True
return False
return complexity["v1"] > max_complexity
return None
@defer.inlineCallbacks
@@ -980,10 +1093,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
max_complexity = self.hs.config.limit_remote_rooms.complexity
complexity = yield self.store.get_room_complexity(room_id)
if complexity["v1"] > max_complexity:
return True
return False
return complexity["v1"] > max_complexity
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):

View File

@@ -378,7 +378,7 @@ class SyncHandler(object):
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
return (now_token, ephemeral_by_room)
return now_token, ephemeral_by_room
@defer.inlineCallbacks
def _load_filtered_recents(
@@ -1332,7 +1332,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
return ([], [], [], [])
return [], [], [], []
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
@@ -1642,7 +1642,7 @@ class SyncHandler(object):
)
room_entries.append(entry)
return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
return room_entries, invited, newly_joined_rooms, newly_left_rooms
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1716,7 +1716,7 @@ class SyncHandler(object):
)
)
return (room_entries, invited, [])
return room_entries, invited, []
@defer.inlineCallbacks
def _generate_room_entry(

View File

@@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return ([], pagination_config.from_key)
return [], pagination_config.from_key

View File

@@ -14,21 +14,21 @@
# limitations under the License.
import logging
import urllib
import attr
from netaddr import IPAddress
from netaddr import AddrFormatError, IPAddress
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from twisted.web.iweb import IAgent, IAgentEndpointFactory
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
from synapse.http.federation.srv_resolver import Server, SrvResolver
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.util import Clock
logger = logging.getLogger(__name__)
@@ -36,8 +36,9 @@ logger = logging.getLogger(__name__)
@implementer(IAgent)
class MatrixFederationAgent(object):
"""An Agent-like thing which provides a `request` method which will look up a matrix
server and send an HTTP request to it.
"""An Agent-like thing which provides a `request` method which correctly
handles resolving matrix server names when using matrix://. Handles standard
https URIs as normal.
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
@@ -51,9 +52,9 @@ class MatrixFederationAgent(object):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
_well_known_cache (TTLCache|None):
TTLCache impl for storing cached well-known lookups. None to use a default
implementation.
_well_known_resolver (WellKnownResolver|None):
WellKnownResolver to use to perform well-known lookups. None to use a
default implementation.
"""
def __init__(
@@ -61,49 +62,49 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
_well_known_cache=None,
_well_known_resolver=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
self._tls_client_options_factory = tls_client_options_factory
if _srv_resolver is None:
_srv_resolver = SrvResolver()
self._srv_resolver = _srv_resolver
self._pool = HTTPConnectionPool(reactor)
self._pool.retryAutomatically = False
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
self._well_known_resolver = WellKnownResolver(
self._agent = Agent.usingEndpointFactory(
self._reactor,
agent=Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
MatrixHostnameEndpointFactory(
reactor, tls_client_options_factory, _srv_resolver
),
well_known_cache=_well_known_cache,
pool=self._pool,
)
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
self._reactor,
agent=Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
),
)
self._well_known_resolver = _well_known_resolver
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
"""
Args:
method (bytes): HTTP method: GET/POST/etc
uri (bytes): Absolute URI to be retrieved
headers (twisted.web.http_headers.Headers|None):
HTTP headers to send with the request, or None to
send no extra headers.
bodyProducer (twisted.web.iweb.IBodyProducer|None):
An object which can generate bytes to make up the
body of this request (for example, the properly encoded contents of
a file for a file upload). Or None if the request is to have
no body.
Returns:
Deferred[twisted.web.iweb.IResponse]:
fires when the header of the response has been received (regardless of the
@@ -111,210 +112,207 @@ class MatrixFederationAgent(object):
response from being received (including problems that prevent the request
from being sent).
"""
parsed_uri = URI.fromBytes(uri, defaultPort=-1)
res = yield self._route_matrix_uri(parsed_uri)
# We use urlparse as that will set `port` to None if there is no
# explicit port.
parsed_uri = urllib.parse.urlparse(uri)
# set up the TLS connection params
# If this is a matrix:// URI check if the server has delegated matrix
# traffic using well-known delegation.
#
# XXX disabling TLS is really only supported here for the benefit of the
# unit tests. We should make the UTs cope with TLS rather than having to make
# the code support the unit tests.
if self._tls_client_options_factory is None:
tls_options = None
else:
tls_options = self._tls_client_options_factory.get_options(
res.tls_server_name.decode("ascii")
# We have to do this here and not in the endpoint as we need to rewrite
# the host header with the delegated server name.
delegated_server = None
if (
parsed_uri.scheme == b"matrix"
and not _is_ip_literal(parsed_uri.hostname)
and not parsed_uri.port
):
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.hostname
)
delegated_server = well_known_result.delegated_server
# make sure that the Host header is set correctly
if delegated_server:
# Ok, the server has delegated matrix traffic to somewhere else, so
# lets rewrite the URL to replace the server with the delegated
# server name.
uri = urllib.parse.urlunparse(
(
parsed_uri.scheme,
delegated_server,
parsed_uri.path,
parsed_uri.params,
parsed_uri.query,
parsed_uri.fragment,
)
)
parsed_uri = urllib.parse.urlparse(uri)
# We need to make sure the host header is set to the netloc of the
# server.
if headers is None:
headers = Headers()
else:
headers = headers.copy()
if not headers.hasHeader(b"host"):
headers.addRawHeader(b"host", res.host_header)
headers.addRawHeader(b"host", parsed_uri.netloc)
class EndpointFactory(object):
@staticmethod
def endpointForURI(_uri):
ep = LoggingHostnameEndpoint(
self._reactor, res.target_host, res.target_port
)
if tls_options is not None:
ep = wrapClientTLS(tls_options, ep)
return ep
agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
res = yield make_deferred_yieldable(
agent.request(method, uri, headers, bodyProducer)
self._agent.request(method, uri, headers, bodyProducer)
)
return res
@defer.inlineCallbacks
def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
"""Helper for `request`: determine the routing for a Matrix URI
Args:
parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
if there is no explicit port given.
@implementer(IAgentEndpointFactory)
class MatrixHostnameEndpointFactory(object):
"""Factory for MatrixHostnameEndpoint for parsing to an Agent.
"""
lookup_well_known (bool): True if we should look up the .well-known file if
there is no SRV record.
def __init__(self, reactor, tls_client_options_factory, srv_resolver):
self._reactor = reactor
self._tls_client_options_factory = tls_client_options_factory
Returns:
Deferred[_RoutingResult]
"""
# check for an IP literal
try:
ip_address = IPAddress(parsed_uri.host.decode("ascii"))
except Exception:
# not an IP address
ip_address = None
if srv_resolver is None:
srv_resolver = SrvResolver()
if ip_address:
port = parsed_uri.port
if port == -1:
port = 8448
return _RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=parsed_uri.host,
target_port=port,
)
self._srv_resolver = srv_resolver
if parsed_uri.port != -1:
# there is an explicit port
return _RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=parsed_uri.host,
target_port=parsed_uri.port,
)
if lookup_well_known:
# try a .well-known lookup
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.host
)
well_known_server = well_known_result.delegated_server
if well_known_server:
# if we found a .well-known, start again, but don't do another
# .well-known lookup.
# parse the server name in the .well-known response into host/port.
# (This code is lifted from twisted.web.client.URI.fromBytes).
if b":" in well_known_server:
well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
try:
well_known_port = int(well_known_port)
except ValueError:
# the part after the colon could not be parsed as an int
# - we assume it is an IPv6 literal with no port (the closing
# ']' stops it being parsed as an int)
well_known_host, well_known_port = well_known_server, -1
else:
well_known_host, well_known_port = well_known_server, -1
new_uri = URI(
scheme=parsed_uri.scheme,
netloc=well_known_server,
host=well_known_host,
port=well_known_port,
path=parsed_uri.path,
params=parsed_uri.params,
query=parsed_uri.query,
fragment=parsed_uri.fragment,
)
res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
return res
# try a SRV lookup
service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
server_list = yield self._srv_resolver.resolve_service(service_name)
if not server_list:
target_host = parsed_uri.host
port = 8448
logger.debug(
"No SRV record for %s, using %s:%i",
parsed_uri.host.decode("ascii"),
target_host.decode("ascii"),
port,
)
else:
target_host, port = pick_server_from_list(server_list)
logger.debug(
"Picked %s:%i from SRV records for %s",
target_host.decode("ascii"),
port,
parsed_uri.host.decode("ascii"),
)
return _RoutingResult(
host_header=parsed_uri.netloc,
tls_server_name=parsed_uri.host,
target_host=target_host,
target_port=port,
def endpointForURI(self, parsed_uri):
return MatrixHostnameEndpoint(
self._reactor,
self._tls_client_options_factory,
self._srv_resolver,
parsed_uri,
)
@implementer(IStreamClientEndpoint)
class LoggingHostnameEndpoint(object):
"""A wrapper for HostnameEndpint which logs when it connects"""
class MatrixHostnameEndpoint(object):
"""An endpoint that resolves matrix:// URLs using Matrix server name
resolution (i.e. via SRV). Does not check for well-known delegation.
def __init__(self, reactor, host, port, *args, **kwargs):
self.host = host
self.port = port
self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
Args:
reactor (IReactor)
tls_client_options_factory (ClientTLSOptionsFactory|None):
factory to use for fetching client tls options, or none to disable TLS.
srv_resolver (SrvResolver): The SRV resolver to use
parsed_uri (twisted.web.client.URI): The parsed URI that we're wanting
to connect to.
"""
def __init__(self, reactor, tls_client_options_factory, srv_resolver, parsed_uri):
self._reactor = reactor
self._parsed_uri = parsed_uri
# set up the TLS connection params
#
# XXX disabling TLS is really only supported here for the benefit of the
# unit tests. We should make the UTs cope with TLS rather than having to make
# the code support the unit tests.
if tls_client_options_factory is None:
self._tls_options = None
else:
self._tls_options = tls_client_options_factory.get_options(
self._parsed_uri.host.decode("ascii")
)
self._srv_resolver = srv_resolver
def connect(self, protocol_factory):
logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
return self.ep.connect(protocol_factory)
"""Implements IStreamClientEndpoint interface
"""
return run_in_background(self._do_connect, protocol_factory)
@defer.inlineCallbacks
def _do_connect(self, protocol_factory):
first_exception = None
server_list = yield self._resolve_server()
for server in server_list:
host = server.host
port = server.port
try:
logger.info("Connecting to %s:%i", host.decode("ascii"), port)
endpoint = HostnameEndpoint(self._reactor, host, port)
if self._tls_options:
endpoint = wrapClientTLS(self._tls_options, endpoint)
result = yield make_deferred_yieldable(
endpoint.connect(protocol_factory)
)
return result
except Exception as e:
logger.info(
"Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
)
if not first_exception:
first_exception = e
# We return the first failure because that's probably the most interesting.
if first_exception:
raise first_exception
# This shouldn't happen as we should always have at least one host/port
# to try and if that doesn't work then we'll have an exception.
raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
@defer.inlineCallbacks
def _resolve_server(self):
"""Resolves the server name to a list of hosts and ports to attempt to
connect to.
Returns:
Deferred[list[Server]]
"""
if self._parsed_uri.scheme != b"matrix":
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
# Note: We don't do well-known lookup as that needs to have happened
# before now, due to needing to rewrite the Host header of the HTTP
# request.
# We reparse the URI so that defaultPort is -1 rather than 80
parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
host = parsed_uri.hostname
port = parsed_uri.port
# If there is an explicit port or the host is an IP address we bypass
# SRV lookups and just use the given host/port.
if port or _is_ip_literal(host):
return [Server(host, port or 8448)]
server_list = yield self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
if server_list:
return server_list
# No SRV records, so we fallback to host and 8448
return [Server(host, 8448)]
@attr.s
class _RoutingResult(object):
"""The result returned by `_route_matrix_uri`.
def _is_ip_literal(host):
"""Test if the given host name is either an IPv4 or IPv6 literal.
Contains the parameters needed to direct a federation connection to a particular
server.
Args:
host (bytes)
Where a SRV record points to several servers, this object contains a single server
chosen from the list.
Returns:
bool
"""
host_header = attr.ib()
"""
The value we should assign to the Host header (host:port from the matrix
URI, or .well-known).
host = host.decode("ascii")
:type: bytes
"""
tls_server_name = attr.ib()
"""
The server name we should set in the SNI (typically host, without port, from the
matrix URI or .well-known)
:type: bytes
"""
target_host = attr.ib()
"""
The hostname (or IP literal) we should route the TCP connection to (the target of the
SRV record, or the hostname from the URL/.well-known)
:type: bytes
"""
target_port = attr.ib()
"""
The port we should route the TCP connection to (the target of the SRV record, or
the port from the URL/.well-known, or 8448)
:type: int
"""
try:
IPAddress(host)
return True
except AddrFormatError:
return False

Some files were not shown because too many files have changed in this diff Show More