Compare commits
119 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 757205d718 | |||
| 6c582d7ccb | |||
| 4c13f2b282 | |||
| 9dbf42af8a | |||
| 5625abe503 | |||
| 60481031f2 | |||
| 7c0224d5c0 | |||
| f7ececb0ac | |||
| 39dbee2a3e | |||
| e7011280c7 | |||
| 4444b9a1b3 | |||
| 3b69bf3e74 | |||
| 92c1550f4a | |||
| c8fa620d7a | |||
| 73d552a05d | |||
| deca277d09 | |||
| b06f2947e4 | |||
| d7a692f860 | |||
| a13ad21abf | |||
| bc2c284dbe | |||
| 5798a134c0 | |||
| 71fc04069a | |||
| 3cdce28d3b | |||
| 6d97843793 | |||
| 7dc398586c | |||
| 81aa6d53b0 | |||
| 49ef8ec399 | |||
| dfb22fec48 | |||
| cc66cf1238 | |||
| a3f0635686 | |||
| 1196ee32b3 | |||
| a344ad3d3f | |||
| b9f1adc370 | |||
| 7ccc251415 | |||
| dfd10f5133 | |||
| 1af7866562 | |||
| 064143c130 | |||
| 324f21b216 | |||
| 10c1a233f9 | |||
| 44d3c2e80b | |||
| 07c267c516 | |||
| 62b1250629 | |||
| 11c4e506bd | |||
| 491eaf0808 | |||
| dd8e6020d8 | |||
| 99c88ac84e | |||
| 3b09a37682 | |||
| 91caa5b430 | |||
| bc754cdeed | |||
| c775f310e9 | |||
| 09cbc3a8e9 | |||
| 736ac58e11 | |||
| a6c102009e | |||
| 544ba2c2e9 | |||
| 81c5289c83 | |||
| 4b7bf2e413 | |||
| 1b959b6977 | |||
| c88a119259 | |||
| 5043ef801a | |||
| baeaf00a12 | |||
| 322ccac33f | |||
| ccb15a5bbe | |||
| f5b50d0871 | |||
| e7577427c9 | |||
| 7837a5f2ea | |||
| 1a7e6eb633 | |||
| 1ecd1a6a5f | |||
| d1e0b91083 | |||
| 62a1639287 | |||
| aefa76f5cd | |||
| c3d2bf2807 | |||
| c25137a99f | |||
| e8e3e033ee | |||
| 27d3fc421a | |||
| fbb758a7ce | |||
| e70f0081da | |||
| fe0ac98e66 | |||
| 7af5a63063 | |||
| c998f25006 | |||
| 4a2d2c2b6f | |||
| 9ba32f6573 | |||
| ffa5b757c7 | |||
| 971c980c6e | |||
| d9b8cf81be | |||
| 0fb5189072 | |||
| 80793e813c | |||
| ae38e0569f | |||
| 886eceba3e | |||
| 79252d1c83 | |||
| e8fc180d4d | |||
| 7b657f1148 | |||
| 1e4b4d85e7 | |||
| 62fb643cdc | |||
| 97cbc96093 | |||
| 5906be8589 | |||
| 18a4c03c50 | |||
| eafa8d3c54 | |||
| 977310ee27 | |||
| 981c6cf544 | |||
| 6a19f7e101 | |||
| 4a97eef0dc | |||
| b5573c0ffb | |||
| 1819563640 | |||
| e4cbea6c46 | |||
| 80a1c6e9e5 | |||
| d7675e79e1 | |||
| 8de9ebe35d | |||
| 29763f01c6 | |||
| 74f016d343 | |||
| 1f9df1cc7b | |||
| 7777d353bf | |||
| 502728777c | |||
| bb29bc2937 | |||
| 8374bcb0a8 | |||
| c03e3e8301 | |||
| f299c5414c | |||
| a3df04a899 | |||
| 2253b083d9 | |||
| 6fadb560fc |
@@ -6,6 +6,7 @@ services:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.5
|
||||
@@ -16,6 +17,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /app
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/app
|
||||
- ..:/src
|
||||
|
||||
@@ -6,6 +6,7 @@ services:
|
||||
image: postgres:11
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
@@ -16,6 +17,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /app
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/app
|
||||
- ..:/src
|
||||
|
||||
@@ -6,6 +6,7 @@ services:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
@@ -16,6 +17,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /app
|
||||
working_dir: /src
|
||||
volumes:
|
||||
- ..:/app
|
||||
- ..:/src
|
||||
|
||||
@@ -27,7 +27,7 @@ git config --global user.name "A robot"
|
||||
|
||||
# Fetch and merge. If it doesn't work, it will raise due to set -e.
|
||||
git fetch -u origin $GITBASE
|
||||
git merge --no-edit origin/$GITBASE
|
||||
git merge --no-edit --no-commit origin/$GITBASE
|
||||
|
||||
# Show what we are after.
|
||||
git --no-pager show -s
|
||||
|
||||
+82
-12
@@ -1,8 +1,7 @@
|
||||
env:
|
||||
CODECOV_TOKEN: "2dd7eb9b-0eda-45fe-a47c-9b5ac040045f"
|
||||
COVERALLS_REPO_TOKEN: wsJWOby6j0uCYFiCes3r0XauxO27mx8lD
|
||||
|
||||
steps:
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e check_codestyle"
|
||||
@@ -10,6 +9,7 @@ steps:
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
@@ -18,6 +18,7 @@ steps:
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
@@ -26,6 +27,7 @@ steps:
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
@@ -36,6 +38,7 @@ steps:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
propagate-environment: true
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
@@ -44,21 +47,35 @@ steps:
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e mypy"
|
||||
label: ":mypy: mypy"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.5"
|
||||
mount-buildkite-agent: false
|
||||
|
||||
- wait
|
||||
|
||||
|
||||
- command:
|
||||
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
|
||||
- "python3.5 -m pip install tox"
|
||||
- "tox -e py35-old,codecov"
|
||||
- "tox -e py35-old,combine"
|
||||
label: ":python: 3.5 / SQLite / Old Deps"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
LANG: "C.UTF-8"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
|
||||
image: "ubuntu:xenial" # We use xenial to get an old sqlite and python
|
||||
workdir: "/src"
|
||||
mount-buildkite-agent: false
|
||||
propagate-environment: true
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -68,14 +85,18 @@ steps:
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py35,codecov"
|
||||
- "tox -e py35,combine"
|
||||
label: ":python: 3.5 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.5"
|
||||
workdir: "/src"
|
||||
mount-buildkite-agent: false
|
||||
propagate-environment: true
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -85,14 +106,18 @@ steps:
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py36,codecov"
|
||||
- "tox -e py36,combine"
|
||||
label: ":python: 3.6 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
workdir: "/src"
|
||||
mount-buildkite-agent: false
|
||||
propagate-environment: true
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -102,14 +127,18 @@ steps:
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py37,codecov"
|
||||
- "tox -e py37,combine"
|
||||
label: ":python: 3.7 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.7"
|
||||
workdir: "/src"
|
||||
mount-buildkite-agent: false
|
||||
propagate-environment: true
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -123,12 +152,14 @@ steps:
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'"
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,combine'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py35.pg95.yaml
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -142,12 +173,14 @@ steps:
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,combine'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py37.pg95.yaml
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -161,12 +194,14 @@ steps:
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,combine'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py37.pg11.yaml
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -174,7 +209,6 @@ steps:
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
|
||||
- label: "SyTest - :python: 3.5 / SQLite / Monolith"
|
||||
agents:
|
||||
queue: "medium"
|
||||
@@ -187,6 +221,16 @@ steps:
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
entrypoint: ["/bin/sh", "-e", "-c"]
|
||||
mount-buildkite-agent: false
|
||||
volumes: ["./logs:/logs"]
|
||||
- artifacts#v1.2.0:
|
||||
upload: [ "logs/**/*.log", "logs/**/*.log.*", "logs/coverage.xml" ]
|
||||
- matrix-org/annotate:
|
||||
path: "logs/annotate.md"
|
||||
style: "error"
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -208,6 +252,16 @@ steps:
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
entrypoint: ["/bin/sh", "-e", "-c"]
|
||||
mount-buildkite-agent: false
|
||||
volumes: ["./logs:/logs"]
|
||||
- artifacts#v1.2.0:
|
||||
upload: [ "logs/**/*.log", "logs/**/*.log.*", "logs/coverage.xml" ]
|
||||
- matrix-org/annotate:
|
||||
path: "logs/annotate.md"
|
||||
style: "error"
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
@@ -232,9 +286,25 @@ steps:
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
entrypoint: ["/bin/sh", "-e", "-c"]
|
||||
mount-buildkite-agent: false
|
||||
volumes: ["./logs:/logs"]
|
||||
- artifacts#v1.2.0:
|
||||
upload: [ "logs/**/*.log", "logs/**/*.log.*", "logs/coverage.xml" ]
|
||||
- matrix-org/annotate:
|
||||
path: "logs/annotate.md"
|
||||
style: "error"
|
||||
- matrix-org/coveralls#v1.0:
|
||||
parallel: "true"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- wait: ~
|
||||
continue_on_failure: true
|
||||
|
||||
- label: Trigger webhook
|
||||
command: "curl -k https://coveralls.io/webhook?repo_token=$COVERALLS_REPO_TOKEN -d \"payload[build_num]=$BUILDKITE_BUILD_NUMBER&payload[status]=done\""
|
||||
|
||||
+2
-1
@@ -1,7 +1,8 @@
|
||||
[run]
|
||||
branch = True
|
||||
parallel = True
|
||||
include = synapse/*
|
||||
include=$TOP/synapse/*
|
||||
data_file = $TOP/.coverage
|
||||
|
||||
[report]
|
||||
precision = 2
|
||||
|
||||
+3
-2
@@ -20,6 +20,7 @@ _trial_temp*/
|
||||
/*.signing.key
|
||||
/env/
|
||||
/homeserver*.yaml
|
||||
/logs
|
||||
/media_store/
|
||||
/uploads
|
||||
|
||||
@@ -29,8 +30,9 @@ _trial_temp*/
|
||||
/.vscode/
|
||||
|
||||
# build products
|
||||
/.coverage*
|
||||
!/.coveragerc
|
||||
/.coverage*
|
||||
/.mypy_cache/
|
||||
/.tox
|
||||
/build/
|
||||
/coverage.*
|
||||
@@ -38,4 +40,3 @@ _trial_temp*/
|
||||
/docs/build/
|
||||
/htmlcov
|
||||
/pip-wheel-metadata/
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Lay the groundwork for structured logging output.
|
||||
@@ -0,0 +1 @@
|
||||
Add unstable support for MSC2197 (filtered search requests over federation), in order to allow upcoming room directory query performance improvements.
|
||||
@@ -0,0 +1 @@
|
||||
Correctly retry all hosts returned from SRV when we fail to connect.
|
||||
@@ -0,0 +1 @@
|
||||
Add admin API endpoint for setting whether or not a user is a server administrator.
|
||||
@@ -0,0 +1 @@
|
||||
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.
|
||||
@@ -0,0 +1 @@
|
||||
Add config option to sign remote key query responses with a separate key.
|
||||
@@ -0,0 +1 @@
|
||||
Switch to the v2 lookup API for 3PID invites.
|
||||
@@ -0,0 +1 @@
|
||||
Add support for config templating.
|
||||
@@ -0,0 +1 @@
|
||||
Users with the type of "support" or "bot" are no longer required to consent.
|
||||
@@ -0,0 +1 @@
|
||||
Let synctl accept a directory of config files.
|
||||
@@ -0,0 +1 @@
|
||||
Increase max display name size to 256.
|
||||
@@ -0,0 +1 @@
|
||||
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!
|
||||
@@ -0,0 +1 @@
|
||||
Add support for database engine-specific schema deltas, based on file extension.
|
||||
@@ -0,0 +1 @@
|
||||
Add admin API endpoint for getting whether or not a user is a server administrator.
|
||||
@@ -0,0 +1 @@
|
||||
Fix a cache-invalidation bug for worker-based deployments.
|
||||
@@ -0,0 +1 @@
|
||||
Update Buildkite pipeline to use plugins instead of buildkite-agent commands.
|
||||
@@ -0,0 +1 @@
|
||||
Add link in sample config to the logging config schema.
|
||||
+5
-5
@@ -17,7 +17,7 @@ By default, the image expects a single volume, located at ``/data``, that will h
|
||||
* the appservices configuration.
|
||||
|
||||
You are free to use separate volumes depending on storage endpoints at your
|
||||
disposal. For instance, ``/data/media`` coud be stored on a large but low
|
||||
disposal. For instance, ``/data/media`` could be stored on a large but low
|
||||
performance hdd storage while other files could be stored on high performance
|
||||
endpoints.
|
||||
|
||||
@@ -27,8 +27,8 @@ configuration file there. Multiple application services are supported.
|
||||
|
||||
## Generating a configuration file
|
||||
|
||||
The first step is to genearte a valid config file. To do this, you can run the
|
||||
image with the `generate` commandline option.
|
||||
The first step is to generate a valid config file. To do this, you can run the
|
||||
image with the `generate` command line option.
|
||||
|
||||
You will need to specify values for the `SYNAPSE_SERVER_NAME` and
|
||||
`SYNAPSE_REPORT_STATS` environment variable, and mount a docker volume to store
|
||||
@@ -59,7 +59,7 @@ The following environment variables are supported in `generate` mode:
|
||||
* `SYNAPSE_CONFIG_PATH`: path to the file to be generated. Defaults to
|
||||
`<SYNAPSE_CONFIG_DIR>/homeserver.yaml`.
|
||||
* `SYNAPSE_DATA_DIR`: where the generated config will put persistent data
|
||||
such as the datatase and media store. Defaults to `/data`.
|
||||
such as the database and media store. Defaults to `/data`.
|
||||
* `UID`, `GID`: the user id and group id to use for creating the data
|
||||
directories. Defaults to `991`, `991`.
|
||||
|
||||
@@ -115,7 +115,7 @@ not given).
|
||||
|
||||
To migrate from a dynamic configuration file to a static one, run the docker
|
||||
container once with the environment variables set, and `migrate_config`
|
||||
commandline option. For example:
|
||||
command line option. For example:
|
||||
|
||||
```
|
||||
docker run -it --rm \
|
||||
|
||||
@@ -84,3 +84,42 @@ with a body of:
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
|
||||
Get whether a user is a server administrator or not
|
||||
===================================================
|
||||
|
||||
|
||||
The api is::
|
||||
|
||||
GET /_synapse/admin/v1/users/<user_id>/admin
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
A response body like the following is returned:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
|
||||
|
||||
Change whether a user is a server administrator or not
|
||||
======================================================
|
||||
|
||||
Note that you cannot demote yourself.
|
||||
|
||||
The api is::
|
||||
|
||||
PUT /_synapse/admin/v1/users/<user_id>/admin
|
||||
|
||||
with a body of:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
+18
-9
@@ -205,9 +205,9 @@ listeners:
|
||||
#
|
||||
- port: 8008
|
||||
tls: false
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
type: http
|
||||
x_forwarded: true
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
@@ -392,10 +392,10 @@ listeners:
|
||||
# permission to listen on port 80.
|
||||
#
|
||||
acme:
|
||||
# ACME support is disabled by default. Uncomment the following line
|
||||
# (and tls_certificate_path and tls_private_key_path above) to enable it.
|
||||
# ACME support is disabled by default. Set this to `true` and uncomment
|
||||
# tls_certificate_path and tls_private_key_path above to enable it.
|
||||
#
|
||||
#enabled: true
|
||||
enabled: False
|
||||
|
||||
# Endpoint to use to request certificates. If you only want to test,
|
||||
# use Let's Encrypt's staging url:
|
||||
@@ -406,17 +406,17 @@ acme:
|
||||
# Port number to listen on for the HTTP-01 challenge. Change this if
|
||||
# you are forwarding connections through Apache/Nginx/etc.
|
||||
#
|
||||
#port: 80
|
||||
port: 80
|
||||
|
||||
# Local addresses to listen on for incoming connections.
|
||||
# Again, you may want to change this if you are forwarding connections
|
||||
# through Apache/Nginx/etc.
|
||||
#
|
||||
#bind_addresses: ['::', '0.0.0.0']
|
||||
bind_addresses: ['::', '0.0.0.0']
|
||||
|
||||
# How many days remaining on a certificate before it is renewed.
|
||||
#
|
||||
#reprovision_threshold: 30
|
||||
reprovision_threshold: 30
|
||||
|
||||
# The domain that the certificate should be for. Normally this
|
||||
# should be the same as your Matrix domain (i.e., 'server_name'), but,
|
||||
@@ -430,7 +430,7 @@ acme:
|
||||
#
|
||||
# If not set, defaults to your 'server_name'.
|
||||
#
|
||||
#domain: matrix.example.com
|
||||
domain: matrix.example.com
|
||||
|
||||
# file to use for the account key. This will be generated if it doesn't
|
||||
# exist.
|
||||
@@ -485,7 +485,8 @@ database:
|
||||
|
||||
## Logging ##
|
||||
|
||||
# A yaml python logging config file
|
||||
# A yaml python logging config file as described by
|
||||
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
|
||||
#
|
||||
log_config: "CONFDIR/SERVERNAME.log.config"
|
||||
|
||||
@@ -1027,6 +1028,14 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
|
||||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
|
||||
|
||||
# Enable SAML2 for registration and login. Uses pysaml2.
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
# Structured Logging
|
||||
|
||||
A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
|
||||
|
||||
Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
|
||||
|
||||
A structured logging configuration looks similar to the following:
|
||||
|
||||
```yaml
|
||||
structured: true
|
||||
|
||||
loggers:
|
||||
synapse:
|
||||
level: INFO
|
||||
synapse.storage.SQL:
|
||||
level: WARNING
|
||||
|
||||
drains:
|
||||
console:
|
||||
type: console
|
||||
location: stdout
|
||||
file:
|
||||
type: file_json
|
||||
location: homeserver.log
|
||||
```
|
||||
|
||||
The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
|
||||
|
||||
## Drain Types
|
||||
|
||||
Drain types can be specified by the `type` key.
|
||||
|
||||
### `console`
|
||||
|
||||
Outputs human-readable logs to the console.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `console_json`
|
||||
|
||||
Outputs machine-readable JSON logs to the console.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `console_json_terse`
|
||||
|
||||
Outputs machine-readable JSON logs to the console, separated by newlines. This
|
||||
format is not designed to be read and re-formatted into human-readable text, but
|
||||
is optimal for a logging aggregation system.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `file`
|
||||
|
||||
Outputs human-readable logs to a file.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: An absolute path to the file to log to.
|
||||
|
||||
### `file_json`
|
||||
|
||||
Outputs machine-readable logs to a file.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: An absolute path to the file to log to.
|
||||
|
||||
### `network_json_terse`
|
||||
|
||||
Delivers machine-readable JSON logs to a log aggregator over TCP. This is
|
||||
compatible with LogStash's TCP input with the codec set to `json_lines`.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `host`: Hostname or IP address of the log aggregator.
|
||||
- `port`: Numerical port to contact on the host.
|
||||
@@ -122,7 +122,8 @@ class UserTypes(object):
|
||||
"""
|
||||
|
||||
SUPPORT = "support"
|
||||
ALL_USER_TYPES = (SUPPORT,)
|
||||
BOT = "bot"
|
||||
ALL_USER_TYPES = (SUPPORT, BOT)
|
||||
|
||||
|
||||
class RelationTypes(object):
|
||||
|
||||
@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# list of tuples of function, args list, kwargs dict
|
||||
_sighup_callbacks = []
|
||||
|
||||
|
||||
def register_sighup(func):
|
||||
def register_sighup(func, *args, **kwargs):
|
||||
"""
|
||||
Register a function to be called when a SIGHUP occurs.
|
||||
|
||||
Args:
|
||||
func (function): Function to be called when sent a SIGHUP signal.
|
||||
Will be called with a single argument, the homeserver.
|
||||
Will be called with a single default argument, the homeserver.
|
||||
*args, **kwargs: args and kwargs to be passed to the target function.
|
||||
"""
|
||||
_sighup_callbacks.append(func)
|
||||
_sighup_callbacks.append((func, args, kwargs))
|
||||
|
||||
|
||||
def start_worker_reactor(appname, config, run_command=reactor.run):
|
||||
@@ -248,8 +250,8 @@ def start(hs, listeners=None):
|
||||
# we're not using systemd.
|
||||
sdnotify(b"RELOADING=1")
|
||||
|
||||
for i in _sighup_callbacks:
|
||||
i(hs)
|
||||
for i, args, kwargs in _sighup_callbacks:
|
||||
i(hs, *args, **kwargs)
|
||||
|
||||
sdnotify(b"READY=1")
|
||||
|
||||
|
||||
@@ -227,8 +227,6 @@ def start(config_options):
|
||||
config.start_pushers = False
|
||||
config.send_federation = False
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -241,6 +239,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
|
||||
# We use task.react as the basic run command as it correctly handles tearing
|
||||
|
||||
@@ -141,8 +141,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.appservice"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -167,6 +165,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ps, config.worker_listeners
|
||||
|
||||
@@ -179,8 +179,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.client_reader"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -193,6 +191,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -175,8 +175,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_replication_http_port is not None
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
# This should only be done on the user directory worker or the master
|
||||
config.update_user_directory = False
|
||||
|
||||
@@ -192,6 +190,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -160,8 +160,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_reader"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -174,6 +172,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -171,8 +171,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_sender"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -197,6 +195,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -232,8 +232,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_main_http_uri is not None
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -246,6 +244,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -341,8 +341,6 @@ def setup(config_options):
|
||||
# generating config files and shouldn't try to continue.
|
||||
sys.exit(0)
|
||||
|
||||
synapse.config.logger.setup_logging(config, use_worker_options=False)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -356,6 +354,8 @@ def setup(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
|
||||
|
||||
logger.info("Preparing database: %s...", config.database_config["name"])
|
||||
|
||||
try:
|
||||
|
||||
@@ -155,8 +155,6 @@ def start(config_options):
|
||||
"Please add ``enable_media_repo: false`` to the main config\n"
|
||||
)
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -169,6 +167,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -184,8 +184,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.pusher"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.start_pushers:
|
||||
@@ -210,6 +208,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
|
||||
def start():
|
||||
|
||||
@@ -435,8 +435,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.synchrotron"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -450,6 +448,8 @@ def start(config_options):
|
||||
application_service_handler=SynchrotronApplicationService(),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -197,8 +197,6 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.user_dir"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -223,6 +221,8 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -13,8 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import ConfigError
|
||||
from ._base import ConfigError, find_config_files
|
||||
|
||||
# export ConfigError if somebody does import *
|
||||
# export ConfigError and find_config_files if somebody does
|
||||
# import *
|
||||
# this is largely a fudge to stop PEP8 moaning about the import
|
||||
__all__ = ["ConfigError"]
|
||||
__all__ = ["ConfigError", "find_config_files"]
|
||||
|
||||
@@ -181,6 +181,11 @@ class Config(object):
|
||||
generate_secrets=False,
|
||||
report_stats=None,
|
||||
open_private_ports=False,
|
||||
listeners=None,
|
||||
database_conf=None,
|
||||
tls_certificate_path=None,
|
||||
tls_private_key_path=None,
|
||||
acme_domain=None,
|
||||
):
|
||||
"""Build a default configuration file
|
||||
|
||||
@@ -207,6 +212,33 @@ class Config(object):
|
||||
open_private_ports (bool): True to leave private ports (such as the non-TLS
|
||||
HTTP listener) open to the internet.
|
||||
|
||||
listeners (list(dict)|None): A list of descriptions of the listeners
|
||||
synapse should start with each of which specifies a port (str), a list of
|
||||
resources (list(str)), tls (bool) and type (str). For example:
|
||||
[{
|
||||
"port": 8448,
|
||||
"resources": [{"names": ["federation"]}],
|
||||
"tls": True,
|
||||
"type": "http",
|
||||
},
|
||||
{
|
||||
"port": 443,
|
||||
"resources": [{"names": ["client"]}],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
}],
|
||||
|
||||
|
||||
database (str|None): The database type to configure, either `psycog2`
|
||||
or `sqlite3`.
|
||||
|
||||
tls_certificate_path (str|None): The path to the tls certificate.
|
||||
|
||||
tls_private_key_path (str|None): The path to the tls private key.
|
||||
|
||||
acme_domain (str|None): The domain acme will try to validate. If
|
||||
specified acme will be enabled.
|
||||
|
||||
Returns:
|
||||
str: the yaml config file
|
||||
"""
|
||||
@@ -220,6 +252,11 @@ class Config(object):
|
||||
generate_secrets=generate_secrets,
|
||||
report_stats=report_stats,
|
||||
open_private_ports=open_private_ports,
|
||||
listeners=listeners,
|
||||
database_conf=database_conf,
|
||||
tls_certificate_path=tls_certificate_path,
|
||||
tls_private_key_path=tls_private_key_path,
|
||||
acme_domain=acme_domain,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -13,6 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from textwrap import indent
|
||||
|
||||
import yaml
|
||||
|
||||
from ._base import Config
|
||||
|
||||
@@ -38,20 +41,28 @@ class DatabaseConfig(Config):
|
||||
|
||||
self.set_databasepath(config.get("database_path"))
|
||||
|
||||
def generate_config_section(self, data_dir_path, **kwargs):
|
||||
database_path = os.path.join(data_dir_path, "homeserver.db")
|
||||
return (
|
||||
"""\
|
||||
## Database ##
|
||||
|
||||
database:
|
||||
# The database engine name
|
||||
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
|
||||
if not database_conf:
|
||||
database_path = os.path.join(data_dir_path, "homeserver.db")
|
||||
database_conf = (
|
||||
"""# The database engine name
|
||||
name: "sqlite3"
|
||||
# Arguments to pass to the engine
|
||||
args:
|
||||
# Path to the database
|
||||
database: "%(database_path)s"
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
else:
|
||||
database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
|
||||
|
||||
return (
|
||||
"""\
|
||||
## Database ##
|
||||
|
||||
database:
|
||||
%(database_conf)s
|
||||
# Number of events to cache in memory.
|
||||
#
|
||||
#event_cache_size: 10K
|
||||
|
||||
@@ -115,7 +115,7 @@ class EmailConfig(Config):
|
||||
missing.append("email." + k)
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
missing.append("public_base_url")
|
||||
missing.append("public_baseurl")
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
|
||||
+30
-4
@@ -76,7 +76,7 @@ class KeyConfig(Config):
|
||||
config_dir_path, config["server_name"] + ".signing.key"
|
||||
)
|
||||
|
||||
self.signing_key = self.read_signing_key(signing_key_path)
|
||||
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
|
||||
|
||||
self.old_signing_keys = self.read_old_signing_keys(
|
||||
config.get("old_signing_keys", {})
|
||||
@@ -85,6 +85,14 @@ class KeyConfig(Config):
|
||||
config.get("key_refresh_interval", "1d")
|
||||
)
|
||||
|
||||
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
|
||||
if key_server_signing_keys_path:
|
||||
self.key_server_signing_keys = self.read_signing_keys(
|
||||
key_server_signing_keys_path, "key_server_signing_keys_path"
|
||||
)
|
||||
else:
|
||||
self.key_server_signing_keys = list(self.signing_key)
|
||||
|
||||
# if neither trusted_key_servers nor perspectives are given, use the default.
|
||||
if "perspectives" not in config and "trusted_key_servers" not in config:
|
||||
key_servers = [{"server_name": "matrix.org"}]
|
||||
@@ -210,16 +218,34 @@ class KeyConfig(Config):
|
||||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
def read_signing_key(self, signing_key_path):
|
||||
signing_keys = self.read_file(signing_key_path, "signing_key")
|
||||
def read_signing_keys(self, signing_key_path, name):
|
||||
"""Read the signing keys in the given path.
|
||||
|
||||
Args:
|
||||
signing_key_path (str)
|
||||
name (str): Associated config key name
|
||||
|
||||
Returns:
|
||||
list[SigningKey]
|
||||
"""
|
||||
|
||||
signing_keys = self.read_file(signing_key_path, name)
|
||||
try:
|
||||
return read_signing_keys(signing_keys.splitlines(True))
|
||||
except Exception as e:
|
||||
raise ConfigError("Error reading signing_key: %s" % (str(e)))
|
||||
raise ConfigError("Error reading %s: %s" % (name, str(e)))
|
||||
|
||||
def read_old_signing_keys(self, old_signing_keys):
|
||||
keys = {}
|
||||
|
||||
+63
-43
@@ -25,6 +25,10 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
|
||||
|
||||
import synapse
|
||||
from synapse.app import _base as appbase
|
||||
from synapse.logging._structured import (
|
||||
reload_structured_logging,
|
||||
setup_structured_logging,
|
||||
)
|
||||
from synapse.logging.context import LoggingContextFilter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -85,7 +89,8 @@ class LoggingConfig(Config):
|
||||
"""\
|
||||
## Logging ##
|
||||
|
||||
# A yaml python logging config file
|
||||
# A yaml python logging config file as described by
|
||||
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
|
||||
#
|
||||
log_config: "%(log_config)s"
|
||||
"""
|
||||
@@ -119,21 +124,10 @@ class LoggingConfig(Config):
|
||||
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
|
||||
|
||||
|
||||
def setup_logging(config, use_worker_options=False):
|
||||
""" Set up python logging
|
||||
|
||||
Args:
|
||||
config (LoggingConfig | synapse.config.workers.WorkerConfig):
|
||||
configuration data
|
||||
|
||||
use_worker_options (bool): True to use the 'worker_log_config' option
|
||||
instead of 'log_config'.
|
||||
|
||||
register_sighup (func | None): Function to call to register a
|
||||
sighup handler.
|
||||
def _setup_stdlib_logging(config, log_config):
|
||||
"""
|
||||
Set up Python stdlib logging.
|
||||
"""
|
||||
log_config = config.worker_log_config if use_worker_options else config.log_config
|
||||
|
||||
if log_config is None:
|
||||
log_format = (
|
||||
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
|
||||
@@ -151,35 +145,10 @@ def setup_logging(config, use_worker_options=False):
|
||||
handler.addFilter(LoggingContextFilter(request=""))
|
||||
logger.addHandler(handler)
|
||||
else:
|
||||
logging.config.dictConfig(log_config)
|
||||
|
||||
def load_log_config():
|
||||
with open(log_config, "r") as f:
|
||||
logging.config.dictConfig(yaml.safe_load(f))
|
||||
|
||||
def sighup(*args):
|
||||
# it might be better to use a file watcher or something for this.
|
||||
load_log_config()
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
load_log_config()
|
||||
appbase.register_sighup(sighup)
|
||||
|
||||
# make sure that the first thing we log is a thing we can grep backwards
|
||||
# for
|
||||
logging.warn("***** STARTING SERVER *****")
|
||||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
||||
logging.info("Server hostname: %s", config.server_name)
|
||||
|
||||
# It's critical to point twisted's internal logging somewhere, otherwise it
|
||||
# stacks up and leaks kup to 64K object;
|
||||
# see: https://twistedmatrix.com/trac/ticket/8164
|
||||
#
|
||||
# Routing to the python logging framework could be a performance problem if
|
||||
# the handlers blocked for a long time as python.logging is a blocking API
|
||||
# see https://twistedmatrix.com/documents/current/core/howto/logger.html
|
||||
# filed as https://github.com/matrix-org/synapse/issues/1727
|
||||
#
|
||||
# However this may not be too much of a problem if we are just writing to a file.
|
||||
# Route Twisted's native logging through to the standard library logging
|
||||
# system.
|
||||
observer = STDLibLogObserver()
|
||||
|
||||
def _log(event):
|
||||
@@ -201,3 +170,54 @@ def setup_logging(config, use_worker_options=False):
|
||||
)
|
||||
if not config.no_redirect_stdio:
|
||||
print("Redirected stdout/stderr to logs")
|
||||
|
||||
|
||||
def _reload_stdlib_logging(*args, log_config=None):
|
||||
logger = logging.getLogger("")
|
||||
|
||||
if not log_config:
|
||||
logger.warn("Reloaded a blank config?")
|
||||
|
||||
logging.config.dictConfig(log_config)
|
||||
|
||||
|
||||
def setup_logging(hs, config, use_worker_options=False):
|
||||
"""
|
||||
Set up the logging subsystem.
|
||||
|
||||
Args:
|
||||
config (LoggingConfig | synapse.config.workers.WorkerConfig):
|
||||
configuration data
|
||||
|
||||
use_worker_options (bool): True to use the 'worker_log_config' option
|
||||
instead of 'log_config'.
|
||||
"""
|
||||
log_config = config.worker_log_config if use_worker_options else config.log_config
|
||||
|
||||
def read_config(*args, callback=None):
|
||||
if log_config is None:
|
||||
return None
|
||||
|
||||
with open(log_config, "rb") as f:
|
||||
log_config_body = yaml.safe_load(f.read())
|
||||
|
||||
if callback:
|
||||
callback(log_config=log_config_body)
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
return log_config_body
|
||||
|
||||
log_config_body = read_config()
|
||||
|
||||
if log_config_body and log_config_body.get("structured") is True:
|
||||
setup_structured_logging(hs, config, log_config_body)
|
||||
appbase.register_sighup(read_config, callback=reload_structured_logging)
|
||||
else:
|
||||
_setup_stdlib_logging(config, log_config_body)
|
||||
appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
|
||||
|
||||
# make sure that the first thing we log is a thing we can grep backwards
|
||||
# for
|
||||
logging.warn("***** STARTING SERVER *****")
|
||||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
||||
logging.info("Server hostname: %s", config.server_name)
|
||||
|
||||
+67
-17
@@ -17,8 +17,11 @@
|
||||
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
from textwrap import indent
|
||||
|
||||
import attr
|
||||
import yaml
|
||||
from netaddr import IPSet
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
@@ -352,7 +355,7 @@ class ServerConfig(Config):
|
||||
return any(l["tls"] for l in self.listeners)
|
||||
|
||||
def generate_config_section(
|
||||
self, server_name, data_dir_path, open_private_ports, **kwargs
|
||||
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
|
||||
):
|
||||
_, bind_port = parse_and_validate_server_name(server_name)
|
||||
if bind_port is not None:
|
||||
@@ -366,11 +369,68 @@ class ServerConfig(Config):
|
||||
# Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
|
||||
# default config string
|
||||
default_room_version = DEFAULT_ROOM_VERSION
|
||||
secure_listeners = []
|
||||
unsecure_listeners = []
|
||||
private_addresses = ["::1", "127.0.0.1"]
|
||||
if listeners:
|
||||
for listener in listeners:
|
||||
if listener["tls"]:
|
||||
secure_listeners.append(listener)
|
||||
else:
|
||||
# If we don't want open ports we need to bind the listeners
|
||||
# to some address other than 0.0.0.0. Here we chose to use
|
||||
# localhost.
|
||||
# If the addresses are already bound we won't overwrite them
|
||||
# however.
|
||||
if not open_private_ports:
|
||||
listener.setdefault("bind_addresses", private_addresses)
|
||||
|
||||
unsecure_http_binding = "port: %i\n tls: false" % (unsecure_port,)
|
||||
if not open_private_ports:
|
||||
unsecure_http_binding += (
|
||||
"\n bind_addresses: ['::1', '127.0.0.1']"
|
||||
unsecure_listeners.append(listener)
|
||||
|
||||
secure_http_bindings = indent(
|
||||
yaml.dump(secure_listeners), " " * 10
|
||||
).lstrip()
|
||||
|
||||
unsecure_http_bindings = indent(
|
||||
yaml.dump(unsecure_listeners), " " * 10
|
||||
).lstrip()
|
||||
|
||||
if not unsecure_listeners:
|
||||
unsecure_http_bindings = (
|
||||
"""- port: %(unsecure_port)s
|
||||
tls: false
|
||||
type: http
|
||||
x_forwarded: true"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
if not open_private_ports:
|
||||
unsecure_http_bindings += (
|
||||
"\n bind_addresses: ['::1', '127.0.0.1']"
|
||||
)
|
||||
|
||||
unsecure_http_bindings += """
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false"""
|
||||
|
||||
if listeners:
|
||||
# comment out this block
|
||||
unsecure_http_bindings = "#" + re.sub(
|
||||
"\n {10}",
|
||||
lambda match: match.group(0) + "#",
|
||||
unsecure_http_bindings,
|
||||
)
|
||||
|
||||
if not secure_listeners:
|
||||
secure_http_bindings = (
|
||||
"""#- port: %(bind_port)s
|
||||
# type: http
|
||||
# tls: true
|
||||
# resources:
|
||||
# - names: [client, federation]"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
return (
|
||||
@@ -556,11 +616,7 @@ class ServerConfig(Config):
|
||||
# will also need to give Synapse a TLS key and certificate: see the TLS section
|
||||
# below.)
|
||||
#
|
||||
#- port: %(bind_port)s
|
||||
# type: http
|
||||
# tls: true
|
||||
# resources:
|
||||
# - names: [client, federation]
|
||||
%(secure_http_bindings)s
|
||||
|
||||
# Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy
|
||||
# that unwraps TLS.
|
||||
@@ -568,13 +624,7 @@ class ServerConfig(Config):
|
||||
# If you plan to use a reverse proxy, please see
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst.
|
||||
#
|
||||
- %(unsecure_http_binding)s
|
||||
type: http
|
||||
x_forwarded: true
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false
|
||||
%(unsecure_http_bindings)s
|
||||
|
||||
# example additional_resources:
|
||||
#
|
||||
|
||||
@@ -27,19 +27,16 @@ class StatsConfig(Config):
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_bucket_size = 86400 * 1000
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
self.stats_bucket_size = self.parse_duration(
|
||||
stats_config.get("bucket_size", "1d")
|
||||
)
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
self.stats_retention = self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
|
||||
+38
-12
@@ -239,12 +239,38 @@ class TlsConfig(Config):
|
||||
self.tls_fingerprints.append({"sha256": sha256_fingerprint})
|
||||
|
||||
def generate_config_section(
|
||||
self, config_dir_path, server_name, data_dir_path, **kwargs
|
||||
self,
|
||||
config_dir_path,
|
||||
server_name,
|
||||
data_dir_path,
|
||||
tls_certificate_path,
|
||||
tls_private_key_path,
|
||||
acme_domain,
|
||||
**kwargs
|
||||
):
|
||||
"""If the acme_domain is specified acme will be enabled.
|
||||
If the TLS paths are not specified the default will be certs in the
|
||||
config directory"""
|
||||
|
||||
base_key_name = os.path.join(config_dir_path, server_name)
|
||||
|
||||
tls_certificate_path = base_key_name + ".tls.crt"
|
||||
tls_private_key_path = base_key_name + ".tls.key"
|
||||
if bool(tls_certificate_path) != bool(tls_private_key_path):
|
||||
raise ConfigError(
|
||||
"Please specify both a cert path and a key path or neither."
|
||||
)
|
||||
|
||||
tls_enabled = (
|
||||
"" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
|
||||
)
|
||||
|
||||
if not tls_certificate_path:
|
||||
tls_certificate_path = base_key_name + ".tls.crt"
|
||||
if not tls_private_key_path:
|
||||
tls_private_key_path = base_key_name + ".tls.key"
|
||||
|
||||
acme_enabled = bool(acme_domain)
|
||||
acme_domain = "matrix.example.com"
|
||||
|
||||
default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
|
||||
|
||||
# this is to avoid the max line length. Sorrynotsorry
|
||||
@@ -269,11 +295,11 @@ class TlsConfig(Config):
|
||||
# instance, if using certbot, use `fullchain.pem` as your certificate,
|
||||
# not `cert.pem`).
|
||||
#
|
||||
#tls_certificate_path: "%(tls_certificate_path)s"
|
||||
%(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
|
||||
|
||||
# PEM-encoded private key for TLS
|
||||
#
|
||||
#tls_private_key_path: "%(tls_private_key_path)s"
|
||||
%(tls_enabled)stls_private_key_path: "%(tls_private_key_path)s"
|
||||
|
||||
# Whether to verify TLS server certificates for outbound federation requests.
|
||||
#
|
||||
@@ -340,10 +366,10 @@ class TlsConfig(Config):
|
||||
# permission to listen on port 80.
|
||||
#
|
||||
acme:
|
||||
# ACME support is disabled by default. Uncomment the following line
|
||||
# (and tls_certificate_path and tls_private_key_path above) to enable it.
|
||||
# ACME support is disabled by default. Set this to `true` and uncomment
|
||||
# tls_certificate_path and tls_private_key_path above to enable it.
|
||||
#
|
||||
#enabled: true
|
||||
enabled: %(acme_enabled)s
|
||||
|
||||
# Endpoint to use to request certificates. If you only want to test,
|
||||
# use Let's Encrypt's staging url:
|
||||
@@ -354,17 +380,17 @@ class TlsConfig(Config):
|
||||
# Port number to listen on for the HTTP-01 challenge. Change this if
|
||||
# you are forwarding connections through Apache/Nginx/etc.
|
||||
#
|
||||
#port: 80
|
||||
port: 80
|
||||
|
||||
# Local addresses to listen on for incoming connections.
|
||||
# Again, you may want to change this if you are forwarding connections
|
||||
# through Apache/Nginx/etc.
|
||||
#
|
||||
#bind_addresses: ['::', '0.0.0.0']
|
||||
bind_addresses: ['::', '0.0.0.0']
|
||||
|
||||
# How many days remaining on a certificate before it is renewed.
|
||||
#
|
||||
#reprovision_threshold: 30
|
||||
reprovision_threshold: 30
|
||||
|
||||
# The domain that the certificate should be for. Normally this
|
||||
# should be the same as your Matrix domain (i.e., 'server_name'), but,
|
||||
@@ -378,7 +404,7 @@ class TlsConfig(Config):
|
||||
#
|
||||
# If not set, defaults to your 'server_name'.
|
||||
#
|
||||
#domain: matrix.example.com
|
||||
domain: %(acme_domain)s
|
||||
|
||||
# file to use for the account key. This will be generated if it doesn't
|
||||
# exist.
|
||||
|
||||
@@ -29,7 +29,6 @@ from signedjson.key import (
|
||||
from signedjson.sign import (
|
||||
SignatureVerifyException,
|
||||
encode_canonical_json,
|
||||
sign_json,
|
||||
signature_ids,
|
||||
verify_signed_json,
|
||||
)
|
||||
@@ -539,13 +538,7 @@ class BaseV2KeyFetcher(object):
|
||||
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
|
||||
)
|
||||
|
||||
# re-sign the json with our own key, so that it is ready if we are asked to
|
||||
# give it out as a notary server
|
||||
signed_key_json = sign_json(
|
||||
response_json, self.config.server_name, self.config.signing_key[0]
|
||||
)
|
||||
|
||||
signed_key_json_bytes = encode_canonical_json(signed_key_json)
|
||||
key_json_bytes = encode_canonical_json(response_json)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -557,7 +550,7 @@ class BaseV2KeyFetcher(object):
|
||||
from_server=from_server,
|
||||
ts_now_ms=time_added_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=signed_key_json_bytes,
|
||||
key_json_bytes=key_json_bytes,
|
||||
)
|
||||
for key_id in verify_keys
|
||||
],
|
||||
|
||||
@@ -327,21 +327,37 @@ class TransportLayerClient(object):
|
||||
include_all_networks=False,
|
||||
third_party_instance_id=None,
|
||||
):
|
||||
path = _create_v1_path("/publicRooms")
|
||||
if search_filter:
|
||||
# this uses MSC2197 (Search Filtering over Federation)
|
||||
path = _create_v1_path("/publicRooms")
|
||||
|
||||
args = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
args["third_party_instance_id"] = (third_party_instance_id,)
|
||||
if limit:
|
||||
args["limit"] = [str(limit)]
|
||||
if since_token:
|
||||
args["since"] = [since_token]
|
||||
data = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
data["third_party_instance_id"] = third_party_instance_id
|
||||
if limit:
|
||||
data["limit"] = str(limit)
|
||||
if since_token:
|
||||
data["since"] = since_token
|
||||
|
||||
# TODO(erikj): Actually send the search_filter across federation.
|
||||
data["filter"] = search_filter
|
||||
|
||||
response = yield self.client.get_json(
|
||||
destination=remote_server, path=path, args=args, ignore_backoff=True
|
||||
)
|
||||
response = yield self.client.post_json(
|
||||
destination=remote_server, path=path, data=data, ignore_backoff=True
|
||||
)
|
||||
else:
|
||||
path = _create_v1_path("/publicRooms")
|
||||
|
||||
args = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
args["third_party_instance_id"] = (third_party_instance_id,)
|
||||
if limit:
|
||||
args["limit"] = [str(limit)]
|
||||
if since_token:
|
||||
args["since"] = [since_token]
|
||||
|
||||
response = yield self.client.get_json(
|
||||
destination=remote_server, path=path, args=args, ignore_backoff=True
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@@ -770,6 +770,42 @@ class PublicRoomList(BaseFederationServlet):
|
||||
)
|
||||
return 200, data
|
||||
|
||||
async def on_POST(self, origin, content, query):
|
||||
# This implements MSC2197 (Search Filtering over Federation)
|
||||
if not self.allow_access:
|
||||
raise FederationDeniedError(origin)
|
||||
|
||||
limit = int(content.get("limit", 100))
|
||||
since_token = content.get("since", None)
|
||||
search_filter = content.get("filter", None)
|
||||
|
||||
include_all_networks = content.get("include_all_networks", False)
|
||||
third_party_instance_id = content.get("third_party_instance_id", None)
|
||||
|
||||
if include_all_networks:
|
||||
network_tuple = None
|
||||
if third_party_instance_id is not None:
|
||||
raise SynapseError(
|
||||
400, "Can't use include_all_networks with an explicit network"
|
||||
)
|
||||
elif third_party_instance_id is None:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
|
||||
|
||||
if search_filter is None:
|
||||
logger.warning("Nonefilter")
|
||||
|
||||
data = await self.handler.get_local_public_room_list(
|
||||
limit=limit,
|
||||
since_token=since_token,
|
||||
search_filter=search_filter,
|
||||
network_tuple=network_tuple,
|
||||
from_federation=True,
|
||||
)
|
||||
|
||||
return 200, data
|
||||
|
||||
|
||||
class FederationVersionServlet(BaseFederationServlet):
|
||||
PATH = "/version"
|
||||
|
||||
@@ -94,6 +94,25 @@ class AdminHandler(BaseHandler):
|
||||
|
||||
return ret
|
||||
|
||||
def get_user_server_admin(self, user):
|
||||
"""
|
||||
Get the admin bit on a user.
|
||||
|
||||
Args:
|
||||
user_id (UserID): the (necessarily local) user to manipulate
|
||||
"""
|
||||
return self.store.is_server_admin(user)
|
||||
|
||||
def set_user_server_admin(self, user, admin):
|
||||
"""
|
||||
Set the admin bit on a user.
|
||||
|
||||
Args:
|
||||
user_id (UserID): the (necessarily local) user to manipulate
|
||||
admin (bool): whether or not the user should be an admin of this server
|
||||
"""
|
||||
return self.store.set_server_admin(user, admin)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def export_user_data(self, user_id, writer):
|
||||
"""Write all data we have on the user to the given writer.
|
||||
|
||||
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
|
||||
ours = yield self.store.get_state_groups_ids(room_id, seen)
|
||||
|
||||
# state_maps is a list of mappings from (type, state_key) to event_id
|
||||
# type: list[dict[tuple[str, str], str]]
|
||||
state_maps = list(ours.values())
|
||||
state_maps = list(
|
||||
ours.values()
|
||||
) # type: list[dict[tuple[str, str], str]]
|
||||
|
||||
# we don't need this any more, let's delete it.
|
||||
del ours
|
||||
|
||||
@@ -282,3 +282,16 @@ class IdentityHandler(BaseHandler):
|
||||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e.to_synapse_error()
|
||||
|
||||
|
||||
class LookupAlgorithm:
|
||||
"""
|
||||
Supported hashing algorithms when performing a 3PID lookup.
|
||||
|
||||
SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
|
||||
encoding
|
||||
NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
|
||||
"""
|
||||
|
||||
SHA256 = "sha256"
|
||||
NONE = "none"
|
||||
|
||||
@@ -24,7 +24,7 @@ from twisted.internet import defer
|
||||
from twisted.internet.defer import succeed
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, Membership, RelationTypes
|
||||
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
@@ -469,6 +469,9 @@ class EventCreationHandler(object):
|
||||
|
||||
u = yield self.store.get_user_by_id(user_id)
|
||||
assert u is not None
|
||||
if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
|
||||
# support and bot users are not required to consent
|
||||
return
|
||||
if u["appservice_id"] is not None:
|
||||
# users registered by an appservice are exempt
|
||||
return
|
||||
|
||||
@@ -34,7 +34,7 @@ from ._base import BaseHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_DISPLAYNAME_LEN = 100
|
||||
MAX_DISPLAYNAME_LEN = 256
|
||||
MAX_AVATAR_URL_LEN = 1000
|
||||
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ from unpaddedbase64 import decode_base64, encode_base64
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.api.errors import Codes, HttpResponseException
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
@@ -485,7 +486,33 @@ class RoomListHandler(BaseHandler):
|
||||
return {"chunk": [], "total_room_count_estimate": 0}
|
||||
|
||||
if search_filter:
|
||||
# We currently don't support searching across federation, so we have
|
||||
# Searching across federation is defined in MSC2197.
|
||||
# However, the remote homeserver may or may not actually support it.
|
||||
# So we first try an MSC2197 remote-filtered search, then fall back
|
||||
# to a locally-filtered search if we must.
|
||||
|
||||
try:
|
||||
res = yield self._get_remote_list_cached(
|
||||
server_name,
|
||||
limit=limit,
|
||||
since_token=since_token,
|
||||
include_all_networks=include_all_networks,
|
||||
third_party_instance_id=third_party_instance_id,
|
||||
search_filter=search_filter,
|
||||
)
|
||||
return res
|
||||
except HttpResponseException as hre:
|
||||
syn_err = hre.to_synapse_error()
|
||||
if hre.code in (404, 405) or syn_err.errcode in (
|
||||
Codes.UNRECOGNIZED,
|
||||
Codes.NOT_FOUND,
|
||||
):
|
||||
logger.debug("Falling back to locally-filtered /publicRooms")
|
||||
else:
|
||||
raise # Not an error that should trigger a fallback.
|
||||
|
||||
# if we reach this point, then we fall back to the situation where
|
||||
# we currently don't support searching across federation, so we have
|
||||
# to do it manually without pagination
|
||||
limit = None
|
||||
since_token = None
|
||||
|
||||
@@ -29,9 +29,11 @@ from twisted.internet import defer
|
||||
from synapse import types
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
|
||||
from synapse.handlers.identity import LookupAlgorithm
|
||||
from synapse.types import RoomID, UserID
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room, user_left_room
|
||||
from synapse.util.hash import sha256_and_url_safe_base64
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -523,7 +525,7 @@ class RoomMemberHandler(object):
|
||||
event (SynapseEvent): The membership event.
|
||||
context: The context of the event.
|
||||
is_guest (bool): Whether the sender is a guest.
|
||||
room_hosts ([str]): Homeservers which are likely to already be in
|
||||
remote_room_hosts (list[str]|None): Homeservers which are likely to already be in
|
||||
the room, and could be danced with in order to join this
|
||||
homeserver for the first time.
|
||||
ratelimit (bool): Whether to rate limit this request.
|
||||
@@ -634,7 +636,7 @@ class RoomMemberHandler(object):
|
||||
servers.remove(room_alias.domain)
|
||||
servers.insert(0, room_alias.domain)
|
||||
|
||||
return (RoomID.from_string(room_id), servers)
|
||||
return RoomID.from_string(room_id), servers
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_inviter(self, user_id, room_id):
|
||||
@@ -697,6 +699,44 @@ class RoomMemberHandler(object):
|
||||
raise SynapseError(
|
||||
403, "Looking up third-party identifiers is denied from this server"
|
||||
)
|
||||
|
||||
# Check what hashing details are supported by this identity server
|
||||
use_v1 = False
|
||||
hash_details = None
|
||||
try:
|
||||
hash_details = yield self.simple_http_client.get_json(
|
||||
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server)
|
||||
)
|
||||
except (HttpResponseException, ValueError) as e:
|
||||
# Catch HttpResponseExcept for a non-200 response code
|
||||
# Catch ValueError for non-JSON response body
|
||||
|
||||
# Check if this identity server does not know about v2 lookups
|
||||
if e.code == 404:
|
||||
# This is an old identity server that does not yet support v2 lookups
|
||||
use_v1 = True
|
||||
else:
|
||||
logger.warn("Error when looking up hashing details: %s" % (e,))
|
||||
return None
|
||||
|
||||
if use_v1:
|
||||
return (yield self._lookup_3pid_v1(id_server, medium, address))
|
||||
|
||||
return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _lookup_3pid_v1(self, id_server, medium, address):
|
||||
"""Looks up a 3pid in the passed identity server using v1 lookup.
|
||||
|
||||
Args:
|
||||
id_server (str): The server name (including port, if required)
|
||||
of the identity server to use.
|
||||
medium (str): The type of the third party identifier (e.g. "email").
|
||||
address (str): The third party identifier (e.g. "foo@example.com").
|
||||
|
||||
Returns:
|
||||
str: the matrix ID of the 3pid, or None if it is not recognized.
|
||||
"""
|
||||
try:
|
||||
data = yield self.simple_http_client.get_json(
|
||||
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
|
||||
@@ -711,8 +751,83 @@ class RoomMemberHandler(object):
|
||||
|
||||
except IOError as e:
|
||||
logger.warn("Error from identity server lookup: %s" % (e,))
|
||||
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _lookup_3pid_v2(self, id_server, medium, address, hash_details):
|
||||
"""Looks up a 3pid in the passed identity server using v2 lookup.
|
||||
|
||||
Args:
|
||||
id_server (str): The server name (including port, if required)
|
||||
of the identity server to use.
|
||||
medium (str): The type of the third party identifier (e.g. "email").
|
||||
address (str): The third party identifier (e.g. "foo@example.com").
|
||||
hash_details (dict[str, str|list]): A dictionary containing hashing information
|
||||
provided by an identity server.
|
||||
|
||||
Returns:
|
||||
Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
|
||||
"""
|
||||
# Extract information from hash_details
|
||||
supported_lookup_algorithms = hash_details["algorithms"]
|
||||
lookup_pepper = hash_details["lookup_pepper"]
|
||||
|
||||
# Check if any of the supported lookup algorithms are present
|
||||
if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
|
||||
# Perform a hashed lookup
|
||||
lookup_algorithm = LookupAlgorithm.SHA256
|
||||
|
||||
# Hash address, medium and the pepper with sha256
|
||||
to_hash = "%s %s %s" % (address, medium, lookup_pepper)
|
||||
lookup_value = sha256_and_url_safe_base64(to_hash)
|
||||
|
||||
elif LookupAlgorithm.NONE in supported_lookup_algorithms:
|
||||
# Perform a non-hashed lookup
|
||||
lookup_algorithm = LookupAlgorithm.NONE
|
||||
|
||||
# Combine together plaintext address and medium
|
||||
lookup_value = "%s %s" % (address, medium)
|
||||
|
||||
else:
|
||||
logger.warn(
|
||||
"None of the provided lookup algorithms of %s%s are supported: %s",
|
||||
id_server_scheme,
|
||||
id_server,
|
||||
hash_details["algorithms"],
|
||||
)
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Provided identity server does not support any v2 lookup "
|
||||
"algorithms that this homeserver supports.",
|
||||
)
|
||||
|
||||
try:
|
||||
lookup_results = yield self.simple_http_client.post_json_get_json(
|
||||
"%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
|
||||
{
|
||||
"addresses": [lookup_value],
|
||||
"algorithm": lookup_algorithm,
|
||||
"pepper": lookup_pepper,
|
||||
},
|
||||
)
|
||||
except (HttpResponseException, ValueError) as e:
|
||||
# Catch HttpResponseExcept for a non-200 response code
|
||||
# Catch ValueError for non-JSON response body
|
||||
logger.warn("Error when performing a 3pid lookup: %s" % (e,))
|
||||
return None
|
||||
|
||||
# Check for a mapping from what we looked up to an MXID
|
||||
if "mappings" not in lookup_results or not isinstance(
|
||||
lookup_results["mappings"], dict
|
||||
):
|
||||
logger.debug("No results from 3pid lookup")
|
||||
return None
|
||||
|
||||
# Return the MXID if it's available, or None otherwise
|
||||
mxid = lookup_results["mappings"].get(lookup_value)
|
||||
return mxid
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _verify_any_signature(self, data, server_hostname):
|
||||
if server_hostname not in data["signatures"]:
|
||||
@@ -962,9 +1077,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
)
|
||||
|
||||
if complexity:
|
||||
if complexity["v1"] > max_complexity:
|
||||
return True
|
||||
return False
|
||||
return complexity["v1"] > max_complexity
|
||||
return None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -980,10 +1093,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
max_complexity = self.hs.config.limit_remote_rooms.complexity
|
||||
complexity = yield self.store.get_room_complexity(room_id)
|
||||
|
||||
if complexity["v1"] > max_complexity:
|
||||
return True
|
||||
|
||||
return False
|
||||
return complexity["v1"] > max_complexity
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
|
||||
|
||||
+139
-88
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
@@ -65,43 +62,60 @@ class StatsHandler(StateDeltasHandler):
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
if self._is_processing:
|
||||
return
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
yield lock.acquire()
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
self._is_processing = False
|
||||
yield lock.release()
|
||||
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
if not lock.locked:
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
|
||||
# but note that this might be outdated, so we retrieve the positions again.
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
# If still contains a None position, then the stats regenerator hasn't started yet
|
||||
if None in self.pos.values():
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
return
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
with Measure(self.clock, "stats_total_events_and_bytes"):
|
||||
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
|
||||
self.pos
|
||||
)
|
||||
|
||||
if not deltas and not had_counts:
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
@@ -119,7 +133,7 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
@@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler):
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
# Errr...
|
||||
logger.error(
|
||||
"event ID is None and so is the previous event ID. stream_id: %s",
|
||||
stream_id,
|
||||
)
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
@@ -143,92 +160,87 @@ class StatsHandler(StateDeltasHandler):
|
||||
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
|
||||
stream_pos
|
||||
)
|
||||
stream_timestamp = int(stream_timestamp)
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
# All the values in this dict are deltas (RELATIVE changes)
|
||||
room_stats_delta = {}
|
||||
is_newly_created = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = 1
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
prev_event_content = {}
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif membership == prev_membership:
|
||||
pass # noop
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = -1
|
||||
elif prev_membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
room_stats_delta["invited_members"] = -1
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
room_stats_delta["left_members"] = -1
|
||||
elif prev_membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
room_stats_delta["banned_members"] = -1
|
||||
else:
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError(
|
||||
"%r is not a valid prev_membership" % (prev_membership,)
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
if membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
room_stats_delta["joined_members"] = +1
|
||||
elif membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
room_stats_delta["invited_members"] = +1
|
||||
elif membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
room_stats_delta["left_members"] = +1
|
||||
elif membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
room_stats_delta["banned_members"] = +1
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
raise ValueError("%r is not a valid membership" % (membership,))
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
# this accounts for transitions like leave → ban and so on.
|
||||
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
|
||||
membership == Membership.JOIN
|
||||
)
|
||||
|
||||
if has_changed_joinedness:
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
stream_timestamp, "user", user_id, {field: delta}
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
@@ -246,28 +258,50 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
is_newly_created = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
@@ -290,6 +324,20 @@ class StatsHandler(StateDeltasHandler):
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if is_newly_created:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
@@ -308,10 +356,13 @@ class StatsHandler(StateDeltasHandler):
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -14,21 +14,21 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
import attr
|
||||
from netaddr import IPAddress
|
||||
from netaddr import AddrFormatError, IPAddress
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet.interfaces import IStreamClientEndpoint
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
||||
from twisted.web.client import Agent, HTTPConnectionPool
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IAgent
|
||||
from twisted.web.iweb import IAgent, IAgentEndpointFactory
|
||||
|
||||
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
||||
from synapse.http.federation.srv_resolver import Server, SrvResolver
|
||||
from synapse.http.federation.well_known_resolver import WellKnownResolver
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.util import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -36,8 +36,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@implementer(IAgent)
|
||||
class MatrixFederationAgent(object):
|
||||
"""An Agent-like thing which provides a `request` method which will look up a matrix
|
||||
server and send an HTTP request to it.
|
||||
"""An Agent-like thing which provides a `request` method which correctly
|
||||
handles resolving matrix server names when using matrix://. Handles standard
|
||||
https URIs as normal.
|
||||
|
||||
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
|
||||
|
||||
@@ -65,17 +66,19 @@ class MatrixFederationAgent(object):
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
self._tls_client_options_factory = tls_client_options_factory
|
||||
if _srv_resolver is None:
|
||||
_srv_resolver = SrvResolver()
|
||||
self._srv_resolver = _srv_resolver
|
||||
|
||||
self._pool = HTTPConnectionPool(reactor)
|
||||
self._pool.retryAutomatically = False
|
||||
self._pool.maxPersistentPerHost = 5
|
||||
self._pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
self._agent = Agent.usingEndpointFactory(
|
||||
self._reactor,
|
||||
MatrixHostnameEndpointFactory(
|
||||
reactor, tls_client_options_factory, _srv_resolver
|
||||
),
|
||||
pool=self._pool,
|
||||
)
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
self._reactor,
|
||||
@@ -93,19 +96,15 @@ class MatrixFederationAgent(object):
|
||||
"""
|
||||
Args:
|
||||
method (bytes): HTTP method: GET/POST/etc
|
||||
|
||||
uri (bytes): Absolute URI to be retrieved
|
||||
|
||||
headers (twisted.web.http_headers.Headers|None):
|
||||
HTTP headers to send with the request, or None to
|
||||
send no extra headers.
|
||||
|
||||
bodyProducer (twisted.web.iweb.IBodyProducer|None):
|
||||
An object which can generate bytes to make up the
|
||||
body of this request (for example, the properly encoded contents of
|
||||
a file for a file upload). Or None if the request is to have
|
||||
no body.
|
||||
|
||||
Returns:
|
||||
Deferred[twisted.web.iweb.IResponse]:
|
||||
fires when the header of the response has been received (regardless of the
|
||||
@@ -113,210 +112,207 @@ class MatrixFederationAgent(object):
|
||||
response from being received (including problems that prevent the request
|
||||
from being sent).
|
||||
"""
|
||||
parsed_uri = URI.fromBytes(uri, defaultPort=-1)
|
||||
res = yield self._route_matrix_uri(parsed_uri)
|
||||
# We use urlparse as that will set `port` to None if there is no
|
||||
# explicit port.
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# set up the TLS connection params
|
||||
# If this is a matrix:// URI check if the server has delegated matrix
|
||||
# traffic using well-known delegation.
|
||||
#
|
||||
# XXX disabling TLS is really only supported here for the benefit of the
|
||||
# unit tests. We should make the UTs cope with TLS rather than having to make
|
||||
# the code support the unit tests.
|
||||
if self._tls_client_options_factory is None:
|
||||
tls_options = None
|
||||
else:
|
||||
tls_options = self._tls_client_options_factory.get_options(
|
||||
res.tls_server_name.decode("ascii")
|
||||
# We have to do this here and not in the endpoint as we need to rewrite
|
||||
# the host header with the delegated server name.
|
||||
delegated_server = None
|
||||
if (
|
||||
parsed_uri.scheme == b"matrix"
|
||||
and not _is_ip_literal(parsed_uri.hostname)
|
||||
and not parsed_uri.port
|
||||
):
|
||||
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||
parsed_uri.hostname
|
||||
)
|
||||
delegated_server = well_known_result.delegated_server
|
||||
|
||||
# make sure that the Host header is set correctly
|
||||
if delegated_server:
|
||||
# Ok, the server has delegated matrix traffic to somewhere else, so
|
||||
# lets rewrite the URL to replace the server with the delegated
|
||||
# server name.
|
||||
uri = urllib.parse.urlunparse(
|
||||
(
|
||||
parsed_uri.scheme,
|
||||
delegated_server,
|
||||
parsed_uri.path,
|
||||
parsed_uri.params,
|
||||
parsed_uri.query,
|
||||
parsed_uri.fragment,
|
||||
)
|
||||
)
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# We need to make sure the host header is set to the netloc of the
|
||||
# server.
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
else:
|
||||
headers = headers.copy()
|
||||
|
||||
if not headers.hasHeader(b"host"):
|
||||
headers.addRawHeader(b"host", res.host_header)
|
||||
headers.addRawHeader(b"host", parsed_uri.netloc)
|
||||
|
||||
class EndpointFactory(object):
|
||||
@staticmethod
|
||||
def endpointForURI(_uri):
|
||||
ep = LoggingHostnameEndpoint(
|
||||
self._reactor, res.target_host, res.target_port
|
||||
)
|
||||
if tls_options is not None:
|
||||
ep = wrapClientTLS(tls_options, ep)
|
||||
return ep
|
||||
|
||||
agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
|
||||
res = yield make_deferred_yieldable(
|
||||
agent.request(method, uri, headers, bodyProducer)
|
||||
self._agent.request(method, uri, headers, bodyProducer)
|
||||
)
|
||||
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
|
||||
"""Helper for `request`: determine the routing for a Matrix URI
|
||||
|
||||
Args:
|
||||
parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
|
||||
parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
|
||||
if there is no explicit port given.
|
||||
@implementer(IAgentEndpointFactory)
|
||||
class MatrixHostnameEndpointFactory(object):
|
||||
"""Factory for MatrixHostnameEndpoint for parsing to an Agent.
|
||||
"""
|
||||
|
||||
lookup_well_known (bool): True if we should look up the .well-known file if
|
||||
there is no SRV record.
|
||||
def __init__(self, reactor, tls_client_options_factory, srv_resolver):
|
||||
self._reactor = reactor
|
||||
self._tls_client_options_factory = tls_client_options_factory
|
||||
|
||||
Returns:
|
||||
Deferred[_RoutingResult]
|
||||
"""
|
||||
# check for an IP literal
|
||||
try:
|
||||
ip_address = IPAddress(parsed_uri.host.decode("ascii"))
|
||||
except Exception:
|
||||
# not an IP address
|
||||
ip_address = None
|
||||
if srv_resolver is None:
|
||||
srv_resolver = SrvResolver()
|
||||
|
||||
if ip_address:
|
||||
port = parsed_uri.port
|
||||
if port == -1:
|
||||
port = 8448
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=parsed_uri.host,
|
||||
target_port=port,
|
||||
)
|
||||
self._srv_resolver = srv_resolver
|
||||
|
||||
if parsed_uri.port != -1:
|
||||
# there is an explicit port
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=parsed_uri.host,
|
||||
target_port=parsed_uri.port,
|
||||
)
|
||||
|
||||
if lookup_well_known:
|
||||
# try a .well-known lookup
|
||||
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||
parsed_uri.host
|
||||
)
|
||||
well_known_server = well_known_result.delegated_server
|
||||
|
||||
if well_known_server:
|
||||
# if we found a .well-known, start again, but don't do another
|
||||
# .well-known lookup.
|
||||
|
||||
# parse the server name in the .well-known response into host/port.
|
||||
# (This code is lifted from twisted.web.client.URI.fromBytes).
|
||||
if b":" in well_known_server:
|
||||
well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
|
||||
try:
|
||||
well_known_port = int(well_known_port)
|
||||
except ValueError:
|
||||
# the part after the colon could not be parsed as an int
|
||||
# - we assume it is an IPv6 literal with no port (the closing
|
||||
# ']' stops it being parsed as an int)
|
||||
well_known_host, well_known_port = well_known_server, -1
|
||||
else:
|
||||
well_known_host, well_known_port = well_known_server, -1
|
||||
|
||||
new_uri = URI(
|
||||
scheme=parsed_uri.scheme,
|
||||
netloc=well_known_server,
|
||||
host=well_known_host,
|
||||
port=well_known_port,
|
||||
path=parsed_uri.path,
|
||||
params=parsed_uri.params,
|
||||
query=parsed_uri.query,
|
||||
fragment=parsed_uri.fragment,
|
||||
)
|
||||
|
||||
res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
|
||||
return res
|
||||
|
||||
# try a SRV lookup
|
||||
service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
|
||||
server_list = yield self._srv_resolver.resolve_service(service_name)
|
||||
|
||||
if not server_list:
|
||||
target_host = parsed_uri.host
|
||||
port = 8448
|
||||
logger.debug(
|
||||
"No SRV record for %s, using %s:%i",
|
||||
parsed_uri.host.decode("ascii"),
|
||||
target_host.decode("ascii"),
|
||||
port,
|
||||
)
|
||||
else:
|
||||
target_host, port = pick_server_from_list(server_list)
|
||||
logger.debug(
|
||||
"Picked %s:%i from SRV records for %s",
|
||||
target_host.decode("ascii"),
|
||||
port,
|
||||
parsed_uri.host.decode("ascii"),
|
||||
)
|
||||
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=target_host,
|
||||
target_port=port,
|
||||
def endpointForURI(self, parsed_uri):
|
||||
return MatrixHostnameEndpoint(
|
||||
self._reactor,
|
||||
self._tls_client_options_factory,
|
||||
self._srv_resolver,
|
||||
parsed_uri,
|
||||
)
|
||||
|
||||
|
||||
@implementer(IStreamClientEndpoint)
|
||||
class LoggingHostnameEndpoint(object):
|
||||
"""A wrapper for HostnameEndpint which logs when it connects"""
|
||||
class MatrixHostnameEndpoint(object):
|
||||
"""An endpoint that resolves matrix:// URLs using Matrix server name
|
||||
resolution (i.e. via SRV). Does not check for well-known delegation.
|
||||
|
||||
def __init__(self, reactor, host, port, *args, **kwargs):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
|
||||
Args:
|
||||
reactor (IReactor)
|
||||
tls_client_options_factory (ClientTLSOptionsFactory|None):
|
||||
factory to use for fetching client tls options, or none to disable TLS.
|
||||
srv_resolver (SrvResolver): The SRV resolver to use
|
||||
parsed_uri (twisted.web.client.URI): The parsed URI that we're wanting
|
||||
to connect to.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, tls_client_options_factory, srv_resolver, parsed_uri):
|
||||
self._reactor = reactor
|
||||
|
||||
self._parsed_uri = parsed_uri
|
||||
|
||||
# set up the TLS connection params
|
||||
#
|
||||
# XXX disabling TLS is really only supported here for the benefit of the
|
||||
# unit tests. We should make the UTs cope with TLS rather than having to make
|
||||
# the code support the unit tests.
|
||||
|
||||
if tls_client_options_factory is None:
|
||||
self._tls_options = None
|
||||
else:
|
||||
self._tls_options = tls_client_options_factory.get_options(
|
||||
self._parsed_uri.host.decode("ascii")
|
||||
)
|
||||
|
||||
self._srv_resolver = srv_resolver
|
||||
|
||||
def connect(self, protocol_factory):
|
||||
logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
|
||||
return self.ep.connect(protocol_factory)
|
||||
"""Implements IStreamClientEndpoint interface
|
||||
"""
|
||||
|
||||
return run_in_background(self._do_connect, protocol_factory)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_connect(self, protocol_factory):
|
||||
first_exception = None
|
||||
|
||||
server_list = yield self._resolve_server()
|
||||
|
||||
for server in server_list:
|
||||
host = server.host
|
||||
port = server.port
|
||||
|
||||
try:
|
||||
logger.info("Connecting to %s:%i", host.decode("ascii"), port)
|
||||
endpoint = HostnameEndpoint(self._reactor, host, port)
|
||||
if self._tls_options:
|
||||
endpoint = wrapClientTLS(self._tls_options, endpoint)
|
||||
result = yield make_deferred_yieldable(
|
||||
endpoint.connect(protocol_factory)
|
||||
)
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
|
||||
)
|
||||
if not first_exception:
|
||||
first_exception = e
|
||||
|
||||
# We return the first failure because that's probably the most interesting.
|
||||
if first_exception:
|
||||
raise first_exception
|
||||
|
||||
# This shouldn't happen as we should always have at least one host/port
|
||||
# to try and if that doesn't work then we'll have an exception.
|
||||
raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _resolve_server(self):
|
||||
"""Resolves the server name to a list of hosts and ports to attempt to
|
||||
connect to.
|
||||
|
||||
Returns:
|
||||
Deferred[list[Server]]
|
||||
"""
|
||||
|
||||
if self._parsed_uri.scheme != b"matrix":
|
||||
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
|
||||
|
||||
# Note: We don't do well-known lookup as that needs to have happened
|
||||
# before now, due to needing to rewrite the Host header of the HTTP
|
||||
# request.
|
||||
|
||||
# We reparse the URI so that defaultPort is -1 rather than 80
|
||||
parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
|
||||
|
||||
host = parsed_uri.hostname
|
||||
port = parsed_uri.port
|
||||
|
||||
# If there is an explicit port or the host is an IP address we bypass
|
||||
# SRV lookups and just use the given host/port.
|
||||
if port or _is_ip_literal(host):
|
||||
return [Server(host, port or 8448)]
|
||||
|
||||
server_list = yield self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
|
||||
|
||||
if server_list:
|
||||
return server_list
|
||||
|
||||
# No SRV records, so we fallback to host and 8448
|
||||
return [Server(host, 8448)]
|
||||
|
||||
|
||||
@attr.s
|
||||
class _RoutingResult(object):
|
||||
"""The result returned by `_route_matrix_uri`.
|
||||
def _is_ip_literal(host):
|
||||
"""Test if the given host name is either an IPv4 or IPv6 literal.
|
||||
|
||||
Contains the parameters needed to direct a federation connection to a particular
|
||||
server.
|
||||
Args:
|
||||
host (bytes)
|
||||
|
||||
Where a SRV record points to several servers, this object contains a single server
|
||||
chosen from the list.
|
||||
Returns:
|
||||
bool
|
||||
"""
|
||||
|
||||
host_header = attr.ib()
|
||||
"""
|
||||
The value we should assign to the Host header (host:port from the matrix
|
||||
URI, or .well-known).
|
||||
host = host.decode("ascii")
|
||||
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
tls_server_name = attr.ib()
|
||||
"""
|
||||
The server name we should set in the SNI (typically host, without port, from the
|
||||
matrix URI or .well-known)
|
||||
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
target_host = attr.ib()
|
||||
"""
|
||||
The hostname (or IP literal) we should route the TCP connection to (the target of the
|
||||
SRV record, or the hostname from the URL/.well-known)
|
||||
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
target_port = attr.ib()
|
||||
"""
|
||||
The port we should route the TCP connection to (the target of the SRV record, or
|
||||
the port from the URL/.well-known, or 8448)
|
||||
|
||||
:type: int
|
||||
"""
|
||||
try:
|
||||
IPAddress(host)
|
||||
return True
|
||||
except AddrFormatError:
|
||||
return False
|
||||
|
||||
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
|
||||
SERVER_CACHE = {}
|
||||
|
||||
|
||||
@attr.s
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class Server(object):
|
||||
"""
|
||||
Our record of an individual server which can be tried to reach a destination.
|
||||
@@ -53,34 +53,47 @@ class Server(object):
|
||||
expires = attr.ib(default=0)
|
||||
|
||||
|
||||
def pick_server_from_list(server_list):
|
||||
"""Randomly choose a server from the server list
|
||||
|
||||
Args:
|
||||
server_list (list[Server]): list of candidate servers
|
||||
|
||||
Returns:
|
||||
Tuple[bytes, int]: (host, port) pair for the chosen server
|
||||
def _sort_server_list(server_list):
|
||||
"""Given a list of SRV records sort them into priority order and shuffle
|
||||
each priority with the given weight.
|
||||
"""
|
||||
if not server_list:
|
||||
raise RuntimeError("pick_server_from_list called with empty list")
|
||||
priority_map = {}
|
||||
|
||||
# TODO: currently we only use the lowest-priority servers. We should maintain a
|
||||
# cache of servers known to be "down" and filter them out
|
||||
for server in server_list:
|
||||
priority_map.setdefault(server.priority, []).append(server)
|
||||
|
||||
min_priority = min(s.priority for s in server_list)
|
||||
eligible_servers = list(s for s in server_list if s.priority == min_priority)
|
||||
total_weight = sum(s.weight for s in eligible_servers)
|
||||
target_weight = random.randint(0, total_weight)
|
||||
results = []
|
||||
for priority in sorted(priority_map):
|
||||
servers = priority_map[priority]
|
||||
|
||||
for s in eligible_servers:
|
||||
target_weight -= s.weight
|
||||
# This algorithms roughly follows the algorithm described in RFC2782,
|
||||
# changed to remove an off-by-one error.
|
||||
#
|
||||
# N.B. Weights can be zero, which means that they should be picked
|
||||
# rarely.
|
||||
|
||||
if target_weight <= 0:
|
||||
return s.host, s.port
|
||||
total_weight = sum(s.weight for s in servers)
|
||||
|
||||
# this should be impossible.
|
||||
raise RuntimeError("pick_server_from_list got to end of eligible server list.")
|
||||
# Total weight can become zero if there are only zero weight servers
|
||||
# left, which we handle by just shuffling and appending to the results.
|
||||
while servers and total_weight:
|
||||
target_weight = random.randint(1, total_weight)
|
||||
|
||||
for s in servers:
|
||||
target_weight -= s.weight
|
||||
|
||||
if target_weight <= 0:
|
||||
break
|
||||
|
||||
results.append(s)
|
||||
servers.remove(s)
|
||||
total_weight -= s.weight
|
||||
|
||||
if servers:
|
||||
random.shuffle(servers)
|
||||
results.extend(servers)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class SrvResolver(object):
|
||||
@@ -120,7 +133,7 @@ class SrvResolver(object):
|
||||
if cache_entry:
|
||||
if all(s.expires > now for s in cache_entry):
|
||||
servers = list(cache_entry)
|
||||
return servers
|
||||
return _sort_server_list(servers)
|
||||
|
||||
try:
|
||||
answers, _, _ = yield make_deferred_yieldable(
|
||||
@@ -169,4 +182,4 @@ class SrvResolver(object):
|
||||
)
|
||||
|
||||
self._cache[service_name] = list(servers)
|
||||
return servers
|
||||
return _sort_server_list(servers)
|
||||
|
||||
@@ -0,0 +1,374 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os.path
|
||||
import sys
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
import attr
|
||||
from constantly import NamedConstant, Names, ValueConstant, Values
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.logger import (
|
||||
FileLogObserver,
|
||||
FilteringLogObserver,
|
||||
ILogObserver,
|
||||
LogBeginner,
|
||||
Logger,
|
||||
LogLevel,
|
||||
LogLevelFilterPredicate,
|
||||
LogPublisher,
|
||||
eventAsText,
|
||||
globalLogBeginner,
|
||||
jsonFileLogObserver,
|
||||
)
|
||||
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.logging._terse_json import (
|
||||
TerseJSONToConsoleLogObserver,
|
||||
TerseJSONToTCPLogObserver,
|
||||
)
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
|
||||
def stdlib_log_level_to_twisted(level: str) -> LogLevel:
|
||||
"""
|
||||
Convert a stdlib log level to Twisted's log level.
|
||||
"""
|
||||
lvl = level.lower().replace("warning", "warn")
|
||||
return LogLevel.levelWithName(lvl)
|
||||
|
||||
|
||||
@attr.s
|
||||
@implementer(ILogObserver)
|
||||
class LogContextObserver(object):
|
||||
"""
|
||||
An ILogObserver which adds Synapse-specific log context information.
|
||||
|
||||
Attributes:
|
||||
observer (ILogObserver): The target parent observer.
|
||||
"""
|
||||
|
||||
observer = attr.ib()
|
||||
|
||||
def __call__(self, event: dict) -> None:
|
||||
"""
|
||||
Consume a log event and emit it to the parent observer after filtering
|
||||
and adding log context information.
|
||||
|
||||
Args:
|
||||
event (dict)
|
||||
"""
|
||||
# Filter out some useless events that Twisted outputs
|
||||
if "log_text" in event:
|
||||
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
|
||||
return
|
||||
|
||||
if event["log_text"].startswith("(UDP Port "):
|
||||
return
|
||||
|
||||
if event["log_text"].startswith("Timing out client") or event[
|
||||
"log_format"
|
||||
].startswith("Timing out client"):
|
||||
return
|
||||
|
||||
context = LoggingContext.current_context()
|
||||
|
||||
# Copy the context information to the log event.
|
||||
if context is not None:
|
||||
context.copy_to_twisted_log_entry(event)
|
||||
else:
|
||||
# If there's no logging context, not even the root one, we might be
|
||||
# starting up or it might be from non-Synapse code. Log it as if it
|
||||
# came from the root logger.
|
||||
event["request"] = None
|
||||
event["scope"] = None
|
||||
|
||||
self.observer(event)
|
||||
|
||||
|
||||
class PythonStdlibToTwistedLogger(logging.Handler):
|
||||
"""
|
||||
Transform a Python stdlib log message into a Twisted one.
|
||||
"""
|
||||
|
||||
def __init__(self, observer, *args, **kwargs):
|
||||
"""
|
||||
Args:
|
||||
observer (ILogObserver): A Twisted logging observer.
|
||||
*args, **kwargs: Args/kwargs to be passed to logging.Handler.
|
||||
"""
|
||||
self.observer = observer
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
"""
|
||||
Emit a record to Twisted's observer.
|
||||
|
||||
Args:
|
||||
record (logging.LogRecord)
|
||||
"""
|
||||
|
||||
self.observer(
|
||||
{
|
||||
"log_time": record.created,
|
||||
"log_text": record.getMessage(),
|
||||
"log_format": "{log_text}",
|
||||
"log_namespace": record.name,
|
||||
"log_level": stdlib_log_level_to_twisted(record.levelname),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver:
|
||||
"""
|
||||
A log observer that formats events like the traditional log formatter and
|
||||
sends them to `outFile`.
|
||||
|
||||
Args:
|
||||
outFile (file object): The file object to write to.
|
||||
"""
|
||||
|
||||
def formatEvent(_event: dict) -> str:
|
||||
event = dict(_event)
|
||||
event["log_level"] = event["log_level"].name.upper()
|
||||
event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
|
||||
event.get("log_format", "{log_text}") or "{log_text}"
|
||||
)
|
||||
return eventAsText(event, includeSystem=False) + "\n"
|
||||
|
||||
return FileLogObserver(outFile, formatEvent)
|
||||
|
||||
|
||||
class DrainType(Names):
|
||||
CONSOLE = NamedConstant()
|
||||
CONSOLE_JSON = NamedConstant()
|
||||
CONSOLE_JSON_TERSE = NamedConstant()
|
||||
FILE = NamedConstant()
|
||||
FILE_JSON = NamedConstant()
|
||||
NETWORK_JSON_TERSE = NamedConstant()
|
||||
|
||||
|
||||
class OutputPipeType(Values):
|
||||
stdout = ValueConstant(sys.__stdout__)
|
||||
stderr = ValueConstant(sys.__stderr__)
|
||||
|
||||
|
||||
@attr.s
|
||||
class DrainConfiguration(object):
|
||||
name = attr.ib()
|
||||
type = attr.ib()
|
||||
location = attr.ib()
|
||||
options = attr.ib(default=None)
|
||||
|
||||
|
||||
@attr.s
|
||||
class NetworkJSONTerseOptions(object):
|
||||
maximum_buffer = attr.ib(type=int)
|
||||
|
||||
|
||||
DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
|
||||
|
||||
|
||||
def parse_drain_configs(
|
||||
drains: dict
|
||||
) -> typing.Generator[DrainConfiguration, None, None]:
|
||||
"""
|
||||
Parse the drain configurations.
|
||||
|
||||
Args:
|
||||
drains (dict): A list of drain configurations.
|
||||
|
||||
Yields:
|
||||
DrainConfiguration instances.
|
||||
|
||||
Raises:
|
||||
ConfigError: If any of the drain configuration items are invalid.
|
||||
"""
|
||||
for name, config in drains.items():
|
||||
if "type" not in config:
|
||||
raise ConfigError("Logging drains require a 'type' key.")
|
||||
|
||||
try:
|
||||
logging_type = DrainType.lookupByName(config["type"].upper())
|
||||
except ValueError:
|
||||
raise ConfigError(
|
||||
"%s is not a known logging drain type." % (config["type"],)
|
||||
)
|
||||
|
||||
if logging_type in [
|
||||
DrainType.CONSOLE,
|
||||
DrainType.CONSOLE_JSON,
|
||||
DrainType.CONSOLE_JSON_TERSE,
|
||||
]:
|
||||
location = config.get("location")
|
||||
if location is None or location not in ["stdout", "stderr"]:
|
||||
raise ConfigError(
|
||||
(
|
||||
"The %s drain needs the 'location' key set to "
|
||||
"either 'stdout' or 'stderr'."
|
||||
)
|
||||
% (logging_type,)
|
||||
)
|
||||
|
||||
pipe = OutputPipeType.lookupByName(location).value
|
||||
|
||||
yield DrainConfiguration(name=name, type=logging_type, location=pipe)
|
||||
|
||||
elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
|
||||
if "location" not in config:
|
||||
raise ConfigError(
|
||||
"The %s drain needs the 'location' key set." % (logging_type,)
|
||||
)
|
||||
|
||||
location = config.get("location")
|
||||
if os.path.abspath(location) != location:
|
||||
raise ConfigError(
|
||||
"File paths need to be absolute, '%s' is a relative path"
|
||||
% (location,)
|
||||
)
|
||||
yield DrainConfiguration(name=name, type=logging_type, location=location)
|
||||
|
||||
elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
|
||||
host = config.get("host")
|
||||
port = config.get("port")
|
||||
maximum_buffer = config.get("maximum_buffer", 1000)
|
||||
yield DrainConfiguration(
|
||||
name=name,
|
||||
type=logging_type,
|
||||
location=(host, port),
|
||||
options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
|
||||
)
|
||||
|
||||
else:
|
||||
raise ConfigError(
|
||||
"The %s drain type is currently not implemented."
|
||||
% (config["type"].upper(),)
|
||||
)
|
||||
|
||||
|
||||
def setup_structured_logging(
|
||||
hs,
|
||||
config,
|
||||
log_config: dict,
|
||||
logBeginner: LogBeginner = globalLogBeginner,
|
||||
redirect_stdlib_logging: bool = True,
|
||||
) -> LogPublisher:
|
||||
"""
|
||||
Set up Twisted's structured logging system.
|
||||
|
||||
Args:
|
||||
hs: The homeserver to use.
|
||||
config (HomeserverConfig): The configuration of the Synapse homeserver.
|
||||
log_config (dict): The log configuration to use.
|
||||
"""
|
||||
if config.no_redirect_stdio:
|
||||
raise ConfigError(
|
||||
"no_redirect_stdio cannot be defined using structured logging."
|
||||
)
|
||||
|
||||
logger = Logger()
|
||||
|
||||
if "drains" not in log_config:
|
||||
raise ConfigError("The logging configuration requires a list of drains.")
|
||||
|
||||
observers = []
|
||||
|
||||
for observer in parse_drain_configs(log_config["drains"]):
|
||||
# Pipe drains
|
||||
if observer.type == DrainType.CONSOLE:
|
||||
logger.debug(
|
||||
"Starting up the {name} console logger drain", name=observer.name
|
||||
)
|
||||
observers.append(SynapseFileLogObserver(observer.location))
|
||||
elif observer.type == DrainType.CONSOLE_JSON:
|
||||
logger.debug(
|
||||
"Starting up the {name} JSON console logger drain", name=observer.name
|
||||
)
|
||||
observers.append(jsonFileLogObserver(observer.location))
|
||||
elif observer.type == DrainType.CONSOLE_JSON_TERSE:
|
||||
logger.debug(
|
||||
"Starting up the {name} terse JSON console logger drain",
|
||||
name=observer.name,
|
||||
)
|
||||
observers.append(
|
||||
TerseJSONToConsoleLogObserver(observer.location, metadata={})
|
||||
)
|
||||
|
||||
# File drains
|
||||
elif observer.type == DrainType.FILE:
|
||||
logger.debug("Starting up the {name} file logger drain", name=observer.name)
|
||||
log_file = open(observer.location, "at", buffering=1, encoding="utf8")
|
||||
observers.append(SynapseFileLogObserver(log_file))
|
||||
elif observer.type == DrainType.FILE_JSON:
|
||||
logger.debug(
|
||||
"Starting up the {name} JSON file logger drain", name=observer.name
|
||||
)
|
||||
log_file = open(observer.location, "at", buffering=1, encoding="utf8")
|
||||
observers.append(jsonFileLogObserver(log_file))
|
||||
|
||||
elif observer.type == DrainType.NETWORK_JSON_TERSE:
|
||||
metadata = {"server_name": hs.config.server_name}
|
||||
log_observer = TerseJSONToTCPLogObserver(
|
||||
hs=hs,
|
||||
host=observer.location[0],
|
||||
port=observer.location[1],
|
||||
metadata=metadata,
|
||||
maximum_buffer=observer.options.maximum_buffer,
|
||||
)
|
||||
log_observer.start()
|
||||
observers.append(log_observer)
|
||||
else:
|
||||
# We should never get here, but, just in case, throw an error.
|
||||
raise ConfigError("%s drain type cannot be configured" % (observer.type,))
|
||||
|
||||
publisher = LogPublisher(*observers)
|
||||
log_filter = LogLevelFilterPredicate()
|
||||
|
||||
for namespace, namespace_config in log_config.get(
|
||||
"loggers", DEFAULT_LOGGERS
|
||||
).items():
|
||||
# Set the log level for twisted.logger.Logger namespaces
|
||||
log_filter.setLogLevelForNamespace(
|
||||
namespace,
|
||||
stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
|
||||
)
|
||||
|
||||
# Also set the log levels for the stdlib logger namespaces, to prevent
|
||||
# them getting to PythonStdlibToTwistedLogger and having to be formatted
|
||||
if "level" in namespace_config:
|
||||
logging.getLogger(namespace).setLevel(namespace_config.get("level"))
|
||||
|
||||
f = FilteringLogObserver(publisher, [log_filter])
|
||||
lco = LogContextObserver(f)
|
||||
|
||||
if redirect_stdlib_logging:
|
||||
stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
|
||||
stdliblogger = logging.getLogger()
|
||||
stdliblogger.addHandler(stuff_into_twisted)
|
||||
|
||||
# Always redirect standard I/O, otherwise other logging outputs might miss
|
||||
# it.
|
||||
logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
|
||||
|
||||
return publisher
|
||||
|
||||
|
||||
def reload_structured_logging(*args, log_config=None) -> None:
|
||||
warnings.warn(
|
||||
"Currently the structured logging system can not be reloaded, doing nothing"
|
||||
)
|
||||
@@ -0,0 +1,278 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Log formatters that output terse JSON.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from collections import deque
|
||||
from ipaddress import IPv4Address, IPv6Address, ip_address
|
||||
from math import floor
|
||||
from typing.io import TextIO
|
||||
|
||||
import attr
|
||||
from simplejson import dumps
|
||||
|
||||
from twisted.application.internet import ClientService
|
||||
from twisted.internet.endpoints import (
|
||||
HostnameEndpoint,
|
||||
TCP4ClientEndpoint,
|
||||
TCP6ClientEndpoint,
|
||||
)
|
||||
from twisted.internet.protocol import Factory, Protocol
|
||||
from twisted.logger import FileLogObserver, Logger
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
|
||||
def flatten_event(event: dict, metadata: dict, include_time: bool = False):
|
||||
"""
|
||||
Flatten a Twisted logging event to an dictionary capable of being sent
|
||||
as a log event to a logging aggregation system.
|
||||
|
||||
The format is vastly simplified and is not designed to be a "human readable
|
||||
string" in the sense that traditional logs are. Instead, the structure is
|
||||
optimised for searchability and filtering, with human-understandable log
|
||||
keys.
|
||||
|
||||
Args:
|
||||
event (dict): The Twisted logging event we are flattening.
|
||||
metadata (dict): Additional data to include with each log message. This
|
||||
can be information like the server name. Since the target log
|
||||
consumer does not know who we are other than by host IP, this
|
||||
allows us to forward through static information.
|
||||
include_time (bool): Should we include the `time` key? If False, the
|
||||
event time is stripped from the event.
|
||||
"""
|
||||
new_event = {}
|
||||
|
||||
# If it's a failure, make the new event's log_failure be the traceback text.
|
||||
if "log_failure" in event:
|
||||
new_event["log_failure"] = event["log_failure"].getTraceback()
|
||||
|
||||
# If it's a warning, copy over a string representation of the warning.
|
||||
if "warning" in event:
|
||||
new_event["warning"] = str(event["warning"])
|
||||
|
||||
# Stdlib logging events have "log_text" as their human-readable portion,
|
||||
# Twisted ones have "log_format". For now, include the log_format, so that
|
||||
# context only given in the log format (e.g. what is being logged) is
|
||||
# available.
|
||||
if "log_text" in event:
|
||||
new_event["log"] = event["log_text"]
|
||||
else:
|
||||
new_event["log"] = event["log_format"]
|
||||
|
||||
# We want to include the timestamp when forwarding over the network, but
|
||||
# exclude it when we are writing to stdout. This is because the log ingester
|
||||
# (e.g. logstash, fluentd) can add its own timestamp.
|
||||
if include_time:
|
||||
new_event["time"] = round(event["log_time"], 2)
|
||||
|
||||
# Convert the log level to a textual representation.
|
||||
new_event["level"] = event["log_level"].name.upper()
|
||||
|
||||
# Ignore these keys, and do not transfer them over to the new log object.
|
||||
# They are either useless (isError), transferred manually above (log_time,
|
||||
# log_level, etc), or contain Python objects which are not useful for output
|
||||
# (log_logger, log_source).
|
||||
keys_to_delete = [
|
||||
"isError",
|
||||
"log_failure",
|
||||
"log_format",
|
||||
"log_level",
|
||||
"log_logger",
|
||||
"log_source",
|
||||
"log_system",
|
||||
"log_time",
|
||||
"log_text",
|
||||
"observer",
|
||||
"warning",
|
||||
]
|
||||
|
||||
# If it's from the Twisted legacy logger (twisted.python.log), it adds some
|
||||
# more keys we want to purge.
|
||||
if event.get("log_namespace") == "log_legacy":
|
||||
keys_to_delete.extend(["message", "system", "time"])
|
||||
|
||||
# Rather than modify the dictionary in place, construct a new one with only
|
||||
# the content we want. The original event should be considered 'frozen'.
|
||||
for key in event.keys():
|
||||
|
||||
if key in keys_to_delete:
|
||||
continue
|
||||
|
||||
if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
|
||||
# If it's a plain type, include it as is.
|
||||
new_event[key] = event[key]
|
||||
else:
|
||||
# If it's not one of those basic types, write out a string
|
||||
# representation. This should probably be a warning in development,
|
||||
# so that we are sure we are only outputting useful data.
|
||||
new_event[key] = str(event[key])
|
||||
|
||||
# Add the metadata information to the event (e.g. the server_name).
|
||||
new_event.update(metadata)
|
||||
|
||||
return new_event
|
||||
|
||||
|
||||
def TerseJSONToConsoleLogObserver(outFile: TextIO, metadata: dict) -> FileLogObserver:
|
||||
"""
|
||||
A log observer that formats events to a flattened JSON representation.
|
||||
|
||||
Args:
|
||||
outFile: The file object to write to.
|
||||
metadata: Metadata to be added to each log object.
|
||||
"""
|
||||
|
||||
def formatEvent(_event: dict) -> str:
|
||||
flattened = flatten_event(_event, metadata)
|
||||
return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
|
||||
|
||||
return FileLogObserver(outFile, formatEvent)
|
||||
|
||||
|
||||
@attr.s
|
||||
class TerseJSONToTCPLogObserver(object):
|
||||
"""
|
||||
An IObserver that writes JSON logs to a TCP target.
|
||||
|
||||
Args:
|
||||
hs (HomeServer): The Homeserver that is being logged for.
|
||||
host: The host of the logging target.
|
||||
port: The logging target's port.
|
||||
metadata: Metadata to be added to each log entry.
|
||||
"""
|
||||
|
||||
hs = attr.ib()
|
||||
host = attr.ib(type=str)
|
||||
port = attr.ib(type=int)
|
||||
metadata = attr.ib(type=dict)
|
||||
maximum_buffer = attr.ib(type=int)
|
||||
_buffer = attr.ib(default=attr.Factory(deque), type=deque)
|
||||
_writer = attr.ib(default=None)
|
||||
_logger = attr.ib(default=attr.Factory(Logger))
|
||||
|
||||
def start(self) -> None:
|
||||
|
||||
# Connect without DNS lookups if it's a direct IP.
|
||||
try:
|
||||
ip = ip_address(self.host)
|
||||
if isinstance(ip, IPv4Address):
|
||||
endpoint = TCP4ClientEndpoint(
|
||||
self.hs.get_reactor(), self.host, self.port
|
||||
)
|
||||
elif isinstance(ip, IPv6Address):
|
||||
endpoint = TCP6ClientEndpoint(
|
||||
self.hs.get_reactor(), self.host, self.port
|
||||
)
|
||||
except ValueError:
|
||||
endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
|
||||
|
||||
factory = Factory.forProtocol(Protocol)
|
||||
self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
|
||||
self._service.startService()
|
||||
|
||||
def _write_loop(self) -> None:
|
||||
"""
|
||||
Implement the write loop.
|
||||
"""
|
||||
if self._writer:
|
||||
return
|
||||
|
||||
self._writer = self._service.whenConnected()
|
||||
|
||||
@self._writer.addBoth
|
||||
def writer(r):
|
||||
if isinstance(r, Failure):
|
||||
r.printTraceback(file=sys.__stderr__)
|
||||
self._writer = None
|
||||
self.hs.get_reactor().callLater(1, self._write_loop)
|
||||
return
|
||||
|
||||
try:
|
||||
for event in self._buffer:
|
||||
r.transport.write(
|
||||
dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
|
||||
"utf8"
|
||||
)
|
||||
)
|
||||
r.transport.write(b"\n")
|
||||
self._buffer.clear()
|
||||
except Exception as e:
|
||||
sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
|
||||
|
||||
self._writer = False
|
||||
self.hs.get_reactor().callLater(1, self._write_loop)
|
||||
|
||||
def _handle_pressure(self) -> None:
|
||||
"""
|
||||
Handle backpressure by shedding events.
|
||||
|
||||
The buffer will, in this order, until the buffer is below the maximum:
|
||||
- Shed DEBUG events
|
||||
- Shed INFO events
|
||||
- Shed the middle 50% of the events.
|
||||
"""
|
||||
if len(self._buffer) <= self.maximum_buffer:
|
||||
return
|
||||
|
||||
# Strip out DEBUGs
|
||||
self._buffer = deque(
|
||||
filter(lambda event: event["level"] != "DEBUG", self._buffer)
|
||||
)
|
||||
|
||||
if len(self._buffer) <= self.maximum_buffer:
|
||||
return
|
||||
|
||||
# Strip out INFOs
|
||||
self._buffer = deque(
|
||||
filter(lambda event: event["level"] != "INFO", self._buffer)
|
||||
)
|
||||
|
||||
if len(self._buffer) <= self.maximum_buffer:
|
||||
return
|
||||
|
||||
# Cut the middle entries out
|
||||
buffer_split = floor(self.maximum_buffer / 2)
|
||||
|
||||
old_buffer = self._buffer
|
||||
self._buffer = deque()
|
||||
|
||||
for i in range(buffer_split):
|
||||
self._buffer.append(old_buffer.popleft())
|
||||
|
||||
end_buffer = []
|
||||
for i in range(buffer_split):
|
||||
end_buffer.append(old_buffer.pop())
|
||||
|
||||
self._buffer.extend(reversed(end_buffer))
|
||||
|
||||
def __call__(self, event: dict) -> None:
|
||||
flattened = flatten_event(event, self.metadata, include_time=True)
|
||||
self._buffer.append(flattened)
|
||||
|
||||
# Handle backpressure, if it exists.
|
||||
try:
|
||||
self._handle_pressure()
|
||||
except Exception:
|
||||
# If handling backpressure fails,clear the buffer and log the
|
||||
# exception.
|
||||
self._buffer.clear()
|
||||
self._logger.failure("Failed clearing backpressure")
|
||||
|
||||
# Try and write immediately.
|
||||
self._write_loop()
|
||||
@@ -25,6 +25,7 @@ See doc/log_contexts.rst for details on how this works.
|
||||
import logging
|
||||
import threading
|
||||
import types
|
||||
from typing import Any, List
|
||||
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
@@ -194,7 +195,7 @@ class LoggingContext(object):
|
||||
class Sentinel(object):
|
||||
"""Sentinel to represent the root context"""
|
||||
|
||||
__slots__ = []
|
||||
__slots__ = [] # type: List[Any]
|
||||
|
||||
def __str__(self):
|
||||
return "sentinel"
|
||||
@@ -202,6 +203,10 @@ class LoggingContext(object):
|
||||
def copy_to(self, record):
|
||||
pass
|
||||
|
||||
def copy_to_twisted_log_entry(self, record):
|
||||
record["request"] = None
|
||||
record["scope"] = None
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
@@ -330,6 +335,13 @@ class LoggingContext(object):
|
||||
# we also track the current scope:
|
||||
record.scope = self.scope
|
||||
|
||||
def copy_to_twisted_log_entry(self, record):
|
||||
"""
|
||||
Copy logging fields from this context to a Twisted log record.
|
||||
"""
|
||||
record["request"] = self.request
|
||||
record["scope"] = self.scope
|
||||
|
||||
def start(self):
|
||||
if get_thread_id() != self.main_thread:
|
||||
logger.warning("Started logcontext %s on different thread", self)
|
||||
|
||||
@@ -47,9 +47,9 @@ REQUIREMENTS = [
|
||||
"idna>=2.5",
|
||||
# validating SSL certs for IP addresses requires service_identity 18.1.
|
||||
"service_identity>=18.1.0",
|
||||
# our logcontext handling relies on the ability to cancel inlineCallbacks
|
||||
# (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
|
||||
"Twisted>=18.7.0",
|
||||
# Twisted 18.9 introduces some logger improvements that the structured
|
||||
# logger utilises
|
||||
"Twisted>=18.9.0",
|
||||
"treq>=15.1",
|
||||
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
|
||||
"pyopenssl>=16.0.0",
|
||||
|
||||
@@ -44,6 +44,7 @@ from synapse.rest.admin._base import (
|
||||
from synapse.rest.admin.media import register_servlets_for_media_repo
|
||||
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.users import UserAdminServlet
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -51,7 +52,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UsersRestServlet(RestServlet):
|
||||
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
|
||||
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
@@ -742,6 +743,7 @@ def register_servlets(hs, http_server):
|
||||
PurgeRoomServlet(hs).register(http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
UserAdminServlet(hs).register(http_server)
|
||||
|
||||
|
||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import re
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.rest.admin import assert_requester_is_admin, assert_user_is_admin
|
||||
from synapse.types import UserID
|
||||
|
||||
|
||||
class UserAdminServlet(RestServlet):
|
||||
"""
|
||||
Get or set whether or not a user is a server administrator.
|
||||
|
||||
Note that only local users can be server administrators, and that an
|
||||
administrator may not demote themselves.
|
||||
|
||||
Only server administrators can use this API.
|
||||
|
||||
Examples:
|
||||
* Get
|
||||
GET /_synapse/admin/v1/users/@nonadmin:example.com/admin
|
||||
response on success:
|
||||
{
|
||||
"admin": false
|
||||
}
|
||||
* Set
|
||||
PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
|
||||
request body:
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
response on success:
|
||||
{}
|
||||
"""
|
||||
|
||||
PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
yield assert_requester_is_admin(self.auth, request)
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Only local users can be admins of this homeserver")
|
||||
|
||||
is_admin = yield self.handlers.admin_handler.get_user_server_admin(target_user)
|
||||
is_admin = bool(is_admin)
|
||||
|
||||
return (200, {"admin": is_admin})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
yield assert_user_is_admin(self.auth, requester.user)
|
||||
auth_user = requester.user
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
assert_params_in_dict(body, ["admin"])
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Only local users can be admins of this homeserver")
|
||||
|
||||
set_admin_to = bool(body["admin"])
|
||||
|
||||
if target_user == auth_user and not set_admin_to:
|
||||
raise SynapseError(400, "You may not demote yourself.")
|
||||
|
||||
yield self.handlers.admin_handler.set_user_server_admin(
|
||||
target_user, set_admin_to
|
||||
)
|
||||
|
||||
return (200, {})
|
||||
@@ -13,7 +13,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from signedjson.sign import sign_json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -95,6 +97,7 @@ class RemoteKey(DirectServeResource):
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
|
||||
self.config = hs.config
|
||||
|
||||
@wrap_json_request_handler
|
||||
async def _async_render_GET(self, request):
|
||||
@@ -214,15 +217,14 @@ class RemoteKey(DirectServeResource):
|
||||
yield self.fetcher.get_keys(cache_misses)
|
||||
yield self.query_keys(request, query, query_remote_on_cache_miss=False)
|
||||
else:
|
||||
result_io = BytesIO()
|
||||
result_io.write(b'{"server_keys":')
|
||||
sep = b"["
|
||||
for json_bytes in json_results:
|
||||
result_io.write(sep)
|
||||
result_io.write(json_bytes)
|
||||
sep = b","
|
||||
if sep == b"[":
|
||||
result_io.write(sep)
|
||||
result_io.write(b"]}")
|
||||
signed_keys = []
|
||||
for key_json in json_results:
|
||||
key_json = json.loads(key_json)
|
||||
for signing_key in self.config.key_server_signing_keys:
|
||||
key_json = sign_json(key_json, self.config.server_name, signing_key)
|
||||
|
||||
respond_with_json_bytes(request, 200, result_io.getvalue())
|
||||
signed_keys.append(key_json)
|
||||
|
||||
results = {"server_keys": signed_keys}
|
||||
|
||||
respond_with_json_bytes(request, 200, encode_canonical_json(results))
|
||||
|
||||
@@ -34,7 +34,7 @@ class WellKnownBuilder(object):
|
||||
self._config = hs.config
|
||||
|
||||
def get_well_known(self):
|
||||
# if we don't have a public_base_url, we can't help much here.
|
||||
# if we don't have a public_baseurl, we can't help much here.
|
||||
if self._config.public_baseurl is None:
|
||||
return None
|
||||
|
||||
|
||||
@@ -1395,14 +1395,22 @@ class SQLBaseStore(object):
|
||||
"""
|
||||
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
|
||||
|
||||
# We need to be careful that the size of the `members_changed` list
|
||||
# isn't so large that it causes problems sending over replication, so we
|
||||
# send them in chunks.
|
||||
# Max line length is 16K, and max user ID length is 255, so 50 should
|
||||
# be safe.
|
||||
for chunk in batch_iter(members_changed, 50):
|
||||
keys = itertools.chain([room_id], chunk)
|
||||
self._send_invalidation_to_replication(txn, _CURRENT_STATE_CACHE_NAME, keys)
|
||||
if members_changed:
|
||||
# We need to be careful that the size of the `members_changed` list
|
||||
# isn't so large that it causes problems sending over replication, so we
|
||||
# send them in chunks.
|
||||
# Max line length is 16K, and max user ID length is 255, so 50 should
|
||||
# be safe.
|
||||
for chunk in batch_iter(members_changed, 50):
|
||||
keys = itertools.chain([room_id], chunk)
|
||||
self._send_invalidation_to_replication(
|
||||
txn, _CURRENT_STATE_CACHE_NAME, keys
|
||||
)
|
||||
else:
|
||||
# if no members changed, we still need to invalidate the other caches.
|
||||
self._send_invalidation_to_replication(
|
||||
txn, _CURRENT_STATE_CACHE_NAME, [room_id]
|
||||
)
|
||||
|
||||
def _invalidate_state_caches(self, room_id, members_changed):
|
||||
"""Invalidates caches that are based on the current state, but does
|
||||
|
||||
@@ -2270,8 +2270,9 @@ class EventsStore(
|
||||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_state",
|
||||
"room_stats",
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_historical",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
|
||||
@@ -238,6 +238,13 @@ def _upgrade_existing_database(
|
||||
|
||||
logger.debug("applied_delta_files: %s", applied_delta_files)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
specific_engine_extension = ".postgres"
|
||||
else:
|
||||
specific_engine_extension = ".sqlite"
|
||||
|
||||
specific_engine_extensions = (".sqlite", ".postgres")
|
||||
|
||||
for v in range(start_ver, SCHEMA_VERSION + 1):
|
||||
logger.info("Upgrading schema to v%d", v)
|
||||
|
||||
@@ -274,15 +281,22 @@ def _upgrade_existing_database(
|
||||
# Sometimes .pyc files turn up anyway even though we've
|
||||
# disabled their generation; e.g. from distribution package
|
||||
# installers. Silently skip it
|
||||
pass
|
||||
continue
|
||||
elif ext == ".sql":
|
||||
# A plain old .sql file, just read and execute it
|
||||
logger.info("Applying schema %s", relative_path)
|
||||
executescript(cur, absolute_path)
|
||||
elif ext == specific_engine_extension and root_name.endswith(".sql"):
|
||||
# A .sql file specific to our engine; just read and execute it
|
||||
logger.info("Applying engine-specific schema %s", relative_path)
|
||||
executescript(cur, absolute_path)
|
||||
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
|
||||
# A .sql file for a different engine; skip it.
|
||||
continue
|
||||
else:
|
||||
# Not a valid delta file.
|
||||
logger.warn(
|
||||
"Found directory entry that did not end in .py or" " .sql: %s",
|
||||
logger.warning(
|
||||
"Found directory entry that did not end in .py or .sql: %s",
|
||||
relative_path,
|
||||
)
|
||||
continue
|
||||
@@ -290,7 +304,7 @@ def _upgrade_existing_database(
|
||||
# Mark as done.
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)"
|
||||
"INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
|
||||
),
|
||||
(v, relative_path),
|
||||
)
|
||||
@@ -298,7 +312,7 @@ def _upgrade_existing_database(
|
||||
cur.execute("DELETE FROM schema_version")
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
|
||||
),
|
||||
(v, True),
|
||||
)
|
||||
|
||||
@@ -56,6 +56,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
||||
"consent_server_notice_sent",
|
||||
"appservice_id",
|
||||
"creation_ts",
|
||||
"user_type",
|
||||
],
|
||||
allow_none=True,
|
||||
desc="get_user_by_id",
|
||||
@@ -272,6 +273,14 @@ class RegistrationWorkerStore(SQLBaseStore):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_server_admin(self, user):
|
||||
"""Determines if a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
|
||||
Returns (bool):
|
||||
true iff the user is a server admin, false otherwise.
|
||||
"""
|
||||
res = yield self._simple_select_one_onecol(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
@@ -282,6 +291,21 @@ class RegistrationWorkerStore(SQLBaseStore):
|
||||
|
||||
return res if res else False
|
||||
|
||||
def set_server_admin(self, user, admin):
|
||||
"""Sets whether a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
admin (bool): true iff the user is to be a server admin,
|
||||
false otherwise.
|
||||
"""
|
||||
return self._simple_update_one(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
updatevalues={"admin": 1 if admin else 0},
|
||||
desc="set_server_admin",
|
||||
)
|
||||
|
||||
def _query_for_auth(self, txn, token):
|
||||
sql = (
|
||||
"SELECT users.name, users.is_guest, access_tokens.id as token_id,"
|
||||
@@ -845,6 +869,17 @@ class RegistrationStore(
|
||||
(user_id_obj.localpart, create_profile_with_displayname),
|
||||
)
|
||||
|
||||
if self.hs.config.stats_enabled:
|
||||
# we create a new completed user statistics row
|
||||
|
||||
# we don't strictly need current_token since this user really can't
|
||||
# have any state deltas before now (as it is a new user), but still,
|
||||
# we include it for completeness.
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
self._update_stats_delta_txn(
|
||||
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||
txn.call_after(self.is_guest.invalidate, (user_id,))
|
||||
|
||||
@@ -1116,6 +1151,7 @@ class RegistrationStore(
|
||||
deferred str|None: A str representing a link to redirect the user
|
||||
to if there is one.
|
||||
"""
|
||||
|
||||
# Insert everything into a transaction in order to run atomically
|
||||
def validate_threepid_session_txn(txn):
|
||||
row = self._simple_select_one_txn(
|
||||
|
||||
@@ -0,0 +1,144 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
----- First clean up from previous versions of room stats.
|
||||
|
||||
-- First remove old stats stuff
|
||||
DROP TABLE IF EXISTS room_stats;
|
||||
DROP TABLE IF EXISTS user_stats;
|
||||
DROP TABLE IF EXISTS room_stats_earliest_tokens;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_position;
|
||||
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
|
||||
DROP TABLE IF EXISTS stats_stream_pos;
|
||||
|
||||
-- Unschedule old background updates if they're still scheduled
|
||||
DELETE FROM background_updates WHERE update_name IN (
|
||||
'populate_stats_createtables',
|
||||
'populate_stats_process_rooms',
|
||||
'populate_stats_cleanup'
|
||||
);
|
||||
|
||||
----- Create tables for our version of room stats.
|
||||
|
||||
-- single-row table to track position of incremental updates
|
||||
CREATE TABLE IF NOT EXISTS stats_incremental_position (
|
||||
-- the stream_id of the last-processed state delta
|
||||
state_delta_stream_id BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed backfilled event
|
||||
-- (this is negative)
|
||||
total_events_min_stream_ordering BIGINT,
|
||||
|
||||
-- the stream_ordering of the last-processed normally-created event
|
||||
-- (this is positive)
|
||||
total_events_max_stream_ordering BIGINT,
|
||||
|
||||
-- If true, this represents the contract agreed upon by the stats
|
||||
-- regenerator.
|
||||
-- If false, this is suitable for use by the delta/incremental processor.
|
||||
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
|
||||
);
|
||||
|
||||
-- insert a null row and make sure it is the only one.
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
|
||||
|
||||
-- represents PRESENT room statistics for a room
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS room_stats_current (
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
|
||||
-- represents HISTORICAL room statistics for a room
|
||||
CREATE TABLE IF NOT EXISTS room_stats_historical (
|
||||
room_id TEXT NOT NULL,
|
||||
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
|
||||
-- Note that end_ts is quantised.
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
current_state_events INT NOT NULL,
|
||||
total_events INT NOT NULL,
|
||||
total_event_bytes BIGINT NOT NULL,
|
||||
joined_members INT NOT NULL,
|
||||
invited_members INT NOT NULL,
|
||||
left_members INT NOT NULL,
|
||||
banned_members INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (room_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient room stats.
|
||||
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular room.)
|
||||
|
||||
|
||||
-- represents PRESENT statistics for a user
|
||||
-- only holds absolute fields
|
||||
CREATE TABLE IF NOT EXISTS user_stats_current (
|
||||
user_id TEXT NOT NULL PRIMARY KEY,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
-- If initial stats regen is still to be performed: NULL
|
||||
-- If initial stats regen has been performed: the maximum delta stream
|
||||
-- position that this row takes into account.
|
||||
completed_delta_stream_id BIGINT
|
||||
);
|
||||
|
||||
-- represents HISTORICAL statistics for a user
|
||||
CREATE TABLE IF NOT EXISTS user_stats_historical (
|
||||
user_id TEXT NOT NULL,
|
||||
end_ts BIGINT NOT NULL,
|
||||
bucket_size INT NOT NULL,
|
||||
|
||||
public_rooms INT NOT NULL,
|
||||
private_rooms INT NOT NULL,
|
||||
|
||||
PRIMARY KEY (user_id, end_ts)
|
||||
);
|
||||
|
||||
-- We use this index to speed up deletion of ancient user stats.
|
||||
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
|
||||
|
||||
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
|
||||
-- out for us. (We would want it to review stats for a particular user.)
|
||||
|
||||
-- Also rename room_state to room_stats_state to make its ownership clear.
|
||||
ALTER TABLE room_state RENAME TO room_stats_state;
|
||||
@@ -0,0 +1,24 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- These partial indices helps us with finding incomplete stats row
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (room_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (user_id)
|
||||
WHERE completed_delta_stream_id IS NULL;
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- even though SQLite >= 3.8 can support partial indices, we won't enable
|
||||
-- them, in case the SQLite database may be later used on another system.
|
||||
-- It's also the case that SQLite is only likely to be used in small
|
||||
-- deployments or testing, where the optimisations gained by use of a
|
||||
-- partial index are not a big concern.
|
||||
|
||||
CREATE INDEX IF NOT EXISTS room_stats_not_complete
|
||||
ON room_stats_current (completed_delta_stream_id, room_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS user_stats_not_complete
|
||||
ON user_stats_current (completed_delta_stream_id, user_id);
|
||||
|
||||
+505
-374
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018, 2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -14,17 +15,20 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from itertools import chain
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import DeferredLock
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
from synapse.storage import PostgresEngine
|
||||
from synapse.storage.state_deltas import StateDeltasStore
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# these fields track absolutes (e.g. total number of rooms on the server)
|
||||
# You can think of these as Prometheus Gauges.
|
||||
# You can draw these stats on a line graph.
|
||||
# Example: number of users in a room
|
||||
ABSOLUTE_STATS_FIELDS = {
|
||||
"room": (
|
||||
"current_state_events",
|
||||
@@ -32,14 +36,18 @@ ABSOLUTE_STATS_FIELDS = {
|
||||
"invited_members",
|
||||
"left_members",
|
||||
"banned_members",
|
||||
"state_events",
|
||||
"total_events",
|
||||
"total_event_bytes",
|
||||
),
|
||||
"user": ("public_rooms", "private_rooms"),
|
||||
}
|
||||
|
||||
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
||||
# You can draw these stats on a histogram.
|
||||
# Example: number of events sent locally during a time slice
|
||||
PER_SLICE_FIELDS = {"room": (), "user": ()}
|
||||
|
||||
TEMP_TABLE = "_temp_populate_stats"
|
||||
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
||||
|
||||
|
||||
class StatsStore(StateDeltasStore):
|
||||
@@ -51,291 +59,111 @@ class StatsStore(StateDeltasStore):
|
||||
self.stats_enabled = hs.config.stats_enabled
|
||||
self.stats_bucket_size = hs.config.stats_bucket_size
|
||||
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_createtables", self._populate_stats_createtables
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_process_rooms", self._populate_stats_process_rooms
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
"populate_stats_cleanup", self._populate_stats_cleanup
|
||||
)
|
||||
self.stats_delta_processing_lock = DeferredLock()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_createtables(self, progress, batch_size):
|
||||
self.register_noop_background_update("populate_stats_createtables")
|
||||
self.register_noop_background_update("populate_stats_process_rooms")
|
||||
self.register_noop_background_update("populate_stats_cleanup")
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
return 1
|
||||
|
||||
# Get all the rooms that we want to process.
|
||||
def _make_staging_area(txn):
|
||||
# Create the temporary tables
|
||||
stmts = get_statements(
|
||||
"""
|
||||
-- We just recreate the table, we'll be reinserting the
|
||||
-- correct entries again later anyway.
|
||||
DROP TABLE IF EXISTS {temp}_rooms;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {temp}_rooms(
|
||||
room_id TEXT NOT NULL,
|
||||
events BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX {temp}_rooms_events
|
||||
ON {temp}_rooms(events);
|
||||
CREATE INDEX {temp}_rooms_id
|
||||
ON {temp}_rooms(room_id);
|
||||
""".format(
|
||||
temp=TEMP_TABLE
|
||||
).splitlines()
|
||||
)
|
||||
|
||||
for statement in stmts:
|
||||
txn.execute(statement)
|
||||
|
||||
sql = (
|
||||
"CREATE TABLE IF NOT EXISTS "
|
||||
+ TEMP_TABLE
|
||||
+ "_position(position TEXT NOT NULL)"
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
# Get rooms we want to process from the database, only adding
|
||||
# those that we haven't (i.e. those not in room_stats_earliest_token)
|
||||
sql = """
|
||||
INSERT INTO %s_rooms (room_id, events)
|
||||
SELECT c.room_id, count(*) FROM current_state_events AS c
|
||||
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
|
||||
WHERE t.room_id IS NULL
|
||||
GROUP BY c.room_id
|
||||
""" % (
|
||||
TEMP_TABLE,
|
||||
)
|
||||
txn.execute(sql)
|
||||
|
||||
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
|
||||
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
|
||||
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
|
||||
self.get_earliest_token_for_room_stats.invalidate_all()
|
||||
|
||||
yield self._end_background_update("populate_stats_createtables")
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_cleanup(self, progress, batch_size):
|
||||
def quantise_stats_time(self, ts):
|
||||
"""
|
||||
Update the user directory stream position, then clean up the old tables.
|
||||
Quantises a timestamp to be a multiple of the bucket size.
|
||||
|
||||
Args:
|
||||
ts (int): the timestamp to quantise, in milliseconds since the Unix
|
||||
Epoch
|
||||
|
||||
Returns:
|
||||
int: a timestamp which
|
||||
- is divisible by the bucket size;
|
||||
- is no later than `ts`; and
|
||||
- is the largest such timestamp.
|
||||
"""
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
return 1
|
||||
return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
position = yield self._simple_select_one_onecol(
|
||||
TEMP_TABLE + "_position", None, "position"
|
||||
)
|
||||
yield self.update_stats_stream_pos(position)
|
||||
|
||||
def _delete_staging_area(txn):
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
||||
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
||||
|
||||
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
|
||||
|
||||
yield self._end_background_update("populate_stats_cleanup")
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _populate_stats_process_rooms(self, progress, batch_size):
|
||||
|
||||
if not self.stats_enabled:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
# If we don't have progress filed, delete everything.
|
||||
if not progress:
|
||||
yield self.delete_all_stats()
|
||||
|
||||
def _get_next_batch(txn):
|
||||
# Only fetch 250 rooms, so we don't fetch too many at once, even
|
||||
# if those 250 rooms have less than batch_size state events.
|
||||
sql = """
|
||||
SELECT room_id, events FROM %s_rooms
|
||||
ORDER BY events DESC
|
||||
LIMIT 250
|
||||
""" % (
|
||||
TEMP_TABLE,
|
||||
)
|
||||
txn.execute(sql)
|
||||
rooms_to_work_on = txn.fetchall()
|
||||
|
||||
if not rooms_to_work_on:
|
||||
return None
|
||||
|
||||
# Get how many are left to process, so we can give status on how
|
||||
# far we are in processing
|
||||
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
|
||||
progress["remaining"] = txn.fetchone()[0]
|
||||
|
||||
return rooms_to_work_on
|
||||
|
||||
rooms_to_work_on = yield self.runInteraction(
|
||||
"populate_stats_temp_read", _get_next_batch
|
||||
)
|
||||
|
||||
# No more rooms -- complete the transaction.
|
||||
if not rooms_to_work_on:
|
||||
yield self._end_background_update("populate_stats_process_rooms")
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
len(rooms_to_work_on),
|
||||
progress["remaining"],
|
||||
)
|
||||
|
||||
# Number of state events we've processed by going through each room
|
||||
processed_event_count = 0
|
||||
|
||||
for room_id, event_count in rooms_to_work_on:
|
||||
|
||||
current_state_ids = yield self.get_current_state_ids(room_id)
|
||||
|
||||
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
|
||||
history_visibility_id = current_state_ids.get(
|
||||
(EventTypes.RoomHistoryVisibility, "")
|
||||
)
|
||||
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
|
||||
name_id = current_state_ids.get((EventTypes.Name, ""))
|
||||
topic_id = current_state_ids.get((EventTypes.Topic, ""))
|
||||
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
|
||||
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
|
||||
|
||||
event_ids = [
|
||||
join_rules_id,
|
||||
history_visibility_id,
|
||||
encryption_id,
|
||||
name_id,
|
||||
topic_id,
|
||||
avatar_id,
|
||||
canonical_alias_id,
|
||||
]
|
||||
|
||||
state_events = yield self.get_events(
|
||||
[ev for ev in event_ids if ev is not None]
|
||||
)
|
||||
|
||||
def _get_or_none(event_id, arg):
|
||||
event = state_events.get(event_id)
|
||||
if event:
|
||||
return event.content.get(arg)
|
||||
return None
|
||||
|
||||
yield self.update_room_state(
|
||||
room_id,
|
||||
{
|
||||
"join_rules": _get_or_none(join_rules_id, "join_rule"),
|
||||
"history_visibility": _get_or_none(
|
||||
history_visibility_id, "history_visibility"
|
||||
),
|
||||
"encryption": _get_or_none(encryption_id, "algorithm"),
|
||||
"name": _get_or_none(name_id, "name"),
|
||||
"topic": _get_or_none(topic_id, "topic"),
|
||||
"avatar": _get_or_none(avatar_id, "url"),
|
||||
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
|
||||
},
|
||||
)
|
||||
|
||||
now = self.hs.get_reactor().seconds()
|
||||
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
def _fetch_data(txn):
|
||||
|
||||
# Get the current token of the room
|
||||
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
||||
|
||||
current_state_events = len(current_state_ids)
|
||||
|
||||
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
|
||||
|
||||
total_state_events = self._get_total_state_event_counts_txn(
|
||||
txn, room_id
|
||||
)
|
||||
|
||||
self._update_stats_txn(
|
||||
txn,
|
||||
"room",
|
||||
room_id,
|
||||
now,
|
||||
{
|
||||
"bucket_size": self.stats_bucket_size,
|
||||
"current_state_events": current_state_events,
|
||||
"joined_members": membership_counts.get(Membership.JOIN, 0),
|
||||
"invited_members": membership_counts.get(Membership.INVITE, 0),
|
||||
"left_members": membership_counts.get(Membership.LEAVE, 0),
|
||||
"banned_members": membership_counts.get(Membership.BAN, 0),
|
||||
"state_events": total_state_events,
|
||||
},
|
||||
)
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id, "token": current_token},
|
||||
)
|
||||
|
||||
# We've finished a room. Delete it from the table.
|
||||
self._simple_delete_one_txn(
|
||||
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
|
||||
)
|
||||
|
||||
yield self.runInteraction("update_room_stats", _fetch_data)
|
||||
|
||||
# Update the remaining counter.
|
||||
progress["remaining"] -= 1
|
||||
yield self.runInteraction(
|
||||
"populate_stats",
|
||||
self._background_update_progress_txn,
|
||||
"populate_stats_process_rooms",
|
||||
progress,
|
||||
)
|
||||
|
||||
processed_event_count += event_count
|
||||
|
||||
if processed_event_count > batch_size:
|
||||
# Don't process any more rooms, we've hit our batch size.
|
||||
return processed_event_count
|
||||
|
||||
return processed_event_count
|
||||
|
||||
def delete_all_stats(self):
|
||||
def get_stats_positions(self, for_initial_processor=False):
|
||||
"""
|
||||
Delete all statistics records.
|
||||
Returns the stats processor positions.
|
||||
|
||||
Args:
|
||||
for_initial_processor (bool, optional): If true, returns the position
|
||||
promised by the latest stats regeneration, rather than the current
|
||||
incremental processor's position.
|
||||
Otherwise (if false), return the incremental processor's position.
|
||||
|
||||
Returns (dict):
|
||||
Dict containing :-
|
||||
state_delta_stream_id: stream_id of last-processed state delta
|
||||
total_events_min_stream_ordering: stream_ordering of latest-processed
|
||||
backfilled event, in the context of total_events counting.
|
||||
total_events_max_stream_ordering: stream_ordering of latest-processed
|
||||
non-backfilled event, in the context of total_events counting.
|
||||
"""
|
||||
|
||||
def _delete_all_stats_txn(txn):
|
||||
txn.execute("DELETE FROM room_state")
|
||||
txn.execute("DELETE FROM room_stats")
|
||||
txn.execute("DELETE FROM room_stats_earliest_token")
|
||||
txn.execute("DELETE FROM user_stats")
|
||||
|
||||
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
|
||||
|
||||
def get_stats_stream_pos(self):
|
||||
return self._simple_select_one_onecol(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
desc="stats_stream_pos",
|
||||
return self._simple_select_one(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
desc="stats_incremental_position",
|
||||
)
|
||||
|
||||
def update_stats_stream_pos(self, stream_id):
|
||||
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
|
||||
"""
|
||||
See L{get_stats_positions}.
|
||||
|
||||
Args:
|
||||
txn (cursor): Database cursor
|
||||
"""
|
||||
return self._simple_select_one_txn(
|
||||
txn=txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
retcols=(
|
||||
"state_delta_stream_id",
|
||||
"total_events_min_stream_ordering",
|
||||
"total_events_max_stream_ordering",
|
||||
),
|
||||
)
|
||||
|
||||
def update_stats_positions(self, positions, for_initial_processor=False):
|
||||
"""
|
||||
Updates the stats processor positions.
|
||||
|
||||
Args:
|
||||
positions: See L{get_stats_positions}
|
||||
for_initial_processor: See L{get_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one(
|
||||
table="stats_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_stats_stream_pos",
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
desc="update_stats_incremental_position",
|
||||
)
|
||||
|
||||
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
|
||||
"""
|
||||
See L{update_stats_positions}
|
||||
"""
|
||||
if positions is None:
|
||||
positions = {
|
||||
"state_delta_stream_id": None,
|
||||
"total_events_min_stream_ordering": None,
|
||||
"total_events_max_stream_ordering": None,
|
||||
}
|
||||
return self._simple_update_one_txn(
|
||||
txn,
|
||||
table="stats_incremental_position",
|
||||
keyvalues={"is_background_contract": for_initial_processor},
|
||||
updatevalues=positions,
|
||||
)
|
||||
|
||||
def update_room_state(self, room_id, fields):
|
||||
@@ -361,42 +189,14 @@ class StatsStore(StateDeltasStore):
|
||||
fields[col] = None
|
||||
|
||||
return self._simple_upsert(
|
||||
table="room_state",
|
||||
table="room_stats_state",
|
||||
keyvalues={"room_id": room_id},
|
||||
values=fields,
|
||||
desc="update_room_state",
|
||||
)
|
||||
|
||||
def get_deltas_for_room(self, room_id, start, size=100):
|
||||
"""
|
||||
Get statistics deltas for a given room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
start (int): Pagination start. Number of entries, not timestamp.
|
||||
size (int): How many entries to return.
|
||||
|
||||
Returns:
|
||||
Deferred[list[dict]], where the dict has the keys of
|
||||
ABSOLUTE_STATS_FIELDS["room"] and "ts".
|
||||
"""
|
||||
return self._simple_select_list_paginate(
|
||||
"room_stats",
|
||||
{"room_id": room_id},
|
||||
"ts",
|
||||
start,
|
||||
size,
|
||||
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
|
||||
order_direction="DESC",
|
||||
)
|
||||
|
||||
def get_all_room_state(self):
|
||||
return self._simple_select_list(
|
||||
"room_state", None, retcols=("name", "topic", "canonical_alias")
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_earliest_token_for_room_stats(self, room_id):
|
||||
def get_earliest_token_for_stats(self, stats_type, id):
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
@@ -406,79 +206,410 @@ class StatsStore(StateDeltasStore):
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
return self._simple_select_one_onecol(
|
||||
"room_stats_earliest_token",
|
||||
{"room_id": room_id},
|
||||
retcol="token",
|
||||
"%s_current" % (table,),
|
||||
{id_col: id},
|
||||
retcol="completed_delta_stream_id",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
def update_stats(self, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert(
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": ts},
|
||||
values=fields,
|
||||
desc="update_stats",
|
||||
def update_stats_delta(
|
||||
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|
||||
):
|
||||
"""
|
||||
Updates the statistics for a subject, with a delta (difference/relative
|
||||
change).
|
||||
|
||||
Args:
|
||||
ts (int): timestamp of the change
|
||||
stats_type (str): "room" or "user" – the kind of subject
|
||||
stats_id (str): the subject's ID (room ID or user ID)
|
||||
fields (dict[str, int]): Deltas of stats values.
|
||||
complete_with_stream_id (int, optional):
|
||||
If supplied, converts an incomplete row into a complete row,
|
||||
with the supplied stream_id marked as the stream_id where the
|
||||
row was completed.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"update_stats_delta",
|
||||
self._update_stats_delta_txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=complete_with_stream_id,
|
||||
)
|
||||
|
||||
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
return self._simple_upsert_txn(
|
||||
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
|
||||
)
|
||||
def _update_stats_delta_txn(
|
||||
self,
|
||||
txn,
|
||||
ts,
|
||||
stats_type,
|
||||
stats_id,
|
||||
fields,
|
||||
complete_with_stream_id=None,
|
||||
absolute_field_overrides=None,
|
||||
):
|
||||
"""
|
||||
See L{update_stats_delta}
|
||||
Additional Args:
|
||||
absolute_field_overrides (dict[str, int]): Current stats values
|
||||
(i.e. not deltas) of absolute fields.
|
||||
Does not work with per-slice fields.
|
||||
"""
|
||||
|
||||
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
|
||||
def _update_stats_delta(txn):
|
||||
table, id_col = TYPE_TO_ROOM[stats_type]
|
||||
if absolute_field_overrides is None:
|
||||
absolute_field_overrides = {}
|
||||
|
||||
sql = (
|
||||
"SELECT * FROM %s"
|
||||
" WHERE %s=? and ts=("
|
||||
" SELECT MAX(ts) FROM %s"
|
||||
" WHERE %s=?"
|
||||
")"
|
||||
) % (table, id_col, table, id_col)
|
||||
txn.execute(sql, (stats_id, stats_id))
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if len(rows) == 0:
|
||||
# silently skip as we don't have anything to apply a delta to yet.
|
||||
# this tries to minimise any race between the initial sync and
|
||||
# subsequent deltas arriving.
|
||||
return
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
current_ts = ts
|
||||
latest_ts = rows[0]["ts"]
|
||||
if current_ts < latest_ts:
|
||||
# This one is in the past, but we're just encountering it now.
|
||||
# Mark it as part of the current bucket.
|
||||
current_ts = latest_ts
|
||||
elif ts != latest_ts:
|
||||
# we have to copy our absolute counters over to the new entry.
|
||||
values = {
|
||||
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
}
|
||||
values[id_col] = stats_id
|
||||
values["ts"] = ts
|
||||
values["bucket_size"] = self.stats_bucket_size
|
||||
quantised_ts = self.quantise_stats_time(int(ts))
|
||||
end_ts = quantised_ts + self.stats_bucket_size
|
||||
|
||||
self._simple_insert_txn(txn, table=table, values=values)
|
||||
|
||||
# actually update the new value
|
||||
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table=table,
|
||||
keyvalues={id_col: stats_id, "ts": current_ts},
|
||||
updatevalues={field: value},
|
||||
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
||||
slice_field_names = PER_SLICE_FIELDS[stats_type]
|
||||
for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
||||
if field not in abs_field_names and field not in slice_field_names:
|
||||
# guard against potential SQL injection dodginess
|
||||
raise ValueError(
|
||||
"%s is not a recognised field"
|
||||
" for stats type %s" % (field, stats_type)
|
||||
)
|
||||
|
||||
# only absolute stats fields are tracked in the `_current` stats tables,
|
||||
# so those are the only ones that we process deltas for when
|
||||
# we upsert against the `_current` table.
|
||||
|
||||
# This calculates the deltas (`field = field + ?` values)
|
||||
# for absolute fields,
|
||||
# * defaulting to 0 if not specified
|
||||
# (required for the INSERT part of upserting to work)
|
||||
# * omitting overrides specified in `absolute_field_overrides`
|
||||
deltas_of_absolute_fields = {
|
||||
key: fields.get(key, 0)
|
||||
for key in abs_field_names
|
||||
if key not in absolute_field_overrides
|
||||
}
|
||||
|
||||
if complete_with_stream_id is not None:
|
||||
absolute_field_overrides = absolute_field_overrides.copy()
|
||||
absolute_field_overrides[
|
||||
"completed_delta_stream_id"
|
||||
] = complete_with_stream_id
|
||||
|
||||
# first upsert the `_current` table
|
||||
self._upsert_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
table=table + "_current",
|
||||
keyvalues={id_col: stats_id},
|
||||
absolutes=absolute_field_overrides,
|
||||
additive_relatives=deltas_of_absolute_fields,
|
||||
)
|
||||
|
||||
if self.has_completed_background_updates():
|
||||
# TODO want to check specifically for stats regenerator, not all
|
||||
# background updates…
|
||||
# then upsert the `_historical` table.
|
||||
# we don't support absolute_fields for per-slice fields as it makes
|
||||
# no sense.
|
||||
per_slice_additive_relatives = {
|
||||
key: fields.get(key, 0) for key in slice_field_names
|
||||
}
|
||||
self._upsert_copy_from_table_with_additive_relatives_txn(
|
||||
txn=txn,
|
||||
into_table=table + "_historical",
|
||||
keyvalues={id_col: stats_id},
|
||||
extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
|
||||
extra_dst_keyvalues={"end_ts": end_ts},
|
||||
additive_relatives=per_slice_additive_relatives,
|
||||
src_table=table + "_current",
|
||||
copy_columns=abs_field_names,
|
||||
additional_where=" AND completed_delta_stream_id IS NOT NULL",
|
||||
)
|
||||
|
||||
def _upsert_with_additive_relatives_txn(
|
||||
self, txn, table, keyvalues, absolutes, additive_relatives
|
||||
):
|
||||
"""Used to update values in the stats tables.
|
||||
|
||||
Args:
|
||||
txn: Transaction
|
||||
table (str): Table name
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
absolutes (dict[str, any]): Absolute (set) fields
|
||||
additive_relatives (dict[str, int]): Fields that will be added onto
|
||||
if existing row present.
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
absolute_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
|
||||
for field in absolutes.keys()
|
||||
]
|
||||
|
||||
relative_updates = [
|
||||
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
|
||||
% {"table": table, "field": field}
|
||||
for field in additive_relatives.keys()
|
||||
]
|
||||
|
||||
insert_cols = []
|
||||
qargs = []
|
||||
|
||||
for (key, val) in chain(
|
||||
keyvalues.items(), absolutes.items(), additive_relatives.items()
|
||||
):
|
||||
insert_cols.append(key)
|
||||
qargs.append(val)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(table)s (%(insert_cols_cs)s)
|
||||
VALUES (%(insert_vals_qs)s)
|
||||
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
|
||||
""" % {
|
||||
"table": table,
|
||||
"insert_cols_cs": ", ".join(insert_cols),
|
||||
"insert_vals_qs": ", ".join(
|
||||
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
|
||||
),
|
||||
"key_columns": ", ".join(keyvalues),
|
||||
"updates": ", ".join(chain(absolute_updates, relative_updates)),
|
||||
}
|
||||
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, table)
|
||||
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
|
||||
current_row = self._simple_select_one_txn(
|
||||
txn, table, keyvalues, retcols, allow_none=True
|
||||
)
|
||||
if current_row is None:
|
||||
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
|
||||
self._simple_insert_txn(txn, table, merged_dict)
|
||||
else:
|
||||
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
|
||||
table,
|
||||
field,
|
||||
field,
|
||||
id_col,
|
||||
)
|
||||
txn.execute(sql, (value, stats_id, current_ts))
|
||||
for (key, val) in additive_relatives.items():
|
||||
current_row[key] += val
|
||||
current_row.update(absolutes)
|
||||
self._simple_update_one_txn(txn, table, keyvalues, current_row)
|
||||
|
||||
return self.runInteraction("update_stats_delta", _update_stats_delta)
|
||||
def _upsert_copy_from_table_with_additive_relatives_txn(
|
||||
self,
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
additive_relatives,
|
||||
src_table,
|
||||
copy_columns,
|
||||
additional_where="",
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
txn: Transaction
|
||||
into_table (str): The destination table to UPSERT the row into
|
||||
keyvalues (dict[str, any]): Row-identifying key values
|
||||
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
||||
for `into_table`.
|
||||
extra_dst_insvalues (dict[str, any]): Additional values to insert
|
||||
on new row creation for `into_table`.
|
||||
additive_relatives (dict[str, any]): Fields that will be added onto
|
||||
if existing row present. (Must be disjoint from copy_columns.)
|
||||
src_table (str): The source table to copy from
|
||||
copy_columns (iterable[str]): The list of columns to copy
|
||||
additional_where (str): Additional SQL for where (prefix with AND
|
||||
if using).
|
||||
"""
|
||||
if self.database_engine.can_native_upsert:
|
||||
ins_columns = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
additive_relatives,
|
||||
extra_dst_keyvalues,
|
||||
extra_dst_insvalues,
|
||||
)
|
||||
sel_exprs = chain(
|
||||
keyvalues,
|
||||
copy_columns,
|
||||
(
|
||||
"?"
|
||||
for _ in chain(
|
||||
additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
|
||||
)
|
||||
),
|
||||
)
|
||||
keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
||||
|
||||
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
||||
sets_ar = (
|
||||
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
|
||||
for f in additive_relatives
|
||||
)
|
||||
|
||||
sql = """
|
||||
INSERT INTO %(into_table)s (%(ins_columns)s)
|
||||
SELECT %(sel_exprs)s
|
||||
FROM %(src_table)s
|
||||
WHERE %(keyvalues_where)s %(additional_where)s
|
||||
ON CONFLICT (%(keyvalues)s)
|
||||
DO UPDATE SET %(sets)s
|
||||
""" % {
|
||||
"into_table": into_table,
|
||||
"ins_columns": ", ".join(ins_columns),
|
||||
"sel_exprs": ", ".join(sel_exprs),
|
||||
"keyvalues_where": " AND ".join(keyvalues_where),
|
||||
"src_table": src_table,
|
||||
"keyvalues": ", ".join(
|
||||
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
||||
),
|
||||
"sets": ", ".join(chain(sets_cc, sets_ar)),
|
||||
"additional_where": additional_where,
|
||||
}
|
||||
|
||||
qargs = list(
|
||||
chain(
|
||||
additive_relatives.values(),
|
||||
extra_dst_keyvalues.values(),
|
||||
extra_dst_insvalues.values(),
|
||||
keyvalues.values(),
|
||||
)
|
||||
)
|
||||
txn.execute(sql, qargs)
|
||||
else:
|
||||
self.database_engine.lock_table(txn, into_table)
|
||||
src_row = self._simple_select_one_txn(
|
||||
txn, src_table, keyvalues, copy_columns
|
||||
)
|
||||
dest_current_row = self._simple_select_one_txn(
|
||||
txn,
|
||||
into_table,
|
||||
keyvalues,
|
||||
retcols=list(chain(additive_relatives.keys(), copy_columns)),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if dest_current_row is None:
|
||||
merged_dict = {
|
||||
**keyvalues,
|
||||
**extra_dst_keyvalues,
|
||||
**extra_dst_insvalues,
|
||||
**src_row,
|
||||
**additive_relatives,
|
||||
}
|
||||
self._simple_insert_txn(txn, into_table, merged_dict)
|
||||
else:
|
||||
for (key, val) in additive_relatives.items():
|
||||
src_row[key] = dest_current_row[key] + val
|
||||
self._simple_update_txn(txn, into_table, keyvalues, src_row)
|
||||
|
||||
def incremental_update_room_total_events_and_bytes(self, in_positions):
|
||||
"""
|
||||
Counts the number of events and total event bytes per-room and then adds
|
||||
these to the respective total_events and total_event_bytes room counts.
|
||||
|
||||
Args:
|
||||
in_positions (dict): Positions,
|
||||
as retrieved from L{get_stats_positions}.
|
||||
|
||||
Returns (Deferred[tuple[dict, bool]]):
|
||||
First element (dict):
|
||||
The new positions. Note that this is for reference only –
|
||||
the new positions WILL be committed by this function.
|
||||
Second element (bool):
|
||||
true iff there was a change to the positions, false otherwise
|
||||
"""
|
||||
|
||||
def incremental_update_total_events_and_bytes_txn(txn):
|
||||
positions = in_positions.copy()
|
||||
|
||||
max_pos = self.get_room_max_stream_ordering()
|
||||
min_pos = self.get_room_min_stream_ordering()
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=positions["total_events_max_stream_ordering"],
|
||||
high_pos=max_pos,
|
||||
)
|
||||
|
||||
self.update_total_event_and_bytes_count_between_txn(
|
||||
txn,
|
||||
low_pos=min_pos,
|
||||
high_pos=positions["total_events_min_stream_ordering"],
|
||||
)
|
||||
|
||||
if (
|
||||
positions["total_events_max_stream_ordering"] != max_pos
|
||||
or positions["total_events_min_stream_ordering"] != min_pos
|
||||
):
|
||||
positions["total_events_max_stream_ordering"] = max_pos
|
||||
positions["total_events_min_stream_ordering"] = min_pos
|
||||
|
||||
self._update_stats_positions_txn(txn, positions)
|
||||
|
||||
return positions, True
|
||||
else:
|
||||
return positions, False
|
||||
|
||||
return self.runInteraction(
|
||||
"stats_incremental_total_events_and_bytes",
|
||||
incremental_update_total_events_and_bytes_txn,
|
||||
)
|
||||
|
||||
def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
|
||||
"""
|
||||
Updates the total_events and total_event_bytes counts for rooms,
|
||||
in a range of stream_orderings.
|
||||
|
||||
Inclusivity of low_pos and high_pos is dependent upon their signs.
|
||||
This makes it intuitive to use this function for both backfilled
|
||||
and non-backfilled events.
|
||||
|
||||
Examples:
|
||||
(low, high) → (kind)
|
||||
(3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
|
||||
(-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
|
||||
(-7, 7) → -7 <= … <= 7 (both)
|
||||
|
||||
Args:
|
||||
txn: Database transaction.
|
||||
low_pos: Low stream ordering
|
||||
high_pos: High stream ordering
|
||||
"""
|
||||
|
||||
if low_pos >= high_pos:
|
||||
# nothing to do here.
|
||||
return
|
||||
|
||||
now = self.hs.clock.time_msec()
|
||||
|
||||
# we choose comparators based on the signs
|
||||
low_comparator = "<=" if low_pos < 0 else "<"
|
||||
high_comparator = "<" if high_pos < 0 else "<="
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
new_bytes_expression = "OCTET_LENGTH(json)"
|
||||
else:
|
||||
new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
||||
|
||||
sql = """
|
||||
SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
||||
FROM events INNER JOIN event_json USING (event_id)
|
||||
WHERE ? %s stream_ordering AND stream_ordering %s ?
|
||||
GROUP BY room_id
|
||||
""" % (
|
||||
low_comparator,
|
||||
high_comparator,
|
||||
new_bytes_expression,
|
||||
)
|
||||
|
||||
txn.execute(sql, (low_pos, high_pos))
|
||||
|
||||
for room_id, new_events, new_bytes in txn.fetchall():
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
now,
|
||||
"room",
|
||||
room_id,
|
||||
{"total_events": new_events, "total_event_bytes": new_bytes},
|
||||
)
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import hashlib
|
||||
|
||||
import unpaddedbase64
|
||||
|
||||
|
||||
def sha256_and_url_safe_base64(input_text):
|
||||
"""SHA256 hash an input string, encode the digest as url-safe base64, and
|
||||
return
|
||||
|
||||
:param input_text: string to hash
|
||||
:type input_text: str
|
||||
|
||||
:returns a sha256 hashed and url-safe base64 encoded digest
|
||||
:rtype: str
|
||||
"""
|
||||
digest = hashlib.sha256(input_text.encode()).digest()
|
||||
return unpaddedbase64.encode_base64(digest, urlsafe=True)
|
||||
@@ -30,6 +30,8 @@ from six import iteritems
|
||||
|
||||
import yaml
|
||||
|
||||
from synapse.config import find_config_files
|
||||
|
||||
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
|
||||
|
||||
GREEN = "\x1b[1;32m"
|
||||
@@ -135,7 +137,8 @@ def main():
|
||||
"configfile",
|
||||
nargs="?",
|
||||
default="homeserver.yaml",
|
||||
help="the homeserver config file, defaults to homeserver.yaml",
|
||||
help="the homeserver config file. Defaults to homeserver.yaml. May also be"
|
||||
" a directory with *.yaml files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
|
||||
@@ -176,8 +179,12 @@ def main():
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
with open(configfile) as stream:
|
||||
config = yaml.safe_load(stream)
|
||||
config_files = find_config_files([configfile])
|
||||
config = {}
|
||||
for config_file in config_files:
|
||||
with open(config_file) as file_stream:
|
||||
yaml_config = yaml.safe_load(file_stream)
|
||||
config.update(yaml_config)
|
||||
|
||||
pidfile = config["pid_file"]
|
||||
cache_factor = config.get("synctl_cache_factor")
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
from synapse.config.database import DatabaseConfig
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class DatabaseConfigTestCase(unittest.TestCase):
|
||||
def test_database_configured_correctly_no_database_conf_param(self):
|
||||
conf = yaml.safe_load(
|
||||
DatabaseConfig().generate_config_section("/data_dir_path", None)
|
||||
)
|
||||
|
||||
expected_database_conf = {
|
||||
"name": "sqlite3",
|
||||
"args": {"database": "/data_dir_path/homeserver.db"},
|
||||
}
|
||||
|
||||
self.assertEqual(conf["database"], expected_database_conf)
|
||||
|
||||
def test_database_configured_correctly_database_conf_param(self):
|
||||
|
||||
database_conf = {
|
||||
"name": "my super fast datastore",
|
||||
"args": {
|
||||
"user": "matrix",
|
||||
"password": "synapse_database_password",
|
||||
"host": "synapse_database_host",
|
||||
"database": "matrix",
|
||||
},
|
||||
}
|
||||
|
||||
conf = yaml.safe_load(
|
||||
DatabaseConfig().generate_config_section("/data_dir_path", database_conf)
|
||||
)
|
||||
|
||||
self.assertEqual(conf["database"], database_conf)
|
||||
+100
-1
@@ -13,7 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.config.server import is_threepid_reserved
|
||||
import yaml
|
||||
|
||||
from synapse.config.server import ServerConfig, is_threepid_reserved
|
||||
|
||||
from tests import unittest
|
||||
|
||||
@@ -29,3 +31,100 @@ class ServerConfigTestCase(unittest.TestCase):
|
||||
self.assertTrue(is_threepid_reserved(config, user1))
|
||||
self.assertFalse(is_threepid_reserved(config, user3))
|
||||
self.assertFalse(is_threepid_reserved(config, user1_msisdn))
|
||||
|
||||
def test_unsecure_listener_no_listeners_open_private_ports_false(self):
|
||||
conf = yaml.safe_load(
|
||||
ServerConfig().generate_config_section(
|
||||
"che.org", "/data_dir_path", False, None
|
||||
)
|
||||
)
|
||||
|
||||
expected_listeners = [
|
||||
{
|
||||
"port": 8008,
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
"x_forwarded": True,
|
||||
"bind_addresses": ["::1", "127.0.0.1"],
|
||||
"resources": [{"names": ["client", "federation"], "compress": False}],
|
||||
}
|
||||
]
|
||||
|
||||
self.assertEqual(conf["listeners"], expected_listeners)
|
||||
|
||||
def test_unsecure_listener_no_listeners_open_private_ports_true(self):
|
||||
conf = yaml.safe_load(
|
||||
ServerConfig().generate_config_section(
|
||||
"che.org", "/data_dir_path", True, None
|
||||
)
|
||||
)
|
||||
|
||||
expected_listeners = [
|
||||
{
|
||||
"port": 8008,
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
"x_forwarded": True,
|
||||
"resources": [{"names": ["client", "federation"], "compress": False}],
|
||||
}
|
||||
]
|
||||
|
||||
self.assertEqual(conf["listeners"], expected_listeners)
|
||||
|
||||
def test_listeners_set_correctly_open_private_ports_false(self):
|
||||
listeners = [
|
||||
{
|
||||
"port": 8448,
|
||||
"resources": [{"names": ["federation"]}],
|
||||
"tls": True,
|
||||
"type": "http",
|
||||
},
|
||||
{
|
||||
"port": 443,
|
||||
"resources": [{"names": ["client"]}],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
},
|
||||
]
|
||||
|
||||
conf = yaml.safe_load(
|
||||
ServerConfig().generate_config_section(
|
||||
"this.one.listens", "/data_dir_path", True, listeners
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(conf["listeners"], listeners)
|
||||
|
||||
def test_listeners_set_correctly_open_private_ports_true(self):
|
||||
listeners = [
|
||||
{
|
||||
"port": 8448,
|
||||
"resources": [{"names": ["federation"]}],
|
||||
"tls": True,
|
||||
"type": "http",
|
||||
},
|
||||
{
|
||||
"port": 443,
|
||||
"resources": [{"names": ["client"]}],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
},
|
||||
{
|
||||
"port": 1243,
|
||||
"resources": [{"names": ["client"]}],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
"bind_addresses": ["this_one_is_bound"],
|
||||
},
|
||||
]
|
||||
|
||||
expected_listeners = listeners.copy()
|
||||
expected_listeners[1]["bind_addresses"] = ["::1", "127.0.0.1"]
|
||||
|
||||
conf = yaml.safe_load(
|
||||
ServerConfig().generate_config_section(
|
||||
"this.one.listens", "/data_dir_path", True, listeners
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(conf["listeners"], expected_listeners)
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
import os
|
||||
|
||||
import yaml
|
||||
|
||||
from OpenSSL import SSL
|
||||
|
||||
from synapse.config.tls import ConfigError, TlsConfig
|
||||
@@ -191,3 +193,45 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
|
||||
self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1, 0)
|
||||
self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1_1, 0)
|
||||
self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1_2, 0)
|
||||
|
||||
def test_acme_disabled_in_generated_config_no_acme_domain_provied(self):
|
||||
"""
|
||||
Checks acme is disabled by default.
|
||||
"""
|
||||
conf = TestConfig()
|
||||
conf.read_config(
|
||||
yaml.safe_load(
|
||||
TestConfig().generate_config_section(
|
||||
"/config_dir_path",
|
||||
"my_super_secure_server",
|
||||
"/data_dir_path",
|
||||
"/tls_cert_path",
|
||||
"tls_private_key",
|
||||
None, # This is the acme_domain
|
||||
)
|
||||
),
|
||||
"/config_dir_path",
|
||||
)
|
||||
|
||||
self.assertFalse(conf.acme_enabled)
|
||||
|
||||
def test_acme_enabled_in_generated_config_domain_provided(self):
|
||||
"""
|
||||
Checks acme is enabled if the acme_domain arg is set to some string.
|
||||
"""
|
||||
conf = TestConfig()
|
||||
conf.read_config(
|
||||
yaml.safe_load(
|
||||
TestConfig().generate_config_section(
|
||||
"/config_dir_path",
|
||||
"my_super_secure_server",
|
||||
"/data_dir_path",
|
||||
"/tls_cert_path",
|
||||
"tls_private_key",
|
||||
"my_supe_secure_server", # This is the acme_domain
|
||||
)
|
||||
),
|
||||
"/config_dir_path",
|
||||
)
|
||||
|
||||
self.assertTrue(conf.acme_enabled)
|
||||
|
||||
@@ -20,7 +20,6 @@ from synapse.federation.federation_server import server_matches_acl_event
|
||||
from tests import unittest
|
||||
|
||||
|
||||
@unittest.DEBUG
|
||||
class ServerACLsTestCase(unittest.TestCase):
|
||||
def test_blacklisted_server(self):
|
||||
e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import Mock
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class StatsRoomTests(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets_for_client_rest_resource,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.handler = self.hs.get_stats_handler()
|
||||
|
||||
def _add_background_updates(self):
|
||||
"""
|
||||
Add the background updates we need to run.
|
||||
"""
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_process_rooms",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_createtables",
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def test_initial_room(self):
|
||||
"""
|
||||
The background updates will build the table from scratch.
|
||||
"""
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Disable stats
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Stats disabled, shouldn't have done anything
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
self.assertEqual(len(r), 0)
|
||||
|
||||
# Enable stats
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
r = self.get_success(self.store.get_all_room_state())
|
||||
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["topic"], "foo")
|
||||
|
||||
def test_initial_earliest_token(self):
|
||||
"""
|
||||
Ingestion via notify_new_event will ignore tokens that the background
|
||||
update have already processed.
|
||||
"""
|
||||
self.reactor.advance(86401)
|
||||
|
||||
self.hs.config.stats_enabled = False
|
||||
self.handler.stats_enabled = False
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
u2 = self.register_user("u2", "pass")
|
||||
u2_token = self.login("u2", "pass")
|
||||
|
||||
u3 = self.register_user("u3", "pass")
|
||||
u3_token = self.login("u3", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Begin the ingestion by creating the temp tables. This will also store
|
||||
# the position that the deltas should begin at, once they take over.
|
||||
self.hs.config.stats_enabled = True
|
||||
self.handler.stats_enabled = True
|
||||
self.store._all_done = False
|
||||
self.get_success(self.store.update_stats_stream_pos(None))
|
||||
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
# Now, before the table is actually ingested, add some more events.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u2, tok=u2_token)
|
||||
|
||||
# Now do the initial ingestion.
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
"background_updates",
|
||||
{
|
||||
"update_name": "populate_stats_cleanup",
|
||||
"progress_json": "{}",
|
||||
"depends_on": "populate_stats_process_rooms",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
self.store._all_done = False
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
self.reactor.advance(86401)
|
||||
|
||||
# Now add some more events, triggering ingestion. Because of the stream
|
||||
# position being set to before the events sent in the middle, a simpler
|
||||
# implementation would reprocess those events, and say there were four
|
||||
# users, not three.
|
||||
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
|
||||
self.helper.join(room=room_1, user=u3, tok=u3_token)
|
||||
|
||||
# Get the deltas! There should be two -- day 1, and day 2.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
|
||||
# The oldest has 2 joined members
|
||||
self.assertEqual(r[-1]["joined_members"], 2)
|
||||
|
||||
# The newest has 3
|
||||
self.assertEqual(r[0]["joined_members"], 3)
|
||||
|
||||
def test_incorrect_state_transition(self):
|
||||
"""
|
||||
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
|
||||
(JOIN, INVITE, LEAVE, BAN), an error is raised.
|
||||
"""
|
||||
events = {
|
||||
"a1": {"membership": Membership.LEAVE},
|
||||
"a2": {"membership": "not a real thing"},
|
||||
}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a1",
|
||||
"prev_event_id": "a2",
|
||||
"stream_id": 60,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid prev_membership"
|
||||
)
|
||||
|
||||
# And the other way...
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user",
|
||||
"room_id": "room",
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
|
||||
self.assertEqual(
|
||||
f.value.args[0], "'not a real thing' is not a valid membership"
|
||||
)
|
||||
|
||||
def test_redacted_prev_event(self):
|
||||
"""
|
||||
If the prev_event does not exist, then it is assumed to be a LEAVE.
|
||||
"""
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
|
||||
# Do the initial population of the user directory via the background update
|
||||
self._add_background_updates()
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
|
||||
|
||||
def get_event(event_id, allow_none=True):
|
||||
if events.get(event_id):
|
||||
m = Mock()
|
||||
m.content = events[event_id]
|
||||
else:
|
||||
m = None
|
||||
d = defer.Deferred()
|
||||
self.reactor.callLater(0.0, d.callback, m)
|
||||
return d
|
||||
|
||||
def get_received_ts(event_id):
|
||||
return defer.succeed(1)
|
||||
|
||||
self.store.get_received_ts = get_received_ts
|
||||
self.store.get_event = get_event
|
||||
|
||||
deltas = [
|
||||
{
|
||||
"type": EventTypes.Member,
|
||||
"state_key": "some_user:test",
|
||||
"room_id": room_1,
|
||||
"event_id": "a2",
|
||||
"prev_event_id": "a1",
|
||||
"stream_id": 100,
|
||||
}
|
||||
]
|
||||
|
||||
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
|
||||
self.get_success(self.handler._handle_deltas(deltas))
|
||||
|
||||
# One delta, with two joined members -- the room creator, and our fake
|
||||
# user.
|
||||
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
|
||||
self.assertEqual(len(r), 1)
|
||||
self.assertEqual(r[0]["joined_members"], 2)
|
||||
@@ -41,9 +41,9 @@ from synapse.http.federation.well_known_resolver import (
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.util.caches.ttlcache import TTLCache
|
||||
|
||||
from tests import unittest
|
||||
from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file
|
||||
from tests.server import FakeTransport, ThreadedMemoryReactorClock
|
||||
from tests.unittest import TestCase
|
||||
from tests.utils import default_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -67,7 +67,7 @@ def get_connection_factory():
|
||||
return test_server_connection_factory
|
||||
|
||||
|
||||
class MatrixFederationAgentTests(TestCase):
|
||||
class MatrixFederationAgentTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.reactor = ThreadedMemoryReactorClock()
|
||||
|
||||
@@ -1069,8 +1069,64 @@ class MatrixFederationAgentTests(TestCase):
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, None)
|
||||
|
||||
def test_srv_fallbacks(self):
|
||||
"""Test that other SRV results are tried if the first one fails.
|
||||
"""
|
||||
|
||||
class TestCachePeriodFromHeaders(TestCase):
|
||||
self.mock_resolver.resolve_service.side_effect = lambda _: [
|
||||
Server(host=b"target.com", port=8443),
|
||||
Server(host=b"target.com", port=8444),
|
||||
]
|
||||
self.reactor.lookups["target.com"] = "1.2.3.4"
|
||||
|
||||
test_d = self._make_get_request(b"matrix://testserv/foo/bar")
|
||||
|
||||
# Nothing happened yet
|
||||
self.assertNoResult(test_d)
|
||||
|
||||
self.mock_resolver.resolve_service.assert_called_once_with(
|
||||
b"_matrix._tcp.testserv"
|
||||
)
|
||||
|
||||
# We should see an attempt to connect to the first server
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
self.assertEqual(host, "1.2.3.4")
|
||||
self.assertEqual(port, 8443)
|
||||
|
||||
# Fonx the connection
|
||||
client_factory.clientConnectionFailed(None, Exception("nope"))
|
||||
|
||||
# There's a 300ms delay in HostnameEndpoint
|
||||
self.reactor.pump((0.4,))
|
||||
|
||||
# Hasn't failed yet
|
||||
self.assertNoResult(test_d)
|
||||
|
||||
# We shouldnow see an attempt to connect to the second server
|
||||
clients = self.reactor.tcpClients
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
self.assertEqual(host, "1.2.3.4")
|
||||
self.assertEqual(port, 8444)
|
||||
|
||||
# make a test server, and wire up the client
|
||||
http_server = self._make_connection(client_factory, expected_sni=b"testserv")
|
||||
|
||||
self.assertEqual(len(http_server.requests), 1)
|
||||
request = http_server.requests[0]
|
||||
self.assertEqual(request.method, b"GET")
|
||||
self.assertEqual(request.path, b"/foo/bar")
|
||||
self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
|
||||
|
||||
# finish the request
|
||||
request.finish()
|
||||
self.reactor.pump((0.1,))
|
||||
self.successResultOf(test_d)
|
||||
|
||||
|
||||
class TestCachePeriodFromHeaders(unittest.TestCase):
|
||||
def test_cache_control(self):
|
||||
# uppercase
|
||||
self.assertEqual(
|
||||
|
||||
@@ -83,8 +83,10 @@ class SrvResolverTestCase(unittest.TestCase):
|
||||
|
||||
service_name = b"test_service.example.com"
|
||||
|
||||
entry = Mock(spec_set=["expires"])
|
||||
entry = Mock(spec_set=["expires", "priority", "weight"])
|
||||
entry.expires = 0
|
||||
entry.priority = 0
|
||||
entry.weight = 0
|
||||
|
||||
cache = {service_name: [entry]}
|
||||
resolver = SrvResolver(dns_client=dns_client_mock, cache=cache)
|
||||
@@ -105,8 +107,10 @@ class SrvResolverTestCase(unittest.TestCase):
|
||||
|
||||
service_name = b"test_service.example.com"
|
||||
|
||||
entry = Mock(spec_set=["expires"])
|
||||
entry = Mock(spec_set=["expires", "priority", "weight"])
|
||||
entry.expires = 999999999
|
||||
entry.priority = 0
|
||||
entry.weight = 0
|
||||
|
||||
cache = {service_name: [entry]}
|
||||
resolver = SrvResolver(
|
||||
|
||||
@@ -0,0 +1,197 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import shutil
|
||||
import sys
|
||||
import textwrap
|
||||
|
||||
from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile
|
||||
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.logging._structured import setup_structured_logging
|
||||
from synapse.logging.context import LoggingContext
|
||||
|
||||
from tests.unittest import DEBUG, HomeserverTestCase
|
||||
|
||||
|
||||
class FakeBeginner(object):
|
||||
def beginLoggingTo(self, observers, **kwargs):
|
||||
self.observers = observers
|
||||
|
||||
|
||||
class StructuredLoggingTestCase(HomeserverTestCase):
|
||||
"""
|
||||
Tests for Synapse's structured logging support.
|
||||
"""
|
||||
|
||||
def test_output_to_json_round_trip(self):
|
||||
"""
|
||||
Synapse logs can be outputted to JSON and then read back again.
|
||||
"""
|
||||
temp_dir = self.mktemp()
|
||||
os.mkdir(temp_dir)
|
||||
self.addCleanup(shutil.rmtree, temp_dir)
|
||||
|
||||
json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json"))
|
||||
|
||||
log_config = {
|
||||
"drains": {"jsonfile": {"type": "file_json", "location": json_log_file}}
|
||||
}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs, self.hs.config, log_config, logBeginner=beginner
|
||||
)
|
||||
|
||||
# Make a logger and send an event
|
||||
logger = Logger(
|
||||
namespace="tests.logging.test_structured", observer=beginner.observers[0]
|
||||
)
|
||||
logger.info("Hello there, {name}!", name="wally")
|
||||
|
||||
# Read the log file and check it has the event we sent
|
||||
with open(json_log_file, "r") as f:
|
||||
logged_events = list(eventsFromJSONLogFile(f))
|
||||
self.assertEqual(len(logged_events), 1)
|
||||
|
||||
# The event pulled from the file should render fine
|
||||
self.assertEqual(
|
||||
eventAsText(logged_events[0], includeTimestamp=False),
|
||||
"[tests.logging.test_structured#info] Hello there, wally!",
|
||||
)
|
||||
|
||||
def test_output_to_text(self):
|
||||
"""
|
||||
Synapse logs can be outputted to text.
|
||||
"""
|
||||
temp_dir = self.mktemp()
|
||||
os.mkdir(temp_dir)
|
||||
self.addCleanup(shutil.rmtree, temp_dir)
|
||||
|
||||
log_file = os.path.abspath(os.path.join(temp_dir, "out.log"))
|
||||
|
||||
log_config = {"drains": {"file": {"type": "file", "location": log_file}}}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs, self.hs.config, log_config, logBeginner=beginner
|
||||
)
|
||||
|
||||
# Make a logger and send an event
|
||||
logger = Logger(
|
||||
namespace="tests.logging.test_structured", observer=beginner.observers[0]
|
||||
)
|
||||
logger.info("Hello there, {name}!", name="wally")
|
||||
|
||||
# Read the log file and check it has the event we sent
|
||||
with open(log_file, "r") as f:
|
||||
logged_events = f.read().strip().split("\n")
|
||||
self.assertEqual(len(logged_events), 1)
|
||||
|
||||
# The event pulled from the file should render fine
|
||||
self.assertTrue(
|
||||
logged_events[0].endswith(
|
||||
" - tests.logging.test_structured - INFO - None - Hello there, wally!"
|
||||
)
|
||||
)
|
||||
|
||||
def test_collects_logcontext(self):
|
||||
"""
|
||||
Test that log outputs have the attached logging context.
|
||||
"""
|
||||
log_config = {"drains": {}}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
publisher = setup_structured_logging(
|
||||
self.hs, self.hs.config, log_config, logBeginner=beginner
|
||||
)
|
||||
|
||||
logs = []
|
||||
|
||||
publisher.addObserver(logs.append)
|
||||
|
||||
# Make a logger and send an event
|
||||
logger = Logger(
|
||||
namespace="tests.logging.test_structured", observer=beginner.observers[0]
|
||||
)
|
||||
|
||||
with LoggingContext("testcontext", request="somereq"):
|
||||
logger.info("Hello there, {name}!", name="steve")
|
||||
|
||||
self.assertEqual(len(logs), 1)
|
||||
self.assertEqual(logs[0]["request"], "somereq")
|
||||
|
||||
|
||||
class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase):
|
||||
def make_homeserver(self, reactor, clock):
|
||||
|
||||
tempdir = self.mktemp()
|
||||
os.mkdir(tempdir)
|
||||
log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml"))
|
||||
self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log"))
|
||||
|
||||
config = self.default_config()
|
||||
config["log_config"] = log_config_file
|
||||
|
||||
with open(log_config_file, "w") as f:
|
||||
f.write(
|
||||
textwrap.dedent(
|
||||
"""\
|
||||
structured: true
|
||||
|
||||
drains:
|
||||
file:
|
||||
type: file_json
|
||||
location: %s
|
||||
"""
|
||||
% (self.homeserver_log,)
|
||||
)
|
||||
)
|
||||
|
||||
self.addCleanup(self._sys_cleanup)
|
||||
|
||||
return self.setup_test_homeserver(config=config)
|
||||
|
||||
def _sys_cleanup(self):
|
||||
sys.stdout = sys.__stdout__
|
||||
sys.stderr = sys.__stderr__
|
||||
|
||||
# Do not remove! We need the logging system to be set other than WARNING.
|
||||
@DEBUG
|
||||
def test_log_output(self):
|
||||
"""
|
||||
When a structured logging config is given, Synapse will use it.
|
||||
"""
|
||||
setup_logging(self.hs, self.hs.config)
|
||||
|
||||
# Make a logger and send an event
|
||||
logger = Logger(namespace="tests.logging.test_structured")
|
||||
|
||||
with LoggingContext("testcontext", request="somereq"):
|
||||
logger.info("Hello there, {name}!", name="steve")
|
||||
|
||||
with open(self.homeserver_log, "r") as f:
|
||||
logged_events = [
|
||||
eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f)
|
||||
]
|
||||
|
||||
logs = "\n".join(logged_events)
|
||||
self.assertTrue("***** STARTING SERVER *****" in logs)
|
||||
self.assertTrue("Hello there, steve!" in logs)
|
||||
@@ -0,0 +1,234 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from collections import Counter
|
||||
|
||||
from twisted.logger import Logger
|
||||
|
||||
from synapse.logging._structured import setup_structured_logging
|
||||
|
||||
from tests.server import connect_client
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
from .test_structured import FakeBeginner
|
||||
|
||||
|
||||
class TerseJSONTCPTestCase(HomeserverTestCase):
|
||||
def test_log_output(self):
|
||||
"""
|
||||
The Terse JSON outputter delivers simplified structured logs over TCP.
|
||||
"""
|
||||
log_config = {
|
||||
"drains": {
|
||||
"tersejson": {
|
||||
"type": "network_json_terse",
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs, self.hs.config, log_config, logBeginner=beginner
|
||||
)
|
||||
|
||||
logger = Logger(
|
||||
namespace="tests.logging.test_terse_json", observer=beginner.observers[0]
|
||||
)
|
||||
logger.info("Hello there, {name}!", name="wally")
|
||||
|
||||
# Trigger the connection
|
||||
self.pump()
|
||||
|
||||
_, server = connect_client(self.reactor, 0)
|
||||
|
||||
# Trigger data being sent
|
||||
self.pump()
|
||||
|
||||
# One log message, with a single trailing newline
|
||||
logs = server.data.decode("utf8").splitlines()
|
||||
self.assertEqual(len(logs), 1)
|
||||
self.assertEqual(server.data.count(b"\n"), 1)
|
||||
|
||||
log = json.loads(logs[0])
|
||||
|
||||
# The terse logger should give us these keys.
|
||||
expected_log_keys = [
|
||||
"log",
|
||||
"time",
|
||||
"level",
|
||||
"log_namespace",
|
||||
"request",
|
||||
"scope",
|
||||
"server_name",
|
||||
"name",
|
||||
]
|
||||
self.assertEqual(set(log.keys()), set(expected_log_keys))
|
||||
|
||||
# It contains the data we expect.
|
||||
self.assertEqual(log["name"], "wally")
|
||||
|
||||
def test_log_backpressure_debug(self):
|
||||
"""
|
||||
When backpressure is hit, DEBUG logs will be shed.
|
||||
"""
|
||||
log_config = {
|
||||
"loggers": {"synapse": {"level": "DEBUG"}},
|
||||
"drains": {
|
||||
"tersejson": {
|
||||
"type": "network_json_terse",
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"maximum_buffer": 10,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs,
|
||||
self.hs.config,
|
||||
log_config,
|
||||
logBeginner=beginner,
|
||||
redirect_stdlib_logging=False,
|
||||
)
|
||||
|
||||
logger = Logger(
|
||||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
|
||||
)
|
||||
|
||||
# Send some debug messages
|
||||
for i in range(0, 3):
|
||||
logger.debug("debug %s" % (i,))
|
||||
|
||||
# Send a bunch of useful messages
|
||||
for i in range(0, 7):
|
||||
logger.info("test message %s" % (i,))
|
||||
|
||||
# The last debug message pushes it past the maximum buffer
|
||||
logger.debug("too much debug")
|
||||
|
||||
# Allow the reconnection
|
||||
_, server = connect_client(self.reactor, 0)
|
||||
self.pump()
|
||||
|
||||
# Only the 7 infos made it through, the debugs were elided
|
||||
logs = server.data.splitlines()
|
||||
self.assertEqual(len(logs), 7)
|
||||
|
||||
def test_log_backpressure_info(self):
|
||||
"""
|
||||
When backpressure is hit, DEBUG and INFO logs will be shed.
|
||||
"""
|
||||
log_config = {
|
||||
"loggers": {"synapse": {"level": "DEBUG"}},
|
||||
"drains": {
|
||||
"tersejson": {
|
||||
"type": "network_json_terse",
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"maximum_buffer": 10,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs,
|
||||
self.hs.config,
|
||||
log_config,
|
||||
logBeginner=beginner,
|
||||
redirect_stdlib_logging=False,
|
||||
)
|
||||
|
||||
logger = Logger(
|
||||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
|
||||
)
|
||||
|
||||
# Send some debug messages
|
||||
for i in range(0, 3):
|
||||
logger.debug("debug %s" % (i,))
|
||||
|
||||
# Send a bunch of useful messages
|
||||
for i in range(0, 10):
|
||||
logger.warn("test warn %s" % (i,))
|
||||
|
||||
# Send a bunch of info messages
|
||||
for i in range(0, 3):
|
||||
logger.info("test message %s" % (i,))
|
||||
|
||||
# The last debug message pushes it past the maximum buffer
|
||||
logger.debug("too much debug")
|
||||
|
||||
# Allow the reconnection
|
||||
client, server = connect_client(self.reactor, 0)
|
||||
self.pump()
|
||||
|
||||
# The 10 warnings made it through, the debugs and infos were elided
|
||||
logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
|
||||
self.assertEqual(len(logs), 10)
|
||||
|
||||
self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
|
||||
|
||||
def test_log_backpressure_cut_middle(self):
|
||||
"""
|
||||
When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
|
||||
it will cut the middle messages out.
|
||||
"""
|
||||
log_config = {
|
||||
"loggers": {"synapse": {"level": "DEBUG"}},
|
||||
"drains": {
|
||||
"tersejson": {
|
||||
"type": "network_json_terse",
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"maximum_buffer": 10,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Begin the logger with our config
|
||||
beginner = FakeBeginner()
|
||||
setup_structured_logging(
|
||||
self.hs,
|
||||
self.hs.config,
|
||||
log_config,
|
||||
logBeginner=beginner,
|
||||
redirect_stdlib_logging=False,
|
||||
)
|
||||
|
||||
logger = Logger(
|
||||
namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
|
||||
)
|
||||
|
||||
# Send a bunch of useful messages
|
||||
for i in range(0, 20):
|
||||
logger.warn("test warn", num=i)
|
||||
|
||||
# Allow the reconnection
|
||||
client, server = connect_client(self.reactor, 0)
|
||||
self.pump()
|
||||
|
||||
# The first five and last five warnings made it through, the debugs and
|
||||
# infos were elided
|
||||
logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
|
||||
self.assertEqual(len(logs), 10)
|
||||
self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
|
||||
self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs])
|
||||
+25
-2
@@ -11,9 +11,13 @@ from twisted.internet import address, threads, udp
|
||||
from twisted.internet._resolver import SimpleResolverComplexifier
|
||||
from twisted.internet.defer import Deferred, fail, succeed
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple
|
||||
from twisted.internet.interfaces import (
|
||||
IReactorPluggableNameResolver,
|
||||
IReactorTCP,
|
||||
IResolverSimple,
|
||||
)
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.test.proto_helpers import MemoryReactorClock
|
||||
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
|
||||
from twisted.web.http import unquote
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
@@ -465,3 +469,22 @@ class FakeTransport(object):
|
||||
self.buffer = self.buffer[len(to_write) :]
|
||||
if self.buffer and self.autoflush:
|
||||
self._reactor.callLater(0.0, self.flush)
|
||||
|
||||
|
||||
def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
|
||||
"""
|
||||
Connect a client to a fake TCP transport.
|
||||
|
||||
Args:
|
||||
reactor
|
||||
factory: The connecting factory to build.
|
||||
"""
|
||||
factory = reactor.tcpClients[client_id][2]
|
||||
client = factory.buildProtocol(None)
|
||||
server = AccumulatingProtocol()
|
||||
server.makeConnection(FakeTransport(client, reactor))
|
||||
client.makeConnection(FakeTransport(server, reactor))
|
||||
|
||||
reactor.tcpClients.pop(client_id)
|
||||
|
||||
return client, server
|
||||
|
||||
@@ -49,6 +49,7 @@ class RegistrationStoreTestCase(unittest.TestCase):
|
||||
"consent_server_notice_sent": None,
|
||||
"appservice_id": None,
|
||||
"creation_ts": 1000,
|
||||
"user_type": None,
|
||||
},
|
||||
(yield self.store.get_user_by_id(self.user_id)),
|
||||
)
|
||||
|
||||
@@ -74,7 +74,6 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
|
||||
self.assertEqual(events_to_filter[i].event_id, filtered[i].event_id)
|
||||
self.assertEqual(filtered[i].content["a"], "b")
|
||||
|
||||
@tests.unittest.DEBUG
|
||||
@defer.inlineCallbacks
|
||||
def test_erased_user(self):
|
||||
# 4 message events, from erased and unerased users, with a membership
|
||||
|
||||
@@ -7,6 +7,7 @@ deps =
|
||||
python-subunit
|
||||
junitxml
|
||||
coverage
|
||||
coverage-enable-subprocess
|
||||
parameterized
|
||||
|
||||
# cyptography 2.2 requires setuptools >= 18.5
|
||||
@@ -43,13 +44,13 @@ whitelist_externals =
|
||||
setenv =
|
||||
{[base]setenv}
|
||||
postgres: SYNAPSE_POSTGRES = 1
|
||||
TOP={toxinidir}
|
||||
|
||||
passenv = *
|
||||
|
||||
commands =
|
||||
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
|
||||
# Add this so that coverage will run on subprocesses
|
||||
sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py'
|
||||
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
|
||||
|
||||
# As of twisted 16.4, trial tries to import the tests as a package (previously
|
||||
@@ -75,8 +76,6 @@ commands =
|
||||
# )
|
||||
usedevelop=true
|
||||
|
||||
|
||||
|
||||
# A test suite for the oldest supported versions of Python libraries, to catch
|
||||
# any uses of APIs not available in them.
|
||||
[testenv:py35-old]
|
||||
@@ -88,6 +87,7 @@ deps =
|
||||
mock
|
||||
lxml
|
||||
coverage
|
||||
coverage-enable-subprocess
|
||||
|
||||
commands =
|
||||
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
|
||||
@@ -96,15 +96,11 @@ commands =
|
||||
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
|
||||
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs -d"\n" pip install'
|
||||
|
||||
# Add this so that coverage will run on subprocesses
|
||||
/bin/sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py'
|
||||
|
||||
# Install Synapse itself. This won't update any libraries.
|
||||
pip install -e .
|
||||
|
||||
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
|
||||
|
||||
|
||||
[testenv:packaging]
|
||||
skip_install=True
|
||||
deps =
|
||||
@@ -137,12 +133,22 @@ basepython = python3.6
|
||||
[testenv:check-sampleconfig]
|
||||
commands = {toxinidir}/scripts-dev/generate_sample_config --check
|
||||
|
||||
[testenv:codecov]
|
||||
[testenv:combine]
|
||||
skip_install = True
|
||||
deps =
|
||||
coverage
|
||||
codecov
|
||||
commands =
|
||||
whitelist_externals =
|
||||
bash
|
||||
commands=
|
||||
coverage combine
|
||||
coverage xml
|
||||
codecov -X gcov
|
||||
coverage report
|
||||
|
||||
[testenv:mypy]
|
||||
basepython = python3.5
|
||||
deps =
|
||||
{[base]deps}
|
||||
mypy
|
||||
extras = all
|
||||
commands = mypy --ignore-missing-imports \
|
||||
synapse/logging/_structured.py \
|
||||
synapse/logging/_terse_json.py
|
||||
|
||||
Reference in New Issue
Block a user