Compare commits
347 Commits
neilj/upda
...
v0.33.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3aa2c0b75 | ||
|
|
e302f40e20 | ||
|
|
1e70f1dbab | ||
|
|
b56ef14629 | ||
|
|
fe88907d04 | ||
|
|
c6363f7269 | ||
|
|
2a8996b67d | ||
|
|
e7b3b4d8c2 | ||
|
|
9d13ff4da8 | ||
|
|
c8642720c9 | ||
|
|
24efb2a70d | ||
|
|
c30cfff572 | ||
|
|
335b23a078 | ||
|
|
fcfe7a850d | ||
|
|
024be6cf18 | ||
|
|
3e6e94fe9f | ||
|
|
8b3652831c | ||
|
|
bc9af88a2d | ||
|
|
90f8e606e2 | ||
|
|
d0f6c1ce21 | ||
|
|
9e2f9a7b57 | ||
|
|
941ac0f085 | ||
|
|
f6e82dcddb | ||
|
|
0a81038ea0 | ||
|
|
ad9198cc34 | ||
|
|
984db8bb08 | ||
|
|
c971aa7b9d | ||
|
|
8f08d848f5 | ||
|
|
f1a7264663 | ||
|
|
7c33ab76da | ||
|
|
63755fa4c2 | ||
|
|
73884ebac5 | ||
|
|
7c27c4d51c | ||
|
|
1c3f4d9ca5 | ||
|
|
6c0f8d9d50 | ||
|
|
ed5331a627 | ||
|
|
cb64fe2cb7 | ||
|
|
13193a6e2b | ||
|
|
3126b88d35 | ||
|
|
89a76d1889 | ||
|
|
bfa0b759e0 | ||
|
|
9cbd0094f0 | ||
|
|
9dbe38ea7d | ||
|
|
93139a1fb8 | ||
|
|
e7cd7cb0f0 | ||
|
|
c857f5ef9b | ||
|
|
b7d2fb5eb9 | ||
|
|
f30a303590 | ||
|
|
2ac1abbc7e | ||
|
|
fa0d464fa4 | ||
|
|
0403cf0783 | ||
|
|
0e200e366d | ||
|
|
11bfc2af1c | ||
|
|
3db016b641 | ||
|
|
8decd6233d | ||
|
|
8c5b84441b | ||
|
|
54f8616d2c | ||
|
|
65cd8ccc79 | ||
|
|
7ca097f77e | ||
|
|
5cea4e16c7 | ||
|
|
0ddf486724 | ||
|
|
546aee7e52 | ||
|
|
33716c4aea | ||
|
|
bc635026c5 | ||
|
|
02aa41809b | ||
|
|
8fd93b5eea | ||
|
|
4073f73edc | ||
|
|
649c647955 | ||
|
|
4084a774a8 | ||
|
|
b041115415 | ||
|
|
9a68778ac2 | ||
|
|
9e05c8d309 | ||
|
|
037a06e8f0 | ||
|
|
af10fa6536 | ||
|
|
e957428a15 | ||
|
|
e586916cda | ||
|
|
3572a206d3 | ||
|
|
7bc22539ff | ||
|
|
b7e7712f07 | ||
|
|
1e4c7fff5f | ||
|
|
9a5ea511b5 | ||
|
|
e33a538af3 | ||
|
|
edda9f5cac | ||
|
|
b8ad756bd0 | ||
|
|
771d213ac5 | ||
|
|
b60749a1ec | ||
|
|
6febd8e8f7 | ||
|
|
cd7ef43872 | ||
|
|
806964b5de | ||
|
|
52ec6e9dfa | ||
|
|
c5440b2ca0 | ||
|
|
84a750e0c3 | ||
|
|
7298efd361 | ||
|
|
f60c9e2a01 | ||
|
|
7baf66ef5d | ||
|
|
654324eded | ||
|
|
1d371fc5b3 | ||
|
|
b07a2cbee9 | ||
|
|
70fd75cd1d | ||
|
|
5d848992bf | ||
|
|
3b4223aa23 | ||
|
|
28f5bfdcf7 | ||
|
|
ee7c8bd2b5 | ||
|
|
6707a3212c | ||
|
|
135f3b4390 | ||
|
|
2608ebc04c | ||
|
|
599f65bb89 | ||
|
|
417e7077aa | ||
|
|
d64b24dfe6 | ||
|
|
4f8baab0c4 | ||
|
|
b3c2ebba32 | ||
|
|
625542878d | ||
|
|
2fd17b5ad1 | ||
|
|
10587f7f32 | ||
|
|
80189ed27c | ||
|
|
0cd7b209e2 | ||
|
|
78d1042c10 | ||
|
|
af3125226d | ||
|
|
9c8cd855da | ||
|
|
92657be7d0 | ||
|
|
61b05727fa | ||
|
|
dfba1d843d | ||
|
|
2254790ae4 | ||
|
|
7419764351 | ||
|
|
2d2828dcbc | ||
|
|
c127c8d042 | ||
|
|
804dd41e18 | ||
|
|
c91bd295f5 | ||
|
|
87c18d12ee | ||
|
|
a6cf7d9d9a | ||
|
|
7e9ced4178 | ||
|
|
70fc599ede | ||
|
|
bae37cd811 | ||
|
|
c42f7fd7b9 | ||
|
|
3e242dc149 | ||
|
|
87b111f96a | ||
|
|
b13836da7f | ||
|
|
77055dba92 | ||
|
|
567363e497 | ||
|
|
06ee2b7cc5 | ||
|
|
30cf40ff30 | ||
|
|
905f8de673 | ||
|
|
4fc4b881c5 | ||
|
|
81942c109d | ||
|
|
a395f1ddb3 | ||
|
|
99178f8602 | ||
|
|
301cb60d0b | ||
|
|
0b01281e77 | ||
|
|
e8e540630e | ||
|
|
a796bdd35e | ||
|
|
09f3cf1a7e | ||
|
|
3d6aa06577 | ||
|
|
ea068d6f3c | ||
|
|
14e4d4f4bf | ||
|
|
475253a88e | ||
|
|
7f0399586d | ||
|
|
8c0c51ecb3 | ||
|
|
79a8a347a6 | ||
|
|
82276a18d1 | ||
|
|
b1580f50fe | ||
|
|
414fa36f3e | ||
|
|
32eb1dedd2 | ||
|
|
71990b3cae | ||
|
|
0b07f02e19 | ||
|
|
9fbaed325f | ||
|
|
9db2476991 | ||
|
|
69f7f1418f | ||
|
|
48b04c4a4c | ||
|
|
08abe8e13c | ||
|
|
05077e06fa | ||
|
|
01a5a8b9e3 | ||
|
|
897d976c1e | ||
|
|
45fc0ead10 | ||
|
|
cdd24449ee | ||
|
|
14d49c51db | ||
|
|
84b4e76fed | ||
|
|
c780d84d66 | ||
|
|
1d67b13674 | ||
|
|
92d50e3c2a | ||
|
|
e94cdbaecf | ||
|
|
9b1bc593c5 | ||
|
|
7f147d623b | ||
|
|
15e8dd2ccc | ||
|
|
05fe8a6c1c | ||
|
|
f584d6108f | ||
|
|
28d7b546cb | ||
|
|
f2cbbda956 | ||
|
|
cd77270a66 | ||
|
|
242a0483eb | ||
|
|
60cf1c6e16 | ||
|
|
7e6e588e60 | ||
|
|
1ca2744621 | ||
|
|
dddb5aa7bb | ||
|
|
4eb8408ed2 | ||
|
|
c5842dff1a | ||
|
|
7a0da69eee | ||
|
|
a5806aba27 | ||
|
|
fd2dbf1836 | ||
|
|
9643a6f7f2 | ||
|
|
5c261107c9 | ||
|
|
c7181dcc6c | ||
|
|
74854a9719 | ||
|
|
48fec67536 | ||
|
|
3504982cb7 | ||
|
|
4e5a4549b6 | ||
|
|
9b7d9d8ba0 | ||
|
|
db10f553ba | ||
|
|
764030cf63 | ||
|
|
8432e2ebd7 | ||
|
|
a81f140880 | ||
|
|
47b25ba5f3 | ||
|
|
3bf8bab8f9 | ||
|
|
a4cf660a32 | ||
|
|
0d568ff403 | ||
|
|
d7585a4c83 | ||
|
|
365f588d98 | ||
|
|
eb7be75a10 | ||
|
|
dd0ac1614c | ||
|
|
bb81e78ec6 | ||
|
|
afb4b490a4 | ||
|
|
f7bf181a90 | ||
|
|
f7baff6f7b | ||
|
|
a52f276990 | ||
|
|
46c832eaac | ||
|
|
2b1a4b2596 | ||
|
|
cd6937fb26 | ||
|
|
c2c153dd3b | ||
|
|
79d3b4689e | ||
|
|
808d8e06aa | ||
|
|
3f6762f0bb | ||
|
|
3b5b64ac99 | ||
|
|
23d7e63a4a | ||
|
|
8bd585b09b | ||
|
|
012d612f9d | ||
|
|
f89f6b7c09 | ||
|
|
be6527325a | ||
|
|
55e6bdf287 | ||
|
|
e2c0aa2c26 | ||
|
|
b01a755498 | ||
|
|
1058d14127 | ||
|
|
80bf7d3580 | ||
|
|
9a2f960736 | ||
|
|
324525f40c | ||
|
|
4d664278af | ||
|
|
8dee601054 | ||
|
|
e21c368b8b | ||
|
|
cf6f9a8b53 | ||
|
|
48a910e128 | ||
|
|
f2a48d87df | ||
|
|
2aa7cc6a46 | ||
|
|
e07970165f | ||
|
|
c5171bf171 | ||
|
|
ba1fbf7d5b | ||
|
|
3cef867cc1 | ||
|
|
c144252a8c | ||
|
|
c334ca67bb | ||
|
|
04f5d2db62 | ||
|
|
ab822a2d1f | ||
|
|
91cdb6de08 | ||
|
|
d49b77404b | ||
|
|
63260397c6 | ||
|
|
3f8709ffe4 | ||
|
|
3ee57bdcbb | ||
|
|
4c22b4047b | ||
|
|
782689bd40 | ||
|
|
ca87ad1def | ||
|
|
b5f638f1f4 | ||
|
|
9fd161c6fb | ||
|
|
0195dfbf52 | ||
|
|
69c49d3fa3 | ||
|
|
a2d872e7b3 | ||
|
|
fa27073b14 | ||
|
|
38f708a2bb | ||
|
|
73737bd0f9 | ||
|
|
3b2dcfff78 | ||
|
|
521d369e7a | ||
|
|
b99a0f3941 | ||
|
|
a8ffc27db7 | ||
|
|
4c9da1440f | ||
|
|
d9efd87d55 | ||
|
|
0e8d78f6aa | ||
|
|
66f7dc8c87 | ||
|
|
7e51342196 | ||
|
|
bcfeb44afe | ||
|
|
372bf073c1 | ||
|
|
7edd11623d | ||
|
|
13ad9930c8 | ||
|
|
51b17ec566 | ||
|
|
3c1080b6e4 | ||
|
|
eff3ae3b9a | ||
|
|
b4d6db5c4a | ||
|
|
aae86a81ef | ||
|
|
7d5b1a60a3 | ||
|
|
bfb6c58624 | ||
|
|
a675f9c556 | ||
|
|
c151b32b1d | ||
|
|
762a758fea | ||
|
|
25d2b5d55f | ||
|
|
df1e4f259f | ||
|
|
c055c91655 | ||
|
|
3f543dc021 | ||
|
|
859989f958 | ||
|
|
8cfad2e686 | ||
|
|
81d727efa9 | ||
|
|
da7fe43899 | ||
|
|
e2e846628d | ||
|
|
2f78f432c4 | ||
|
|
fc5d937550 | ||
|
|
86a00e05e1 | ||
|
|
d82fa0e9a6 | ||
|
|
a87af25fbb | ||
|
|
eabc5f8271 | ||
|
|
c24fc9797b | ||
|
|
afcd655ab6 | ||
|
|
2de813a611 | ||
|
|
eaaa2248ff | ||
|
|
b586b8b986 | ||
|
|
06b331ff40 | ||
|
|
8f9a7eb58d | ||
|
|
ed4bc3d2fc | ||
|
|
bd92c8eaa7 | ||
|
|
9b5bf3d858 | ||
|
|
e25d87d97b | ||
|
|
e2c9fe0a6a | ||
|
|
9b75c78b4d | ||
|
|
63417c31e9 | ||
|
|
b6d55588e1 | ||
|
|
cd32c19a60 | ||
|
|
34f51babc5 | ||
|
|
8226e5597a | ||
|
|
c79c4c0a7d | ||
|
|
6c6aba76e1 | ||
|
|
01021c812f | ||
|
|
5075e444f4 | ||
|
|
3e19beb941 | ||
|
|
bb99b1f550 | ||
|
|
ce6db0e547 | ||
|
|
152c0aa58e | ||
|
|
119451dcd1 | ||
|
|
a6c813761a | ||
|
|
54a9bea88c | ||
|
|
484a0ebdfc | ||
|
|
f81f421086 | ||
|
|
cd9765805e | ||
|
|
495cb100d1 | ||
|
|
37be52ac34 | ||
|
|
76c80e3fdf |
@@ -9,6 +9,8 @@ jobs:
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
sytestpy2postgres:
|
||||
machine: true
|
||||
steps:
|
||||
@@ -18,15 +20,45 @@ jobs:
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
sytestpy2merged:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: bash .circleci/merge_base_branch.sh
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy2
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
|
||||
sytestpy2postgresmerged:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: bash .circleci/merge_base_branch.sh
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy2
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
|
||||
sytestpy3:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
sytestpy3postgres:
|
||||
machine: true
|
||||
steps:
|
||||
@@ -36,6 +68,32 @@ jobs:
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
sytestpy3merged:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: bash .circleci/merge_base_branch.sh
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
sytestpy3postgresmerged:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: bash .circleci/merge_base_branch.sh
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
- store_test_results:
|
||||
path: logs
|
||||
|
||||
workflows:
|
||||
version: 2
|
||||
@@ -43,6 +101,21 @@ workflows:
|
||||
jobs:
|
||||
- sytestpy2
|
||||
- sytestpy2postgres
|
||||
# Currently broken while the Python 3 port is incomplete
|
||||
# - sytestpy3
|
||||
# - sytestpy3postgres
|
||||
- sytestpy3
|
||||
- sytestpy3postgres
|
||||
- sytestpy2merged:
|
||||
filters:
|
||||
branches:
|
||||
ignore: /develop|master/
|
||||
- sytestpy2postgresmerged:
|
||||
filters:
|
||||
branches:
|
||||
ignore: /develop|master/
|
||||
- sytestpy3merged:
|
||||
filters:
|
||||
branches:
|
||||
ignore: /develop|master/
|
||||
- sytestpy3postgresmerged:
|
||||
filters:
|
||||
branches:
|
||||
ignore: /develop|master/
|
||||
|
||||
31
.circleci/merge_base_branch.sh
Executable file
31
.circleci/merge_base_branch.sh
Executable file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
# CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful.
|
||||
# In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL.
|
||||
echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV
|
||||
source $BASH_ENV
|
||||
|
||||
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
|
||||
then
|
||||
echo "Can't figure out what the PR number is!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get the reference, using the GitHub API
|
||||
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
|
||||
|
||||
# Show what we are before
|
||||
git show -s
|
||||
|
||||
# Set up username so it can do a merge
|
||||
git config --global user.email bot@matrix.org
|
||||
git config --global user.name "A robot"
|
||||
|
||||
# Fetch and merge. If it doesn't work, it will raise due to set -e.
|
||||
git fetch -u origin $GITBASE
|
||||
git merge --no-edit origin/$GITBASE
|
||||
|
||||
# Show what we are after.
|
||||
git show -s
|
||||
@@ -3,6 +3,5 @@ Dockerfile
|
||||
.gitignore
|
||||
demo/etc
|
||||
tox.ini
|
||||
synctl
|
||||
.git/*
|
||||
.tox/*
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -44,6 +44,7 @@ media_store/
|
||||
build/
|
||||
venv/
|
||||
venv*/
|
||||
*venv/
|
||||
|
||||
localhost-800*/
|
||||
static/client/register/register_config.js
|
||||
|
||||
12
.travis.yml
12
.travis.yml
@@ -8,9 +8,6 @@ before_script:
|
||||
- git remote set-branches --add origin develop
|
||||
- git fetch origin develop
|
||||
|
||||
services:
|
||||
- postgresql
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
@@ -25,6 +22,11 @@ matrix:
|
||||
|
||||
- python: 2.7
|
||||
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
|
||||
services:
|
||||
- postgresql
|
||||
|
||||
- python: 3.5
|
||||
env: TOX_ENV=py35
|
||||
|
||||
- python: 3.6
|
||||
env: TOX_ENV=py36
|
||||
@@ -35,10 +37,6 @@ matrix:
|
||||
- python: 3.6
|
||||
env: TOX_ENV=check-newsfragment
|
||||
|
||||
allow_failures:
|
||||
- python: 2.7
|
||||
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
|
||||
|
||||
install:
|
||||
- pip install tox
|
||||
|
||||
|
||||
222
CHANGES.md
222
CHANGES.md
@@ -1,3 +1,223 @@
|
||||
Synapse 0.33.5 (2018-09-24)
|
||||
===========================
|
||||
|
||||
No significant changes.
|
||||
|
||||
|
||||
Synapse 0.33.5rc1 (2018-09-17)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Python 3.5 and 3.6 support is now in beta. ([\#3576](https://github.com/matrix-org/synapse/issues/3576))
|
||||
- Implement `event_format` filter param in `/sync` ([\#3790](https://github.com/matrix-org/synapse/issues/3790))
|
||||
- Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users ([\#3846](https://github.com/matrix-org/synapse/issues/3846))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Remove connection ID for replication prometheus metrics, as it creates a large number of new series. ([\#3788](https://github.com/matrix-org/synapse/issues/3788))
|
||||
- guest users should not be part of mau total ([\#3800](https://github.com/matrix-org/synapse/issues/3800))
|
||||
- Bump dependency on pyopenssl 16.x, to avoid incompatibility with recent Twisted. ([\#3804](https://github.com/matrix-org/synapse/issues/3804))
|
||||
- Fix existing room tags not coming down sync when joining a room ([\#3810](https://github.com/matrix-org/synapse/issues/3810))
|
||||
- Fix jwt import check ([\#3824](https://github.com/matrix-org/synapse/issues/3824))
|
||||
- fix VOIP crashes under Python 3 (#3821) ([\#3835](https://github.com/matrix-org/synapse/issues/3835))
|
||||
- Fix manhole so that it works with latest openssh clients ([\#3841](https://github.com/matrix-org/synapse/issues/3841))
|
||||
- Fix outbound requests occasionally wedging, which can result in federation breaking between servers. ([\#3845](https://github.com/matrix-org/synapse/issues/3845))
|
||||
- Show heroes if room name/canonical alias has been deleted ([\#3851](https://github.com/matrix-org/synapse/issues/3851))
|
||||
- Fix handling of redacted events from federation ([\#3859](https://github.com/matrix-org/synapse/issues/3859))
|
||||
- ([\#3874](https://github.com/matrix-org/synapse/issues/3874))
|
||||
- Mitigate outbound federation randomly becoming wedged ([\#3875](https://github.com/matrix-org/synapse/issues/3875))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- CircleCI tests now run on the potential merge of a PR. ([\#3704](https://github.com/matrix-org/synapse/issues/3704))
|
||||
- http/ is now ported to Python 3. ([\#3771](https://github.com/matrix-org/synapse/issues/3771))
|
||||
- Improve human readable error messages for threepid registration/account update ([\#3789](https://github.com/matrix-org/synapse/issues/3789))
|
||||
- Make /sync slightly faster by avoiding needless copies ([\#3795](https://github.com/matrix-org/synapse/issues/3795))
|
||||
- handlers/ is now ported to Python 3. ([\#3803](https://github.com/matrix-org/synapse/issues/3803))
|
||||
- Limit the number of PDUs/EDUs per federation transaction ([\#3805](https://github.com/matrix-org/synapse/issues/3805))
|
||||
- Only start postgres instance for postgres tests on Travis CI ([\#3806](https://github.com/matrix-org/synapse/issues/3806))
|
||||
- tests/ is now ported to Python 3. ([\#3808](https://github.com/matrix-org/synapse/issues/3808))
|
||||
- crypto/ is now ported to Python 3. ([\#3822](https://github.com/matrix-org/synapse/issues/3822))
|
||||
- rest/ is now ported to Python 3. ([\#3823](https://github.com/matrix-org/synapse/issues/3823))
|
||||
- add some logging for the keyring queue ([\#3826](https://github.com/matrix-org/synapse/issues/3826))
|
||||
- speed up lazy loading by 2-3x ([\#3827](https://github.com/matrix-org/synapse/issues/3827))
|
||||
- Improved Dockerfile to remove build requirements after building reducing the image size. ([\#3834](https://github.com/matrix-org/synapse/issues/3834))
|
||||
- Disable lazy loading for incremental syncs for now ([\#3840](https://github.com/matrix-org/synapse/issues/3840))
|
||||
- federation/ is now ported to Python 3. ([\#3847](https://github.com/matrix-org/synapse/issues/3847))
|
||||
- Log when we retry outbound requests ([\#3853](https://github.com/matrix-org/synapse/issues/3853))
|
||||
- Removed some excess logging messages. ([\#3855](https://github.com/matrix-org/synapse/issues/3855))
|
||||
- Speed up purge history for rooms that have been previously purged ([\#3856](https://github.com/matrix-org/synapse/issues/3856))
|
||||
- Refactor some HTTP timeout code. ([\#3857](https://github.com/matrix-org/synapse/issues/3857))
|
||||
- Fix running merged builds on CircleCI ([\#3858](https://github.com/matrix-org/synapse/issues/3858))
|
||||
- Fix typo in replication stream exception. ([\#3860](https://github.com/matrix-org/synapse/issues/3860))
|
||||
- Add in flight real time metrics for Measure blocks ([\#3871](https://github.com/matrix-org/synapse/issues/3871))
|
||||
- Disable buffering and automatic retrying in treq requests to prevent timeouts. ([\#3872](https://github.com/matrix-org/synapse/issues/3872))
|
||||
- mention jemalloc in the README ([\#3877](https://github.com/matrix-org/synapse/issues/3877))
|
||||
- Remove unmaintained "nuke-room-from-db.sh" script ([\#3888](https://github.com/matrix-org/synapse/issues/3888))
|
||||
|
||||
|
||||
Synapse 0.33.4 (2018-09-07)
|
||||
===========================
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Unignore synctl in .dockerignore to fix docker builds ([\#3802](https://github.com/matrix-org/synapse/issues/3802))
|
||||
|
||||
|
||||
Synapse 0.33.4rc2 (2018-09-06)
|
||||
==============================
|
||||
|
||||
Pull in security fixes from v0.33.3.1
|
||||
|
||||
|
||||
Synapse 0.33.3.1 (2018-09-06)
|
||||
=============================
|
||||
|
||||
SECURITY FIXES
|
||||
--------------
|
||||
|
||||
- Fix an issue where event signatures were not always correctly validated ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
|
||||
- Fix an issue where server_acls could be circumvented for incoming events ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Unignore synctl in .dockerignore to fix docker builds ([\#3802](https://github.com/matrix-org/synapse/issues/3802))
|
||||
|
||||
|
||||
Synapse 0.33.4rc1 (2018-09-04)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Support profile API endpoints on workers ([\#3659](https://github.com/matrix-org/synapse/issues/3659))
|
||||
- Server notices for resource limit blocking ([\#3680](https://github.com/matrix-org/synapse/issues/3680))
|
||||
- Allow guests to use /rooms/:roomId/event/:eventId ([\#3724](https://github.com/matrix-org/synapse/issues/3724))
|
||||
- Add mau_trial_days config param, so that users only get counted as MAU after N days. ([\#3749](https://github.com/matrix-org/synapse/issues/3749))
|
||||
- Require twisted 17.1 or later (fixes [#3741](https://github.com/matrix-org/synapse/issues/3741)). ([\#3751](https://github.com/matrix-org/synapse/issues/3751))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues ([\#3722](https://github.com/matrix-org/synapse/issues/3722))
|
||||
- Fix bug where we resent "limit exceeded" server notices repeatedly ([\#3747](https://github.com/matrix-org/synapse/issues/3747))
|
||||
- Fix bug where we broke sync when using limit_usage_by_mau but hadn't configured server notices ([\#3753](https://github.com/matrix-org/synapse/issues/3753))
|
||||
- Fix 'federation_domain_whitelist' such that an empty list correctly blocks all outbound federation traffic ([\#3754](https://github.com/matrix-org/synapse/issues/3754))
|
||||
- Fix tagging of server notice rooms ([\#3755](https://github.com/matrix-org/synapse/issues/3755), [\#3756](https://github.com/matrix-org/synapse/issues/3756))
|
||||
- Fix 'admin_uri' config variable and error parameter to be 'admin_contact' to match the spec. ([\#3758](https://github.com/matrix-org/synapse/issues/3758))
|
||||
- Don't return non-LL-member state in incremental sync state blocks ([\#3760](https://github.com/matrix-org/synapse/issues/3760))
|
||||
- Fix bug in sending presence over federation ([\#3768](https://github.com/matrix-org/synapse/issues/3768))
|
||||
- Fix bug where preserved threepid user comes to sign up and server is mau blocked ([\#3777](https://github.com/matrix-org/synapse/issues/3777))
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- Removed the link to the unmaintained matrix-synapse-auto-deploy project from the readme. ([\#3378](https://github.com/matrix-org/synapse/issues/3378))
|
||||
- Refactor state module to support multiple room versions ([\#3673](https://github.com/matrix-org/synapse/issues/3673))
|
||||
- The synapse.storage module has been ported to Python 3. ([\#3725](https://github.com/matrix-org/synapse/issues/3725))
|
||||
- Split the state_group_cache into member and non-member state events (and so speed up LL /sync) ([\#3726](https://github.com/matrix-org/synapse/issues/3726))
|
||||
- Log failure to authenticate remote servers as warnings (without stack traces) ([\#3727](https://github.com/matrix-org/synapse/issues/3727))
|
||||
- The CONTRIBUTING guidelines have been updated to mention our use of Markdown and that .misc files have content. ([\#3730](https://github.com/matrix-org/synapse/issues/3730))
|
||||
- Reference the need for an HTTP replication port when using the federation_reader worker ([\#3734](https://github.com/matrix-org/synapse/issues/3734))
|
||||
- Fix minor spelling error in federation client documentation. ([\#3735](https://github.com/matrix-org/synapse/issues/3735))
|
||||
- Remove redundant state resolution function ([\#3737](https://github.com/matrix-org/synapse/issues/3737))
|
||||
- The test suite now passes on PostgreSQL. ([\#3740](https://github.com/matrix-org/synapse/issues/3740))
|
||||
- Fix MAU cache invalidation due to missing yield ([\#3746](https://github.com/matrix-org/synapse/issues/3746))
|
||||
- Make sure that we close db connections opened during init ([\#3764](https://github.com/matrix-org/synapse/issues/3764))
|
||||
|
||||
|
||||
Synapse 0.33.3 (2018-08-22)
|
||||
===========================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug introduced in v0.33.3rc1 which made the ToS give a 500 error ([\#3732](https://github.com/matrix-org/synapse/issues/3732))
|
||||
|
||||
|
||||
Synapse 0.33.3rc2 (2018-08-21)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs ([\#3723](https://github.com/matrix-org/synapse/issues/3723))
|
||||
|
||||
|
||||
Synapse 0.33.3rc1 (2018-08-21)
|
||||
==============================
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- Add support for the SNI extension to federation TLS connections. Thanks to @vojeroen! ([\#3439](https://github.com/matrix-org/synapse/issues/3439))
|
||||
- Add /_media/r0/config ([\#3184](https://github.com/matrix-org/synapse/issues/3184))
|
||||
- speed up /members API and add `at` and `membership` params as per MSC1227 ([\#3568](https://github.com/matrix-org/synapse/issues/3568))
|
||||
- implement `summary` block in /sync response as per MSC688 ([\#3574](https://github.com/matrix-org/synapse/issues/3574))
|
||||
- Add lazy-loading support to /messages as per MSC1227 ([\#3589](https://github.com/matrix-org/synapse/issues/3589))
|
||||
- Add ability to limit number of monthly active users on the server ([\#3633](https://github.com/matrix-org/synapse/issues/3633))
|
||||
- Support more federation endpoints on workers ([\#3653](https://github.com/matrix-org/synapse/issues/3653))
|
||||
- Basic support for room versioning ([\#3654](https://github.com/matrix-org/synapse/issues/3654))
|
||||
- Ability to disable client/server Synapse via conf toggle ([\#3655](https://github.com/matrix-org/synapse/issues/3655))
|
||||
- Ability to whitelist specific threepids against monthly active user limiting ([\#3662](https://github.com/matrix-org/synapse/issues/3662))
|
||||
- Add some metrics for the appservice and federation event sending loops ([\#3664](https://github.com/matrix-org/synapse/issues/3664))
|
||||
- Where server is disabled, block ability for locked out users to read new messages ([\#3670](https://github.com/matrix-org/synapse/issues/3670))
|
||||
- set admin uri via config, to be used in error messages where the user should contact the administrator ([\#3687](https://github.com/matrix-org/synapse/issues/3687))
|
||||
- Synapse's presence functionality can now be disabled with the "use_presence" configuration option. ([\#3694](https://github.com/matrix-org/synapse/issues/3694))
|
||||
- For resource limit blocked users, prevent writing into rooms ([\#3708](https://github.com/matrix-org/synapse/issues/3708))
|
||||
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix occasional glitches in the synapse_event_persisted_position metric ([\#3658](https://github.com/matrix-org/synapse/issues/3658))
|
||||
- Fix bug on deleting 3pid when using identity servers that don't support unbind API ([\#3661](https://github.com/matrix-org/synapse/issues/3661))
|
||||
- Make the tests pass on Twisted < 18.7.0 ([\#3676](https://github.com/matrix-org/synapse/issues/3676))
|
||||
- Don’t ship recaptcha_ajax.js, use it directly from Google ([\#3677](https://github.com/matrix-org/synapse/issues/3677))
|
||||
- Fixes test_reap_monthly_active_users so it passes under postgres ([\#3681](https://github.com/matrix-org/synapse/issues/3681))
|
||||
- Fix mau blocking calulation bug on login ([\#3689](https://github.com/matrix-org/synapse/issues/3689))
|
||||
- Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users ([\#3692](https://github.com/matrix-org/synapse/issues/3692))
|
||||
- Improve HTTP request logging to include all requests ([\#3700](https://github.com/matrix-org/synapse/issues/3700))
|
||||
- Avoid timing out requests while we are streaming back the response ([\#3701](https://github.com/matrix-org/synapse/issues/3701))
|
||||
- Support more federation endpoints on workers ([\#3705](https://github.com/matrix-org/synapse/issues/3705), [\#3713](https://github.com/matrix-org/synapse/issues/3713))
|
||||
- Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning ([\#3710](https://github.com/matrix-org/synapse/issues/3710))
|
||||
- Fix bug where `state_cache` cache factor ignored environment variables ([\#3719](https://github.com/matrix-org/synapse/issues/3719))
|
||||
|
||||
|
||||
Deprecations and Removals
|
||||
-------------------------
|
||||
|
||||
- The Shared-Secret registration method of the legacy v1/register REST endpoint has been removed. For a replacement, please see [the admin/register API documentation](https://github.com/matrix-org/synapse/blob/master/docs/admin_api/register_api.rst). ([\#3703](https://github.com/matrix-org/synapse/issues/3703))
|
||||
|
||||
|
||||
Internal Changes
|
||||
----------------
|
||||
|
||||
- The test suite now can run under PostgreSQL. ([\#3423](https://github.com/matrix-org/synapse/issues/3423))
|
||||
- Refactor HTTP replication endpoints to reduce code duplication ([\#3632](https://github.com/matrix-org/synapse/issues/3632))
|
||||
- Tests now correctly execute on Python 3. ([\#3647](https://github.com/matrix-org/synapse/issues/3647))
|
||||
- Sytests can now be run inside a Docker container. ([\#3660](https://github.com/matrix-org/synapse/issues/3660))
|
||||
- Port over enough to Python 3 to allow the sytests to start. ([\#3668](https://github.com/matrix-org/synapse/issues/3668))
|
||||
- Update docker base image from alpine 3.7 to 3.8. ([\#3669](https://github.com/matrix-org/synapse/issues/3669))
|
||||
- Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7. ([\#3678](https://github.com/matrix-org/synapse/issues/3678))
|
||||
- Synapse's tests are now formatted with the black autoformatter. ([\#3679](https://github.com/matrix-org/synapse/issues/3679))
|
||||
- Implemented a new testing base class to reduce test boilerplate. ([\#3684](https://github.com/matrix-org/synapse/issues/3684))
|
||||
- Rename MAU prometheus metrics ([\#3690](https://github.com/matrix-org/synapse/issues/3690))
|
||||
- add new error type ResourceLimit ([\#3707](https://github.com/matrix-org/synapse/issues/3707))
|
||||
- Logcontexts for replication command handlers ([\#3709](https://github.com/matrix-org/synapse/issues/3709))
|
||||
- Update admin register API documentation to reference a real user ID. ([\#3712](https://github.com/matrix-org/synapse/issues/3712))
|
||||
|
||||
|
||||
Synapse 0.33.2 (2018-08-09)
|
||||
===========================
|
||||
|
||||
@@ -24,7 +244,7 @@ Features
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952))
|
||||
- Make /directory/list API return 404 for room not found instead of 400. Thanks to @fuzzmz! ([\#3620](https://github.com/matrix-org/synapse/issues/3620))
|
||||
- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391))
|
||||
- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514))
|
||||
- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520))
|
||||
|
||||
@@ -30,11 +30,11 @@ use github's pull request workflow to review the contribution, and either ask
|
||||
you to make any refinements needed or merge it and make them ourselves. The
|
||||
changes will then land on master when we next do a release.
|
||||
|
||||
We use `Jenkins <http://matrix.org/jenkins>`_ and
|
||||
We use `Jenkins <http://matrix.org/jenkins>`_ and
|
||||
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
|
||||
integration. All pull requests to synapse get automatically tested by Travis;
|
||||
the Jenkins builds require an adminstrator to start them. If your change
|
||||
breaks the build, this will be shown in github, so please keep an eye on the
|
||||
integration. All pull requests to synapse get automatically tested by Travis;
|
||||
the Jenkins builds require an adminstrator to start them. If your change
|
||||
breaks the build, this will be shown in github, so please keep an eye on the
|
||||
pull request for feedback.
|
||||
|
||||
Code style
|
||||
@@ -56,17 +56,18 @@ entry. These are managed by Towncrier
|
||||
(https://github.com/hawkowl/towncrier).
|
||||
|
||||
To create a changelog entry, make a new file in the ``changelog.d``
|
||||
file named in the format of ``issuenumberOrPR.type``. The type can be
|
||||
file named in the format of ``PRnumber.type``. The type can be
|
||||
one of ``feature``, ``bugfix``, ``removal`` (also used for
|
||||
deprecations), or ``misc`` (for internal-only changes). The content of
|
||||
the file is your changelog entry, which can contain RestructuredText
|
||||
formatting. A note of contributors is welcomed in changelogs for
|
||||
non-misc changes (the content of misc changes is not displayed).
|
||||
the file is your changelog entry, which can contain Markdown
|
||||
formatting. Adding credits to the changelog is encouraged, we value
|
||||
your contributions and would like to have you shouted out in the
|
||||
release notes!
|
||||
|
||||
For example, a fix for a bug reported in #1234 would have its
|
||||
changelog entry in ``changelog.d/1234.bugfix``, and contain content
|
||||
like "The security levels of Florbs are now validated when
|
||||
recieved over federation. Contributed by Jane Matrix".
|
||||
For example, a fix in PR #1234 would have its changelog entry in
|
||||
``changelog.d/1234.bugfix``, and contain content like "The security levels of
|
||||
Florbs are now validated when recieved over federation. Contributed by Jane
|
||||
Matrix".
|
||||
|
||||
Attribution
|
||||
~~~~~~~~~~~
|
||||
@@ -125,7 +126,7 @@ the contribution or otherwise have the right to contribute it to Matrix::
|
||||
personal information I submit with it, including my sign-off) is
|
||||
maintained indefinitely and may be redistributed consistent with
|
||||
this project or the open source license(s) involved.
|
||||
|
||||
|
||||
If you agree to this for your contribution, then all that's needed is to
|
||||
include the line in your commit or pull request comment::
|
||||
|
||||
|
||||
25
README.rst
25
README.rst
@@ -167,11 +167,6 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
|
||||
Dockerfile to automate a synapse server in a single Docker image, at
|
||||
https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||
|
||||
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
||||
tested with VirtualBox/AWS/DigitalOcean - see
|
||||
https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||
for details.
|
||||
|
||||
Configuring synapse
|
||||
-------------------
|
||||
|
||||
@@ -747,6 +742,18 @@ so an example nginx configuration might look like::
|
||||
}
|
||||
}
|
||||
|
||||
and an example apache configuration may look like::
|
||||
|
||||
<VirtualHost *:443>
|
||||
SSLEngine on
|
||||
ServerName matrix.example.com;
|
||||
|
||||
<Location /_matrix>
|
||||
ProxyPass http://127.0.0.1:8008/_matrix nocanon
|
||||
ProxyPassReverse http://127.0.0.1:8008/_matrix
|
||||
</Location>
|
||||
</VirtualHost>
|
||||
|
||||
You will also want to set ``bind_addresses: ['127.0.0.1']`` and ``x_forwarded: true``
|
||||
for port 8008 in ``homeserver.yaml`` to ensure that client IP addresses are
|
||||
recorded correctly.
|
||||
@@ -956,5 +963,13 @@ variable. The default is 0.5, which can be decreased to reduce RAM usage
|
||||
in memory constrained enviroments, or increased if performance starts to
|
||||
degrade.
|
||||
|
||||
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
|
||||
improvement in overall amount, and especially in terms of giving back RAM
|
||||
to the OS. To use it, the library must simply be put in the LD_PRELOAD
|
||||
environment variable when launching Synapse. On Debian, this can be done
|
||||
by installing the ``libjemalloc1`` package and adding this line to
|
||||
``/etc/default/matrix-synaspse``::
|
||||
|
||||
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
|
||||
|
||||
.. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Add support for the SNI extension to federation TLS connections
|
||||
@@ -1 +0,0 @@
|
||||
The test suite now can run under PostgreSQL.
|
||||
@@ -1 +0,0 @@
|
||||
Refactor HTTP replication endpoints to reduce code duplication
|
||||
@@ -1 +0,0 @@
|
||||
Add ability to limit number of monthly active users on the server
|
||||
@@ -1 +0,0 @@
|
||||
Tests now correctly execute on Python 3.
|
||||
@@ -1 +0,0 @@
|
||||
Support more federation endpoints on workers
|
||||
@@ -1 +0,0 @@
|
||||
Basic support for room versioning
|
||||
@@ -1 +0,0 @@
|
||||
Ability to disable client/server Synapse via conf toggle
|
||||
@@ -1 +0,0 @@
|
||||
Fix occasional glitches in the synapse_event_persisted_position metric
|
||||
@@ -1 +0,0 @@
|
||||
Sytests can now be run inside a Docker container.
|
||||
@@ -1 +0,0 @@
|
||||
Fix bug on deleting 3pid when using identity servers that don't support unbind API
|
||||
@@ -1 +0,0 @@
|
||||
Ability to whitelist specific threepids against monthly active user limiting
|
||||
@@ -1 +0,0 @@
|
||||
Add some metrics for the appservice and federation event sending loops
|
||||
@@ -1 +0,0 @@
|
||||
Update docker base image from alpine 3.7 to 3.8.
|
||||
@@ -1 +0,0 @@
|
||||
Where server is disabled, block ability for locked out users to read new messages
|
||||
@@ -1 +0,0 @@
|
||||
Make the tests pass on Twisted < 18.7.0
|
||||
@@ -1 +0,0 @@
|
||||
Don’t ship recaptcha_ajax.js, use it directly from Google
|
||||
@@ -1 +0,0 @@
|
||||
Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7.
|
||||
@@ -1 +0,0 @@
|
||||
Synapse's tests are now formatted with the black autoformatter.
|
||||
@@ -1 +0,0 @@
|
||||
Fixes test_reap_monthly_active_users so it passes under postgres
|
||||
@@ -1 +0,0 @@
|
||||
Implemented a new testing base class to reduce test boilerplate.
|
||||
@@ -1 +0,0 @@
|
||||
set admin uri via config, to be used in error messages where the user should contact the administrator
|
||||
@@ -1 +0,0 @@
|
||||
Rename MAU prometheus metrics
|
||||
@@ -1 +0,0 @@
|
||||
Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users
|
||||
@@ -1,6 +1,8 @@
|
||||
FROM docker.io/python:2-alpine3.8
|
||||
|
||||
RUN apk add --no-cache --virtual .nacl_deps \
|
||||
COPY . /synapse
|
||||
|
||||
RUN apk add --no-cache --virtual .build_deps \
|
||||
build-base \
|
||||
libffi-dev \
|
||||
libjpeg-turbo-dev \
|
||||
@@ -8,13 +10,16 @@ RUN apk add --no-cache --virtual .nacl_deps \
|
||||
libxslt-dev \
|
||||
linux-headers \
|
||||
postgresql-dev \
|
||||
su-exec \
|
||||
zlib-dev
|
||||
|
||||
COPY . /synapse
|
||||
|
||||
# A wheel cache may be provided in ./cache for faster build
|
||||
RUN cd /synapse \
|
||||
zlib-dev \
|
||||
&& cd /synapse \
|
||||
&& apk add --no-cache --virtual .runtime_deps \
|
||||
libffi \
|
||||
libjpeg-turbo \
|
||||
libressl \
|
||||
libxslt \
|
||||
libpq \
|
||||
zlib \
|
||||
su-exec \
|
||||
&& pip install --upgrade \
|
||||
lxml \
|
||||
pip \
|
||||
@@ -26,8 +31,9 @@ RUN cd /synapse \
|
||||
&& rm -rf \
|
||||
setup.cfg \
|
||||
setup.py \
|
||||
synapse
|
||||
|
||||
synapse \
|
||||
&& apk del .build_deps
|
||||
|
||||
VOLUME ["/data"]
|
||||
|
||||
EXPOSE 8008/tcp 8448/tcp
|
||||
|
||||
@@ -33,7 +33,7 @@ As an example::
|
||||
|
||||
< {
|
||||
"access_token": "token_here",
|
||||
"user_id": "@pepper_roni@test",
|
||||
"user_id": "@pepper_roni:localhost",
|
||||
"home_server": "test",
|
||||
"device_id": "device_id_here"
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ replication endpoints that it's talking to on the main synapse process.
|
||||
``worker_replication_port`` should point to the TCP replication listener port and
|
||||
``worker_replication_http_port`` should point to the HTTP replication port.
|
||||
|
||||
Currently, only the ``event_creator`` worker requires specifying
|
||||
Currently, the ``event_creator`` and ``federation_reader`` workers require specifying
|
||||
``worker_replication_http_port``.
|
||||
|
||||
For instance::
|
||||
@@ -241,6 +241,14 @@ regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
|
||||
|
||||
If ``use_presence`` is False in the homeserver config, it can also handle REST
|
||||
endpoints matching the following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
|
||||
|
||||
This "stub" presence handler will pass through ``GET`` request but make the
|
||||
``PUT`` effectively a no-op.
|
||||
|
||||
It will proxy any requests it cannot handle to the main synapse instance. It
|
||||
must therefore be configured with the location of the main instance, via
|
||||
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
|
||||
@@ -257,6 +265,7 @@ Handles some event creation. It can handle REST endpoints matching::
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/join/
|
||||
^/_matrix/client/(api/v1|r0|unstable)/profile/
|
||||
|
||||
It will create events locally and then send them on to the main synapse
|
||||
instance to be persisted and handled.
|
||||
|
||||
@@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5'
|
||||
$TOX_BIN/pip install 'pip>=10'
|
||||
|
||||
{ python synapse/python_dependencies.py
|
||||
echo lxml psycopg2
|
||||
echo lxml
|
||||
} | xargs $TOX_BIN/pip install
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
## CAUTION:
|
||||
## This script will remove (hopefully) all trace of the given room ID from
|
||||
## your homeserver.db
|
||||
|
||||
## Do not run it lightly.
|
||||
|
||||
set -e
|
||||
|
||||
if [ "$1" == "-h" ] || [ "$1" == "" ]; then
|
||||
echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
|
||||
echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
|
||||
echo "or"
|
||||
echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
|
||||
exit
|
||||
fi
|
||||
|
||||
ROOMID="$1"
|
||||
|
||||
cat <<EOF
|
||||
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_edges WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_depth WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
|
||||
DELETE FROM events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_json WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM current_state_events WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_memberships WHERE room_id = '$ROOMID';
|
||||
DELETE FROM feedback WHERE room_id = '$ROOMID';
|
||||
DELETE FROM topics WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_names WHERE room_id = '$ROOMID';
|
||||
DELETE FROM rooms WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_hosts WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_groups WHERE room_id = '$ROOMID';
|
||||
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
|
||||
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
|
||||
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_search WHERE room_id = '$ROOMID';
|
||||
DELETE FROM guest_access WHERE room_id = '$ROOMID';
|
||||
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_tags WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
|
||||
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
|
||||
DELETE FROM local_invites WHERE room_id = '$ROOMID';
|
||||
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_reports WHERE room_id = '$ROOMID';
|
||||
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
|
||||
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
|
||||
DELETE FROM event_auth WHERE room_id = '$ROOMID';
|
||||
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
|
||||
VACUUM;
|
||||
EOF
|
||||
@@ -17,13 +17,14 @@ ignore =
|
||||
[pep8]
|
||||
max-line-length = 90
|
||||
# W503 requires that binary operators be at the end, not start, of lines. Erik
|
||||
# doesn't like it. E203 is contrary to PEP8.
|
||||
ignore = W503,E203
|
||||
# doesn't like it. E203 is contrary to PEP8. E731 is silly.
|
||||
ignore = W503,E203,E731
|
||||
|
||||
[flake8]
|
||||
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
|
||||
# pep8 to do those checks), but not the "max-line-length" setting
|
||||
max-line-length = 90
|
||||
ignore=W503,E203,E731
|
||||
|
||||
[isort]
|
||||
line_length = 89
|
||||
|
||||
@@ -17,4 +17,14 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.33.2"
|
||||
try:
|
||||
from twisted.internet import protocol
|
||||
from twisted.internet.protocol import Factory
|
||||
from twisted.names.dns import DNSDatagramProtocol
|
||||
protocol.Factory.noisy = False
|
||||
Factory.noisy = False
|
||||
DNSDatagramProtocol.noisy = False
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "0.33.5"
|
||||
|
||||
@@ -25,7 +25,8 @@ from twisted.internet import defer
|
||||
import synapse.types
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.errors import AuthError, Codes
|
||||
from synapse.api.errors import AuthError, Codes, ResourceLimitError
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
from synapse.types import UserID
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -211,7 +212,7 @@ class Auth(object):
|
||||
user_agent = request.requestHeaders.getRawHeaders(
|
||||
b"User-Agent",
|
||||
default=[b""]
|
||||
)[0]
|
||||
)[0].decode('ascii', 'surrogateescape')
|
||||
if user and access_token and ip_addr:
|
||||
yield self.store.insert_client_ip(
|
||||
user_id=user.to_string(),
|
||||
@@ -682,7 +683,7 @@ class Auth(object):
|
||||
Returns:
|
||||
bool: False if no access_token was given, True otherwise.
|
||||
"""
|
||||
query_params = request.args.get("access_token")
|
||||
query_params = request.args.get(b"access_token")
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
return bool(query_params) or bool(auth_headers)
|
||||
|
||||
@@ -698,7 +699,7 @@ class Auth(object):
|
||||
401 since some of the old clients depended on auth errors returning
|
||||
403.
|
||||
Returns:
|
||||
str: The access_token
|
||||
unicode: The access_token
|
||||
Raises:
|
||||
AuthError: If there isn't an access_token in the request.
|
||||
"""
|
||||
@@ -720,9 +721,9 @@ class Auth(object):
|
||||
"Too many Authorization headers.",
|
||||
errcode=Codes.MISSING_TOKEN,
|
||||
)
|
||||
parts = auth_headers[0].split(" ")
|
||||
if parts[0] == "Bearer" and len(parts) == 2:
|
||||
return parts[1]
|
||||
parts = auth_headers[0].split(b" ")
|
||||
if parts[0] == b"Bearer" and len(parts) == 2:
|
||||
return parts[1].decode('ascii')
|
||||
else:
|
||||
raise AuthError(
|
||||
token_not_found_http_status,
|
||||
@@ -738,7 +739,7 @@ class Auth(object):
|
||||
errcode=Codes.MISSING_TOKEN
|
||||
)
|
||||
|
||||
return query_params[0]
|
||||
return query_params[0].decode('ascii')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_in_room_or_world_readable(self, room_id, user_id):
|
||||
@@ -775,31 +776,56 @@ class Auth(object):
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_auth_blocking(self, user_id=None):
|
||||
def check_auth_blocking(self, user_id=None, threepid=None):
|
||||
"""Checks if the user should be rejected for some external reason,
|
||||
such as monthly active user limiting or global disable flag
|
||||
|
||||
Args:
|
||||
user_id(str|None): If present, checks for presence against existing
|
||||
MAU cohort
|
||||
|
||||
threepid(dict|None): If present, checks for presence against configured
|
||||
reserved threepid. Used in cases where the user is trying register
|
||||
with a MAU blocked server, normally they would be rejected but their
|
||||
threepid is on the reserved list. user_id and
|
||||
threepid should never be set at the same time.
|
||||
"""
|
||||
|
||||
# Never fail an auth check for the server notices users
|
||||
# This can be a problem where event creation is prohibited due to blocking
|
||||
if user_id == self.hs.config.server_notices_mxid:
|
||||
return
|
||||
|
||||
if self.hs.config.hs_disabled:
|
||||
raise AuthError(
|
||||
raise ResourceLimitError(
|
||||
403, self.hs.config.hs_disabled_message,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEED,
|
||||
admin_uri=self.hs.config.admin_uri,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
admin_contact=self.hs.config.admin_contact,
|
||||
limit_type=self.hs.config.hs_disabled_limit_type
|
||||
)
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
# If the user is already part of the MAU cohort
|
||||
assert not (user_id and threepid)
|
||||
|
||||
# If the user is already part of the MAU cohort or a trial user
|
||||
if user_id:
|
||||
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
|
||||
if timestamp:
|
||||
return
|
||||
|
||||
is_trial = yield self.store.is_trial_user(user_id)
|
||||
if is_trial:
|
||||
return
|
||||
elif threepid:
|
||||
# If the user does not exist yet, but is signing up with a
|
||||
# reserved threepid then pass auth check
|
||||
if is_threepid_reserved(self.hs.config, threepid):
|
||||
return
|
||||
# Else if there is no room in the MAU bucket, bail
|
||||
current_mau = yield self.store.get_monthly_active_count()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise AuthError(
|
||||
403, "Monthly Active User Limits AU Limit Exceeded",
|
||||
admin_uri=self.hs.config.admin_uri,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEED
|
||||
raise ResourceLimitError(
|
||||
403, "Monthly Active User Limit Exceeded",
|
||||
admin_contact=self.hs.config.admin_contact,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
limit_type="monthly_active_user"
|
||||
)
|
||||
|
||||
@@ -78,6 +78,7 @@ class EventTypes(object):
|
||||
Name = "m.room.name"
|
||||
|
||||
ServerACL = "m.room.server_acl"
|
||||
Pinned = "m.room.pinned_events"
|
||||
|
||||
|
||||
class RejectedReason(object):
|
||||
@@ -97,9 +98,17 @@ class ThirdPartyEntityKind(object):
|
||||
LOCATION = "location"
|
||||
|
||||
|
||||
class RoomVersions(object):
|
||||
V1 = "1"
|
||||
VDH_TEST = "vdh-test-version"
|
||||
|
||||
|
||||
# the version we will give rooms which are created on this server
|
||||
DEFAULT_ROOM_VERSION = "1"
|
||||
DEFAULT_ROOM_VERSION = RoomVersions.V1
|
||||
|
||||
# vdh-test-version is a placeholder to get room versioning support working and tested
|
||||
# until we have a working v2.
|
||||
KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}
|
||||
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
|
||||
|
||||
ServerNoticeMsgType = "m.server_notice"
|
||||
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
|
||||
|
||||
@@ -56,7 +56,7 @@ class Codes(object):
|
||||
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
|
||||
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
|
||||
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
|
||||
RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED"
|
||||
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
|
||||
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
|
||||
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
|
||||
|
||||
@@ -224,15 +224,34 @@ class NotFoundError(SynapseError):
|
||||
|
||||
class AuthError(SynapseError):
|
||||
"""An error raised when there was a problem authorising an event."""
|
||||
def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
|
||||
self.admin_uri = admin_uri
|
||||
super(AuthError, self).__init__(code, msg, errcode=errcode)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if "errcode" not in kwargs:
|
||||
kwargs["errcode"] = Codes.FORBIDDEN
|
||||
super(AuthError, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class ResourceLimitError(SynapseError):
|
||||
"""
|
||||
Any error raised when there is a problem with resource usage.
|
||||
For instance, the monthly active user limit for the server has been exceeded
|
||||
"""
|
||||
def __init__(
|
||||
self, code, msg,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
admin_contact=None,
|
||||
limit_type=None,
|
||||
):
|
||||
self.admin_contact = admin_contact
|
||||
self.limit_type = limit_type
|
||||
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
|
||||
|
||||
def error_dict(self):
|
||||
return cs_error(
|
||||
self.msg,
|
||||
self.errcode,
|
||||
admin_uri=self.admin_uri,
|
||||
admin_contact=self.admin_contact,
|
||||
limit_type=self.limit_type
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -251,6 +251,7 @@ class FilterCollection(object):
|
||||
"include_leave", False
|
||||
)
|
||||
self.event_fields = filter_json.get("event_fields", [])
|
||||
self.event_format = filter_json.get("event_format", "client")
|
||||
|
||||
def __repr__(self):
|
||||
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
|
||||
|
||||
@@ -72,7 +72,7 @@ class Ratelimiter(object):
|
||||
return allowed, time_allowed
|
||||
|
||||
def prune_message_counts(self, time_now_s):
|
||||
for user_id in self.message_counts.keys():
|
||||
for user_id in list(self.message_counts.keys()):
|
||||
message_count, time_start, msg_rate_hz = (
|
||||
self.message_counts[user_id]
|
||||
)
|
||||
|
||||
@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
|
||||
logger.info("Metrics now reporting on %s:%d", host, port)
|
||||
|
||||
|
||||
def listen_tcp(bind_addresses, port, factory, backlog=50):
|
||||
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
|
||||
"""
|
||||
Create a TCP socket for a port and several addresses
|
||||
"""
|
||||
@@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
|
||||
check_bind_error(e, address, bind_addresses)
|
||||
|
||||
|
||||
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
|
||||
def listen_ssl(
|
||||
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
|
||||
):
|
||||
"""
|
||||
Create an SSL socket for a port and several addresses
|
||||
"""
|
||||
|
||||
@@ -51,10 +51,7 @@ class AppserviceSlaveStore(
|
||||
|
||||
|
||||
class AppserviceServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = AppserviceSlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -117,8 +114,9 @@ class ASReplicationHandler(ReplicationClientHandler):
|
||||
super(ASReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
|
||||
@@ -74,10 +74,7 @@ class ClientReaderSlavedStore(
|
||||
|
||||
|
||||
class ClientReaderServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = ClientReaderSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
|
||||
@@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.profile import (
|
||||
ProfileAvatarURLRestServlet,
|
||||
ProfileDisplaynameRestServlet,
|
||||
ProfileRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinRoomAliasServlet,
|
||||
RoomMembershipRestServlet,
|
||||
@@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import (
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -62,6 +68,9 @@ logger = logging.getLogger("synapse.app.event_creator")
|
||||
|
||||
|
||||
class EventCreatorSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
DirectoryStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
@@ -81,10 +90,7 @@ class EventCreatorSlavedStore(
|
||||
|
||||
|
||||
class EventCreatorServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = EventCreatorSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -101,6 +107,9 @@ class EventCreatorServer(HomeServer):
|
||||
RoomMembershipRestServlet(self).register(resource)
|
||||
RoomStateEventRestServlet(self).register(resource)
|
||||
JoinRoomAliasServlet(self).register(resource)
|
||||
ProfileAvatarURLRestServlet(self).register(resource)
|
||||
ProfileDisplaynameRestServlet(self).register(resource)
|
||||
ProfileRestServlet(self).register(resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
|
||||
@@ -32,6 +32,7 @@ from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
@@ -54,6 +55,7 @@ logger = logging.getLogger("synapse.app.federation_reader")
|
||||
|
||||
|
||||
class FederationReaderSlavedStore(
|
||||
SlavedAccountDataStore,
|
||||
SlavedProfileStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedPusherStore,
|
||||
@@ -70,10 +72,7 @@ class FederationReaderSlavedStore(
|
||||
|
||||
|
||||
class FederationReaderServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = FederationReaderSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
|
||||
@@ -78,10 +78,7 @@ class FederationSenderSlaveStore(
|
||||
|
||||
|
||||
class FederationSenderServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = FederationSenderSlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -144,8 +141,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
|
||||
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
yield super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
||||
|
||||
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
|
||||
from synapse.rest.client.v2_alpha._base import client_v2_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__(hs)
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {
|
||||
"Authorization": auth_headers,
|
||||
}
|
||||
result = yield self.http_client.get_json(
|
||||
self.main_uri + request.uri,
|
||||
headers=headers,
|
||||
)
|
||||
defer.returnValue((200, result))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
@@ -118,10 +148,7 @@ class FrontendProxySlavedStore(
|
||||
|
||||
|
||||
class FrontendProxyServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = FrontendProxySlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -135,6 +162,12 @@ class FrontendProxyServer(HomeServer):
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
|
||||
# If presence is disabled, use the stub servlet that does
|
||||
# not allow sending presence
|
||||
if not self.config.use_presence:
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
@@ -153,7 +186,8 @@ class FrontendProxyServer(HomeServer):
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
)
|
||||
),
|
||||
reactor=self.get_reactor()
|
||||
)
|
||||
|
||||
logger.info("Synapse client reader now listening on port %d", port)
|
||||
|
||||
@@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
|
||||
from synapse.rest.key.v2 import KeyApiV2Resource
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage import are_all_users_on_domain
|
||||
from synapse.storage import DataStore, are_all_users_on_domain
|
||||
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
|
||||
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
@@ -111,6 +111,8 @@ def build_resource_for_web_client(hs):
|
||||
|
||||
|
||||
class SynapseHomeServer(HomeServer):
|
||||
DATASTORE_CLASS = DataStore
|
||||
|
||||
def _listener_http(self, config, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_addresses = listener_config["bind_addresses"]
|
||||
@@ -305,6 +307,10 @@ class SynapseHomeServer(HomeServer):
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
registered_reserved_users_mau_gauge = Gauge(
|
||||
"synapse_admin_mau:registered_reserved_users",
|
||||
"Registered users with reserved threepids"
|
||||
)
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
@@ -356,13 +362,13 @@ def setup(config_options):
|
||||
logger.info("Preparing database: %s...", config.database_config['name'])
|
||||
|
||||
try:
|
||||
db_conn = hs.get_db_conn(run_new_connection=False)
|
||||
prepare_database(db_conn, database_engine, config=config)
|
||||
database_engine.on_new_connection(db_conn)
|
||||
with hs.get_db_conn(run_new_connection=False) as db_conn:
|
||||
prepare_database(db_conn, database_engine, config=config)
|
||||
database_engine.on_new_connection(db_conn)
|
||||
|
||||
hs.run_startup_checks(db_conn, database_engine)
|
||||
hs.run_startup_checks(db_conn, database_engine)
|
||||
|
||||
db_conn.commit()
|
||||
db_conn.commit()
|
||||
except UpgradeDatabaseException:
|
||||
sys.stderr.write(
|
||||
"\nFailed to upgrade database.\n"
|
||||
@@ -525,13 +531,18 @@ def run(hs):
|
||||
clock.looping_call(
|
||||
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
|
||||
)
|
||||
hs.get_datastore().reap_monthly_active_users()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def generate_monthly_active_users():
|
||||
count = 0
|
||||
current_mau_count = 0
|
||||
reserved_count = 0
|
||||
store = hs.get_datastore()
|
||||
if hs.config.limit_usage_by_mau:
|
||||
count = yield hs.get_datastore().get_monthly_active_count()
|
||||
current_mau_gauge.set(float(count))
|
||||
current_mau_count = yield store.get_monthly_active_count()
|
||||
reserved_count = yield store.get_registered_reserved_users_count()
|
||||
current_mau_gauge.set(float(current_mau_count))
|
||||
registered_reserved_users_mau_gauge.set(float(reserved_count))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
hs.get_datastore().initialise_reserved_users(
|
||||
|
||||
@@ -60,10 +60,7 @@ class MediaRepositorySlavedStore(
|
||||
|
||||
|
||||
class MediaRepositoryServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = MediaRepositorySlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
|
||||
@@ -78,10 +78,7 @@ class PusherSlaveStore(
|
||||
|
||||
|
||||
class PusherServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = PusherSlaveStore
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
|
||||
@@ -148,8 +145,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
||||
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.poke_pushers, stream_name, token, rows)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -162,11 +160,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
||||
else:
|
||||
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == "events":
|
||||
yield self.pusher_pool.on_new_notifications(
|
||||
self.pusher_pool.on_new_notifications(
|
||||
token, token,
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
yield self.pusher_pool.on_new_receipts(
|
||||
self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
|
||||
@@ -114,7 +114,10 @@ class SynchrotronPresence(object):
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||
if self.hs.config.use_presence:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
"""A user has started syncing. Send a UserSync to the master, unless they
|
||||
@@ -211,10 +214,13 @@ class SynchrotronPresence(object):
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
return [
|
||||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
if count > 0
|
||||
]
|
||||
if self.hs.config.use_presence:
|
||||
return [
|
||||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
if count > 0
|
||||
]
|
||||
else:
|
||||
return set()
|
||||
|
||||
|
||||
class SynchrotronTyping(object):
|
||||
@@ -243,10 +249,7 @@ class SynchrotronApplicationService(object):
|
||||
|
||||
|
||||
class SynchrotronServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = SynchrotronSlavedStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -332,8 +335,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
|
||||
@@ -94,10 +94,7 @@ class UserDirectorySlaveStore(
|
||||
|
||||
|
||||
class UserDirectoryServer(HomeServer):
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
DATASTORE_CLASS = UserDirectorySlaveStore
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
@@ -169,8 +166,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
||||
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.user_directory = hs.get_user_directory_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
yield super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == "current_state_deltas":
|
||||
|
||||
@@ -13,7 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
@@ -98,7 +99,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
def query_user(self, service, user_id):
|
||||
if service.url is None:
|
||||
defer.returnValue(False)
|
||||
uri = service.url + ("/users/%s" % urllib.quote(user_id))
|
||||
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
|
||||
response = None
|
||||
try:
|
||||
response = yield self.get_json(uri, {
|
||||
@@ -119,7 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
def query_alias(self, service, alias):
|
||||
if service.url is None:
|
||||
defer.returnValue(False)
|
||||
uri = service.url + ("/rooms/%s" % urllib.quote(alias))
|
||||
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
|
||||
response = None
|
||||
try:
|
||||
response = yield self.get_json(uri, {
|
||||
@@ -153,7 +154,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
service.url,
|
||||
APP_SERVICE_PREFIX,
|
||||
kind,
|
||||
urllib.quote(protocol)
|
||||
urllib.parse.quote(protocol)
|
||||
)
|
||||
try:
|
||||
response = yield self.get_json(uri, fields)
|
||||
@@ -188,7 +189,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
uri = "%s%s/thirdparty/protocol/%s" % (
|
||||
service.url,
|
||||
APP_SERVICE_PREFIX,
|
||||
urllib.quote(protocol)
|
||||
urllib.parse.quote(protocol)
|
||||
)
|
||||
try:
|
||||
info = yield self.get_json(uri, {})
|
||||
@@ -228,7 +229,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
txn_id = str(txn_id)
|
||||
|
||||
uri = service.url + ("/transactions/%s" %
|
||||
urllib.quote(txn_id))
|
||||
urllib.parse.quote(txn_id))
|
||||
try:
|
||||
yield self.put_json(
|
||||
uri=uri,
|
||||
|
||||
@@ -21,7 +21,7 @@ from .consent_config import ConsentConfig
|
||||
from .database import DatabaseConfig
|
||||
from .emailconfig import EmailConfig
|
||||
from .groups import GroupsConfig
|
||||
from .jwt import JWTConfig
|
||||
from .jwt_config import JWTConfig
|
||||
from .key import KeyConfig
|
||||
from .logger import LoggingConfig
|
||||
from .metrics import MetricsConfig
|
||||
|
||||
@@ -168,7 +168,8 @@ def setup_logging(config, use_worker_options=False):
|
||||
if log_file:
|
||||
# TODO: Customisable file size / backup count
|
||||
handler = logging.handlers.RotatingFileHandler(
|
||||
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
|
||||
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3,
|
||||
encoding='utf8'
|
||||
)
|
||||
|
||||
def sighup(signum, stack):
|
||||
@@ -226,7 +227,22 @@ def setup_logging(config, use_worker_options=False):
|
||||
#
|
||||
# However this may not be too much of a problem if we are just writing to a file.
|
||||
observer = STDLibLogObserver()
|
||||
|
||||
def _log(event):
|
||||
|
||||
if "log_text" in event:
|
||||
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
|
||||
return
|
||||
|
||||
if event["log_text"].startswith("(UDP Port "):
|
||||
return
|
||||
|
||||
if event["log_text"].startswith("Timing out client"):
|
||||
return
|
||||
|
||||
return observer(event)
|
||||
|
||||
globalLogBeginner.beginLoggingTo(
|
||||
[observer],
|
||||
[_log],
|
||||
redirectStandardIO=not config.no_redirect_stdio,
|
||||
)
|
||||
|
||||
@@ -49,6 +49,9 @@ class ServerConfig(Config):
|
||||
# "disable" federation
|
||||
self.send_federation = config.get("send_federation", True)
|
||||
|
||||
# Whether to enable user presence.
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
|
||||
# Whether to update the user directory or not. This should be set to
|
||||
# false only if we are updating the user directory in a worker
|
||||
self.update_user_directory = config.get("update_user_directory", True)
|
||||
@@ -74,17 +77,23 @@ class ServerConfig(Config):
|
||||
self.max_mau_value = config.get(
|
||||
"max_mau_value", 0,
|
||||
)
|
||||
|
||||
self.mau_limits_reserved_threepids = config.get(
|
||||
"mau_limit_reserved_threepids", []
|
||||
)
|
||||
|
||||
self.mau_trial_days = config.get(
|
||||
"mau_trial_days", 0,
|
||||
)
|
||||
|
||||
# Options to disable HS
|
||||
self.hs_disabled = config.get("hs_disabled", False)
|
||||
self.hs_disabled_message = config.get("hs_disabled_message", "")
|
||||
self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "")
|
||||
|
||||
# Admin uri to direct users at should their instance become blocked
|
||||
# due to resource constraints
|
||||
self.admin_uri = config.get("admin_uri", None)
|
||||
self.admin_contact = config.get("admin_contact", None)
|
||||
|
||||
# FIXME: federation_domain_whitelist needs sytests
|
||||
self.federation_domain_whitelist = None
|
||||
@@ -249,6 +258,9 @@ class ServerConfig(Config):
|
||||
# hard limit.
|
||||
soft_file_limit: 0
|
||||
|
||||
# Set to false to disable presence tracking on this homeserver.
|
||||
use_presence: true
|
||||
|
||||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
|
||||
# gc_thresholds: [700, 10, 10]
|
||||
|
||||
@@ -340,6 +352,33 @@ class ServerConfig(Config):
|
||||
# - port: 9000
|
||||
# bind_addresses: ['::1', '127.0.0.1']
|
||||
# type: manhole
|
||||
|
||||
|
||||
# Homeserver blocking
|
||||
#
|
||||
# How to reach the server admin, used in ResourceLimitError
|
||||
# admin_contact: 'mailto:admin@server.com'
|
||||
#
|
||||
# Global block config
|
||||
#
|
||||
# hs_disabled: False
|
||||
# hs_disabled_message: 'Human readable reason for why the HS is blocked'
|
||||
# hs_disabled_limit_type: 'error code(str), to help clients decode reason'
|
||||
#
|
||||
# Monthly Active User Blocking
|
||||
#
|
||||
# Enables monthly active user checking
|
||||
# limit_usage_by_mau: False
|
||||
# max_mau_value: 50
|
||||
# mau_trial_days: 2
|
||||
#
|
||||
# Sometimes the server admin will want to ensure certain accounts are
|
||||
# never blocked by mau checking. These accounts are specified here.
|
||||
#
|
||||
# mau_limit_reserved_threepids:
|
||||
# - medium: 'email'
|
||||
# address: 'reserved_user@example.com'
|
||||
|
||||
""" % locals()
|
||||
|
||||
def read_arguments(self, args):
|
||||
@@ -365,6 +404,23 @@ class ServerConfig(Config):
|
||||
" service on the given port.")
|
||||
|
||||
|
||||
def is_threepid_reserved(config, threepid):
|
||||
"""Check the threepid against the reserved threepid config
|
||||
Args:
|
||||
config(ServerConfig) - to access server config attributes
|
||||
threepid(dict) - The threepid to test for
|
||||
|
||||
Returns:
|
||||
boolean Is the threepid undertest reserved_user
|
||||
"""
|
||||
|
||||
for tp in config.mau_limits_reserved_threepids:
|
||||
if (threepid['medium'] == tp['medium']
|
||||
and threepid['address'] == tp['address']):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def read_gc_thresholds(thresholds):
|
||||
"""Reads the three integer thresholds for garbage collection. Ensures that
|
||||
the thresholds are integers if thresholds are supplied.
|
||||
|
||||
@@ -123,6 +123,6 @@ class ClientTLSOptionsFactory(object):
|
||||
|
||||
def get_options(self, host):
|
||||
return ClientTLSOptions(
|
||||
host.decode('utf-8'),
|
||||
host,
|
||||
CertificateOptions(verify=False).getContext()
|
||||
)
|
||||
|
||||
@@ -18,7 +18,9 @@ import logging
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.error import ConnectError
|
||||
from twisted.internet.protocol import Factory
|
||||
from twisted.names.error import DomainError
|
||||
from twisted.web.http import HTTPClient
|
||||
|
||||
from synapse.http.endpoint import matrix_federation_endpoint
|
||||
@@ -47,12 +49,14 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
|
||||
server_response, server_certificate = yield protocol.remote_key
|
||||
defer.returnValue((server_response, server_certificate))
|
||||
except SynapseKeyClientError as e:
|
||||
logger.exception("Error getting key for %r" % (server_name,))
|
||||
if e.status.startswith("4"):
|
||||
logger.warn("Error getting key for %r: %s", server_name, e)
|
||||
if e.status.startswith(b"4"):
|
||||
# Don't retry for 4xx responses.
|
||||
raise IOError("Cannot get key for %r" % server_name)
|
||||
except (ConnectError, DomainError) as e:
|
||||
logger.warn("Error getting key for %r: %s", server_name, e)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.exception("Error getting key for %r", server_name)
|
||||
raise IOError("Cannot get key for %r" % server_name)
|
||||
|
||||
|
||||
@@ -78,6 +82,12 @@ class SynapseKeyClientProtocol(HTTPClient):
|
||||
self._peer = self.transport.getPeer()
|
||||
logger.debug("Connected to %s", self._peer)
|
||||
|
||||
if not isinstance(self.path, bytes):
|
||||
self.path = self.path.encode('ascii')
|
||||
|
||||
if not isinstance(self.host, bytes):
|
||||
self.host = self.host.encode('ascii')
|
||||
|
||||
self.sendCommand(b"GET", self.path)
|
||||
if self.host:
|
||||
self.sendHeader(b"Host", self.host)
|
||||
|
||||
@@ -16,9 +16,10 @@
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import urllib
|
||||
from collections import namedtuple
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from signedjson.key import (
|
||||
decode_verify_key_bytes,
|
||||
encode_verify_key_base64,
|
||||
@@ -40,6 +41,7 @@ from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.crypto.keyclient import fetch_server_key
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.logcontext import (
|
||||
LoggingContext,
|
||||
PreserveLoggingContext,
|
||||
preserve_fn,
|
||||
run_in_background,
|
||||
@@ -216,23 +218,34 @@ class Keyring(object):
|
||||
servers have completed. Follows the synapse rules of logcontext
|
||||
preservation.
|
||||
"""
|
||||
loop_count = 1
|
||||
while True:
|
||||
wait_on = [
|
||||
self.key_downloads[server_name]
|
||||
(server_name, self.key_downloads[server_name])
|
||||
for server_name in server_names
|
||||
if server_name in self.key_downloads
|
||||
]
|
||||
if wait_on:
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList(wait_on)
|
||||
else:
|
||||
if not wait_on:
|
||||
break
|
||||
logger.info(
|
||||
"Waiting for existing lookups for %s to complete [loop %i]",
|
||||
[w[0] for w in wait_on], loop_count,
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
yield defer.DeferredList((w[1] for w in wait_on))
|
||||
|
||||
loop_count += 1
|
||||
|
||||
ctx = LoggingContext.current_context()
|
||||
|
||||
def rm(r, server_name_):
|
||||
self.key_downloads.pop(server_name_, None)
|
||||
with PreserveLoggingContext(ctx):
|
||||
logger.debug("Releasing key lookup lock on %s", server_name_)
|
||||
self.key_downloads.pop(server_name_, None)
|
||||
return r
|
||||
|
||||
for server_name, deferred in server_to_deferred.items():
|
||||
logger.debug("Got key lookup lock on %s", server_name)
|
||||
self.key_downloads[server_name] = deferred
|
||||
deferred.addBoth(rm, server_name)
|
||||
|
||||
@@ -432,7 +445,7 @@ class Keyring(object):
|
||||
# an incoming request.
|
||||
query_response = yield self.client.post_json(
|
||||
destination=perspective_name,
|
||||
path=b"/_matrix/key/v2/query",
|
||||
path="/_matrix/key/v2/query",
|
||||
data={
|
||||
u"server_keys": {
|
||||
server_name: {
|
||||
@@ -513,8 +526,8 @@ class Keyring(object):
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_client_options_factory,
|
||||
path=(b"/_matrix/key/v2/server/%s" % (
|
||||
urllib.quote(requested_key_id),
|
||||
path=("/_matrix/key/v2/server/%s" % (
|
||||
urllib.parse.quote(requested_key_id),
|
||||
)).encode("ascii"),
|
||||
)
|
||||
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.frozenutils import freeze
|
||||
|
||||
@@ -147,6 +149,9 @@ class EventBase(object):
|
||||
def items(self):
|
||||
return list(self._event_dict.items())
|
||||
|
||||
def keys(self):
|
||||
return six.iterkeys(self._event_dict)
|
||||
|
||||
|
||||
class FrozenEvent(EventBase):
|
||||
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
|
||||
|
||||
@@ -13,17 +13,20 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
import six
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import DeferredList
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -133,34 +136,45 @@ class FederationBase(object):
|
||||
* throws a SynapseError if the signature check failed.
|
||||
The deferreds run their callbacks in the sentinel logcontext.
|
||||
"""
|
||||
|
||||
redacted_pdus = [
|
||||
prune_event(pdu)
|
||||
for pdu in pdus
|
||||
]
|
||||
|
||||
deferreds = self.keyring.verify_json_objects_for_server([
|
||||
(p.origin, p.get_pdu_json())
|
||||
for p in redacted_pdus
|
||||
])
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
|
||||
|
||||
ctx = logcontext.LoggingContext.current_context()
|
||||
|
||||
def callback(_, pdu, redacted):
|
||||
def callback(_, pdu):
|
||||
with logcontext.PreserveLoggingContext(ctx):
|
||||
if not check_event_content_hash(pdu):
|
||||
logger.warn(
|
||||
"Event content has been tampered, redacting %s: %s",
|
||||
pdu.event_id, pdu.get_pdu_json()
|
||||
)
|
||||
return redacted
|
||||
# let's try to distinguish between failures because the event was
|
||||
# redacted (which are somewhat expected) vs actual ball-tampering
|
||||
# incidents.
|
||||
#
|
||||
# This is just a heuristic, so we just assume that if the keys are
|
||||
# about the same between the redacted and received events, then the
|
||||
# received event was probably a redacted copy (but we then use our
|
||||
# *actual* redacted copy to be on the safe side.)
|
||||
redacted_event = prune_event(pdu)
|
||||
if (
|
||||
set(redacted_event.keys()) == set(pdu.keys()) and
|
||||
set(six.iterkeys(redacted_event.content))
|
||||
== set(six.iterkeys(pdu.content))
|
||||
):
|
||||
logger.info(
|
||||
"Event %s seems to have been redacted; using our redacted "
|
||||
"copy",
|
||||
pdu.event_id,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Event %s content has been tampered, redacting",
|
||||
pdu.event_id, pdu.get_pdu_json(),
|
||||
)
|
||||
return redacted_event
|
||||
|
||||
if self.spam_checker.check_event_for_spam(pdu):
|
||||
logger.warn(
|
||||
"Event contains spam, redacting %s: %s",
|
||||
pdu.event_id, pdu.get_pdu_json()
|
||||
)
|
||||
return redacted
|
||||
return prune_event(pdu)
|
||||
|
||||
return pdu
|
||||
|
||||
@@ -168,21 +182,121 @@ class FederationBase(object):
|
||||
failure.trap(SynapseError)
|
||||
with logcontext.PreserveLoggingContext(ctx):
|
||||
logger.warn(
|
||||
"Signature check failed for %s",
|
||||
pdu.event_id,
|
||||
"Signature check failed for %s: %s",
|
||||
pdu.event_id, failure.getErrorMessage(),
|
||||
)
|
||||
return failure
|
||||
|
||||
for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
|
||||
for deferred, pdu in zip(deferreds, pdus):
|
||||
deferred.addCallbacks(
|
||||
callback, errback,
|
||||
callbackArgs=[pdu, redacted],
|
||||
callbackArgs=[pdu],
|
||||
errbackArgs=[pdu],
|
||||
)
|
||||
|
||||
return deferreds
|
||||
|
||||
|
||||
class PduToCheckSig(namedtuple("PduToCheckSig", [
|
||||
"pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds",
|
||||
])):
|
||||
pass
|
||||
|
||||
|
||||
def _check_sigs_on_pdus(keyring, pdus):
|
||||
"""Check that the given events are correctly signed
|
||||
|
||||
Args:
|
||||
keyring (synapse.crypto.Keyring): keyring object to do the checks
|
||||
pdus (Collection[EventBase]): the events to be checked
|
||||
|
||||
Returns:
|
||||
List[Deferred]: a Deferred for each event in pdus, which will either succeed if
|
||||
the signatures are valid, or fail (with a SynapseError) if not.
|
||||
"""
|
||||
|
||||
# (currently this is written assuming the v1 room structure; we'll probably want a
|
||||
# separate function for checking v2 rooms)
|
||||
|
||||
# we want to check that the event is signed by:
|
||||
#
|
||||
# (a) the server which created the event_id
|
||||
#
|
||||
# (b) the sender's server.
|
||||
#
|
||||
# - except in the case of invites created from a 3pid invite, which are exempt
|
||||
# from this check, because the sender has to match that of the original 3pid
|
||||
# invite, but the event may come from a different HS, for reasons that I don't
|
||||
# entirely grok (why do the senders have to match? and if they do, why doesn't the
|
||||
# joining server ask the inviting server to do the switcheroo with
|
||||
# exchange_third_party_invite?).
|
||||
#
|
||||
# That's pretty awful, since redacting such an invite will render it invalid
|
||||
# (because it will then look like a regular invite without a valid signature),
|
||||
# and signatures are *supposed* to be valid whether or not an event has been
|
||||
# redacted. But this isn't the worst of the ways that 3pid invites are broken.
|
||||
#
|
||||
# let's start by getting the domain for each pdu, and flattening the event back
|
||||
# to JSON.
|
||||
pdus_to_check = [
|
||||
PduToCheckSig(
|
||||
pdu=p,
|
||||
redacted_pdu_json=prune_event(p).get_pdu_json(),
|
||||
event_id_domain=get_domain_from_id(p.event_id),
|
||||
sender_domain=get_domain_from_id(p.sender),
|
||||
deferreds=[],
|
||||
)
|
||||
for p in pdus
|
||||
]
|
||||
|
||||
# first make sure that the event is signed by the event_id's domain
|
||||
deferreds = keyring.verify_json_objects_for_server([
|
||||
(p.event_id_domain, p.redacted_pdu_json)
|
||||
for p in pdus_to_check
|
||||
])
|
||||
|
||||
for p, d in zip(pdus_to_check, deferreds):
|
||||
p.deferreds.append(d)
|
||||
|
||||
# now let's look for events where the sender's domain is different to the
|
||||
# event id's domain (normally only the case for joins/leaves), and add additional
|
||||
# checks.
|
||||
pdus_to_check_sender = [
|
||||
p for p in pdus_to_check
|
||||
if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu)
|
||||
]
|
||||
|
||||
more_deferreds = keyring.verify_json_objects_for_server([
|
||||
(p.sender_domain, p.redacted_pdu_json)
|
||||
for p in pdus_to_check_sender
|
||||
])
|
||||
|
||||
for p, d in zip(pdus_to_check_sender, more_deferreds):
|
||||
p.deferreds.append(d)
|
||||
|
||||
# replace lists of deferreds with single Deferreds
|
||||
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
|
||||
|
||||
|
||||
def _flatten_deferred_list(deferreds):
|
||||
"""Given a list of one or more deferreds, either return the single deferred, or
|
||||
combine into a DeferredList.
|
||||
"""
|
||||
if len(deferreds) > 1:
|
||||
return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True)
|
||||
else:
|
||||
assert len(deferreds) == 1
|
||||
return deferreds[0]
|
||||
|
||||
|
||||
def _is_invite_via_3pid(event):
|
||||
return (
|
||||
event.type == EventTypes.Member
|
||||
and event.membership == Membership.INVITE
|
||||
and "third_party_invite" in event.content
|
||||
)
|
||||
|
||||
|
||||
def event_from_pdu_json(pdu_json, outlier=False):
|
||||
"""Construct a FrozenEvent from an event json received over federation
|
||||
|
||||
|
||||
@@ -271,10 +271,10 @@ class FederationClient(FederationBase):
|
||||
event_id, destination, e,
|
||||
)
|
||||
except NotRetryingDestination as e:
|
||||
logger.info(e.message)
|
||||
logger.info(str(e))
|
||||
continue
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e.message)
|
||||
logger.info(str(e))
|
||||
continue
|
||||
except Exception as e:
|
||||
pdu_attempts[destination] = now
|
||||
@@ -510,7 +510,7 @@ class FederationClient(FederationBase):
|
||||
else:
|
||||
logger.warn(
|
||||
"Failed to %s via %s: %i %s",
|
||||
description, destination, e.code, e.message,
|
||||
description, destination, e.code, e.args[0],
|
||||
)
|
||||
except Exception:
|
||||
logger.warn(
|
||||
@@ -875,7 +875,7 @@ class FederationClient(FederationBase):
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to send_third_party_invite via %s: %s",
|
||||
destination, e.message
|
||||
destination, str(e)
|
||||
)
|
||||
|
||||
raise RuntimeError("Failed to send to any server.")
|
||||
|
||||
@@ -99,7 +99,7 @@ class FederationServer(FederationBase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_incoming_transaction(self, transaction_data):
|
||||
def on_incoming_transaction(self, origin, transaction_data):
|
||||
# keep this as early as possible to make the calculated origin ts as
|
||||
# accurate as possible.
|
||||
request_time = self._clock.time_msec()
|
||||
@@ -108,34 +108,33 @@ class FederationServer(FederationBase):
|
||||
|
||||
if not transaction.transaction_id:
|
||||
raise Exception("Transaction missing transaction_id")
|
||||
if not transaction.origin:
|
||||
raise Exception("Transaction missing origin")
|
||||
|
||||
logger.debug("[%s] Got transaction", transaction.transaction_id)
|
||||
|
||||
# use a linearizer to ensure that we don't process the same transaction
|
||||
# multiple times in parallel.
|
||||
with (yield self._transaction_linearizer.queue(
|
||||
(transaction.origin, transaction.transaction_id),
|
||||
(origin, transaction.transaction_id),
|
||||
)):
|
||||
result = yield self._handle_incoming_transaction(
|
||||
transaction, request_time,
|
||||
origin, transaction, request_time,
|
||||
)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_incoming_transaction(self, transaction, request_time):
|
||||
def _handle_incoming_transaction(self, origin, transaction, request_time):
|
||||
""" Process an incoming transaction and return the HTTP response
|
||||
|
||||
Args:
|
||||
origin (unicode): the server making the request
|
||||
transaction (Transaction): incoming transaction
|
||||
request_time (int): timestamp that the HTTP request arrived at
|
||||
|
||||
Returns:
|
||||
Deferred[(int, object)]: http response code and body
|
||||
"""
|
||||
response = yield self.transaction_actions.have_responded(transaction)
|
||||
response = yield self.transaction_actions.have_responded(origin, transaction)
|
||||
|
||||
if response:
|
||||
logger.debug(
|
||||
@@ -149,7 +148,7 @@ class FederationServer(FederationBase):
|
||||
|
||||
received_pdus_counter.inc(len(transaction.pdus))
|
||||
|
||||
origin_host, _ = parse_server_name(transaction.origin)
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
|
||||
pdus_by_room = {}
|
||||
|
||||
@@ -190,7 +189,7 @@ class FederationServer(FederationBase):
|
||||
event_id = pdu.event_id
|
||||
try:
|
||||
yield self._handle_received_pdu(
|
||||
transaction.origin, pdu
|
||||
origin, pdu
|
||||
)
|
||||
pdu_results[event_id] = {}
|
||||
except FederationError as e:
|
||||
@@ -212,7 +211,7 @@ class FederationServer(FederationBase):
|
||||
if hasattr(transaction, "edus"):
|
||||
for edu in (Edu(**x) for x in transaction.edus):
|
||||
yield self.received_edu(
|
||||
transaction.origin,
|
||||
origin,
|
||||
edu.edu_type,
|
||||
edu.content
|
||||
)
|
||||
@@ -224,6 +223,7 @@ class FederationServer(FederationBase):
|
||||
logger.debug("Returning: %s", str(response))
|
||||
|
||||
yield self.transaction_actions.set_response(
|
||||
origin,
|
||||
transaction,
|
||||
200, response
|
||||
)
|
||||
@@ -838,9 +838,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
|
||||
)
|
||||
|
||||
return self._send_edu(
|
||||
edu_type=edu_type,
|
||||
origin=origin,
|
||||
content=content,
|
||||
edu_type=edu_type,
|
||||
origin=origin,
|
||||
content=content,
|
||||
)
|
||||
|
||||
def on_query(self, query_type, args):
|
||||
@@ -851,6 +851,6 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
|
||||
return handler(args)
|
||||
|
||||
return self._get_query_client(
|
||||
query_type=query_type,
|
||||
args=args,
|
||||
query_type=query_type,
|
||||
args=args,
|
||||
)
|
||||
|
||||
@@ -36,7 +36,7 @@ class TransactionActions(object):
|
||||
self.store = datastore
|
||||
|
||||
@log_function
|
||||
def have_responded(self, transaction):
|
||||
def have_responded(self, origin, transaction):
|
||||
""" Have we already responded to a transaction with the same id and
|
||||
origin?
|
||||
|
||||
@@ -50,11 +50,11 @@ class TransactionActions(object):
|
||||
"transaction_id")
|
||||
|
||||
return self.store.get_received_txn_response(
|
||||
transaction.transaction_id, transaction.origin
|
||||
transaction.transaction_id, origin
|
||||
)
|
||||
|
||||
@log_function
|
||||
def set_response(self, transaction, code, response):
|
||||
def set_response(self, origin, transaction, code, response):
|
||||
""" Persist how we responded to a transaction.
|
||||
|
||||
Returns:
|
||||
@@ -66,7 +66,7 @@ class TransactionActions(object):
|
||||
|
||||
return self.store.set_received_txn_response(
|
||||
transaction.transaction_id,
|
||||
transaction.origin,
|
||||
origin,
|
||||
code,
|
||||
response,
|
||||
)
|
||||
|
||||
@@ -32,7 +32,7 @@ Events are replicated via a separate events stream.
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from six import iteritems, itervalues
|
||||
from six import iteritems
|
||||
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
@@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
user_ids = set(
|
||||
user_id
|
||||
for uids in itervalues(self.presence_changed)
|
||||
for uids in self.presence_changed.values()
|
||||
for user_id in uids
|
||||
)
|
||||
|
||||
|
||||
@@ -58,6 +58,7 @@ class TransactionQueue(object):
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
@@ -308,6 +309,9 @@ class TransactionQueue(object):
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
if not self.hs.config.use_presence:
|
||||
# No-op if presence is disabled.
|
||||
return
|
||||
|
||||
# First we queue up the new presence by user ID, so multiple presence
|
||||
# updates in quick successtion are correctly handled
|
||||
@@ -459,7 +463,19 @@ class TransactionQueue(object):
|
||||
# pending_transactions flag.
|
||||
|
||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||
|
||||
# We can only include at most 50 PDUs per transactions
|
||||
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
|
||||
if leftover_pdus:
|
||||
self.pending_pdus_by_dest[destination] = leftover_pdus
|
||||
|
||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||
|
||||
# We can only include at most 100 EDUs per transactions
|
||||
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
|
||||
if leftover_edus:
|
||||
self.pending_edus_by_dest[destination] = leftover_edus
|
||||
|
||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||
|
||||
pending_edus.extend(
|
||||
|
||||
@@ -15,7 +15,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -106,7 +107,7 @@ class TransportLayerClient(object):
|
||||
dest (str)
|
||||
room_id (str)
|
||||
event_tuples (list)
|
||||
limt (int)
|
||||
limit (int)
|
||||
|
||||
Returns:
|
||||
Deferred: Results in a dict received from the remote homeserver.
|
||||
@@ -951,4 +952,4 @@ def _create_path(prefix, path, *args):
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
|
||||
return prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
|
||||
|
||||
@@ -90,8 +90,8 @@ class Authenticator(object):
|
||||
@defer.inlineCallbacks
|
||||
def authenticate_request(self, request, content):
|
||||
json_request = {
|
||||
"method": request.method,
|
||||
"uri": request.uri,
|
||||
"method": request.method.decode('ascii'),
|
||||
"uri": request.uri.decode('ascii'),
|
||||
"destination": self.server_name,
|
||||
"signatures": {},
|
||||
}
|
||||
@@ -252,7 +252,7 @@ class BaseFederationServlet(object):
|
||||
by the callback method. None if the request has already been handled.
|
||||
"""
|
||||
content = None
|
||||
if request.method in ["PUT", "POST"]:
|
||||
if request.method in [b"PUT", b"POST"]:
|
||||
# TODO: Handle other method types? other content types?
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
@@ -261,10 +261,10 @@ class BaseFederationServlet(object):
|
||||
except NoAuthenticationError:
|
||||
origin = None
|
||||
if self.REQUIRE_AUTH:
|
||||
logger.exception("authenticate_request failed")
|
||||
logger.warn("authenticate_request failed: missing authentication")
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("authenticate_request failed")
|
||||
except Exception as e:
|
||||
logger.warn("authenticate_request failed: %s", e)
|
||||
raise
|
||||
|
||||
if origin:
|
||||
@@ -353,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet):
|
||||
|
||||
try:
|
||||
code, response = yield self.handler.on_incoming_transaction(
|
||||
transaction_data
|
||||
origin, transaction_data,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("on_incoming_transaction failed")
|
||||
@@ -386,7 +386,7 @@ class FederationStateServlet(BaseFederationServlet):
|
||||
return self.handler.on_context_state_request(
|
||||
origin,
|
||||
context,
|
||||
query.get("event_id", [None])[0],
|
||||
parse_string_from_args(query, "event_id", None),
|
||||
)
|
||||
|
||||
|
||||
@@ -397,7 +397,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
|
||||
return self.handler.on_state_ids_request(
|
||||
origin,
|
||||
room_id,
|
||||
query.get("event_id", [None])[0],
|
||||
parse_string_from_args(query, "event_id", None),
|
||||
)
|
||||
|
||||
|
||||
@@ -405,14 +405,12 @@ class FederationBackfillServlet(BaseFederationServlet):
|
||||
PATH = "/backfill/(?P<context>[^/]*)/"
|
||||
|
||||
def on_GET(self, origin, content, query, context):
|
||||
versions = query["v"]
|
||||
limits = query["limit"]
|
||||
versions = [x.decode('ascii') for x in query[b"v"]]
|
||||
limit = parse_integer_from_args(query, "limit", None)
|
||||
|
||||
if not limits:
|
||||
if not limit:
|
||||
return defer.succeed((400, {"error": "Did not include limit param"}))
|
||||
|
||||
limit = int(limits[-1])
|
||||
|
||||
return self.handler.on_backfill_request(origin, context, versions, limit)
|
||||
|
||||
|
||||
@@ -423,7 +421,7 @@ class FederationQueryServlet(BaseFederationServlet):
|
||||
def on_GET(self, origin, content, query, query_type):
|
||||
return self.handler.on_query_request(
|
||||
query_type,
|
||||
{k: v[0].decode("utf-8") for k, v in query.items()}
|
||||
{k.decode('utf8'): v[0].decode("utf-8") for k, v in query.items()}
|
||||
)
|
||||
|
||||
|
||||
@@ -630,14 +628,14 @@ class OpenIdUserInfo(BaseFederationServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, origin, content, query):
|
||||
token = query.get("access_token", [None])[0]
|
||||
token = query.get(b"access_token", [None])[0]
|
||||
if token is None:
|
||||
defer.returnValue((401, {
|
||||
"errcode": "M_MISSING_TOKEN", "error": "Access Token required"
|
||||
}))
|
||||
return
|
||||
|
||||
user_id = yield self.handler.on_openid_userinfo(token)
|
||||
user_id = yield self.handler.on_openid_userinfo(token.decode('ascii'))
|
||||
|
||||
if user_id is None:
|
||||
defer.returnValue((401, {
|
||||
|
||||
@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
|
||||
"""
|
||||
logger.info("Logging in user %s on device %s", user_id, device_id)
|
||||
access_token = yield self.issue_access_token(user_id, device_id)
|
||||
yield self.auth.check_auth_blocking()
|
||||
yield self.auth.check_auth_blocking(user_id)
|
||||
|
||||
# the device *should* have been registered before we got here; however,
|
||||
# it's possible we raced against a DELETE operation. The thing we
|
||||
@@ -734,7 +734,6 @@ class AuthHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def validate_short_term_login_token_and_get_user_id(self, login_token):
|
||||
yield self.auth.check_auth_blocking()
|
||||
auth_api = self.hs.get_auth()
|
||||
user_id = None
|
||||
try:
|
||||
@@ -743,6 +742,7 @@ class AuthHandler(BaseHandler):
|
||||
auth_api.validate_macaroon(macaroon, "login", True, user_id)
|
||||
except Exception:
|
||||
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
|
||||
yield self.auth.check_auth_blocking(user_id)
|
||||
defer.returnValue(user_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -895,22 +895,24 @@ class AuthHandler(BaseHandler):
|
||||
|
||||
Args:
|
||||
password (unicode): Password to hash.
|
||||
stored_hash (unicode): Expected hash value.
|
||||
stored_hash (bytes): Expected hash value.
|
||||
|
||||
Returns:
|
||||
Deferred(bool): Whether self.hash(password) == stored_hash.
|
||||
"""
|
||||
|
||||
def _do_validate_hash():
|
||||
# Normalise the Unicode in the password
|
||||
pw = unicodedata.normalize("NFKC", password)
|
||||
|
||||
return bcrypt.checkpw(
|
||||
pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
|
||||
stored_hash.encode('utf8')
|
||||
stored_hash
|
||||
)
|
||||
|
||||
if stored_hash:
|
||||
if not isinstance(stored_hash, bytes):
|
||||
stored_hash = stored_hash.encode('ascii')
|
||||
|
||||
return make_deferred_yieldable(
|
||||
threads.deferToThreadPool(
|
||||
self.hs.get_reactor(),
|
||||
|
||||
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
|
||||
(algorithm, key_id, ex_json, key)
|
||||
)
|
||||
else:
|
||||
new_keys.append((algorithm, key_id, encode_canonical_json(key)))
|
||||
new_keys.append((
|
||||
algorithm, key_id, encode_canonical_json(key).decode('ascii')))
|
||||
|
||||
yield self.store.add_e2e_one_time_keys(
|
||||
user_id, device_id, time_now, new_keys
|
||||
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
|
||||
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
|
||||
# give a string for e.message, which json then fails to serialize.
|
||||
return {
|
||||
"status": 503, "message": str(e.message),
|
||||
"status": 503, "message": str(e),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -291,8 +291,9 @@ class FederationHandler(BaseHandler):
|
||||
ev_ids, get_prev_content=False, check_redacted=False
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(pdu.room_id)
|
||||
state_map = yield resolve_events_with_factory(
|
||||
state_groups, {pdu.event_id: pdu}, fetch
|
||||
room_version, state_groups, {pdu.event_id: pdu}, fetch
|
||||
)
|
||||
|
||||
state = (yield self.store.get_events(state_map.values())).values()
|
||||
@@ -593,7 +594,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
required_auth = set(
|
||||
a_id
|
||||
for event in events + state_events.values() + auth_events.values()
|
||||
for event in events + list(state_events.values()) + list(auth_events.values())
|
||||
for a_id, _ in event.auth_events
|
||||
)
|
||||
auth_events.update({
|
||||
@@ -801,7 +802,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
continue
|
||||
except NotRetryingDestination as e:
|
||||
logger.info(e.message)
|
||||
logger.info(str(e))
|
||||
continue
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
@@ -1357,7 +1358,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
if state_groups:
|
||||
_, state = state_groups.items().pop()
|
||||
_, state = list(state_groups.items()).pop()
|
||||
results = state
|
||||
|
||||
if event.is_state():
|
||||
@@ -1828,7 +1829,10 @@ class FederationHandler(BaseHandler):
|
||||
(d.type, d.state_key): d for d in different_events if d
|
||||
})
|
||||
|
||||
new_state = self.state_handler.resolve_events(
|
||||
room_version = yield self.store.get_room_version(event.room_id)
|
||||
|
||||
new_state = yield self.state_handler.resolve_events(
|
||||
room_version,
|
||||
[list(local_view.values()), list(remote_view.values())],
|
||||
event
|
||||
)
|
||||
@@ -2386,8 +2390,7 @@ class FederationHandler(BaseHandler):
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
logcontext.run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
|
||||
@@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
# If presence is disabled, return an empty list
|
||||
if not self.hs.config.use_presence:
|
||||
defer.returnValue([])
|
||||
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
|
||||
@@ -25,7 +25,13 @@ from twisted.internet import defer
|
||||
from twisted.internet.defer import succeed
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
ConsentNotGivenError,
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.urls import ConsentURIBuilder
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events.utils import serialize_event
|
||||
@@ -36,6 +42,7 @@ from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -82,28 +89,85 @@ class MessageHandler(object):
|
||||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_events(self, user_id, room_id, is_guest=False):
|
||||
def get_state_events(
|
||||
self, user_id, room_id, types=None, filtered_types=None,
|
||||
at_token=None, is_guest=False,
|
||||
):
|
||||
"""Retrieve all state events for a given room. If the user is
|
||||
joined to the room then return the current state. If the user has
|
||||
left the room return the state events from when they left.
|
||||
left the room return the state events from when they left. If an explicit
|
||||
'at' parameter is passed, return the state events as of that event, if
|
||||
visible.
|
||||
|
||||
Args:
|
||||
user_id(str): The user requesting state events.
|
||||
room_id(str): The room ID to get all state events from.
|
||||
types(list[(str, str|None)]|None): List of (type, state_key) tuples
|
||||
which are used to filter the state fetched. If `state_key` is None,
|
||||
all events are returned of the given type.
|
||||
May be None, which matches any key.
|
||||
filtered_types(list[str]|None): Only apply filtering via `types` to this
|
||||
list of event types. Other types of events are returned unfiltered.
|
||||
If None, `types` filtering is applied to all events.
|
||||
at_token(StreamToken|None): the stream token of the at which we are requesting
|
||||
the stats. If the user is not allowed to view the state as of that
|
||||
stream token, we raise a 403 SynapseError. If None, returns the current
|
||||
state based on the current_state_events table.
|
||||
is_guest(bool): whether this user is a guest
|
||||
Returns:
|
||||
A list of dicts representing state events. [{}, {}, {}]
|
||||
"""
|
||||
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
Raises:
|
||||
NotFoundError (404) if the at token does not yield an event
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
room_state = yield self.state.get_current_state(room_id)
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[membership_event_id], None
|
||||
AuthError (403) if the user doesn't have permission to view
|
||||
members of this room.
|
||||
"""
|
||||
if at_token:
|
||||
# FIXME this claims to get the state at a stream position, but
|
||||
# get_recent_events_for_room operates by topo ordering. This therefore
|
||||
# does not reliably give you the state at the given stream position.
|
||||
# (https://github.com/matrix-org/synapse/issues/3305)
|
||||
last_events, _ = yield self.store.get_recent_events_for_room(
|
||||
room_id, end_token=at_token.room_key, limit=1,
|
||||
)
|
||||
room_state = room_state[membership_event_id]
|
||||
|
||||
if not last_events:
|
||||
raise NotFoundError("Can't find event for token %s" % (at_token, ))
|
||||
|
||||
visible_events = yield filter_events_for_client(
|
||||
self.store, user_id, last_events,
|
||||
)
|
||||
|
||||
event = last_events[0]
|
||||
if visible_events:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[event.event_id], types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = room_state[event.event_id]
|
||||
else:
|
||||
raise AuthError(
|
||||
403,
|
||||
"User %s not allowed to view events in room %s at token %s" % (
|
||||
user_id, room_id, at_token,
|
||||
)
|
||||
)
|
||||
else:
|
||||
membership, membership_event_id = (
|
||||
yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id,
|
||||
)
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
state_ids = yield self.store.get_filtered_current_state_ids(
|
||||
room_id, types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = yield self.store.get_events(state_ids.values())
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[membership_event_id], types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = room_state[membership_event_id]
|
||||
|
||||
now = self.clock.time_msec()
|
||||
defer.returnValue(
|
||||
@@ -212,10 +276,14 @@ class EventCreationHandler(object):
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
|
||||
If None, they will be requested from the database.
|
||||
|
||||
Raises:
|
||||
ResourceLimitError if server is blocked to some resource being
|
||||
exceeded
|
||||
Returns:
|
||||
Tuple of created event (FrozenEvent), Context
|
||||
"""
|
||||
yield self.auth.check_auth_blocking(requester.user.to_string())
|
||||
|
||||
builder = self.event_builder_factory.new(event_dict)
|
||||
|
||||
self.validator.validate_new(builder)
|
||||
@@ -710,11 +778,8 @@ class EventCreationHandler(object):
|
||||
event, context=context
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
def _notify():
|
||||
|
||||
@@ -18,7 +18,7 @@ import logging
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.types import RoomStreamToken
|
||||
@@ -251,6 +251,26 @@ class PaginationHandler(object):
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
state = None
|
||||
if event_filter and event_filter.lazy_load_members():
|
||||
# TODO: remove redundant members
|
||||
|
||||
types = [
|
||||
(EventTypes.Member, state_key)
|
||||
for state_key in set(
|
||||
event.sender # FIXME: we also care about invite targets etc.
|
||||
for event in events
|
||||
)
|
||||
]
|
||||
|
||||
state_ids = yield self.store.get_state_ids_for_event(
|
||||
events[0].event_id, types=types,
|
||||
)
|
||||
|
||||
if state_ids:
|
||||
state = yield self.store.get_events(list(state_ids.values()))
|
||||
state = state.values()
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
@@ -262,4 +282,10 @@ class PaginationHandler(object):
|
||||
"end": next_token.to_string(),
|
||||
}
|
||||
|
||||
if state:
|
||||
chunk["state"] = [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in state
|
||||
]
|
||||
|
||||
defer.returnValue(chunk)
|
||||
|
||||
@@ -395,6 +395,10 @@ class PresenceHandler(object):
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
# If presence is disabled, no-op
|
||||
if not self.hs.config.use_presence:
|
||||
return
|
||||
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
@@ -424,6 +428,11 @@ class PresenceHandler(object):
|
||||
Useful for streams that are not associated with an actual
|
||||
client that is being used by a user.
|
||||
"""
|
||||
# Override if it should affect the user's presence, if presence is
|
||||
# disabled.
|
||||
if not self.hs.config.use_presence:
|
||||
affect_presence = False
|
||||
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
@@ -469,13 +478,16 @@ class PresenceHandler(object):
|
||||
Returns:
|
||||
set(str): A set of user_id strings.
|
||||
"""
|
||||
syncing_user_ids = {
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count
|
||||
}
|
||||
for user_ids in self.external_process_to_current_syncs.values():
|
||||
syncing_user_ids.update(user_ids)
|
||||
return syncing_user_ids
|
||||
if self.hs.config.use_presence:
|
||||
syncing_user_ids = {
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count
|
||||
}
|
||||
for user_ids in self.external_process_to_current_syncs.values():
|
||||
syncing_user_ids.update(user_ids)
|
||||
return syncing_user_ids
|
||||
else:
|
||||
return set()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
|
||||
|
||||
@@ -32,12 +32,16 @@ from ._base import BaseHandler
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProfileHandler(BaseHandler):
|
||||
PROFILE_UPDATE_MS = 60 * 1000
|
||||
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
|
||||
class BaseProfileHandler(BaseHandler):
|
||||
"""Handles fetching and updating user profile information.
|
||||
|
||||
BaseProfileHandler can be instantiated directly on workers and will
|
||||
delegate to master when necessary. The master process should use the
|
||||
subclass MasterProfileHandler
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
super(ProfileHandler, self).__init__(hs)
|
||||
super(BaseProfileHandler, self).__init__(hs)
|
||||
|
||||
self.federation = hs.get_federation_client()
|
||||
hs.get_federation_registry().register_query_handler(
|
||||
@@ -46,11 +50,6 @@ class ProfileHandler(BaseHandler):
|
||||
|
||||
self.user_directory_handler = hs.get_user_directory_handler()
|
||||
|
||||
if hs.config.worker_app is None:
|
||||
self.clock.looping_call(
|
||||
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_profile(self, user_id):
|
||||
target_user = UserID.from_string(user_id)
|
||||
@@ -282,6 +281,20 @@ class ProfileHandler(BaseHandler):
|
||||
room_id, str(e.message)
|
||||
)
|
||||
|
||||
|
||||
class MasterProfileHandler(BaseProfileHandler):
|
||||
PROFILE_UPDATE_MS = 60 * 1000
|
||||
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
|
||||
|
||||
def __init__(self, hs):
|
||||
super(MasterProfileHandler, self).__init__(hs)
|
||||
|
||||
assert hs.config.worker_app is None
|
||||
|
||||
self.clock.looping_call(
|
||||
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
|
||||
)
|
||||
|
||||
def _start_update_remote_profile_cache(self):
|
||||
return run_as_background_process(
|
||||
"Update remote profile", self._update_remote_profile_cache,
|
||||
|
||||
@@ -18,7 +18,6 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
|
||||
|
||||
affected_room_ids = list(set([r["room_id"] for r in receipts]))
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids
|
||||
)
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids,
|
||||
)
|
||||
|
||||
defer.returnValue(True)
|
||||
defer.returnValue(True)
|
||||
|
||||
@logcontext.preserve_fn # caller should not yield on this
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler):
|
||||
guest_access_token=None,
|
||||
make_guest=False,
|
||||
admin=False,
|
||||
threepid=None,
|
||||
):
|
||||
"""Registers a new client on the server.
|
||||
|
||||
@@ -145,7 +146,7 @@ class RegistrationHandler(BaseHandler):
|
||||
RegistrationError if there was a problem registering.
|
||||
"""
|
||||
|
||||
yield self.auth.check_auth_blocking()
|
||||
yield self.auth.check_auth_blocking(threepid=threepid)
|
||||
password_hash = None
|
||||
if password:
|
||||
password_hash = yield self.auth_handler().hash(password)
|
||||
|
||||
@@ -98,9 +98,13 @@ class RoomCreationHandler(BaseHandler):
|
||||
Raises:
|
||||
SynapseError if the room ID couldn't be stored, or something went
|
||||
horribly wrong.
|
||||
ResourceLimitError if server is blocked to some resource being
|
||||
exceeded
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
self.auth.check_auth_blocking(user_id)
|
||||
|
||||
if not self.spam_checker.user_may_create_room(user_id):
|
||||
raise SynapseError(403, "You are not permitted to create rooms")
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
|
||||
# Filter out rooms that we don't want to return
|
||||
rooms_to_scan = [
|
||||
r for r in sorted_rooms
|
||||
if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
|
||||
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
|
||||
]
|
||||
|
||||
total_room_count = len(rooms_to_scan)
|
||||
|
||||
@@ -344,6 +344,7 @@ class RoomMemberHandler(object):
|
||||
latest_event_ids = (
|
||||
event_id for (event_id, _, _) in prev_events_and_hashes
|
||||
)
|
||||
|
||||
current_state_ids = yield self.state_handler.get_current_state_ids(
|
||||
room_id, latest_event_ids=latest_event_ids,
|
||||
)
|
||||
|
||||
@@ -54,7 +54,7 @@ class SearchHandler(BaseHandler):
|
||||
batch_token = None
|
||||
if batch:
|
||||
try:
|
||||
b = decode_base64(batch)
|
||||
b = decode_base64(batch).decode('ascii')
|
||||
batch_group, batch_group_key, batch_token = b.split("\n")
|
||||
|
||||
assert batch_group is not None
|
||||
@@ -258,18 +258,18 @@ class SearchHandler(BaseHandler):
|
||||
# it returns more from the same group (if applicable) rather
|
||||
# than reverting to searching all results again.
|
||||
if batch_group and batch_group_key:
|
||||
global_next_batch = encode_base64("%s\n%s\n%s" % (
|
||||
global_next_batch = encode_base64(("%s\n%s\n%s" % (
|
||||
batch_group, batch_group_key, pagination_token
|
||||
))
|
||||
)).encode('ascii'))
|
||||
else:
|
||||
global_next_batch = encode_base64("%s\n%s\n%s" % (
|
||||
global_next_batch = encode_base64(("%s\n%s\n%s" % (
|
||||
"all", "", pagination_token
|
||||
))
|
||||
)).encode('ascii'))
|
||||
|
||||
for room_id, group in room_groups.items():
|
||||
group["next_batch"] = encode_base64("%s\n%s\n%s" % (
|
||||
group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
|
||||
"room_id", room_id, pagination_token
|
||||
))
|
||||
)).encode('ascii'))
|
||||
|
||||
allowed_events.extend(room_events)
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
@@ -75,6 +76,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||
"ephemeral",
|
||||
"account_data",
|
||||
"unread_notifications",
|
||||
"summary",
|
||||
])):
|
||||
__slots__ = []
|
||||
|
||||
@@ -184,6 +186,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
||||
class SyncHandler(object):
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs_config = hs.config
|
||||
self.store = hs.get_datastore()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
@@ -503,10 +506,171 @@ class SyncHandler(object):
|
||||
state = {}
|
||||
defer.returnValue(state)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def compute_summary(self, room_id, sync_config, batch, state, now_token):
|
||||
""" Works out a room summary block for this room, summarising the number
|
||||
of joined members in the room, and providing the 'hero' members if the
|
||||
room has no name so clients can consistently name rooms. Also adds
|
||||
state events to 'state' if needed to describe the heroes.
|
||||
|
||||
Args:
|
||||
room_id(str):
|
||||
sync_config(synapse.handlers.sync.SyncConfig):
|
||||
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
|
||||
the room that will be sent to the user.
|
||||
state(dict): dict of (type, state_key) -> Event as returned by
|
||||
compute_state_delta
|
||||
now_token(str): Token of the end of the current batch.
|
||||
|
||||
Returns:
|
||||
A deferred dict describing the room summary
|
||||
"""
|
||||
|
||||
# FIXME: we could/should get this from room_stats when matthew/stats lands
|
||||
|
||||
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
|
||||
last_events, _ = yield self.store.get_recent_event_ids_for_room(
|
||||
room_id, end_token=now_token.room_key, limit=1,
|
||||
)
|
||||
|
||||
if not last_events:
|
||||
defer.returnValue(None)
|
||||
return
|
||||
|
||||
last_event = last_events[-1]
|
||||
state_ids = yield self.store.get_state_ids_for_event(
|
||||
last_event.event_id, [
|
||||
(EventTypes.Name, ''),
|
||||
(EventTypes.CanonicalAlias, ''),
|
||||
]
|
||||
)
|
||||
|
||||
# this is heavily cached, thus: fast.
|
||||
details = yield self.store.get_room_summary(room_id)
|
||||
|
||||
name_id = state_ids.get((EventTypes.Name, ''))
|
||||
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
|
||||
|
||||
summary = {}
|
||||
empty_ms = MemberSummary([], 0)
|
||||
|
||||
# TODO: only send these when they change.
|
||||
summary["m.joined_member_count"] = (
|
||||
details.get(Membership.JOIN, empty_ms).count
|
||||
)
|
||||
summary["m.invited_member_count"] = (
|
||||
details.get(Membership.INVITE, empty_ms).count
|
||||
)
|
||||
|
||||
# if the room has a name or canonical_alias set, we can skip
|
||||
# calculating heroes. we assume that if the event has contents, it'll
|
||||
# be a valid name or canonical_alias - i.e. we're checking that they
|
||||
# haven't been "deleted" by blatting {} over the top.
|
||||
if name_id:
|
||||
name = yield self.store.get_event(name_id, allow_none=False)
|
||||
if name and name.content:
|
||||
defer.returnValue(summary)
|
||||
|
||||
if canonical_alias_id:
|
||||
canonical_alias = yield self.store.get_event(
|
||||
canonical_alias_id, allow_none=False,
|
||||
)
|
||||
if canonical_alias and canonical_alias.content:
|
||||
defer.returnValue(summary)
|
||||
|
||||
joined_user_ids = [
|
||||
r[0] for r in details.get(Membership.JOIN, empty_ms).members
|
||||
]
|
||||
invited_user_ids = [
|
||||
r[0] for r in details.get(Membership.INVITE, empty_ms).members
|
||||
]
|
||||
gone_user_ids = (
|
||||
[r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
|
||||
[r[0] for r in details.get(Membership.BAN, empty_ms).members]
|
||||
)
|
||||
|
||||
# FIXME: only build up a member_ids list for our heroes
|
||||
member_ids = {}
|
||||
for membership in (
|
||||
Membership.JOIN,
|
||||
Membership.INVITE,
|
||||
Membership.LEAVE,
|
||||
Membership.BAN
|
||||
):
|
||||
for user_id, event_id in details.get(membership, empty_ms).members:
|
||||
member_ids[user_id] = event_id
|
||||
|
||||
# FIXME: order by stream ordering rather than as returned by SQL
|
||||
me = sync_config.user.to_string()
|
||||
if (joined_user_ids or invited_user_ids):
|
||||
summary['m.heroes'] = sorted(
|
||||
[
|
||||
user_id
|
||||
for user_id in (joined_user_ids + invited_user_ids)
|
||||
if user_id != me
|
||||
]
|
||||
)[0:5]
|
||||
else:
|
||||
summary['m.heroes'] = sorted(
|
||||
[
|
||||
user_id
|
||||
for user_id in gone_user_ids
|
||||
if user_id != me
|
||||
]
|
||||
)[0:5]
|
||||
|
||||
if not sync_config.filter_collection.lazy_load_members():
|
||||
defer.returnValue(summary)
|
||||
|
||||
# ensure we send membership events for heroes if needed
|
||||
cache_key = (sync_config.user.to_string(), sync_config.device_id)
|
||||
cache = self.get_lazy_loaded_members_cache(cache_key)
|
||||
|
||||
# track which members the client should already know about via LL:
|
||||
# Ones which are already in state...
|
||||
existing_members = set(
|
||||
user_id for (typ, user_id) in state.keys()
|
||||
if typ == EventTypes.Member
|
||||
)
|
||||
|
||||
# ...or ones which are in the timeline...
|
||||
for ev in batch.events:
|
||||
if ev.type == EventTypes.Member:
|
||||
existing_members.add(ev.state_key)
|
||||
|
||||
# ...and then ensure any missing ones get included in state.
|
||||
missing_hero_event_ids = [
|
||||
member_ids[hero_id]
|
||||
for hero_id in summary['m.heroes']
|
||||
if (
|
||||
cache.get(hero_id) != member_ids[hero_id] and
|
||||
hero_id not in existing_members
|
||||
)
|
||||
]
|
||||
|
||||
missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
|
||||
missing_hero_state = missing_hero_state.values()
|
||||
|
||||
for s in missing_hero_state:
|
||||
cache.set(s.state_key, s.event_id)
|
||||
state[(EventTypes.Member, s.state_key)] = s
|
||||
|
||||
defer.returnValue(summary)
|
||||
|
||||
def get_lazy_loaded_members_cache(self, cache_key):
|
||||
cache = self.lazy_loaded_members_cache.get(cache_key)
|
||||
if cache is None:
|
||||
logger.debug("creating LruCache for %r", cache_key)
|
||||
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
|
||||
self.lazy_loaded_members_cache[cache_key] = cache
|
||||
else:
|
||||
logger.debug("found LruCache for %r", cache_key)
|
||||
return cache
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
|
||||
full_state):
|
||||
""" Works out the differnce in state between the start of the timeline
|
||||
""" Works out the difference in state between the start of the timeline
|
||||
and the previous sync.
|
||||
|
||||
Args:
|
||||
@@ -520,7 +684,7 @@ class SyncHandler(object):
|
||||
full_state(bool): Whether to force returning the full state.
|
||||
|
||||
Returns:
|
||||
A deferred new event dictionary
|
||||
A deferred dict of (type, state_key) -> Event
|
||||
"""
|
||||
# TODO(mjark) Check if the state events were received by the server
|
||||
# after the previous sync, since we need to include those state
|
||||
@@ -585,6 +749,26 @@ class SyncHandler(object):
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
elif batch.limited:
|
||||
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
||||
batch.events[0].event_id, types=types,
|
||||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
# for now, we disable LL for gappy syncs - see
|
||||
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
|
||||
# N.B. this slows down incr syncs as we are now processing way
|
||||
# more state in the server than if we were LLing.
|
||||
#
|
||||
# We still have to filter timeline_start to LL entries (above) in order
|
||||
# for _calculate_state's LL logic to work, as we have to include LL
|
||||
# members for timeline senders in case they weren't loaded in the initial
|
||||
# sync. We do this by (counterintuitively) by filtering timeline_start
|
||||
# members to just be ones which were timeline senders, which then ensures
|
||||
# all of the rest get included in the state block (if we need to know
|
||||
# about them).
|
||||
types = None
|
||||
filtered_types = None
|
||||
|
||||
state_at_previous_sync = yield self.get_state_at(
|
||||
room_id, stream_position=since_token, types=types,
|
||||
filtered_types=filtered_types,
|
||||
@@ -595,36 +779,34 @@ class SyncHandler(object):
|
||||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
||||
batch.events[0].event_id, types=types,
|
||||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
state_ids = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
previous=state_at_previous_sync,
|
||||
current=current_state_ids,
|
||||
# we have to include LL members in case LL initial sync missed them
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
else:
|
||||
state_ids = {}
|
||||
if lazy_load_members:
|
||||
if types:
|
||||
# We're returning an incremental sync, with no
|
||||
# "gap" since the previous sync, so normally there would be
|
||||
# no state to return.
|
||||
# But we're lazy-loading, so the client might need some more
|
||||
# member events to understand the events in this timeline.
|
||||
# So we fish out all the member events corresponding to the
|
||||
# timeline here, and then dedupe any redundant ones below.
|
||||
|
||||
state_ids = yield self.store.get_state_ids_for_event(
|
||||
batch.events[0].event_id, types=types,
|
||||
filtered_types=filtered_types,
|
||||
filtered_types=None, # we only want members!
|
||||
)
|
||||
|
||||
if lazy_load_members and not include_redundant_members:
|
||||
cache_key = (sync_config.user.to_string(), sync_config.device_id)
|
||||
cache = self.lazy_loaded_members_cache.get(cache_key)
|
||||
if cache is None:
|
||||
logger.debug("creating LruCache for %r", cache_key)
|
||||
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
|
||||
self.lazy_loaded_members_cache[cache_key] = cache
|
||||
else:
|
||||
logger.debug("found LruCache for %r", cache_key)
|
||||
cache = self.get_lazy_loaded_members_cache(cache_key)
|
||||
|
||||
# if it's a new sync sequence, then assume the client has had
|
||||
# amnesia and doesn't want any recent lazy-loaded members
|
||||
@@ -639,7 +821,7 @@ class SyncHandler(object):
|
||||
logger.debug("filtering state from %r...", state_ids)
|
||||
state_ids = {
|
||||
t: event_id
|
||||
for t, event_id in state_ids.iteritems()
|
||||
for t, event_id in iteritems(state_ids)
|
||||
if cache.get(t[1]) != event_id
|
||||
}
|
||||
logger.debug("...to %r", state_ids)
|
||||
@@ -733,7 +915,7 @@ class SyncHandler(object):
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
if not block_all_presence_data:
|
||||
if self.hs_config.use_presence and not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
@@ -1425,7 +1607,6 @@ class SyncHandler(object):
|
||||
if events == [] and tags is None:
|
||||
return
|
||||
|
||||
since_token = sync_result_builder.since_token
|
||||
now_token = sync_result_builder.now_token
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
@@ -1441,6 +1622,19 @@ class SyncHandler(object):
|
||||
newly_joined_room=newly_joined,
|
||||
)
|
||||
|
||||
# When we join the room (or the client requests full_state), we should
|
||||
# send down any existing tags. Usually the user won't have tags in a
|
||||
# newly joined room, unless either a) they've joined before or b) the
|
||||
# tag was added by synapse e.g. for server notice rooms.
|
||||
if full_state:
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
tags = yield self.store.get_tags_for_room(user_id, room_id)
|
||||
|
||||
# If there aren't any tags, don't send the empty tags list down
|
||||
# sync
|
||||
if not tags:
|
||||
tags = None
|
||||
|
||||
account_data_events = []
|
||||
if tags is not None:
|
||||
account_data_events.append({
|
||||
@@ -1468,6 +1662,32 @@ class SyncHandler(object):
|
||||
full_state=full_state
|
||||
)
|
||||
|
||||
summary = {}
|
||||
|
||||
# we include a summary in room responses when we're lazy loading
|
||||
# members (as the client otherwise doesn't have enough info to form
|
||||
# the name itself).
|
||||
if (
|
||||
sync_config.filter_collection.lazy_load_members() and
|
||||
(
|
||||
# we recalulate the summary:
|
||||
# if there are membership changes in the timeline, or
|
||||
# if membership has changed during a gappy sync, or
|
||||
# if this is an initial sync.
|
||||
any(ev.type == EventTypes.Member for ev in batch.events) or
|
||||
(
|
||||
# XXX: this may include false positives in the form of LL
|
||||
# members which have snuck into state
|
||||
batch.limited and
|
||||
any(t == EventTypes.Member for (t, k) in state)
|
||||
) or
|
||||
since_token is None
|
||||
)
|
||||
):
|
||||
summary = yield self.compute_summary(
|
||||
room_id, sync_config, batch, state, now_token
|
||||
)
|
||||
|
||||
if room_builder.rtype == "joined":
|
||||
unread_notifications = {}
|
||||
room_sync = JoinedSyncResult(
|
||||
@@ -1477,6 +1697,7 @@ class SyncHandler(object):
|
||||
ephemeral=ephemeral,
|
||||
account_data=account_data_events,
|
||||
unread_notifications=unread_notifications,
|
||||
summary=summary,
|
||||
)
|
||||
|
||||
if room_sync or always_include:
|
||||
@@ -1489,6 +1710,16 @@ class SyncHandler(object):
|
||||
unread_notifications["highlight_count"] = notifs["highlight_count"]
|
||||
|
||||
sync_result_builder.joined.append(room_sync)
|
||||
|
||||
if batch.limited and since_token:
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info(
|
||||
"Incremental gappy sync of %s for user %s with %d state events" % (
|
||||
room_id,
|
||||
user_id,
|
||||
len(state),
|
||||
)
|
||||
)
|
||||
elif room_builder.rtype == "archived":
|
||||
room_sync = ArchivedSyncResult(
|
||||
room_id=room_id,
|
||||
@@ -1582,17 +1813,17 @@ def _calculate_state(
|
||||
event_id_to_key = {
|
||||
e: key
|
||||
for key, e in itertools.chain(
|
||||
timeline_contains.items(),
|
||||
previous.items(),
|
||||
timeline_start.items(),
|
||||
current.items(),
|
||||
iteritems(timeline_contains),
|
||||
iteritems(previous),
|
||||
iteritems(timeline_start),
|
||||
iteritems(current),
|
||||
)
|
||||
}
|
||||
|
||||
c_ids = set(e for e in current.values())
|
||||
ts_ids = set(e for e in timeline_start.values())
|
||||
p_ids = set(e for e in previous.values())
|
||||
tc_ids = set(e for e in timeline_contains.values())
|
||||
c_ids = set(e for e in itervalues(current))
|
||||
ts_ids = set(e for e in itervalues(timeline_start))
|
||||
p_ids = set(e for e in itervalues(previous))
|
||||
tc_ids = set(e for e in itervalues(timeline_contains))
|
||||
|
||||
# If we are lazyloading room members, we explicitly add the membership events
|
||||
# for the senders in the timeline into the state block returned by /sync,
|
||||
@@ -1606,7 +1837,7 @@ def _calculate_state(
|
||||
|
||||
if lazy_load_members:
|
||||
p_ids.difference_update(
|
||||
e for t, e in timeline_start.iteritems()
|
||||
e for t, e in iteritems(timeline_start)
|
||||
if t[0] == EventTypes.Member
|
||||
)
|
||||
|
||||
|
||||
@@ -119,6 +119,8 @@ class UserDirectoryHandler(object):
|
||||
"""Called to update index of our local user profiles when they change
|
||||
irrespective of any rooms the user may be in.
|
||||
"""
|
||||
# FIXME(#3714): We should probably do this in the same worker as all
|
||||
# the other changes.
|
||||
yield self.store.update_profile_in_user_dir(
|
||||
user_id, profile.display_name, profile.avatar_url, None,
|
||||
)
|
||||
@@ -127,6 +129,8 @@ class UserDirectoryHandler(object):
|
||||
def handle_user_deactivated(self, user_id):
|
||||
"""Called when a user ID is deactivated
|
||||
"""
|
||||
# FIXME(#3714): We should probably do this in the same worker as all
|
||||
# the other changes.
|
||||
yield self.store.remove_from_user_dir(user_id)
|
||||
yield self.store.remove_from_user_in_public_room(user_id)
|
||||
|
||||
|
||||
@@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
|
||||
return value
|
||||
|
||||
|
||||
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
|
||||
|
||||
def redact_uri(uri):
|
||||
"""Strips access tokens from the uri replaces with <redacted>"""
|
||||
return ACCESS_TOKEN_RE.sub(
|
||||
br'\1<redacted>\3',
|
||||
r'\1<redacted>\3',
|
||||
uri
|
||||
)
|
||||
|
||||
@@ -13,24 +13,25 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from six import StringIO
|
||||
from six import text_type
|
||||
from six.moves import urllib
|
||||
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from prometheus_client import Counter
|
||||
|
||||
from OpenSSL import SSL
|
||||
from OpenSSL.SSL import VERIFY_NONE
|
||||
from twisted.internet import defer, protocol, reactor, ssl, task
|
||||
from twisted.internet import defer, protocol, reactor, ssl
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.web.client import (
|
||||
Agent,
|
||||
BrowserLikeRedirectAgent,
|
||||
ContentDecoderAgent,
|
||||
FileBodyProducer as TwistedFileBodyProducer,
|
||||
GzipDecoder,
|
||||
HTTPConnectionPool,
|
||||
PartialDownloadError,
|
||||
@@ -83,8 +84,10 @@ class SimpleHttpClient(object):
|
||||
if hs.config.user_agent_suffix:
|
||||
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
|
||||
|
||||
self.user_agent = self.user_agent.encode('ascii')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(self, method, uri, *args, **kwargs):
|
||||
def request(self, method, uri, data=b'', headers=None):
|
||||
# A small wrapper around self.agent.request() so we can easily attach
|
||||
# counters to it
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
@@ -93,8 +96,8 @@ class SimpleHttpClient(object):
|
||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||
|
||||
try:
|
||||
request_deferred = self.agent.request(
|
||||
method, uri, *args, **kwargs
|
||||
request_deferred = treq.request(
|
||||
method, uri, agent=self.agent, data=data, headers=headers
|
||||
)
|
||||
add_timeout_to_deferred(
|
||||
request_deferred, 60, self.hs.get_reactor(),
|
||||
@@ -112,7 +115,7 @@ class SimpleHttpClient(object):
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method, redact_uri(uri), type(e).__name__, e.message
|
||||
method, redact_uri(uri), type(e).__name__, e.args[0]
|
||||
)
|
||||
raise
|
||||
|
||||
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
|
||||
# TODO: Do we ever want to log message contents?
|
||||
logger.debug("post_urlencoded_get_json args: %s", args)
|
||||
|
||||
query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
|
||||
query_bytes = urllib.parse.urlencode(
|
||||
encode_urlencode_args(args), True).encode("utf8")
|
||||
|
||||
actual_headers = {
|
||||
b"Content-Type": [b"application/x-www-form-urlencoded"],
|
||||
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
|
||||
|
||||
response = yield self.request(
|
||||
"POST",
|
||||
uri.encode("ascii"),
|
||||
uri,
|
||||
headers=Headers(actual_headers),
|
||||
bodyProducer=FileBodyProducer(StringIO(query_bytes))
|
||||
data=query_bytes
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
defer.returnValue(json.loads(body))
|
||||
body = yield make_deferred_yieldable(treq.json_content(response))
|
||||
defer.returnValue(body)
|
||||
else:
|
||||
raise HttpResponseException(response.code, response.phrase, body)
|
||||
|
||||
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
|
||||
|
||||
response = yield self.request(
|
||||
"POST",
|
||||
uri.encode("ascii"),
|
||||
uri,
|
||||
headers=Headers(actual_headers),
|
||||
bodyProducer=FileBodyProducer(StringIO(json_str))
|
||||
data=json_str
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
|
||||
ValueError: if the response was not JSON
|
||||
"""
|
||||
if len(args):
|
||||
query_bytes = urllib.urlencode(args, True)
|
||||
query_bytes = urllib.parse.urlencode(args, True)
|
||||
uri = "%s?%s" % (uri, query_bytes)
|
||||
|
||||
json_str = encode_canonical_json(json_body)
|
||||
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
|
||||
|
||||
response = yield self.request(
|
||||
"PUT",
|
||||
uri.encode("ascii"),
|
||||
uri,
|
||||
headers=Headers(actual_headers),
|
||||
bodyProducer=FileBodyProducer(StringIO(json_str))
|
||||
data=json_str
|
||||
)
|
||||
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
|
||||
HttpResponseException on a non-2xx HTTP response.
|
||||
"""
|
||||
if len(args):
|
||||
query_bytes = urllib.urlencode(args, True)
|
||||
query_bytes = urllib.parse.urlencode(args, True)
|
||||
uri = "%s?%s" % (uri, query_bytes)
|
||||
|
||||
actual_headers = {
|
||||
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
|
||||
|
||||
response = yield self.request(
|
||||
"GET",
|
||||
uri.encode("ascii"),
|
||||
uri,
|
||||
headers=Headers(actual_headers),
|
||||
)
|
||||
|
||||
@@ -339,13 +342,14 @@ class SimpleHttpClient(object):
|
||||
|
||||
response = yield self.request(
|
||||
"GET",
|
||||
url.encode("ascii"),
|
||||
url,
|
||||
headers=Headers(actual_headers),
|
||||
)
|
||||
|
||||
resp_headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
|
||||
if (b'Content-Length' in resp_headers and
|
||||
int(resp_headers[b'Content-Length']) > max_size):
|
||||
logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
|
||||
raise SynapseError(
|
||||
502,
|
||||
@@ -378,7 +382,12 @@ class SimpleHttpClient(object):
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
(length, resp_headers, response.request.absoluteURI, response.code),
|
||||
(
|
||||
length,
|
||||
resp_headers,
|
||||
response.request.absoluteURI.decode('ascii'),
|
||||
response.code,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -434,12 +443,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_urlencoded_get_raw(self, url, args={}):
|
||||
query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
|
||||
query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
|
||||
|
||||
response = yield self.request(
|
||||
"POST",
|
||||
url.encode("ascii"),
|
||||
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
|
||||
url,
|
||||
data=query_bytes,
|
||||
headers=Headers({
|
||||
b"Content-Type": [b"application/x-www-form-urlencoded"],
|
||||
b"User-Agent": [self.user_agent],
|
||||
@@ -463,9 +472,9 @@ class SpiderEndpointFactory(object):
|
||||
def endpointForURI(self, uri):
|
||||
logger.info("Getting endpoint for %s", uri.toBytes())
|
||||
|
||||
if uri.scheme == "http":
|
||||
if uri.scheme == b"http":
|
||||
endpoint_factory = HostnameEndpoint
|
||||
elif uri.scheme == "https":
|
||||
elif uri.scheme == b"https":
|
||||
tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
|
||||
|
||||
def endpoint_factory(reactor, host, port, **kw):
|
||||
@@ -510,7 +519,7 @@ def encode_urlencode_args(args):
|
||||
|
||||
|
||||
def encode_urlencode_arg(arg):
|
||||
if isinstance(arg, unicode):
|
||||
if isinstance(arg, text_type):
|
||||
return arg.encode('utf-8')
|
||||
elif isinstance(arg, list):
|
||||
return [encode_urlencode_arg(i) for i in arg]
|
||||
@@ -542,26 +551,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
|
||||
|
||||
def creatorForNetloc(self, hostname, port):
|
||||
return self
|
||||
|
||||
|
||||
class FileBodyProducer(TwistedFileBodyProducer):
|
||||
"""Workaround for https://twistedmatrix.com/trac/ticket/8473
|
||||
|
||||
We override the pauseProducing and resumeProducing methods in twisted's
|
||||
FileBodyProducer so that they do not raise exceptions if the task has
|
||||
already completed.
|
||||
"""
|
||||
|
||||
def pauseProducing(self):
|
||||
try:
|
||||
super(FileBodyProducer, self).pauseProducing()
|
||||
except task.TaskDone:
|
||||
# task has already completed
|
||||
pass
|
||||
|
||||
def resumeProducing(self):
|
||||
try:
|
||||
super(FileBodyProducer, self).resumeProducing()
|
||||
except task.NotPaused:
|
||||
# task was not paused (probably because it had already completed)
|
||||
pass
|
||||
|
||||
@@ -17,19 +17,19 @@ import cgi
|
||||
import logging
|
||||
import random
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
from six import string_types
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from six import PY3, string_types
|
||||
from six.moves import urllib
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json
|
||||
from prometheus_client import Counter
|
||||
from signedjson.sign import sign_json
|
||||
|
||||
from twisted.internet import defer, protocol, reactor
|
||||
from twisted.internet import defer, protocol
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.web.client import Agent, HTTPConnectionPool, readBody
|
||||
from twisted.web.client import Agent, HTTPConnectionPool
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
import synapse.metrics
|
||||
@@ -40,11 +40,11 @@ from synapse.api.errors import (
|
||||
HttpResponseException,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.http import cancelled_to_request_timed_out_error
|
||||
from synapse.http.endpoint import matrix_federation_endpoint
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.async_helpers import add_timeout_to_deferred
|
||||
from synapse.util.async_helpers import timeout_no_seriously
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
outbound_logger = logging.getLogger("synapse.http.outbound")
|
||||
@@ -58,16 +58,22 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
|
||||
MAX_LONG_RETRIES = 10
|
||||
MAX_SHORT_RETRIES = 3
|
||||
|
||||
if PY3:
|
||||
MAXINT = sys.maxsize
|
||||
else:
|
||||
MAXINT = sys.maxint
|
||||
|
||||
|
||||
class MatrixFederationEndpointFactory(object):
|
||||
def __init__(self, hs):
|
||||
self.reactor = hs.get_reactor()
|
||||
self.tls_client_options_factory = hs.tls_client_options_factory
|
||||
|
||||
def endpointForURI(self, uri):
|
||||
destination = uri.netloc
|
||||
destination = uri.netloc.decode('ascii')
|
||||
|
||||
return matrix_federation_endpoint(
|
||||
reactor, destination, timeout=10,
|
||||
self.reactor, destination, timeout=10,
|
||||
tls_client_options_factory=self.tls_client_options_factory
|
||||
)
|
||||
|
||||
@@ -85,7 +91,9 @@ class MatrixFederationHttpClient(object):
|
||||
self.hs = hs
|
||||
self.signing_key = hs.config.signing_key[0]
|
||||
self.server_name = hs.hostname
|
||||
reactor = hs.get_reactor()
|
||||
pool = HTTPConnectionPool(reactor)
|
||||
pool.retryAutomatically = False
|
||||
pool.maxPersistentPerHost = 5
|
||||
pool.cachedConnectionTimeout = 2 * 60
|
||||
self.agent = Agent.usingEndpointFactory(
|
||||
@@ -93,26 +101,33 @@ class MatrixFederationHttpClient(object):
|
||||
)
|
||||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
self.version_string = hs.version_string
|
||||
self.version_string = hs.version_string.encode('ascii')
|
||||
self._next_id = 1
|
||||
self.default_timeout = 60
|
||||
|
||||
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
|
||||
return urlparse.urlunparse(
|
||||
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
|
||||
return urllib.parse.urlunparse(
|
||||
(b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _request(self, destination, method, path,
|
||||
body_callback, headers_dict={}, param_bytes=b"",
|
||||
query_bytes=b"", retry_on_dns_fail=True,
|
||||
json=None, json_callback=None,
|
||||
param_bytes=b"",
|
||||
query=None, retry_on_dns_fail=True,
|
||||
timeout=None, long_retries=False,
|
||||
ignore_backoff=False,
|
||||
backoff_on_404=False):
|
||||
""" Creates and sends a request to the given server
|
||||
"""
|
||||
Creates and sends a request to the given server.
|
||||
|
||||
Args:
|
||||
destination (str): The remote server to send the HTTP request to.
|
||||
method (str): HTTP method
|
||||
path (str): The HTTP path
|
||||
json (dict or None): JSON to send in the body.
|
||||
json_callback (func or None): A callback to generate the JSON.
|
||||
query (dict or None): Query arguments.
|
||||
ignore_backoff (bool): true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
backoff_on_404 (bool): Back off if we get a 404
|
||||
@@ -132,8 +147,13 @@ class MatrixFederationHttpClient(object):
|
||||
(May also fail with plenty of other Exceptions for things like DNS
|
||||
failures, connection failures, SSL failures.)
|
||||
"""
|
||||
if timeout:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
if (
|
||||
self.hs.config.federation_domain_whitelist and
|
||||
self.hs.config.federation_domain_whitelist is not None and
|
||||
destination not in self.hs.config.federation_domain_whitelist
|
||||
):
|
||||
raise FederationDeniedError(destination)
|
||||
@@ -146,23 +166,25 @@ class MatrixFederationHttpClient(object):
|
||||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
||||
destination = destination.encode("ascii")
|
||||
headers_dict = {}
|
||||
path_bytes = path.encode("ascii")
|
||||
with limiter:
|
||||
headers_dict[b"User-Agent"] = [self.version_string]
|
||||
headers_dict[b"Host"] = [destination]
|
||||
if query:
|
||||
query_bytes = encode_query_args(query)
|
||||
else:
|
||||
query_bytes = b""
|
||||
|
||||
url_bytes = self._create_url(
|
||||
destination, path_bytes, param_bytes, query_bytes
|
||||
)
|
||||
headers_dict = {
|
||||
"User-Agent": [self.version_string],
|
||||
"Host": [destination],
|
||||
}
|
||||
|
||||
with limiter:
|
||||
url = self._create_url(
|
||||
destination.encode("ascii"), path_bytes, param_bytes, query_bytes
|
||||
).decode('ascii')
|
||||
|
||||
txn_id = "%s-O-%s" % (method, self._next_id)
|
||||
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
|
||||
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url_bytes
|
||||
)
|
||||
self._next_id = (self._next_id + 1) % (MAXINT - 1)
|
||||
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
# (once we have reliable transactions in place)
|
||||
@@ -171,80 +193,110 @@ class MatrixFederationHttpClient(object):
|
||||
else:
|
||||
retries_left = MAX_SHORT_RETRIES
|
||||
|
||||
http_url_bytes = urlparse.urlunparse(
|
||||
("", "", path_bytes, param_bytes, query_bytes, "")
|
||||
)
|
||||
http_url = urllib.parse.urlunparse(
|
||||
(b"", b"", path_bytes, param_bytes, query_bytes, b"")
|
||||
).decode('ascii')
|
||||
|
||||
log_result = None
|
||||
try:
|
||||
while True:
|
||||
producer = None
|
||||
if body_callback:
|
||||
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||
while True:
|
||||
try:
|
||||
if json_callback:
|
||||
json = json_callback()
|
||||
|
||||
try:
|
||||
request_deferred = self.agent.request(
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
add_timeout_to_deferred(
|
||||
request_deferred,
|
||||
timeout / 1000. if timeout else 60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
if json:
|
||||
data = encode_canonical_json(json)
|
||||
headers_dict["Content-Type"] = ["application/json"]
|
||||
self.sign_request(
|
||||
destination, method, http_url, headers_dict, json
|
||||
)
|
||||
else:
|
||||
data = None
|
||||
self.sign_request(destination, method, http_url, headers_dict)
|
||||
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url
|
||||
)
|
||||
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
url,
|
||||
headers=Headers(headers_dict),
|
||||
data=data,
|
||||
agent=self.agent,
|
||||
reactor=self.hs.get_reactor(),
|
||||
unbuffered=True
|
||||
)
|
||||
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
|
||||
|
||||
# Sometimes the timeout above doesn't work, so lets hack yet
|
||||
# another layer of timeouts in in the vain hope that at some
|
||||
# point the world made sense and this really really really
|
||||
# should work.
|
||||
request_deferred = timeout_no_seriously(
|
||||
request_deferred,
|
||||
timeout=_sec_timeout * 2,
|
||||
reactor=self.hs.get_reactor(),
|
||||
)
|
||||
|
||||
with Measure(self.clock, "outbound_request"):
|
||||
response = yield make_deferred_yieldable(
|
||||
request_deferred,
|
||||
)
|
||||
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
e
|
||||
)
|
||||
log_result = "DNS Lookup failed to %s with %s" % (
|
||||
destination, e
|
||||
)
|
||||
raise
|
||||
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"{%s} Sending request failed to %s: %s %s: %s",
|
||||
txn_id,
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
method,
|
||||
url_bytes,
|
||||
_flatten_response_never_received(e),
|
||||
e
|
||||
)
|
||||
log_result = "DNS Lookup failed to %s with %s" % (
|
||||
destination, e
|
||||
)
|
||||
raise
|
||||
|
||||
logger.warn(
|
||||
"{%s} Sending request failed to %s: %s %s: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
method,
|
||||
url,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
|
||||
log_result = _flatten_response_never_received(e)
|
||||
|
||||
if retries_left and not timeout:
|
||||
if long_retries:
|
||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
||||
delay = min(delay, 60)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
else:
|
||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
||||
delay = min(delay, 2)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
|
||||
logger.debug(
|
||||
"{%s} Waiting %s before sending to %s...",
|
||||
txn_id,
|
||||
delay,
|
||||
destination
|
||||
)
|
||||
|
||||
log_result = _flatten_response_never_received(e)
|
||||
|
||||
if retries_left and not timeout:
|
||||
if long_retries:
|
||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
||||
delay = min(delay, 60)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
else:
|
||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
||||
delay = min(delay, 2)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
|
||||
yield self.clock.sleep(delay)
|
||||
retries_left -= 1
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Result: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
log_result,
|
||||
)
|
||||
yield self.clock.sleep(delay)
|
||||
retries_left -= 1
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Result: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
log_result,
|
||||
)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
@@ -252,7 +304,9 @@ class MatrixFederationHttpClient(object):
|
||||
# :'(
|
||||
# Update transactions table?
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
d = treq.content(response)
|
||||
d.addTimeout(_sec_timeout, self.hs.get_reactor())
|
||||
body = yield make_deferred_yieldable(d)
|
||||
raise HttpResponseException(
|
||||
response.code, response.phrase, body
|
||||
)
|
||||
@@ -297,11 +351,11 @@ class MatrixFederationHttpClient(object):
|
||||
auth_headers = []
|
||||
|
||||
for key, sig in request["signatures"][self.server_name].items():
|
||||
auth_headers.append(bytes(
|
||||
auth_headers.append((
|
||||
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
|
||||
self.server_name, key, sig,
|
||||
)
|
||||
))
|
||||
)).encode('ascii')
|
||||
)
|
||||
|
||||
headers_dict[b"Authorization"] = auth_headers
|
||||
|
||||
@@ -347,24 +401,14 @@ class MatrixFederationHttpClient(object):
|
||||
"""
|
||||
|
||||
if not json_data_callback:
|
||||
def json_data_callback():
|
||||
return data
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
json_data = json_data_callback()
|
||||
self.sign_request(
|
||||
destination, method, url_bytes, headers_dict, json_data
|
||||
)
|
||||
producer = _JsonProducer(json_data)
|
||||
return producer
|
||||
json_data_callback = lambda: data
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"PUT",
|
||||
path,
|
||||
body_callback=body_callback,
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query_bytes=encode_query_args(args),
|
||||
json_callback=json_data_callback,
|
||||
query=args,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
@@ -376,8 +420,10 @@ class MatrixFederationHttpClient(object):
|
||||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
defer.returnValue(json.loads(body))
|
||||
d = treq.json_content(response)
|
||||
d.addTimeout(self.default_timeout, self.hs.get_reactor())
|
||||
body = yield make_deferred_yieldable(d)
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_json(self, destination, path, data={}, long_retries=False,
|
||||
@@ -410,20 +456,12 @@ class MatrixFederationHttpClient(object):
|
||||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(
|
||||
destination, method, url_bytes, headers_dict, data
|
||||
)
|
||||
return _JsonProducer(data)
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"POST",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
body_callback=body_callback,
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query=args,
|
||||
json=data,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
@@ -434,9 +472,16 @@ class MatrixFederationHttpClient(object):
|
||||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
d = treq.json_content(response)
|
||||
if timeout:
|
||||
_sec_timeout = timeout / 1000
|
||||
else:
|
||||
_sec_timeout = self.default_timeout
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
d.addTimeout(_sec_timeout, self.hs.get_reactor())
|
||||
body = yield make_deferred_yieldable(d)
|
||||
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
|
||||
@@ -471,16 +516,11 @@ class MatrixFederationHttpClient(object):
|
||||
|
||||
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(destination, method, url_bytes, headers_dict)
|
||||
return None
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"GET",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
body_callback=body_callback,
|
||||
query=args,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
@@ -491,9 +531,11 @@ class MatrixFederationHttpClient(object):
|
||||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
d = treq.json_content(response)
|
||||
d.addTimeout(self.default_timeout, self.hs.get_reactor())
|
||||
body = yield make_deferred_yieldable(d)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_json(self, destination, path, long_retries=False,
|
||||
@@ -523,13 +565,11 @@ class MatrixFederationHttpClient(object):
|
||||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"DELETE",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query=args,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
@@ -540,9 +580,11 @@ class MatrixFederationHttpClient(object):
|
||||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
d = treq.json_content(response)
|
||||
d.addTimeout(self.default_timeout, self.hs.get_reactor())
|
||||
body = yield make_deferred_yieldable(d)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_file(self, destination, path, output_stream, args={},
|
||||
@@ -569,26 +611,11 @@ class MatrixFederationHttpClient(object):
|
||||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
encoded_args = {}
|
||||
for k, vs in args.items():
|
||||
if isinstance(vs, string_types):
|
||||
vs = [vs]
|
||||
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
||||
|
||||
query_bytes = urllib.urlencode(encoded_args, True)
|
||||
logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(destination, method, url_bytes, headers_dict)
|
||||
return None
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"GET",
|
||||
path,
|
||||
query_bytes=query_bytes,
|
||||
body_callback=body_callback,
|
||||
query=args,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
@@ -597,9 +624,9 @@ class MatrixFederationHttpClient(object):
|
||||
|
||||
try:
|
||||
with logcontext.PreserveLoggingContext():
|
||||
length = yield _readBodyToFile(
|
||||
response, output_stream, max_size
|
||||
)
|
||||
d = _readBodyToFile(response, output_stream, max_size)
|
||||
d.addTimeout(self.default_timeout, self.hs.get_reactor())
|
||||
length = yield make_deferred_yieldable(d)
|
||||
except Exception:
|
||||
logger.exception("Failed to download body")
|
||||
raise
|
||||
@@ -639,30 +666,6 @@ def _readBodyToFile(response, stream, max_size):
|
||||
return d
|
||||
|
||||
|
||||
class _JsonProducer(object):
|
||||
""" Used by the twisted http client to create the HTTP body from json
|
||||
"""
|
||||
def __init__(self, jsn):
|
||||
self.reset(jsn)
|
||||
|
||||
def reset(self, jsn):
|
||||
self.body = encode_canonical_json(jsn)
|
||||
self.length = len(self.body)
|
||||
|
||||
def startProducing(self, consumer):
|
||||
consumer.write(self.body)
|
||||
return defer.succeed(None)
|
||||
|
||||
def pauseProducing(self):
|
||||
pass
|
||||
|
||||
def stopProducing(self):
|
||||
pass
|
||||
|
||||
def resumeProducing(self):
|
||||
pass
|
||||
|
||||
|
||||
def _flatten_response_never_received(e):
|
||||
if hasattr(e, "reasons"):
|
||||
reasons = ", ".join(
|
||||
@@ -693,7 +696,7 @@ def check_content_type_is_json(headers):
|
||||
"No Content-Type header"
|
||||
)
|
||||
|
||||
c_type = c_type[0] # only the first header
|
||||
c_type = c_type[0].decode('ascii') # only the first header
|
||||
val, options = cgi.parse_header(c_type)
|
||||
if val != "application/json":
|
||||
raise RuntimeError(
|
||||
@@ -711,6 +714,6 @@ def encode_query_args(args):
|
||||
vs = [vs]
|
||||
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
||||
|
||||
query_bytes = urllib.urlencode(encoded_args, True)
|
||||
query_bytes = urllib.parse.urlencode(encoded_args, True)
|
||||
|
||||
return query_bytes
|
||||
return query_bytes.encode('utf8')
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from prometheus_client.core import Counter, Histogram
|
||||
|
||||
@@ -111,6 +112,9 @@ in_flight_requests_db_sched_duration = Counter(
|
||||
# The set of all in flight requests, set[RequestMetrics]
|
||||
_in_flight_requests = set()
|
||||
|
||||
# Protects the _in_flight_requests set from concurrent accesss
|
||||
_in_flight_requests_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_in_flight_counts():
|
||||
"""Returns a count of all in flight requests by (method, server_name)
|
||||
@@ -120,7 +124,8 @@ def _get_in_flight_counts():
|
||||
"""
|
||||
# Cast to a list to prevent it changing while the Prometheus
|
||||
# thread is collecting metrics
|
||||
reqs = list(_in_flight_requests)
|
||||
with _in_flight_requests_lock:
|
||||
reqs = list(_in_flight_requests)
|
||||
|
||||
for rm in reqs:
|
||||
rm.update_metrics()
|
||||
@@ -154,10 +159,12 @@ class RequestMetrics(object):
|
||||
# to the "in flight" metrics.
|
||||
self._request_stats = self.start_context.get_resource_usage()
|
||||
|
||||
_in_flight_requests.add(self)
|
||||
with _in_flight_requests_lock:
|
||||
_in_flight_requests.add(self)
|
||||
|
||||
def stop(self, time_sec, request):
|
||||
_in_flight_requests.discard(self)
|
||||
with _in_flight_requests_lock:
|
||||
_in_flight_requests.discard(self)
|
||||
|
||||
context = LoggingContext.current_context()
|
||||
|
||||
|
||||
@@ -25,8 +25,9 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python import failure
|
||||
from twisted.web import resource, server
|
||||
from twisted.web import resource
|
||||
from twisted.web.server import NOT_DONE_YET
|
||||
from twisted.web.static import NoRangeStaticProducer
|
||||
from twisted.web.util import redirectTo
|
||||
|
||||
import synapse.events
|
||||
@@ -37,10 +38,13 @@ from synapse.api.errors import (
|
||||
SynapseError,
|
||||
UnrecognizedRequestError,
|
||||
)
|
||||
from synapse.http.request_metrics import requests_counter
|
||||
from synapse.util.caches import intern_dict
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
|
||||
if PY3:
|
||||
from io import BytesIO
|
||||
else:
|
||||
from cStringIO import StringIO as BytesIO
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -60,11 +64,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
|
||||
def wrap_json_request_handler(h):
|
||||
"""Wraps a request handler method with exception handling.
|
||||
|
||||
Also adds logging as per wrap_request_handler_with_logging.
|
||||
Also does the wrapping with request.processing as per wrap_async_request_handler.
|
||||
|
||||
The handler method must have a signature of "handle_foo(self, request)",
|
||||
where "self" must have a "clock" attribute (and "request" must be a
|
||||
SynapseRequest).
|
||||
where "request" must be a SynapseRequest.
|
||||
|
||||
The handler must return a deferred. If the deferred succeeds we assume that
|
||||
a response has been sent. If the deferred fails with a SynapseError we use
|
||||
@@ -108,24 +111,23 @@ def wrap_json_request_handler(h):
|
||||
pretty_print=_request_user_agent_is_curl(request),
|
||||
)
|
||||
|
||||
return wrap_request_handler_with_logging(wrapped_request_handler)
|
||||
return wrap_async_request_handler(wrapped_request_handler)
|
||||
|
||||
|
||||
def wrap_html_request_handler(h):
|
||||
"""Wraps a request handler method with exception handling.
|
||||
|
||||
Also adds logging as per wrap_request_handler_with_logging.
|
||||
Also does the wrapping with request.processing as per wrap_async_request_handler.
|
||||
|
||||
The handler method must have a signature of "handle_foo(self, request)",
|
||||
where "self" must have a "clock" attribute (and "request" must be a
|
||||
SynapseRequest).
|
||||
where "request" must be a SynapseRequest.
|
||||
"""
|
||||
def wrapped_request_handler(self, request):
|
||||
d = defer.maybeDeferred(h, self, request)
|
||||
d.addErrback(_return_html_error, request)
|
||||
return d
|
||||
|
||||
return wrap_request_handler_with_logging(wrapped_request_handler)
|
||||
return wrap_async_request_handler(wrapped_request_handler)
|
||||
|
||||
|
||||
def _return_html_error(f, request):
|
||||
@@ -170,46 +172,26 @@ def _return_html_error(f, request):
|
||||
finish_request(request)
|
||||
|
||||
|
||||
def wrap_request_handler_with_logging(h):
|
||||
"""Wraps a request handler to provide logging and metrics
|
||||
def wrap_async_request_handler(h):
|
||||
"""Wraps an async request handler so that it calls request.processing.
|
||||
|
||||
This helps ensure that work done by the request handler after the request is completed
|
||||
is correctly recorded against the request metrics/logs.
|
||||
|
||||
The handler method must have a signature of "handle_foo(self, request)",
|
||||
where "self" must have a "clock" attribute (and "request" must be a
|
||||
SynapseRequest).
|
||||
where "request" must be a SynapseRequest.
|
||||
|
||||
As well as calling `request.processing` (which will log the response and
|
||||
duration for this request), the wrapped request handler will insert the
|
||||
request id into the logging context.
|
||||
The handler may return a deferred, in which case the completion of the request isn't
|
||||
logged until the deferred completes.
|
||||
"""
|
||||
@defer.inlineCallbacks
|
||||
def wrapped_request_handler(self, request):
|
||||
"""
|
||||
Args:
|
||||
self:
|
||||
request (synapse.http.site.SynapseRequest):
|
||||
"""
|
||||
def wrapped_async_request_handler(self, request):
|
||||
with request.processing():
|
||||
yield h(self, request)
|
||||
|
||||
request_id = request.get_request_id()
|
||||
with LoggingContext(request_id) as request_context:
|
||||
request_context.request = request_id
|
||||
with Measure(self.clock, "wrapped_request_handler"):
|
||||
# we start the request metrics timer here with an initial stab
|
||||
# at the servlet name. For most requests that name will be
|
||||
# JsonResource (or a subclass), and JsonResource._async_render
|
||||
# will update it once it picks a servlet.
|
||||
servlet_name = self.__class__.__name__
|
||||
with request.processing(servlet_name):
|
||||
with PreserveLoggingContext(request_context):
|
||||
d = defer.maybeDeferred(h, self, request)
|
||||
|
||||
# record the arrival of the request *after*
|
||||
# dispatching to the handler, so that the handler
|
||||
# can update the servlet name in the request
|
||||
# metrics
|
||||
requests_counter.labels(request.method,
|
||||
request.request_metrics.name).inc()
|
||||
yield d
|
||||
return wrapped_request_handler
|
||||
# we need to preserve_fn here, because the synchronous render method won't yield for
|
||||
# us (obviously)
|
||||
return preserve_fn(wrapped_async_request_handler)
|
||||
|
||||
|
||||
class HttpServer(object):
|
||||
@@ -272,7 +254,7 @@ class JsonResource(HttpServer, resource.Resource):
|
||||
""" This gets called by twisted every time someone sends us a request.
|
||||
"""
|
||||
self._async_render(request)
|
||||
return server.NOT_DONE_YET
|
||||
return NOT_DONE_YET
|
||||
|
||||
@wrap_json_request_handler
|
||||
@defer.inlineCallbacks
|
||||
@@ -413,8 +395,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
|
||||
return
|
||||
|
||||
if pretty_print:
|
||||
json_bytes = (encode_pretty_printed_json(json_object) + "\n"
|
||||
).encode("utf-8")
|
||||
json_bytes = encode_pretty_printed_json(json_object) + b"\n"
|
||||
else:
|
||||
if canonical_json or synapse.events.USE_FROZEN_DICTS:
|
||||
# canonicaljson already encodes to bytes
|
||||
@@ -450,8 +431,12 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
|
||||
if send_cors:
|
||||
set_cors_headers(request)
|
||||
|
||||
request.write(json_bytes)
|
||||
finish_request(request)
|
||||
# todo: we can almost certainly avoid this copy and encode the json straight into
|
||||
# the bytesIO, but it would involve faffing around with string->bytes wrappers.
|
||||
bytes_io = BytesIO(json_bytes)
|
||||
|
||||
producer = NoRangeStaticProducer(request, bytes_io)
|
||||
producer.start()
|
||||
return NOT_DONE_YET
|
||||
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ def parse_integer(request, name, default=None, required=False):
|
||||
|
||||
Args:
|
||||
request: the twisted HTTP request.
|
||||
name (str): the name of the query parameter.
|
||||
name (bytes/unicode): the name of the query parameter.
|
||||
default (int|None): value to use if the parameter is absent, defaults
|
||||
to None.
|
||||
required (bool): whether to raise a 400 SynapseError if the
|
||||
@@ -46,6 +46,10 @@ def parse_integer(request, name, default=None, required=False):
|
||||
|
||||
|
||||
def parse_integer_from_args(args, name, default=None, required=False):
|
||||
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('ascii')
|
||||
|
||||
if name in args:
|
||||
try:
|
||||
return int(args[name][0])
|
||||
@@ -65,7 +69,7 @@ def parse_boolean(request, name, default=None, required=False):
|
||||
|
||||
Args:
|
||||
request: the twisted HTTP request.
|
||||
name (str): the name of the query parameter.
|
||||
name (bytes/unicode): the name of the query parameter.
|
||||
default (bool|None): value to use if the parameter is absent, defaults
|
||||
to None.
|
||||
required (bool): whether to raise a 400 SynapseError if the
|
||||
@@ -83,11 +87,15 @@ def parse_boolean(request, name, default=None, required=False):
|
||||
|
||||
|
||||
def parse_boolean_from_args(args, name, default=None, required=False):
|
||||
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('ascii')
|
||||
|
||||
if name in args:
|
||||
try:
|
||||
return {
|
||||
"true": True,
|
||||
"false": False,
|
||||
b"true": True,
|
||||
b"false": False,
|
||||
}[args[name][0]]
|
||||
except Exception:
|
||||
message = (
|
||||
@@ -104,21 +112,29 @@ def parse_boolean_from_args(args, name, default=None, required=False):
|
||||
|
||||
|
||||
def parse_string(request, name, default=None, required=False,
|
||||
allowed_values=None, param_type="string"):
|
||||
"""Parse a string parameter from the request query string.
|
||||
allowed_values=None, param_type="string", encoding='ascii'):
|
||||
"""
|
||||
Parse a string parameter from the request query string.
|
||||
|
||||
If encoding is not None, the content of the query param will be
|
||||
decoded to Unicode using the encoding, otherwise it will be encoded
|
||||
|
||||
Args:
|
||||
request: the twisted HTTP request.
|
||||
name (str): the name of the query parameter.
|
||||
default (str|None): value to use if the parameter is absent, defaults
|
||||
to None.
|
||||
name (bytes/unicode): the name of the query parameter.
|
||||
default (bytes/unicode|None): value to use if the parameter is absent,
|
||||
defaults to None. Must be bytes if encoding is None.
|
||||
required (bool): whether to raise a 400 SynapseError if the
|
||||
parameter is absent, defaults to False.
|
||||
allowed_values (list[str]): List of allowed values for the string,
|
||||
or None if any value is allowed, defaults to None
|
||||
allowed_values (list[bytes/unicode]): List of allowed values for the
|
||||
string, or None if any value is allowed, defaults to None. Must be
|
||||
the same type as name, if given.
|
||||
encoding: The encoding to decode the name to, and decode the string
|
||||
content with.
|
||||
|
||||
Returns:
|
||||
str|None: A string value or the default.
|
||||
bytes/unicode|None: A string value or the default. Unicode if encoding
|
||||
was given, bytes otherwise.
|
||||
|
||||
Raises:
|
||||
SynapseError if the parameter is absent and required, or if the
|
||||
@@ -126,14 +142,22 @@ def parse_string(request, name, default=None, required=False,
|
||||
is not one of those allowed values.
|
||||
"""
|
||||
return parse_string_from_args(
|
||||
request.args, name, default, required, allowed_values, param_type,
|
||||
request.args, name, default, required, allowed_values, param_type, encoding
|
||||
)
|
||||
|
||||
|
||||
def parse_string_from_args(args, name, default=None, required=False,
|
||||
allowed_values=None, param_type="string"):
|
||||
allowed_values=None, param_type="string", encoding='ascii'):
|
||||
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('ascii')
|
||||
|
||||
if name in args:
|
||||
value = args[name][0]
|
||||
|
||||
if encoding:
|
||||
value = value.decode(encoding)
|
||||
|
||||
if allowed_values is not None and value not in allowed_values:
|
||||
message = "Query parameter %r must be one of [%s]" % (
|
||||
name, ", ".join(repr(v) for v in allowed_values)
|
||||
@@ -146,6 +170,10 @@ def parse_string_from_args(args, name, default=None, required=False,
|
||||
message = "Missing %s query parameter %r" % (param_type, name)
|
||||
raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
|
||||
else:
|
||||
|
||||
if encoding and isinstance(default, bytes):
|
||||
return default.decode(encoding)
|
||||
|
||||
return default
|
||||
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
@@ -19,8 +18,8 @@ import time
|
||||
from twisted.web.server import Request, Site
|
||||
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.request_metrics import RequestMetrics
|
||||
from synapse.util.logcontext import ContextResourceUsage, LoggingContext
|
||||
from synapse.http.request_metrics import RequestMetrics, requests_counter
|
||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -34,25 +33,43 @@ class SynapseRequest(Request):
|
||||
|
||||
It extends twisted's twisted.web.server.Request, and adds:
|
||||
* Unique request ID
|
||||
* A log context associated with the request
|
||||
* Redaction of access_token query-params in __repr__
|
||||
* Logging at start and end
|
||||
* Metrics to record CPU, wallclock and DB time by endpoint.
|
||||
|
||||
It provides a method `processing` which should be called by the Resource
|
||||
which is handling the request, and returns a context manager.
|
||||
It also provides a method `processing`, which returns a context manager. If this
|
||||
method is called, the request won't be logged until the context manager is closed;
|
||||
this is useful for asynchronous request handlers which may go on processing the
|
||||
request even after the client has disconnected.
|
||||
|
||||
Attributes:
|
||||
logcontext(LoggingContext) : the log context for this request
|
||||
"""
|
||||
def __init__(self, site, channel, *args, **kw):
|
||||
Request.__init__(self, channel, *args, **kw)
|
||||
self.site = site
|
||||
self._channel = channel
|
||||
self._channel = channel # this is used by the tests
|
||||
self.authenticated_entity = None
|
||||
self.start_time = 0
|
||||
|
||||
# we can't yet create the logcontext, as we don't know the method.
|
||||
self.logcontext = None
|
||||
|
||||
global _next_request_seq
|
||||
self.request_seq = _next_request_seq
|
||||
_next_request_seq += 1
|
||||
|
||||
# whether an asynchronous request handler has called processing()
|
||||
self._is_processing = False
|
||||
|
||||
# the time when the asynchronous request handler completed its processing
|
||||
self._processing_finished_time = None
|
||||
|
||||
# what time we finished sending the response to the client (or the connection
|
||||
# dropped)
|
||||
self.finish_time = None
|
||||
|
||||
def __repr__(self):
|
||||
# We overwrite this so that we don't log ``access_token``
|
||||
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
|
||||
@@ -68,44 +85,166 @@ class SynapseRequest(Request):
|
||||
return "%s-%i" % (self.method, self.request_seq)
|
||||
|
||||
def get_redacted_uri(self):
|
||||
return redact_uri(self.uri)
|
||||
uri = self.uri
|
||||
if isinstance(uri, bytes):
|
||||
uri = self.uri.decode('ascii')
|
||||
return redact_uri(uri)
|
||||
|
||||
def get_user_agent(self):
|
||||
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
|
||||
|
||||
def render(self, resrc):
|
||||
# this is called once a Resource has been found to serve the request; in our
|
||||
# case the Resource in question will normally be a JsonResource.
|
||||
|
||||
# create a LogContext for this request
|
||||
request_id = self.get_request_id()
|
||||
logcontext = self.logcontext = LoggingContext(request_id)
|
||||
logcontext.request = request_id
|
||||
|
||||
# override the Server header which is set by twisted
|
||||
self.setHeader("Server", self.site.server_version_string)
|
||||
return Request.render(self, resrc)
|
||||
|
||||
with PreserveLoggingContext(self.logcontext):
|
||||
# we start the request metrics timer here with an initial stab
|
||||
# at the servlet name. For most requests that name will be
|
||||
# JsonResource (or a subclass), and JsonResource._async_render
|
||||
# will update it once it picks a servlet.
|
||||
servlet_name = resrc.__class__.__name__
|
||||
self._started_processing(servlet_name)
|
||||
|
||||
Request.render(self, resrc)
|
||||
|
||||
# record the arrival of the request *after*
|
||||
# dispatching to the handler, so that the handler
|
||||
# can update the servlet name in the request
|
||||
# metrics
|
||||
requests_counter.labels(self.method,
|
||||
self.request_metrics.name).inc()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def processing(self):
|
||||
"""Record the fact that we are processing this request.
|
||||
|
||||
Returns a context manager; the correct way to use this is:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(request):
|
||||
with request.processing("FooServlet"):
|
||||
yield really_handle_the_request()
|
||||
|
||||
Once the context manager is closed, the completion of the request will be logged,
|
||||
and the various metrics will be updated.
|
||||
"""
|
||||
if self._is_processing:
|
||||
raise RuntimeError("Request is already processing")
|
||||
self._is_processing = True
|
||||
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
# this should already have been caught, and sent back to the client as a 500.
|
||||
logger.exception("Asynchronous messge handler raised an uncaught exception")
|
||||
finally:
|
||||
# the request handler has finished its work and either sent the whole response
|
||||
# back, or handed over responsibility to a Producer.
|
||||
|
||||
self._processing_finished_time = time.time()
|
||||
self._is_processing = False
|
||||
|
||||
# if we've already sent the response, log it now; otherwise, we wait for the
|
||||
# response to be sent.
|
||||
if self.finish_time is not None:
|
||||
self._finished_processing()
|
||||
|
||||
def finish(self):
|
||||
"""Called when all response data has been written to this Request.
|
||||
|
||||
Overrides twisted.web.server.Request.finish to record the finish time and do
|
||||
logging.
|
||||
"""
|
||||
self.finish_time = time.time()
|
||||
Request.finish(self)
|
||||
if not self._is_processing:
|
||||
with PreserveLoggingContext(self.logcontext):
|
||||
self._finished_processing()
|
||||
|
||||
def connectionLost(self, reason):
|
||||
"""Called when the client connection is closed before the response is written.
|
||||
|
||||
Overrides twisted.web.server.Request.connectionLost to record the finish time and
|
||||
do logging.
|
||||
"""
|
||||
self.finish_time = time.time()
|
||||
Request.connectionLost(self, reason)
|
||||
|
||||
# we only get here if the connection to the client drops before we send
|
||||
# the response.
|
||||
#
|
||||
# It's useful to log it here so that we can get an idea of when
|
||||
# the client disconnects.
|
||||
with PreserveLoggingContext(self.logcontext):
|
||||
logger.warn(
|
||||
"Error processing request %r: %s %s", self, reason.type, reason.value,
|
||||
)
|
||||
|
||||
if not self._is_processing:
|
||||
self._finished_processing()
|
||||
|
||||
def _started_processing(self, servlet_name):
|
||||
"""Record the fact that we are processing this request.
|
||||
|
||||
This will log the request's arrival. Once the request completes,
|
||||
be sure to call finished_processing.
|
||||
|
||||
Args:
|
||||
servlet_name (str): the name of the servlet which will be
|
||||
processing this request. This is used in the metrics.
|
||||
|
||||
It is possible to update this afterwards by updating
|
||||
self.request_metrics.name.
|
||||
"""
|
||||
self.start_time = time.time()
|
||||
self.request_metrics = RequestMetrics()
|
||||
self.request_metrics.start(
|
||||
self.start_time, name=servlet_name, method=self.method,
|
||||
self.start_time, name=servlet_name, method=self.method.decode('ascii'),
|
||||
)
|
||||
|
||||
self.site.access_logger.info(
|
||||
"%s - %s - Received request: %s %s",
|
||||
self.getClientIP(),
|
||||
self.site.site_tag,
|
||||
self.method,
|
||||
self.method.decode('ascii'),
|
||||
self.get_redacted_uri()
|
||||
)
|
||||
|
||||
def _finished_processing(self):
|
||||
try:
|
||||
context = LoggingContext.current_context()
|
||||
usage = context.get_resource_usage()
|
||||
except Exception:
|
||||
usage = ContextResourceUsage()
|
||||
"""Log the completion of this request and update the metrics
|
||||
"""
|
||||
|
||||
end_time = time.time()
|
||||
if self.logcontext is None:
|
||||
# this can happen if the connection closed before we read the
|
||||
# headers (so render was never called). In that case we'll already
|
||||
# have logged a warning, so just bail out.
|
||||
return
|
||||
|
||||
usage = self.logcontext.get_resource_usage()
|
||||
|
||||
if self._processing_finished_time is None:
|
||||
# we completed the request without anything calling processing()
|
||||
self._processing_finished_time = time.time()
|
||||
|
||||
# the time between receiving the request and the request handler finishing
|
||||
processing_time = self._processing_finished_time - self.start_time
|
||||
|
||||
# the time between the request handler finishing and the response being sent
|
||||
# to the client (nb may be negative)
|
||||
response_send_time = self.finish_time - self._processing_finished_time
|
||||
|
||||
# need to decode as it could be raw utf-8 bytes
|
||||
# from a IDN servname in an auth header
|
||||
authenticated_entity = self.authenticated_entity
|
||||
if authenticated_entity is not None:
|
||||
if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
|
||||
authenticated_entity = authenticated_entity.decode("utf-8", "replace")
|
||||
|
||||
# ...or could be raw utf-8 bytes in the User-Agent header.
|
||||
@@ -116,22 +255,31 @@ class SynapseRequest(Request):
|
||||
user_agent = self.get_user_agent()
|
||||
if user_agent is not None:
|
||||
user_agent = user_agent.decode("utf-8", "replace")
|
||||
else:
|
||||
user_agent = "-"
|
||||
|
||||
code = str(self.code)
|
||||
if not self.finished:
|
||||
# we didn't send the full response before we gave up (presumably because
|
||||
# the connection dropped)
|
||||
code += "!"
|
||||
|
||||
self.site.access_logger.info(
|
||||
"%s - %s - {%s}"
|
||||
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
|
||||
" Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
|
||||
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
|
||||
self.getClientIP(),
|
||||
self.site.site_tag,
|
||||
authenticated_entity,
|
||||
end_time - self.start_time,
|
||||
processing_time,
|
||||
response_send_time,
|
||||
usage.ru_utime,
|
||||
usage.ru_stime,
|
||||
usage.db_sched_duration_sec,
|
||||
usage.db_txn_duration_sec,
|
||||
int(usage.db_txn_count),
|
||||
self.sentLength,
|
||||
self.code,
|
||||
code,
|
||||
self.method,
|
||||
self.get_redacted_uri(),
|
||||
self.clientproto,
|
||||
@@ -140,38 +288,10 @@ class SynapseRequest(Request):
|
||||
)
|
||||
|
||||
try:
|
||||
self.request_metrics.stop(end_time, self)
|
||||
self.request_metrics.stop(self.finish_time, self)
|
||||
except Exception as e:
|
||||
logger.warn("Failed to stop metrics: %r", e)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def processing(self, servlet_name):
|
||||
"""Record the fact that we are processing this request.
|
||||
|
||||
Returns a context manager; the correct way to use this is:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(request):
|
||||
with request.processing("FooServlet"):
|
||||
yield really_handle_the_request()
|
||||
|
||||
This will log the request's arrival. Once the context manager is
|
||||
closed, the completion of the request will be logged, and the various
|
||||
metrics will be updated.
|
||||
|
||||
Args:
|
||||
servlet_name (str): the name of the servlet which will be
|
||||
processing this request. This is used in the metrics.
|
||||
|
||||
It is possible to update this afterwards by updating
|
||||
self.request_metrics.servlet_name.
|
||||
"""
|
||||
# TODO: we should probably just move this into render() and finish(),
|
||||
# to save having to call a separate method.
|
||||
self._started_processing(servlet_name)
|
||||
yield
|
||||
self._finished_processing()
|
||||
|
||||
|
||||
class XForwardedForRequest(SynapseRequest):
|
||||
def __init__(self, *args, **kw):
|
||||
@@ -217,7 +337,7 @@ class SynapseSite(Site):
|
||||
proxied = config.get("x_forwarded", False)
|
||||
self.requestFactory = SynapseRequestFactory(self, proxied)
|
||||
self.access_logger = logging.getLogger(logger_name)
|
||||
self.server_version_string = server_version_string
|
||||
self.server_version_string = server_version_string.encode('ascii')
|
||||
|
||||
def log(self, request):
|
||||
pass
|
||||
|
||||
@@ -18,8 +18,11 @@ import gc
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import threading
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
import attr
|
||||
from prometheus_client import Counter, Gauge, Histogram
|
||||
from prometheus_client.core import REGISTRY, GaugeMetricFamily
|
||||
@@ -68,7 +71,7 @@ class LaterGauge(object):
|
||||
return
|
||||
|
||||
if isinstance(calls, dict):
|
||||
for k, v in calls.items():
|
||||
for k, v in six.iteritems(calls):
|
||||
g.add_metric(k, v)
|
||||
else:
|
||||
g.add_metric([], calls)
|
||||
@@ -87,6 +90,109 @@ class LaterGauge(object):
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
class InFlightGauge(object):
|
||||
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
|
||||
at any given time.
|
||||
|
||||
Each InFlightGauge will create a metric called `<name>_total` that counts
|
||||
the number of in flight blocks, as well as a metrics for each item in the
|
||||
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
|
||||
callbacks.
|
||||
|
||||
Args:
|
||||
name (str)
|
||||
desc (str)
|
||||
labels (list[str])
|
||||
sub_metrics (list[str]): A list of sub metrics that the callbacks
|
||||
will update.
|
||||
"""
|
||||
|
||||
def __init__(self, name, desc, labels, sub_metrics):
|
||||
self.name = name
|
||||
self.desc = desc
|
||||
self.labels = labels
|
||||
self.sub_metrics = sub_metrics
|
||||
|
||||
# Create a class which have the sub_metrics values as attributes, which
|
||||
# default to 0 on initialization. Used to pass to registered callbacks.
|
||||
self._metrics_class = attr.make_class(
|
||||
"_MetricsEntry",
|
||||
attrs={x: attr.ib(0) for x in sub_metrics},
|
||||
slots=True,
|
||||
)
|
||||
|
||||
# Counts number of in flight blocks for a given set of label values
|
||||
self._registrations = {}
|
||||
|
||||
# Protects access to _registrations
|
||||
self._lock = threading.Lock()
|
||||
|
||||
self._register_with_collector()
|
||||
|
||||
def register(self, key, callback):
|
||||
"""Registers that we've entered a new block with labels `key`.
|
||||
|
||||
`callback` gets called each time the metrics are collected. The same
|
||||
value must also be given to `unregister`.
|
||||
|
||||
`callback` gets called with an object that has an attribute per
|
||||
sub_metric, which should be updated with the necessary values. Note that
|
||||
the metrics object is shared between all callbacks registered with the
|
||||
same key.
|
||||
|
||||
Note that `callback` may be called on a separate thread.
|
||||
"""
|
||||
with self._lock:
|
||||
self._registrations.setdefault(key, set()).add(callback)
|
||||
|
||||
def unregister(self, key, callback):
|
||||
"""Registers that we've exited a block with labels `key`.
|
||||
"""
|
||||
|
||||
with self._lock:
|
||||
self._registrations.setdefault(key, set()).discard(callback)
|
||||
|
||||
def collect(self):
|
||||
"""Called by prometheus client when it reads metrics.
|
||||
|
||||
Note: may be called by a separate thread.
|
||||
"""
|
||||
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
|
||||
|
||||
metrics_by_key = {}
|
||||
|
||||
# We copy so that we don't mutate the list while iterating
|
||||
with self._lock:
|
||||
keys = list(self._registrations)
|
||||
|
||||
for key in keys:
|
||||
with self._lock:
|
||||
callbacks = set(self._registrations[key])
|
||||
|
||||
in_flight.add_metric(key, len(callbacks))
|
||||
|
||||
metrics = self._metrics_class()
|
||||
metrics_by_key[key] = metrics
|
||||
for callback in callbacks:
|
||||
callback(metrics)
|
||||
|
||||
yield in_flight
|
||||
|
||||
for name in self.sub_metrics:
|
||||
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
|
||||
for key, metrics in six.iteritems(metrics_by_key):
|
||||
gauge.add_metric(key, getattr(metrics, name))
|
||||
yield gauge
|
||||
|
||||
def _register_with_collector(self):
|
||||
if self.name in all_gauges.keys():
|
||||
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||
|
||||
REGISTRY.register(self)
|
||||
all_gauges[self.name] = self
|
||||
|
||||
|
||||
#
|
||||
# Detailed CPU metrics
|
||||
#
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import threading
|
||||
|
||||
import six
|
||||
|
||||
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
|
||||
@@ -78,6 +80,9 @@ _background_process_counts = dict() # type: dict[str, int]
|
||||
# of process descriptions that no longer have any active processes.
|
||||
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
|
||||
|
||||
# A lock that covers the above dicts
|
||||
_bg_metrics_lock = threading.Lock()
|
||||
|
||||
|
||||
class _Collector(object):
|
||||
"""A custom metrics collector for the background process metrics.
|
||||
@@ -92,7 +97,11 @@ class _Collector(object):
|
||||
labels=["name"],
|
||||
)
|
||||
|
||||
for desc, processes in six.iteritems(_background_processes):
|
||||
# We copy the dict so that it doesn't change from underneath us
|
||||
with _bg_metrics_lock:
|
||||
_background_processes_copy = dict(_background_processes)
|
||||
|
||||
for desc, processes in six.iteritems(_background_processes_copy):
|
||||
background_process_in_flight_count.add_metric(
|
||||
(desc,), len(processes),
|
||||
)
|
||||
@@ -167,19 +176,26 @@ def run_as_background_process(desc, func, *args, **kwargs):
|
||||
"""
|
||||
@defer.inlineCallbacks
|
||||
def run():
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
_background_process_counts[desc] = count + 1
|
||||
with _bg_metrics_lock:
|
||||
count = _background_process_counts.get(desc, 0)
|
||||
_background_process_counts[desc] = count + 1
|
||||
|
||||
_background_process_start_count.labels(desc).inc()
|
||||
|
||||
with LoggingContext(desc) as context:
|
||||
context.request = "%s-%i" % (desc, count)
|
||||
proc = _BackgroundProcess(desc, context)
|
||||
_background_processes.setdefault(desc, set()).add(proc)
|
||||
|
||||
with _bg_metrics_lock:
|
||||
_background_processes.setdefault(desc, set()).add(proc)
|
||||
|
||||
try:
|
||||
yield func(*args, **kwargs)
|
||||
finally:
|
||||
proc.update_metrics()
|
||||
_background_processes[desc].remove(proc)
|
||||
|
||||
with _bg_metrics_lock:
|
||||
_background_processes[desc].remove(proc)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
return run()
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -26,6 +28,9 @@ from synapse.util.metrics import Measure
|
||||
|
||||
from . import push_rule_evaluator, push_tools
|
||||
|
||||
if six.PY3:
|
||||
long = int
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
|
||||
@@ -96,7 +101,7 @@ class HttpPusher(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
|
||||
yield self._process()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -17,10 +17,11 @@ import email.mime.multipart
|
||||
import email.utils
|
||||
import logging
|
||||
import time
|
||||
import urllib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
import bleach
|
||||
import jinja2
|
||||
|
||||
@@ -474,7 +475,7 @@ class Mailer(object):
|
||||
# XXX: make r0 once API is stable
|
||||
return "%s_matrix/client/unstable/pushers/remove?%s" % (
|
||||
self.hs.config.public_baseurl,
|
||||
urllib.urlencode(params),
|
||||
urllib.parse.urlencode(params),
|
||||
)
|
||||
|
||||
|
||||
@@ -561,7 +562,7 @@ def _create_mxc_to_http_filter(config):
|
||||
return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
|
||||
config.public_baseurl,
|
||||
serverAndMediaId,
|
||||
urllib.urlencode(params),
|
||||
urllib.parse.urlencode(params),
|
||||
fragment or "",
|
||||
)
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user