1
0

Compare commits

..

5 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
6e827507f7 Return empty object on success rather than null.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 10:29:36 +01:00
Olivier Wilkinson (reivilibre)
0e99412f4c Add admin API docs for setting admin bits on users
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 10:24:38 +01:00
Olivier Wilkinson (reivilibre)
7fd0c90234 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-19 14:44:53 +01:00
Olivier Wilkinson (reivilibre)
ebd2cd84d5 Add admin API for setting the admin bit of a user. 2019-08-19 14:42:55 +01:00
Olivier Wilkinson (reivilibre)
c497e13734 Introduce set_server_admin as dual to is_server_admin. 2019-08-19 14:41:07 +01:00
238 changed files with 2546 additions and 5886 deletions

View File

@@ -6,7 +6,6 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.5
@@ -17,6 +16,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
working_dir: /app
volumes:
- ..:/src
- ..:/app

View File

@@ -6,7 +6,6 @@ services:
image: postgres:11
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
@@ -17,6 +16,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
working_dir: /app
volumes:
- ..:/src
- ..:/app

View File

@@ -6,7 +6,6 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
command: -c fsync=off
testenv:
image: python:3.7
@@ -17,6 +16,6 @@ services:
SYNAPSE_POSTGRES_HOST: postgres
SYNAPSE_POSTGRES_USER: postgres
SYNAPSE_POSTGRES_PASSWORD: postgres
working_dir: /src
working_dir: /app
volumes:
- ..:/src
- ..:/app

View File

@@ -1,18 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from tap.parser import Parser
from tap.line import Result, Unknown, Diagnostic

View File

@@ -27,7 +27,7 @@ git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
git merge --no-edit --no-commit origin/$GITBASE
git merge --no-edit origin/$GITBASE
# Show what we are after.
git --no-pager show -s

240
.buildkite/pipeline.yml Normal file
View File

@@ -0,0 +1,240 @@
env:
CODECOV_TOKEN: "2dd7eb9b-0eda-45fe-a47c-9b5ac040045f"
steps:
- command:
- "python -m pip install tox"
- "tox -e check_codestyle"
label: "\U0001F9F9 Check Style"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "tox -e packaging"
label: "\U0001F9F9 packaging"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "tox -e check_isort"
label: "\U0001F9F9 isort"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- command:
- "python -m pip install tox"
- "scripts-dev/check-newsfragment"
label: ":newspaper: Newsfile"
branches: "!master !develop !release-*"
plugins:
- docker#v3.0.1:
image: "python:3.6"
propagate-environment: true
- command:
- "python -m pip install tox"
- "tox -e check-sampleconfig"
label: "\U0001F9F9 check-sample-config"
plugins:
- docker#v3.0.1:
image: "python:3.6"
- wait
- command:
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
- "python3.5 -m pip install tox"
- "tox -e py35-old,codecov"
label: ":python: 3.5 / SQLite / Old Deps"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py35,codecov"
label: ":python: 3.5 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.5"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py36,codecov"
label: ":python: 3.6 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.6"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- command:
- "python -m pip install tox"
- "tox -e py37,codecov"
label: ":python: 3.7 / SQLite"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:3.7"
propagate-environment: true
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.5 / :postgres: 9.5"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py35.pg95.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.7 / :postgres: 9.5"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py37.pg95.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: ":python: 3.7 / :postgres: 11"
agents:
queue: "medium"
env:
TRIAL_FLAGS: "-j 8"
command:
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
plugins:
- docker-compose#v2.1.0:
run: testenv
config:
- .buildkite/docker-compose.py37.pg11.yaml
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / SQLite / Monolith"
agents:
queue: "medium"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Monolith"
agents:
queue: "medium"
env:
POSTGRES: "1"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Workers"
agents:
queue: "medium"
env:
POSTGRES: "1"
WORKERS: "1"
BLACKLIST: "synapse-blacklist-with-workers"
command:
- "bash .buildkite/merge_base_branch.sh"
- "bash -c 'cat /src/sytest-blacklist /src/.buildkite/worker-blacklist > /src/synapse-blacklist-with-workers'"
- "bash /synapse_sytest.sh"
plugins:
- docker#v3.0.1:
image: "matrixdotorg/sytest-synapse:py35"
propagate-environment: true
always-pull: true
workdir: "/src"
retry:
automatic:
- exit_status: -1
limit: 2
- exit_status: 2
limit: 2

View File

@@ -1,8 +1,7 @@
[run]
branch = True
parallel = True
include=$TOP/synapse/*
data_file = $TOP/.coverage
include = synapse/*
[report]
precision = 2

5
.gitignore vendored
View File

@@ -20,7 +20,6 @@ _trial_temp*/
/*.signing.key
/env/
/homeserver*.yaml
/logs
/media_store/
/uploads
@@ -30,9 +29,8 @@ _trial_temp*/
/.vscode/
# build products
!/.coveragerc
/.coverage*
/.mypy_cache/
!/.coveragerc
/.tox
/build/
/coverage.*
@@ -40,3 +38,4 @@ _trial_temp*/
/docs/build/
/htmlcov
/pip-wheel-metadata/

View File

@@ -36,7 +36,7 @@ that your email address is probably `user@example.com` rather than
System requirements:
- POSIX-compliant system (tested on Linux & OS X)
- Python 3.5, 3.6, or 3.7
- Python 3.5, 3.6, 3.7, or 2.7
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in
@@ -421,7 +421,7 @@ If Synapse is not configured with an SMTP server, password reset via email will
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
Alternatively you can do so from the command line if you have installed via pip.
Alternatively you can do so from the command line if you have installed via pip.
This can be done as follows:

View File

@@ -1 +0,0 @@
Lay the groundwork for structured logging output.

View File

@@ -1 +0,0 @@
Make Opentracing work in worker mode.

View File

@@ -1 +0,0 @@
Update opentracing docs to use the unified `trace` method.

View File

@@ -1 +0,0 @@
Add an admin API to purge old rooms from the database.

View File

@@ -1 +0,0 @@
Add retry to well-known lookups if we have recently seen a valid well-known record for the server.

View File

@@ -1 +0,0 @@
Pass opentracing contexts between servers when transmitting EDUs.

View File

@@ -1 +0,0 @@
Opentracing for device list updates.

View File

@@ -1 +0,0 @@
Opentracing for room and e2e keys.

View File

@@ -1 +0,0 @@
Add unstable support for MSC2197 (filtered search requests over federation), in order to allow upcoming room directory query performance improvements.

View File

@@ -1 +0,0 @@
Correctly retry all hosts returned from SRV when we fail to connect.

View File

@@ -1 +0,0 @@
Remove shared secret registration from client/r0/register endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.

View File

@@ -1 +0,0 @@
Fix stack overflow when recovering an appservice which had an outage.

View File

@@ -1 +0,0 @@
Refactor the Appservice scheduler code.

View File

@@ -1 +0,0 @@
Compatibility with v2 Identity Service APIs other than /lookup.

View File

@@ -1 +0,0 @@
Drop some unused tables.

View File

@@ -1 +0,0 @@
Add missing index on users_in_public_rooms to improve the performance of directory queries.

View File

@@ -1 +0,0 @@
Add config option to sign remote key query responses with a separate key.

View File

@@ -1 +0,0 @@
Improve the logging when we have an error when fetching signing keys.

View File

@@ -1 +0,0 @@
Add support for config templating.

View File

@@ -1 +0,0 @@
Users with the type of "support" or "bot" are no longer required to consent.

View File

@@ -1 +0,0 @@
Let synctl accept a directory of config files.

View File

@@ -1 +0,0 @@
Increase max display name size to 256.

View File

@@ -1 +0,0 @@
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!

View File

@@ -1 +0,0 @@
Add support for database engine-specific schema deltas, based on file extension.

View File

@@ -1 +0,0 @@
Add admin API endpoint for getting whether or not a user is a server administrator.

View File

@@ -1 +0,0 @@
Fix 404 for thumbnail download when `dynamic_thumbnails` is `false` and the thumbnail was dynamically generated. Fix reported by rkfg.

View File

@@ -1 +0,0 @@
Fix a cache-invalidation bug for worker-based deployments.

View File

@@ -1 +0,0 @@
Update Buildkite pipeline to use plugins instead of buildkite-agent commands.

View File

@@ -1 +0,0 @@
Add link in sample config to the logging config schema.

View File

@@ -1 +0,0 @@
Remove unnecessary parentheses in return statements.

View File

@@ -1 +0,0 @@
Remove unused jenkins/prepare_sytest.sh file.

View File

@@ -1 +0,0 @@
Move Buildkite pipeline config to the pipelines repo.

View File

@@ -1 +0,0 @@
Update INSTALL.md to say that Python 2 is no longer supported.

View File

@@ -1 +0,0 @@
Remove unnecessary return statements in the codebase which were the result of a regex run.

View File

@@ -1 +0,0 @@
Remove left-over methods from C/S registration API.

View File

@@ -1 +0,0 @@
Remove `bind_email` and `bind_msisdn` parameters from /register ala MSC2140.

View File

@@ -1 +0,0 @@
Fix admin API for listing media in a room not being available with an external media repo.

View File

@@ -1 +0,0 @@
Fix list media admin API always returning an error.

View File

@@ -1 +0,0 @@
Avoid changing UID/GID if they are already correct.

View File

@@ -1 +0,0 @@
Fix room and user stats tracking.

View File

@@ -1 +0,0 @@
Cleanup event auth type initialisation.

View File

@@ -1 +0,0 @@
Add POST /_matrix/client/r0/account/3pid/unbind endpoint from MSC2140 for unbinding a 3PID from an identity server without removing it from the homeserver user account.

View File

@@ -1 +0,0 @@
Include missing opentracing contexts in outbout replication requests.

View File

@@ -1 +0,0 @@
Add minimum opentracing for client servlets.

View File

@@ -1 +0,0 @@
Fix sending of EDUs when opentracing is enabled with an empty whitelist.

View File

@@ -1 +0,0 @@
Trace replication send times.

View File

@@ -1 +0,0 @@
Fix invalid references to None while opentracing if the log context slips.

View File

@@ -1 +0,0 @@
Support a way for clients to not send read receipts to other users/servers.

View File

@@ -268,7 +268,6 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailrequest(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -303,7 +302,6 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_emailvalidate(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/validate/email/submitToken"
@@ -332,7 +330,6 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_3pidbind(self, args):
# TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/3pid/bind"
json_res = yield self.http_client.do_request(
@@ -401,7 +398,6 @@ class SynapseCmd(cmd.Cmd):
@defer.inlineCallbacks
def _do_invite(self, roomid, userstring):
if not userstring.startswith("@") and self._is_on("complete_usernames"):
# TODO: Update to use v2 Identity Service API endpoint
url = self._identityServerUrl() + "/_matrix/identity/api/v1/lookup"
json_res = yield self.http_client.do_request(
@@ -411,7 +407,6 @@ class SynapseCmd(cmd.Cmd):
mxid = None
if "mxid" in json_res and "signatures" in json_res:
# TODO: Update to use v2 Identity Service API endpoint
url = (
self._identityServerUrl()
+ "/_matrix/identity/api/v1/pubkey/ed25519"

View File

@@ -17,7 +17,7 @@ By default, the image expects a single volume, located at ``/data``, that will h
* the appservices configuration.
You are free to use separate volumes depending on storage endpoints at your
disposal. For instance, ``/data/media`` could be stored on a large but low
disposal. For instance, ``/data/media`` coud be stored on a large but low
performance hdd storage while other files could be stored on high performance
endpoints.
@@ -27,8 +27,8 @@ configuration file there. Multiple application services are supported.
## Generating a configuration file
The first step is to generate a valid config file. To do this, you can run the
image with the `generate` command line option.
The first step is to genearte a valid config file. To do this, you can run the
image with the `generate` commandline option.
You will need to specify values for the `SYNAPSE_SERVER_NAME` and
`SYNAPSE_REPORT_STATS` environment variable, and mount a docker volume to store
@@ -59,7 +59,7 @@ The following environment variables are supported in `generate` mode:
* `SYNAPSE_CONFIG_PATH`: path to the file to be generated. Defaults to
`<SYNAPSE_CONFIG_DIR>/homeserver.yaml`.
* `SYNAPSE_DATA_DIR`: where the generated config will put persistent data
such as the database and media store. Defaults to `/data`.
such as the datatase and media store. Defaults to `/data`.
* `UID`, `GID`: the user id and group id to use for creating the data
directories. Defaults to `991`, `991`.
@@ -115,7 +115,7 @@ not given).
To migrate from a dynamic configuration file to a static one, run the docker
container once with the environment variables set, and `migrate_config`
command line option. For example:
commandline option. For example:
```
docker run -it --rm \

View File

@@ -41,8 +41,8 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
config_dir (str): where to put generated config files
config_path (str): where to put the main config file
environ (dict): environment dictionary
ownership (str|None): "<user>:<group>" string which will be used to set
ownership of the generated configs. If None, ownership will not change.
ownership (str): "<user>:<group>" string which will be used to set
ownership of the generated configs
"""
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
if v not in environ:
@@ -105,24 +105,24 @@ def generate_config_from_template(config_dir, config_path, environ, ownership):
log("Generating log config file " + log_config_file)
convert("/conf/log.config", log_config_file, environ)
subprocess.check_output(["chown", "-R", ownership, "/data"])
# Hopefully we already have a signing key, but generate one if not.
args = [
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in /data rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
if ownership is not None:
subprocess.check_output(["chown", "-R", ownership, "/data"])
args = ["su-exec", ownership] + args
subprocess.check_output(args)
subprocess.check_output(
[
"su-exec",
ownership,
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
# tell synapse to put generated keys in /data rather than /compiled
"--keys-directory",
config_dir,
"--generate-keys",
]
)
def run_generate_config(environ, ownership):
@@ -130,7 +130,7 @@ def run_generate_config(environ, ownership):
Args:
environ (dict): env var dict
ownership (str|None): "userid:groupid" arg for chmod. If None, ownership will not change.
ownership (str): "userid:groupid" arg for chmod
Never returns.
"""
@@ -149,6 +149,9 @@ def run_generate_config(environ, ownership):
log("Creating log config %s" % (log_config_file,))
convert("/conf/log.config", log_config_file, environ)
# make sure that synapse has perms to write to the data dir.
subprocess.check_output(["chown", ownership, data_dir])
args = [
"python",
"-m",
@@ -167,33 +170,12 @@ def run_generate_config(environ, ownership):
"--open-private-ports",
]
# log("running %s" % (args, ))
if ownership is not None:
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
# make sure that synapse has perms to write to the data dir.
subprocess.check_output(["chown", ownership, data_dir])
else:
os.execv("/usr/local/bin/python", args)
os.execv("/usr/local/bin/python", args)
def main(args, environ):
mode = args[1] if len(args) > 1 else None
desired_uid = int(environ.get("UID", "991"))
desired_gid = int(environ.get("GID", "991"))
if (desired_uid == os.getuid()) and (desired_gid == os.getgid()):
ownership = None
else:
ownership = "{}:{}".format(desired_uid, desired_gid)
log(
"Container running as UserID %s:%s, ENV (or defaults) requests %s:%s"
% (os.getuid(), os.getgid(), desired_uid, desired_gid)
)
if ownership is None:
log("Will not perform chmod/su-exec as UserID already matches request")
ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
# In generate mode, generate a configuration and missing keys, then exit
if mode == "generate":
@@ -245,12 +227,16 @@ def main(args, environ):
log("Starting synapse with config file " + config_path)
args = ["python", "-m", "synapse.app.homeserver", "--config-path", config_path]
if ownership is not None:
args = ["su-exec", ownership] + args
os.execv("/sbin/su-exec", args)
else:
os.execv("/usr/local/bin/python", args)
args = [
"su-exec",
ownership,
"python",
"-m",
"synapse.app.homeserver",
"--config-path",
config_path,
]
os.execv("/sbin/su-exec", args)
if __name__ == "__main__":

View File

@@ -1,18 +0,0 @@
Purge room API
==============
This API will remove all trace of a room from your database.
All local users must have left the room before it can be removed.
The API is:
```
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
```
You must authenticate using the access token of an admin user.

View File

@@ -86,25 +86,6 @@ with a body of:
including an ``access_token`` of a server admin.
Get whether a user is a server administrator or not
===================================================
The api is::
GET /_synapse/admin/v1/users/<user_id>/admin
including an ``access_token`` of a server admin.
A response body like the following is returned:
.. code:: json
{
"admin": true
}
Change whether a user is a server administrator or not
======================================================

View File

@@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans
it creates. This is called the sampling policy and it can be configured
through Jaeger's settings.
For OpenTracing concepts see
For OpenTracing concepts see
https://opentracing.io/docs/overview/what-is-tracing/.
For more information about Jaeger's implementation see
@@ -79,7 +79,7 @@ Homeserver whitelisting
The homeserver whitelist is configured using regular expressions. A list of regular
expressions can be given and their union will be compared when propagating any
spans contexts to another homeserver.
spans contexts to another homeserver.
Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead to
@@ -92,29 +92,6 @@ two problems, namely:
but that doesn't prevent another server sending you baggage which will be logged
to OpenTracing's logs.
==========
EDU FORMAT
==========
EDUs can contain tracing data in their content. This is not specced but
it could be of interest for other homeservers.
EDU format (if you're using jaeger):
.. code-block:: json
{
"edu_type": "type",
"content": {
"org.matrix.opentracing_context": {
"uber-trace-id": "fe57cf3e65083289"
}
}
}
Though you don't have to use jaeger you must inject the span context into
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
==================
Configuring Jaeger
==================

View File

@@ -1,62 +0,0 @@
Room and User Statistics
========================
Synapse maintains room and user statistics (as well as a cache of room state),
in various tables. These can be used for administrative purposes but are also
used when generating the public room directory.
# Synapse Developer Documentation
## High-Level Concepts
### Definitions
* **subject**: Something we are tracking stats about currently a room or user.
* **current row**: An entry for a subject in the appropriate current statistics
table. Each subject can have only one.
* **historical row**: An entry for a subject in the appropriate historical
statistics table. Each subject can have any number of these.
### Overview
Stats are maintained as time series. There are two kinds of column:
* absolute columns where the value is correct for the time given by `end_ts`
in the stats row. (Imagine a line graph for these values)
* They can also be thought of as 'gauges' in Prometheus, if you are familiar.
* per-slice columns where the value corresponds to how many of the occurrences
occurred within the time slice given by `(end_ts bucket_size)…end_ts`
or `start_ts…end_ts`. (Imagine a histogram for these values)
Stats are maintained in two tables (for each type): current and historical.
Current stats correspond to the present values. Each subject can only have one
entry.
Historical stats correspond to values in the past. Subjects may have multiple
entries.
## Concepts around the management of stats
### Current rows
Current rows contain the most up-to-date statistics for a room.
They only contain absolute columns
### Historical rows
Historical rows can always be considered to be valid for the time slice and
end time specified.
* historical rows will not exist for every time slice they will be omitted
if there were no changes. In this case, the following assumptions can be
made to interpolate/recreate missing rows:
- absolute fields have the same values as in the preceding row
- per-slice fields are zero (`0`)
* historical rows will not be retained forever rows older than a configurable
time will be purged.
#### Purge
The purging of historical rows is not yet implemented.

View File

@@ -205,9 +205,9 @@ listeners:
#
- port: 8008
tls: false
bind_addresses: ['::1', '127.0.0.1']
type: http
x_forwarded: true
bind_addresses: ['::1', '127.0.0.1']
resources:
- names: [client, federation]
@@ -392,10 +392,10 @@ listeners:
# permission to listen on port 80.
#
acme:
# ACME support is disabled by default. Set this to `true` and uncomment
# tls_certificate_path and tls_private_key_path above to enable it.
# ACME support is disabled by default. Uncomment the following line
# (and tls_certificate_path and tls_private_key_path above) to enable it.
#
enabled: False
#enabled: true
# Endpoint to use to request certificates. If you only want to test,
# use Let's Encrypt's staging url:
@@ -406,17 +406,17 @@ acme:
# Port number to listen on for the HTTP-01 challenge. Change this if
# you are forwarding connections through Apache/Nginx/etc.
#
port: 80
#port: 80
# Local addresses to listen on for incoming connections.
# Again, you may want to change this if you are forwarding connections
# through Apache/Nginx/etc.
#
bind_addresses: ['::', '0.0.0.0']
#bind_addresses: ['::', '0.0.0.0']
# How many days remaining on a certificate before it is renewed.
#
reprovision_threshold: 30
#reprovision_threshold: 30
# The domain that the certificate should be for. Normally this
# should be the same as your Matrix domain (i.e., 'server_name'), but,
@@ -430,7 +430,7 @@ acme:
#
# If not set, defaults to your 'server_name'.
#
domain: matrix.example.com
#domain: matrix.example.com
# file to use for the account key. This will be generated if it doesn't
# exist.
@@ -485,8 +485,7 @@ database:
## Logging ##
# A yaml python logging config file as described by
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
# A yaml python logging config file
#
log_config: "CONFDIR/SERVERNAME.log.config"
@@ -1028,14 +1027,6 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
# Enable SAML2 for registration and login. Uses pysaml2.

View File

@@ -1,83 +0,0 @@
# Structured Logging
A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
A structured logging configuration looks similar to the following:
```yaml
structured: true
loggers:
synapse:
level: INFO
synapse.storage.SQL:
level: WARNING
drains:
console:
type: console
location: stdout
file:
type: file_json
location: homeserver.log
```
The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
## Drain Types
Drain types can be specified by the `type` key.
### `console`
Outputs human-readable logs to the console.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `console_json`
Outputs machine-readable JSON logs to the console.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `console_json_terse`
Outputs machine-readable JSON logs to the console, separated by newlines. This
format is not designed to be read and re-formatted into human-readable text, but
is optimal for a logging aggregation system.
Arguments:
- `location`: Either `stdout` or `stderr`.
### `file`
Outputs human-readable logs to a file.
Arguments:
- `location`: An absolute path to the file to log to.
### `file_json`
Outputs machine-readable logs to a file.
Arguments:
- `location`: An absolute path to the file to log to.
### `network_json_terse`
Delivers machine-readable JSON logs to a log aggregator over TCP. This is
compatible with LogStash's TCP input with the codec set to `json_lines`.
Arguments:
- `host`: Hostname or IP address of the log aggregator.
- `port`: Numerical port to contact on the host.

16
jenkins/prepare_synapse.sh Executable file
View File

@@ -0,0 +1,16 @@
#! /bin/bash
set -eux
cd "`dirname $0`/.."
TOX_DIR=$WORKSPACE/.tox
mkdir -p $TOX_DIR
if ! [ $TOX_DIR -ef .tox ]; then
ln -s "$TOX_DIR" .tox
fi
# set up the virtualenv
tox -e py27 --notest -v

View File

@@ -276,25 +276,25 @@ class Auth(object):
self.get_access_token_from_request(request)
)
if app_service is None:
return None, None
return (None, None)
if app_service.ip_range_whitelist:
ip_address = IPAddress(self.hs.get_ip_from_request(request))
if ip_address not in app_service.ip_range_whitelist:
return None, None
return (None, None)
if b"user_id" not in request.args:
return app_service.sender, app_service
return (app_service.sender, app_service)
user_id = request.args[b"user_id"][0].decode("utf8")
if app_service.sender == user_id:
return app_service.sender, app_service
return (app_service.sender, app_service)
if not app_service.is_interested_in_user(user_id):
raise AuthError(403, "Application service cannot masquerade as this user.")
if not (yield self.store.get_user_by_id(user_id)):
raise AuthError(403, "Application service has not registered this user")
return user_id, app_service
return (user_id, app_service)
@defer.inlineCallbacks
def get_user_by_access_token(self, token, rights="access"):
@@ -694,7 +694,7 @@ class Auth(object):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_was_in_room(room_id, user_id)
return member_event.membership, member_event.event_id
return (member_event.membership, member_event.event_id)
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@@ -703,7 +703,8 @@ class Auth(object):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
return Membership.JOIN, None
return (Membership.JOIN, None)
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)

View File

@@ -122,8 +122,7 @@ class UserTypes(object):
"""
SUPPORT = "support"
BOT = "bot"
ALL_USER_TYPES = (SUPPORT, BOT)
ALL_USER_TYPES = (SUPPORT,)
class RelationTypes(object):

View File

@@ -36,20 +36,18 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
# list of tuples of function, args list, kwargs dict
_sighup_callbacks = []
def register_sighup(func, *args, **kwargs):
def register_sighup(func):
"""
Register a function to be called when a SIGHUP occurs.
Args:
func (function): Function to be called when sent a SIGHUP signal.
Will be called with a single default argument, the homeserver.
*args, **kwargs: args and kwargs to be passed to the target function.
Will be called with a single argument, the homeserver.
"""
_sighup_callbacks.append((func, args, kwargs))
_sighup_callbacks.append(func)
def start_worker_reactor(appname, config, run_command=reactor.run):
@@ -250,8 +248,8 @@ def start(hs, listeners=None):
# we're not using systemd.
sdnotify(b"RELOADING=1")
for i, args, kwargs in _sighup_callbacks:
i(hs, *args, **kwargs)
for i in _sighup_callbacks:
i(hs)
sdnotify(b"READY=1")

View File

@@ -227,6 +227,8 @@ def start(config_options):
config.start_pushers = False
config.send_federation = False
setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -239,8 +241,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
# We use task.react as the basic run command as it correctly handles tearing

View File

@@ -141,6 +141,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.appservice"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -165,8 +167,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
ps.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ps, config.worker_listeners

View File

@@ -179,6 +179,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.client_reader"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -191,8 +193,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -175,6 +175,8 @@ def start(config_options):
assert config.worker_replication_http_port is not None
setup_logging(config, use_worker_options=True)
# This should only be done on the user directory worker or the master
config.update_user_directory = False
@@ -190,8 +192,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -160,6 +160,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_reader"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -172,8 +174,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -171,6 +171,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_sender"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -195,8 +197,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
@@ -262,8 +262,6 @@ class FederationSenderHandler(object):
# we only want to send on receipts for our own users
if not self._is_mine_id(receipt.user_id):
continue
if receipt.data.get("hidden", False):
return # do not send over federation
receipt_info = ReadReceipt(
receipt.room_id,
receipt.receipt_type,

View File

@@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet):
except HttpResponseException as e:
raise e.to_synapse_error()
return 200, result
return (200, result)
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
return 200, {}
return (200, {})
class KeyUploadServlet(RestServlet):
@@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet):
self.main_uri + request.uri.decode("ascii"), body, headers=headers
)
return 200, result
return (200, result)
else:
# Just interested in counts.
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
return 200, {"one_time_key_counts": result}
return (200, {"one_time_key_counts": result})
class FrontendProxySlavedStore(
@@ -232,6 +232,8 @@ def start(config_options):
assert config.worker_main_http_uri is not None
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -244,8 +246,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -341,6 +341,8 @@ def setup(config_options):
# generating config files and shouldn't try to continue.
sys.exit(0)
synapse.config.logger.setup_logging(config, use_worker_options=False)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -354,8 +356,6 @@ def setup(config_options):
database_engine=database_engine,
)
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
logger.info("Preparing database: %s...", config.database_config["name"])
try:

View File

@@ -155,6 +155,8 @@ def start(config_options):
"Please add ``enable_media_repo: false`` to the main config\n"
)
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -167,8 +169,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -184,6 +184,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.pusher"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
if config.start_pushers:
@@ -208,8 +210,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ps, config, use_worker_options=True)
ps.setup()
def start():

View File

@@ -435,6 +435,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.synchrotron"
setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -448,8 +450,6 @@ def start(config_options):
application_service_handler=SynchrotronApplicationService(),
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -197,6 +197,8 @@ def start(config_options):
assert config.worker_app == "synapse.app.user_dir"
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -221,8 +223,6 @@ def start(config_options):
database_engine=database_engine,
)
setup_logging(ss, config, use_worker_options=True)
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners

View File

@@ -107,6 +107,7 @@ class ApplicationServiceApi(SimpleHttpClient):
except CodeMessageException as e:
if e.code == 404:
return False
return
logger.warning("query_user to %s received %s", uri, e.code)
except Exception as ex:
logger.warning("query_user to %s threw exception %s", uri, ex)
@@ -126,6 +127,7 @@ class ApplicationServiceApi(SimpleHttpClient):
logger.warning("query_alias to %s received %s", uri, e.code)
if e.code == 404:
return False
return
except Exception as ex:
logger.warning("query_alias to %s threw exception %s", uri, ex)
return False
@@ -228,6 +230,7 @@ class ApplicationServiceApi(SimpleHttpClient):
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(events))
return True
return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:

View File

@@ -70,37 +70,35 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
def create_recoverer(service, callback):
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them.
services = yield self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
)
for service in services:
self.txn_ctrl.start_recoverer(service)
self.txn_ctrl.add_recoverers(recoverers)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
class _ServiceQueuer(object):
"""Queue of events waiting to be sent to appservices.
Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
"""Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
"""
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
@@ -138,29 +136,13 @@ class _ServiceQueuer(object):
class _TransactionController(object):
"""Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""
def __init__(self, clock, store, as_api):
def __init__(self, clock, store, as_api, recoverer_fn):
self.clock = clock
self.store = store
self.as_api = as_api
# map from service id to recoverer instance
self.recoverers = {}
# for UTs
self.RECOVERER_CLASS = _Recoverer
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []
@defer.inlineCallbacks
def send(self, service, events):
@@ -172,45 +154,42 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)
run_in_background(self._start_recoverer, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))
@defer.inlineCallbacks
def _on_txn_fail(self, service):
def _start_recoverer(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
self.start_recoverer(service)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
except Exception:
logger.exception("Error starting AS recoverer")
def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
@@ -218,17 +197,18 @@ class _TransactionController(object):
class _Recoverer(object):
"""Manages retries and backoff for a DOWN appservice.
We have one of these for each appservice which is currently considered DOWN.
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
service (synapse.appservice.ApplicationService): the service we are managing
callback (callable[_Recoverer]): called once the service recovers.
"""
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
logger.info(
"Starting recoverer for AS ID %s which was marked as " "DOWN",
r.service.id,
)
r.recover()
return recoverers
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
@@ -244,9 +224,7 @@ class _Recoverer(object):
"as-recoverer-%s" % (self.service.id,), self.retry
)
delay = 2 ** self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.clock.call_later(delay, _retry)
self.clock.call_later((2 ** self.backoff_counter), _retry)
def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -256,30 +234,25 @@ class _Recoverer(object):
@defer.inlineCallbacks
def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
return
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
if not sent:
break
if sent:
yield txn.complete(self.store)
# reset the backoff counter and retry immediately
self.backoff_counter = 1
yield self.retry()
else:
self._backoff()
else:
self._set_service_recovered()
except Exception as e:
logger.exception(e)
self._backoff()
yield txn.complete(self.store)
# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
except Exception:
logger.exception("Unexpected error running retries")
# we didn't manage to send all of the transactions before we got an error of
# some flavour: reschedule the next retry.
self._backoff()
def _set_service_recovered(self):
self.callback(self)

View File

@@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import ConfigError, find_config_files
from ._base import ConfigError
# export ConfigError and find_config_files if somebody does
# import *
# export ConfigError if somebody does import *
# this is largely a fudge to stop PEP8 moaning about the import
__all__ = ["ConfigError", "find_config_files"]
__all__ = ["ConfigError"]

View File

@@ -181,11 +181,6 @@ class Config(object):
generate_secrets=False,
report_stats=None,
open_private_ports=False,
listeners=None,
database_conf=None,
tls_certificate_path=None,
tls_private_key_path=None,
acme_domain=None,
):
"""Build a default configuration file
@@ -212,33 +207,6 @@ class Config(object):
open_private_ports (bool): True to leave private ports (such as the non-TLS
HTTP listener) open to the internet.
listeners (list(dict)|None): A list of descriptions of the listeners
synapse should start with each of which specifies a port (str), a list of
resources (list(str)), tls (bool) and type (str). For example:
[{
"port": 8448,
"resources": [{"names": ["federation"]}],
"tls": True,
"type": "http",
},
{
"port": 443,
"resources": [{"names": ["client"]}],
"tls": False,
"type": "http",
}],
database (str|None): The database type to configure, either `psycog2`
or `sqlite3`.
tls_certificate_path (str|None): The path to the tls certificate.
tls_private_key_path (str|None): The path to the tls private key.
acme_domain (str|None): The domain acme will try to validate. If
specified acme will be enabled.
Returns:
str: the yaml config file
"""
@@ -252,11 +220,6 @@ class Config(object):
generate_secrets=generate_secrets,
report_stats=report_stats,
open_private_ports=open_private_ports,
listeners=listeners,
database_conf=database_conf,
tls_certificate_path=tls_certificate_path,
tls_private_key_path=tls_private_key_path,
acme_domain=acme_domain,
)
)

View File

@@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from textwrap import indent
import yaml
from ._base import Config
@@ -41,28 +38,20 @@ class DatabaseConfig(Config):
self.set_databasepath(config.get("database_path"))
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
if not database_conf:
database_path = os.path.join(data_dir_path, "homeserver.db")
database_conf = (
"""# The database engine name
name: "sqlite3"
# Arguments to pass to the engine
args:
# Path to the database
database: "%(database_path)s"
"""
% locals()
)
else:
database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
def generate_config_section(self, data_dir_path, **kwargs):
database_path = os.path.join(data_dir_path, "homeserver.db")
return (
"""\
## Database ##
database:
%(database_conf)s
# The database engine name
name: "sqlite3"
# Arguments to pass to the engine
args:
# Path to the database
database: "%(database_path)s"
# Number of events to cache in memory.
#
#event_cache_size: 10K

View File

@@ -115,7 +115,7 @@ class EmailConfig(Config):
missing.append("email." + k)
if config.get("public_baseurl") is None:
missing.append("public_baseurl")
missing.append("public_base_url")
if len(missing) > 0:
raise RuntimeError(

View File

@@ -76,7 +76,7 @@ class KeyConfig(Config):
config_dir_path, config["server_name"] + ".signing.key"
)
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
self.signing_key = self.read_signing_key(signing_key_path)
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
@@ -85,14 +85,6 @@ class KeyConfig(Config):
config.get("key_refresh_interval", "1d")
)
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
if key_server_signing_keys_path:
self.key_server_signing_keys = self.read_signing_keys(
key_server_signing_keys_path, "key_server_signing_keys_path"
)
else:
self.key_server_signing_keys = list(self.signing_key)
# if neither trusted_key_servers nor perspectives are given, use the default.
if "perspectives" not in config and "trusted_key_servers" not in config:
key_servers = [{"server_name": "matrix.org"}]
@@ -218,34 +210,16 @@ class KeyConfig(Config):
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
"""
% locals()
)
def read_signing_keys(self, signing_key_path, name):
"""Read the signing keys in the given path.
Args:
signing_key_path (str)
name (str): Associated config key name
Returns:
list[SigningKey]
"""
signing_keys = self.read_file(signing_key_path, name)
def read_signing_key(self, signing_key_path):
signing_keys = self.read_file(signing_key_path, "signing_key")
try:
return read_signing_keys(signing_keys.splitlines(True))
except Exception as e:
raise ConfigError("Error reading %s: %s" % (name, str(e)))
raise ConfigError("Error reading signing_key: %s" % (str(e)))
def read_old_signing_keys(self, old_signing_keys):
keys = {}

View File

@@ -25,10 +25,6 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
import synapse
from synapse.app import _base as appbase
from synapse.logging._structured import (
reload_structured_logging,
setup_structured_logging,
)
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
@@ -89,8 +85,7 @@ class LoggingConfig(Config):
"""\
## Logging ##
# A yaml python logging config file as described by
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
# A yaml python logging config file
#
log_config: "%(log_config)s"
"""
@@ -124,10 +119,21 @@ class LoggingConfig(Config):
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
def _setup_stdlib_logging(config, log_config):
"""
Set up Python stdlib logging.
def setup_logging(config, use_worker_options=False):
""" Set up python logging
Args:
config (LoggingConfig | synapse.config.workers.WorkerConfig):
configuration data
use_worker_options (bool): True to use the 'worker_log_config' option
instead of 'log_config'.
register_sighup (func | None): Function to call to register a
sighup handler.
"""
log_config = config.worker_log_config if use_worker_options else config.log_config
if log_config is None:
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
@@ -145,10 +151,35 @@ def _setup_stdlib_logging(config, log_config):
handler.addFilter(LoggingContextFilter(request=""))
logger.addHandler(handler)
else:
logging.config.dictConfig(log_config)
# Route Twisted's native logging through to the standard library logging
# system.
def load_log_config():
with open(log_config, "r") as f:
logging.config.dictConfig(yaml.safe_load(f))
def sighup(*args):
# it might be better to use a file watcher or something for this.
load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config()
appbase.register_sighup(sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164
#
# Routing to the python logging framework could be a performance problem if
# the handlers blocked for a long time as python.logging is a blocking API
# see https://twistedmatrix.com/documents/current/core/howto/logger.html
# filed as https://github.com/matrix-org/synapse/issues/1727
#
# However this may not be too much of a problem if we are just writing to a file.
observer = STDLibLogObserver()
def _log(event):
@@ -170,54 +201,3 @@ def _setup_stdlib_logging(config, log_config):
)
if not config.no_redirect_stdio:
print("Redirected stdout/stderr to logs")
def _reload_stdlib_logging(*args, log_config=None):
logger = logging.getLogger("")
if not log_config:
logger.warn("Reloaded a blank config?")
logging.config.dictConfig(log_config)
def setup_logging(hs, config, use_worker_options=False):
"""
Set up the logging subsystem.
Args:
config (LoggingConfig | synapse.config.workers.WorkerConfig):
configuration data
use_worker_options (bool): True to use the 'worker_log_config' option
instead of 'log_config'.
"""
log_config = config.worker_log_config if use_worker_options else config.log_config
def read_config(*args, callback=None):
if log_config is None:
return None
with open(log_config, "rb") as f:
log_config_body = yaml.safe_load(f.read())
if callback:
callback(log_config=log_config_body)
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
return log_config_body
log_config_body = read_config()
if log_config_body and log_config_body.get("structured") is True:
setup_structured_logging(hs, config, log_config_body)
appbase.register_sighup(read_config, callback=reload_structured_logging)
else:
_setup_stdlib_logging(config, log_config_body)
appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)

View File

@@ -17,11 +17,8 @@
import logging
import os.path
import re
from textwrap import indent
import attr
import yaml
from netaddr import IPSet
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -355,7 +352,7 @@ class ServerConfig(Config):
return any(l["tls"] for l in self.listeners)
def generate_config_section(
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
self, server_name, data_dir_path, open_private_ports, **kwargs
):
_, bind_port = parse_and_validate_server_name(server_name)
if bind_port is not None:
@@ -369,68 +366,11 @@ class ServerConfig(Config):
# Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
# default config string
default_room_version = DEFAULT_ROOM_VERSION
secure_listeners = []
unsecure_listeners = []
private_addresses = ["::1", "127.0.0.1"]
if listeners:
for listener in listeners:
if listener["tls"]:
secure_listeners.append(listener)
else:
# If we don't want open ports we need to bind the listeners
# to some address other than 0.0.0.0. Here we chose to use
# localhost.
# If the addresses are already bound we won't overwrite them
# however.
if not open_private_ports:
listener.setdefault("bind_addresses", private_addresses)
unsecure_listeners.append(listener)
secure_http_bindings = indent(
yaml.dump(secure_listeners), " " * 10
).lstrip()
unsecure_http_bindings = indent(
yaml.dump(unsecure_listeners), " " * 10
).lstrip()
if not unsecure_listeners:
unsecure_http_bindings = (
"""- port: %(unsecure_port)s
tls: false
type: http
x_forwarded: true"""
% locals()
)
if not open_private_ports:
unsecure_http_bindings += (
"\n bind_addresses: ['::1', '127.0.0.1']"
)
unsecure_http_bindings += """
resources:
- names: [client, federation]
compress: false"""
if listeners:
# comment out this block
unsecure_http_bindings = "#" + re.sub(
"\n {10}",
lambda match: match.group(0) + "#",
unsecure_http_bindings,
)
if not secure_listeners:
secure_http_bindings = (
"""#- port: %(bind_port)s
# type: http
# tls: true
# resources:
# - names: [client, federation]"""
% locals()
unsecure_http_binding = "port: %i\n tls: false" % (unsecure_port,)
if not open_private_ports:
unsecure_http_binding += (
"\n bind_addresses: ['::1', '127.0.0.1']"
)
return (
@@ -616,7 +556,11 @@ class ServerConfig(Config):
# will also need to give Synapse a TLS key and certificate: see the TLS section
# below.)
#
%(secure_http_bindings)s
#- port: %(bind_port)s
# type: http
# tls: true
# resources:
# - names: [client, federation]
# Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy
# that unwraps TLS.
@@ -624,7 +568,13 @@ class ServerConfig(Config):
# If you plan to use a reverse proxy, please see
# https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst.
#
%(unsecure_http_bindings)s
- %(unsecure_http_binding)s
type: http
x_forwarded: true
resources:
- names: [client, federation]
compress: false
# example additional_resources:
#

View File

@@ -27,16 +27,19 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
self.stats_bucket_size = 86400 * 1000
self.stats_bucket_size = 86400
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = self.parse_duration(
stats_config.get("bucket_size", "1d")
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
)
self.stats_retention = self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):

View File

@@ -239,38 +239,12 @@ class TlsConfig(Config):
self.tls_fingerprints.append({"sha256": sha256_fingerprint})
def generate_config_section(
self,
config_dir_path,
server_name,
data_dir_path,
tls_certificate_path,
tls_private_key_path,
acme_domain,
**kwargs
self, config_dir_path, server_name, data_dir_path, **kwargs
):
"""If the acme_domain is specified acme will be enabled.
If the TLS paths are not specified the default will be certs in the
config directory"""
base_key_name = os.path.join(config_dir_path, server_name)
if bool(tls_certificate_path) != bool(tls_private_key_path):
raise ConfigError(
"Please specify both a cert path and a key path or neither."
)
tls_enabled = (
"" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
)
if not tls_certificate_path:
tls_certificate_path = base_key_name + ".tls.crt"
if not tls_private_key_path:
tls_private_key_path = base_key_name + ".tls.key"
acme_enabled = bool(acme_domain)
acme_domain = "matrix.example.com"
tls_certificate_path = base_key_name + ".tls.crt"
tls_private_key_path = base_key_name + ".tls.key"
default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
# this is to avoid the max line length. Sorrynotsorry
@@ -295,11 +269,11 @@ class TlsConfig(Config):
# instance, if using certbot, use `fullchain.pem` as your certificate,
# not `cert.pem`).
#
%(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
#tls_certificate_path: "%(tls_certificate_path)s"
# PEM-encoded private key for TLS
#
%(tls_enabled)stls_private_key_path: "%(tls_private_key_path)s"
#tls_private_key_path: "%(tls_private_key_path)s"
# Whether to verify TLS server certificates for outbound federation requests.
#
@@ -366,10 +340,10 @@ class TlsConfig(Config):
# permission to listen on port 80.
#
acme:
# ACME support is disabled by default. Set this to `true` and uncomment
# tls_certificate_path and tls_private_key_path above to enable it.
# ACME support is disabled by default. Uncomment the following line
# (and tls_certificate_path and tls_private_key_path above) to enable it.
#
enabled: %(acme_enabled)s
#enabled: true
# Endpoint to use to request certificates. If you only want to test,
# use Let's Encrypt's staging url:
@@ -380,17 +354,17 @@ class TlsConfig(Config):
# Port number to listen on for the HTTP-01 challenge. Change this if
# you are forwarding connections through Apache/Nginx/etc.
#
port: 80
#port: 80
# Local addresses to listen on for incoming connections.
# Again, you may want to change this if you are forwarding connections
# through Apache/Nginx/etc.
#
bind_addresses: ['::', '0.0.0.0']
#bind_addresses: ['::', '0.0.0.0']
# How many days remaining on a certificate before it is renewed.
#
reprovision_threshold: 30
#reprovision_threshold: 30
# The domain that the certificate should be for. Normally this
# should be the same as your Matrix domain (i.e., 'server_name'), but,
@@ -404,7 +378,7 @@ class TlsConfig(Config):
#
# If not set, defaults to your 'server_name'.
#
domain: %(acme_domain)s
#domain: matrix.example.com
# file to use for the account key. This will be generated if it doesn't
# exist.

View File

@@ -83,7 +83,7 @@ def compute_content_hash(event_dict, hash_algorithm):
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return hashed.name, hashed.digest()
return (hashed.name, hashed.digest())
def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
@@ -106,7 +106,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
event_dict.pop("unsigned", None)
event_json_bytes = encode_canonical_json(event_dict)
hashed = hash_algorithm(event_json_bytes)
return hashed.name, hashed.digest()
return (hashed.name, hashed.digest())
def compute_event_signature(event_dict, signature_name, signing_key):

View File

@@ -18,6 +18,7 @@ import logging
from collections import defaultdict
import six
from six import raise_from
from six.moves import urllib
import attr
@@ -29,6 +30,7 @@ from signedjson.key import (
from signedjson.sign import (
SignatureVerifyException,
encode_canonical_json,
sign_json,
signature_ids,
verify_signed_json,
)
@@ -538,7 +540,13 @@ class BaseV2KeyFetcher(object):
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
)
key_json_bytes = encode_canonical_json(response_json)
# re-sign the json with our own key, so that it is ready if we are asked to
# give it out as a notary server
signed_key_json = sign_json(
response_json, self.config.server_name, self.config.signing_key[0]
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -550,7 +558,7 @@ class BaseV2KeyFetcher(object):
from_server=from_server,
ts_now_ms=time_added_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=key_json_bytes,
key_json_bytes=signed_key_json_bytes,
)
for key_id in verify_keys
],
@@ -649,10 +657,9 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
},
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve upon
raise KeyLookupError(str(e))
raise_from(KeyLookupError("Failed to connect to remote server"), e)
except HttpResponseException as e:
raise KeyLookupError("Remote server returned an error: %s" % (e,))
raise_from(KeyLookupError("Remote server returned an error"), e)
keys = {}
added_keys = []
@@ -814,11 +821,9 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve
# upon
raise KeyLookupError(str(e))
raise_from(KeyLookupError("Failed to connect to remote server"), e)
except HttpResponseException as e:
raise KeyLookupError("Remote server returned an error: %s" % (e,))
raise_from(KeyLookupError("Remote server returned an error"), e)
if response["server_name"] != server_name:
raise KeyLookupError(

View File

@@ -637,11 +637,11 @@ def auth_types_for_event(event):
if event.type == EventTypes.Create:
return []
auth_types = [
(EventTypes.PowerLevels, ""),
(EventTypes.Member, event.sender),
(EventTypes.Create, ""),
]
auth_types = []
auth_types.append((EventTypes.PowerLevels, ""))
auth_types.append((EventTypes.Member, event.sender))
auth_types.append((EventTypes.Create, ""))
if event.type == EventTypes.Member:
membership = event.content["membership"]

View File

@@ -355,7 +355,7 @@ class FederationClient(FederationBase):
auth_chain.sort(key=lambda e: e.depth)
return pdus, auth_chain
return (pdus, auth_chain)
except HttpResponseException as e:
if e.code == 400 or e.code == 404:
logger.info("Failed to use get_room_state_ids API, falling back")
@@ -404,7 +404,7 @@ class FederationClient(FederationBase):
signed_auth.sort(key=lambda e: e.depth)
return signed_pdus, signed_auth
return (signed_pdus, signed_auth)
@defer.inlineCallbacks
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
@@ -429,7 +429,7 @@ class FederationClient(FederationBase):
missing_events.discard(k)
if not missing_events:
return signed_events, failed_to_fetch
return (signed_events, failed_to_fetch)
logger.debug(
"Fetching unknown state/auth events %s for room %s",
@@ -465,7 +465,7 @@ class FederationClient(FederationBase):
# We removed all events we successfully fetched from `batch`
failed_to_fetch.update(batch)
return signed_events, failed_to_fetch
return (signed_events, failed_to_fetch)
@defer.inlineCallbacks
@log_function

View File

@@ -43,7 +43,6 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -100,7 +99,7 @@ class FederationServer(FederationBase):
res = self._transaction_from_pdus(pdus).get_dict()
return 200, res
return (200, res)
@defer.inlineCallbacks
@log_function
@@ -163,7 +162,7 @@ class FederationServer(FederationBase):
yield self.transaction_actions.set_response(
origin, transaction, 400, response
)
return 400, response
return (400, response)
received_pdus_counter.inc(len(transaction.pdus))
@@ -265,7 +264,7 @@ class FederationServer(FederationBase):
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(origin, transaction, 200, response)
return 200, response
return (200, response)
@defer.inlineCallbacks
def received_edu(self, origin, edu_type, content):
@@ -298,7 +297,7 @@ class FederationServer(FederationBase):
event_id,
)
return 200, resp
return (200, resp)
@defer.inlineCallbacks
def on_state_ids_request(self, origin, room_id, event_id):
@@ -315,7 +314,7 @@ class FederationServer(FederationBase):
state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id)
auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
return (200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids})
@defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
@@ -345,15 +344,15 @@ class FederationServer(FederationBase):
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu:
return 200, self._transaction_from_pdus([pdu]).get_dict()
return (200, self._transaction_from_pdus([pdu]).get_dict())
else:
return 404, ""
return (404, "")
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
return 200, resp
return (200, resp)
@defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
@@ -435,7 +434,7 @@ class FederationServer(FederationBase):
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
yield self.handler.on_send_leave_request(origin, pdu)
return 200, {}
return (200, {})
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
@@ -446,7 +445,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
return 200, res
return (200, res)
@defer.inlineCallbacks
def on_query_auth_request(self, origin, content, room_id, event_id):
@@ -499,7 +498,7 @@ class FederationServer(FederationBase):
"missing": ret.get("missing", []),
}
return 200, send_content
return (200, send_content)
@log_function
def on_query_client_keys(self, origin, content):
@@ -508,7 +507,6 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
@trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -517,7 +515,6 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -811,13 +808,12 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
with start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)

View File

@@ -14,20 +14,11 @@
# limitations under the License.
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
whitelisted_homeserver,
)
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -53,115 +44,93 @@ class TransactionManager(object):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
success = True
for edu in pending_edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json.loads(context)))
if keep_destination:
edu.strip_context()
logger.debug("TX [%s] _attempt_new_transaction", destination)
with start_active_span_follows_from("send_transaction", span_contexts):
txn_id = str(self._next_txn_id)
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)
success = True
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
logger.debug("TX [%s] _attempt_new_transaction", destination)
self._next_txn_id += 1
txn_id = str(self._next_txn_id)
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise e
self._next_txn_id += 1
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
p.event_id,
e_id,
r,
)
success = False
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
p.event_id,
)
success = False
set_tag(tags.ERROR, not success)
return success
return success

View File

@@ -327,37 +327,21 @@ class TransportLayerClient(object):
include_all_networks=False,
third_party_instance_id=None,
):
if search_filter:
# this uses MSC2197 (Search Filtering over Federation)
path = _create_v1_path("/publicRooms")
path = _create_v1_path("/publicRooms")
data = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
data["third_party_instance_id"] = third_party_instance_id
if limit:
data["limit"] = str(limit)
if since_token:
data["since"] = since_token
args = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
args["limit"] = [str(limit)]
if since_token:
args["since"] = [since_token]
data["filter"] = search_filter
# TODO(erikj): Actually send the search_filter across federation.
response = yield self.client.post_json(
destination=remote_server, path=path, data=data, ignore_backoff=True
)
else:
path = _create_v1_path("/publicRooms")
args = {"include_all_networks": "true" if include_all_networks else "false"}
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
args["limit"] = [str(limit)]
if since_token:
args["since"] = [since_token]
response = yield self.client.get_json(
destination=remote_server, path=path, args=args, ignore_backoff=True
)
response = yield self.client.get_json(
destination=remote_server, path=path, args=args, ignore_backoff=True
)
return response

Some files were not shown because too many files have changed in this diff Show More