Compare commits
5 Commits
rei/rss_ta
...
rei/admin_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e827507f7 | ||
|
|
0e99412f4c | ||
|
|
7fd0c90234 | ||
|
|
ebd2cd84d5 | ||
|
|
c497e13734 |
@@ -6,7 +6,6 @@ services:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.5
|
||||
@@ -17,6 +16,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
working_dir: /app
|
||||
volumes:
|
||||
- ..:/src
|
||||
- ..:/app
|
||||
|
||||
@@ -6,7 +6,6 @@ services:
|
||||
image: postgres:11
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
@@ -17,6 +16,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
working_dir: /app
|
||||
volumes:
|
||||
- ..:/src
|
||||
- ..:/app
|
||||
|
||||
@@ -6,7 +6,6 @@ services:
|
||||
image: postgres:9.5
|
||||
environment:
|
||||
POSTGRES_PASSWORD: postgres
|
||||
command: -c fsync=off
|
||||
|
||||
testenv:
|
||||
image: python:3.7
|
||||
@@ -17,6 +16,6 @@ services:
|
||||
SYNAPSE_POSTGRES_HOST: postgres
|
||||
SYNAPSE_POSTGRES_USER: postgres
|
||||
SYNAPSE_POSTGRES_PASSWORD: postgres
|
||||
working_dir: /src
|
||||
working_dir: /app
|
||||
volumes:
|
||||
- ..:/src
|
||||
- ..:/app
|
||||
|
||||
@@ -1,18 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
from tap.parser import Parser
|
||||
from tap.line import Result, Unknown, Diagnostic
|
||||
|
||||
@@ -27,7 +27,7 @@ git config --global user.name "A robot"
|
||||
|
||||
# Fetch and merge. If it doesn't work, it will raise due to set -e.
|
||||
git fetch -u origin $GITBASE
|
||||
git merge --no-edit --no-commit origin/$GITBASE
|
||||
git merge --no-edit origin/$GITBASE
|
||||
|
||||
# Show what we are after.
|
||||
git --no-pager show -s
|
||||
|
||||
240
.buildkite/pipeline.yml
Normal file
240
.buildkite/pipeline.yml
Normal file
@@ -0,0 +1,240 @@
|
||||
env:
|
||||
CODECOV_TOKEN: "2dd7eb9b-0eda-45fe-a47c-9b5ac040045f"
|
||||
|
||||
steps:
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e check_codestyle"
|
||||
label: "\U0001F9F9 Check Style"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e packaging"
|
||||
label: "\U0001F9F9 packaging"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e check_isort"
|
||||
label: "\U0001F9F9 isort"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "scripts-dev/check-newsfragment"
|
||||
label: ":newspaper: Newsfile"
|
||||
branches: "!master !develop !release-*"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
propagate-environment: true
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e check-sampleconfig"
|
||||
label: "\U0001F9F9 check-sample-config"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
|
||||
- wait
|
||||
|
||||
|
||||
- command:
|
||||
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
|
||||
- "python3.5 -m pip install tox"
|
||||
- "tox -e py35-old,codecov"
|
||||
label: ":python: 3.5 / SQLite / Old Deps"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
|
||||
propagate-environment: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py35,codecov"
|
||||
label: ":python: 3.5 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.5"
|
||||
propagate-environment: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py36,codecov"
|
||||
label: ":python: 3.6 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.6"
|
||||
propagate-environment: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- command:
|
||||
- "python -m pip install tox"
|
||||
- "tox -e py37,codecov"
|
||||
label: ":python: 3.7 / SQLite"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 2"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "python:3.7"
|
||||
propagate-environment: true
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- label: ":python: 3.5 / :postgres: 9.5"
|
||||
agents:
|
||||
queue: "medium"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py35-postgres,codecov'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py35.pg95.yaml
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- label: ":python: 3.7 / :postgres: 9.5"
|
||||
agents:
|
||||
queue: "medium"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py37.pg95.yaml
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- label: ":python: 3.7 / :postgres: 11"
|
||||
agents:
|
||||
queue: "medium"
|
||||
env:
|
||||
TRIAL_FLAGS: "-j 8"
|
||||
command:
|
||||
- "bash -c 'python -m pip install tox && python -m tox -e py37-postgres,codecov'"
|
||||
plugins:
|
||||
- docker-compose#v2.1.0:
|
||||
run: testenv
|
||||
config:
|
||||
- .buildkite/docker-compose.py37.pg11.yaml
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
|
||||
- label: "SyTest - :python: 3.5 / SQLite / Monolith"
|
||||
agents:
|
||||
queue: "medium"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Monolith"
|
||||
agents:
|
||||
queue: "medium"
|
||||
env:
|
||||
POSTGRES: "1"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
|
||||
- label: "SyTest - :python: 3.5 / :postgres: 9.6 / Workers"
|
||||
agents:
|
||||
queue: "medium"
|
||||
env:
|
||||
POSTGRES: "1"
|
||||
WORKERS: "1"
|
||||
BLACKLIST: "synapse-blacklist-with-workers"
|
||||
command:
|
||||
- "bash .buildkite/merge_base_branch.sh"
|
||||
- "bash -c 'cat /src/sytest-blacklist /src/.buildkite/worker-blacklist > /src/synapse-blacklist-with-workers'"
|
||||
- "bash /synapse_sytest.sh"
|
||||
plugins:
|
||||
- docker#v3.0.1:
|
||||
image: "matrixdotorg/sytest-synapse:py35"
|
||||
propagate-environment: true
|
||||
always-pull: true
|
||||
workdir: "/src"
|
||||
retry:
|
||||
automatic:
|
||||
- exit_status: -1
|
||||
limit: 2
|
||||
- exit_status: 2
|
||||
limit: 2
|
||||
@@ -1,8 +1,7 @@
|
||||
[run]
|
||||
branch = True
|
||||
parallel = True
|
||||
include=$TOP/synapse/*
|
||||
data_file = $TOP/.coverage
|
||||
include = synapse/*
|
||||
|
||||
[report]
|
||||
precision = 2
|
||||
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -20,7 +20,6 @@ _trial_temp*/
|
||||
/*.signing.key
|
||||
/env/
|
||||
/homeserver*.yaml
|
||||
/logs
|
||||
/media_store/
|
||||
/uploads
|
||||
|
||||
@@ -30,9 +29,8 @@ _trial_temp*/
|
||||
/.vscode/
|
||||
|
||||
# build products
|
||||
!/.coveragerc
|
||||
/.coverage*
|
||||
/.mypy_cache/
|
||||
!/.coveragerc
|
||||
/.tox
|
||||
/build/
|
||||
/coverage.*
|
||||
@@ -40,3 +38,4 @@ _trial_temp*/
|
||||
/docs/build/
|
||||
/htmlcov
|
||||
/pip-wheel-metadata/
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Lay the groundwork for structured logging output.
|
||||
@@ -1 +0,0 @@
|
||||
Make Opentracing work in worker mode.
|
||||
@@ -1 +0,0 @@
|
||||
Update opentracing docs to use the unified `trace` method.
|
||||
@@ -1 +0,0 @@
|
||||
Add an admin API to purge old rooms from the database.
|
||||
@@ -1 +0,0 @@
|
||||
Add retry to well-known lookups if we have recently seen a valid well-known record for the server.
|
||||
@@ -1 +0,0 @@
|
||||
Pass opentracing contexts between servers when transmitting EDUs.
|
||||
@@ -1 +0,0 @@
|
||||
Opentracing for room and e2e keys.
|
||||
@@ -1 +0,0 @@
|
||||
Add unstable support for MSC2197 (filtered search requests over federation), in order to allow upcoming room directory query performance improvements.
|
||||
@@ -1 +0,0 @@
|
||||
Correctly retry all hosts returned from SRV when we fail to connect.
|
||||
@@ -1 +0,0 @@
|
||||
Remove shared secret registration from client/r0/register endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.
|
||||
@@ -1 +0,0 @@
|
||||
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.
|
||||
@@ -1 +0,0 @@
|
||||
Fix stack overflow when recovering an appservice which had an outage.
|
||||
@@ -1 +0,0 @@
|
||||
Refactor the Appservice scheduler code.
|
||||
@@ -1 +0,0 @@
|
||||
Drop some unused tables.
|
||||
@@ -1 +0,0 @@
|
||||
Add missing index on users_in_public_rooms to improve the performance of directory queries.
|
||||
@@ -1 +0,0 @@
|
||||
Add config option to sign remote key query responses with a separate key.
|
||||
@@ -1 +0,0 @@
|
||||
Improve the logging when we have an error when fetching signing keys.
|
||||
@@ -1 +0,0 @@
|
||||
Add support for config templating.
|
||||
@@ -1 +0,0 @@
|
||||
Users with the type of "support" or "bot" are no longer required to consent.
|
||||
@@ -1 +0,0 @@
|
||||
Let synctl accept a directory of config files.
|
||||
@@ -1 +0,0 @@
|
||||
Increase max display name size to 256.
|
||||
@@ -1 +0,0 @@
|
||||
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!
|
||||
@@ -1 +0,0 @@
|
||||
Add support for database engine-specific schema deltas, based on file extension.
|
||||
@@ -1 +0,0 @@
|
||||
Add admin API endpoint for getting whether or not a user is a server administrator.
|
||||
@@ -1 +0,0 @@
|
||||
Fix a cache-invalidation bug for worker-based deployments.
|
||||
@@ -1 +0,0 @@
|
||||
Update Buildkite pipeline to use plugins instead of buildkite-agent commands.
|
||||
@@ -1 +0,0 @@
|
||||
Add link in sample config to the logging config schema.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unnecessary parentheses in return statements.
|
||||
@@ -1 +0,0 @@
|
||||
Remove unused jenkins/prepare_sytest.sh file.
|
||||
@@ -1 +0,0 @@
|
||||
Move Buildkite pipeline config to the pipelines repo.
|
||||
@@ -17,7 +17,7 @@ By default, the image expects a single volume, located at ``/data``, that will h
|
||||
* the appservices configuration.
|
||||
|
||||
You are free to use separate volumes depending on storage endpoints at your
|
||||
disposal. For instance, ``/data/media`` could be stored on a large but low
|
||||
disposal. For instance, ``/data/media`` coud be stored on a large but low
|
||||
performance hdd storage while other files could be stored on high performance
|
||||
endpoints.
|
||||
|
||||
@@ -27,8 +27,8 @@ configuration file there. Multiple application services are supported.
|
||||
|
||||
## Generating a configuration file
|
||||
|
||||
The first step is to generate a valid config file. To do this, you can run the
|
||||
image with the `generate` command line option.
|
||||
The first step is to genearte a valid config file. To do this, you can run the
|
||||
image with the `generate` commandline option.
|
||||
|
||||
You will need to specify values for the `SYNAPSE_SERVER_NAME` and
|
||||
`SYNAPSE_REPORT_STATS` environment variable, and mount a docker volume to store
|
||||
@@ -59,7 +59,7 @@ The following environment variables are supported in `generate` mode:
|
||||
* `SYNAPSE_CONFIG_PATH`: path to the file to be generated. Defaults to
|
||||
`<SYNAPSE_CONFIG_DIR>/homeserver.yaml`.
|
||||
* `SYNAPSE_DATA_DIR`: where the generated config will put persistent data
|
||||
such as the database and media store. Defaults to `/data`.
|
||||
such as the datatase and media store. Defaults to `/data`.
|
||||
* `UID`, `GID`: the user id and group id to use for creating the data
|
||||
directories. Defaults to `991`, `991`.
|
||||
|
||||
@@ -115,7 +115,7 @@ not given).
|
||||
|
||||
To migrate from a dynamic configuration file to a static one, run the docker
|
||||
container once with the environment variables set, and `migrate_config`
|
||||
command line option. For example:
|
||||
commandline option. For example:
|
||||
|
||||
```
|
||||
docker run -it --rm \
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
Purge room API
|
||||
==============
|
||||
|
||||
This API will remove all trace of a room from your database.
|
||||
|
||||
All local users must have left the room before it can be removed.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/purge_room
|
||||
|
||||
{
|
||||
"room_id": "!room:id"
|
||||
}
|
||||
```
|
||||
|
||||
You must authenticate using the access token of an admin user.
|
||||
@@ -86,25 +86,6 @@ with a body of:
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
|
||||
Get whether a user is a server administrator or not
|
||||
===================================================
|
||||
|
||||
|
||||
The api is::
|
||||
|
||||
GET /_synapse/admin/v1/users/<user_id>/admin
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
A response body like the following is returned:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
|
||||
|
||||
Change whether a user is a server administrator or not
|
||||
======================================================
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans
|
||||
it creates. This is called the sampling policy and it can be configured
|
||||
through Jaeger's settings.
|
||||
|
||||
For OpenTracing concepts see
|
||||
For OpenTracing concepts see
|
||||
https://opentracing.io/docs/overview/what-is-tracing/.
|
||||
|
||||
For more information about Jaeger's implementation see
|
||||
@@ -79,7 +79,7 @@ Homeserver whitelisting
|
||||
|
||||
The homeserver whitelist is configured using regular expressions. A list of regular
|
||||
expressions can be given and their union will be compared when propagating any
|
||||
spans contexts to another homeserver.
|
||||
spans contexts to another homeserver.
|
||||
|
||||
Though it's mostly safe to send and receive span contexts to and from
|
||||
untrusted users since span contexts are usually opaque ids it can lead to
|
||||
@@ -92,29 +92,6 @@ two problems, namely:
|
||||
but that doesn't prevent another server sending you baggage which will be logged
|
||||
to OpenTracing's logs.
|
||||
|
||||
==========
|
||||
EDU FORMAT
|
||||
==========
|
||||
|
||||
EDUs can contain tracing data in their content. This is not specced but
|
||||
it could be of interest for other homeservers.
|
||||
|
||||
EDU format (if you're using jaeger):
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
"edu_type": "type",
|
||||
"content": {
|
||||
"org.matrix.opentracing_context": {
|
||||
"uber-trace-id": "fe57cf3e65083289"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Though you don't have to use jaeger you must inject the span context into
|
||||
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
|
||||
|
||||
==================
|
||||
Configuring Jaeger
|
||||
==================
|
||||
|
||||
@@ -1,136 +0,0 @@
|
||||
Room and User Statistics
|
||||
========================
|
||||
|
||||
Synapse maintains room and user statistics (as well as a cache of room state),
|
||||
in various tables.
|
||||
|
||||
These can be used for administrative purposes but are also used when generating
|
||||
the public room directory. If these tables get stale or out of sync (possibly
|
||||
after database corruption), you may wish to regenerate them.
|
||||
|
||||
|
||||
# Synapse Administrator Documentation
|
||||
|
||||
## Various SQL scripts that you may find useful
|
||||
|
||||
### Delete stats, including historical stats
|
||||
|
||||
```sql
|
||||
DELETE FROM room_stats_current;
|
||||
DELETE FROM room_stats_historical;
|
||||
DELETE FROM user_stats_current;
|
||||
DELETE FROM user_stats_historical;
|
||||
```
|
||||
|
||||
### Regenerate stats (all subjects)
|
||||
|
||||
```sql
|
||||
BEGIN;
|
||||
DELETE FROM stats_incremental_position;
|
||||
INSERT INTO stats_incremental_position (
|
||||
state_delta_stream_id,
|
||||
total_events_min_stream_ordering,
|
||||
total_events_max_stream_ordering,
|
||||
is_background_contract
|
||||
) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
|
||||
COMMIT;
|
||||
|
||||
DELETE FROM room_stats_current;
|
||||
DELETE FROM user_stats_current;
|
||||
```
|
||||
|
||||
then follow the steps below for **'Regenerate stats (missing subjects only)'**
|
||||
|
||||
### Regenerate stats (missing subjects only)
|
||||
|
||||
```sql
|
||||
-- Set up staging tables
|
||||
-- we depend on current_state_events_membership because this is used
|
||||
-- in our counting.
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('populate_stats_prepare', '{}', 'current_state_events_membership');
|
||||
|
||||
-- Run through each room and update stats
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
|
||||
|
||||
-- Run through each user and update stats.
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
|
||||
|
||||
-- Clean up staging tables
|
||||
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
|
||||
('populate_stats_cleanup', '{}', 'populate_stats_process_users');
|
||||
```
|
||||
|
||||
then **restart Synapse**.
|
||||
|
||||
|
||||
# Synapse Developer Documentation
|
||||
|
||||
## High-Level Concepts
|
||||
|
||||
### Definitions
|
||||
|
||||
* **subject**: Something we are tracking stats about – currently a room or user.
|
||||
* **current row**: An entry for a subject in the appropriate current statistics
|
||||
table. Each subject can have only one.
|
||||
* **historical row**: An entry for a subject in the appropriate historical
|
||||
statistics table. Each subject can have any number of these.
|
||||
|
||||
### Overview
|
||||
|
||||
Stats are maintained as time series. There are two kinds of column:
|
||||
|
||||
* absolute columns – where the value is correct for the time given by `end_ts`
|
||||
in the stats row. (Imagine a line graph for these values)
|
||||
* They can also be thought of as 'gauges' in Prometheus, if you are familiar.
|
||||
* per-slice columns – where the value corresponds to how many of the occurrences
|
||||
occurred within the time slice given by `(end_ts − bucket_size)…end_ts`
|
||||
or `start_ts…end_ts`. (Imagine a histogram for these values)
|
||||
|
||||
Currently, only absolute columns are in use.
|
||||
|
||||
Stats are maintained in two tables (for each type): current and historical.
|
||||
|
||||
Current stats correspond to the present values. Each subject can only have one
|
||||
entry.
|
||||
|
||||
Historical stats correspond to values in the past. Subjects may have multiple
|
||||
entries.
|
||||
|
||||
## Concepts around the management of stats
|
||||
|
||||
### current rows
|
||||
|
||||
Current rows contain the most up-to-date statistics for a room.
|
||||
They only contain absolute columns
|
||||
|
||||
#### incomplete current rows
|
||||
|
||||
There are also **incomplete** current rows, which are current rows that do not
|
||||
contain a full count yet – this is because they are waiting for the regeneration
|
||||
process to give them an initial count. Incomplete current rows DO NOT contain
|
||||
correct and up-to-date values. As such, *incomplete rows are not old-collected*.
|
||||
Instead, old incomplete rows will be extended so they are no longer old.
|
||||
|
||||
### historical rows
|
||||
|
||||
Historical rows can always be considered to be valid for the time slice and
|
||||
end time specified. (This, of course, assumes a lack of defects in the code
|
||||
to track the statistics, and assumes integrity of the database).
|
||||
|
||||
Even still, there are two considerations that we may need to bear in mind:
|
||||
|
||||
* historical rows will not exist for every time slice – they will be omitted
|
||||
if there were no changes. In this case, the following assumptions can be
|
||||
made to interpolate/recreate missing rows:
|
||||
- absolute fields have the same values as in the preceding row
|
||||
- per-slice fields are zero (`0`)
|
||||
* historical rows will not be retained forever – rows older than a configurable
|
||||
time will be purged.
|
||||
|
||||
#### purge
|
||||
|
||||
The purging of historical rows is not yet implemented.
|
||||
|
||||
@@ -205,9 +205,9 @@ listeners:
|
||||
#
|
||||
- port: 8008
|
||||
tls: false
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
type: http
|
||||
x_forwarded: true
|
||||
bind_addresses: ['::1', '127.0.0.1']
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
@@ -392,10 +392,10 @@ listeners:
|
||||
# permission to listen on port 80.
|
||||
#
|
||||
acme:
|
||||
# ACME support is disabled by default. Set this to `true` and uncomment
|
||||
# tls_certificate_path and tls_private_key_path above to enable it.
|
||||
# ACME support is disabled by default. Uncomment the following line
|
||||
# (and tls_certificate_path and tls_private_key_path above) to enable it.
|
||||
#
|
||||
enabled: False
|
||||
#enabled: true
|
||||
|
||||
# Endpoint to use to request certificates. If you only want to test,
|
||||
# use Let's Encrypt's staging url:
|
||||
@@ -406,17 +406,17 @@ acme:
|
||||
# Port number to listen on for the HTTP-01 challenge. Change this if
|
||||
# you are forwarding connections through Apache/Nginx/etc.
|
||||
#
|
||||
port: 80
|
||||
#port: 80
|
||||
|
||||
# Local addresses to listen on for incoming connections.
|
||||
# Again, you may want to change this if you are forwarding connections
|
||||
# through Apache/Nginx/etc.
|
||||
#
|
||||
bind_addresses: ['::', '0.0.0.0']
|
||||
#bind_addresses: ['::', '0.0.0.0']
|
||||
|
||||
# How many days remaining on a certificate before it is renewed.
|
||||
#
|
||||
reprovision_threshold: 30
|
||||
#reprovision_threshold: 30
|
||||
|
||||
# The domain that the certificate should be for. Normally this
|
||||
# should be the same as your Matrix domain (i.e., 'server_name'), but,
|
||||
@@ -430,7 +430,7 @@ acme:
|
||||
#
|
||||
# If not set, defaults to your 'server_name'.
|
||||
#
|
||||
domain: matrix.example.com
|
||||
#domain: matrix.example.com
|
||||
|
||||
# file to use for the account key. This will be generated if it doesn't
|
||||
# exist.
|
||||
@@ -485,8 +485,7 @@ database:
|
||||
|
||||
## Logging ##
|
||||
|
||||
# A yaml python logging config file as described by
|
||||
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
|
||||
# A yaml python logging config file
|
||||
#
|
||||
log_config: "CONFDIR/SERVERNAME.log.config"
|
||||
|
||||
@@ -1028,14 +1027,6 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
|
||||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
|
||||
|
||||
# Enable SAML2 for registration and login. Uses pysaml2.
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
# Structured Logging
|
||||
|
||||
A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
|
||||
|
||||
Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
|
||||
|
||||
A structured logging configuration looks similar to the following:
|
||||
|
||||
```yaml
|
||||
structured: true
|
||||
|
||||
loggers:
|
||||
synapse:
|
||||
level: INFO
|
||||
synapse.storage.SQL:
|
||||
level: WARNING
|
||||
|
||||
drains:
|
||||
console:
|
||||
type: console
|
||||
location: stdout
|
||||
file:
|
||||
type: file_json
|
||||
location: homeserver.log
|
||||
```
|
||||
|
||||
The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
|
||||
|
||||
## Drain Types
|
||||
|
||||
Drain types can be specified by the `type` key.
|
||||
|
||||
### `console`
|
||||
|
||||
Outputs human-readable logs to the console.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `console_json`
|
||||
|
||||
Outputs machine-readable JSON logs to the console.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `console_json_terse`
|
||||
|
||||
Outputs machine-readable JSON logs to the console, separated by newlines. This
|
||||
format is not designed to be read and re-formatted into human-readable text, but
|
||||
is optimal for a logging aggregation system.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: Either `stdout` or `stderr`.
|
||||
|
||||
### `file`
|
||||
|
||||
Outputs human-readable logs to a file.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: An absolute path to the file to log to.
|
||||
|
||||
### `file_json`
|
||||
|
||||
Outputs machine-readable logs to a file.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `location`: An absolute path to the file to log to.
|
||||
|
||||
### `network_json_terse`
|
||||
|
||||
Delivers machine-readable JSON logs to a log aggregator over TCP. This is
|
||||
compatible with LogStash's TCP input with the codec set to `json_lines`.
|
||||
|
||||
Arguments:
|
||||
|
||||
- `host`: Hostname or IP address of the log aggregator.
|
||||
- `port`: Numerical port to contact on the host.
|
||||
16
jenkins/prepare_synapse.sh
Executable file
16
jenkins/prepare_synapse.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#! /bin/bash
|
||||
|
||||
set -eux
|
||||
|
||||
cd "`dirname $0`/.."
|
||||
|
||||
TOX_DIR=$WORKSPACE/.tox
|
||||
|
||||
mkdir -p $TOX_DIR
|
||||
|
||||
if ! [ $TOX_DIR -ef .tox ]; then
|
||||
ln -s "$TOX_DIR" .tox
|
||||
fi
|
||||
|
||||
# set up the virtualenv
|
||||
tox -e py27 --notest -v
|
||||
@@ -276,25 +276,25 @@ class Auth(object):
|
||||
self.get_access_token_from_request(request)
|
||||
)
|
||||
if app_service is None:
|
||||
return None, None
|
||||
return (None, None)
|
||||
|
||||
if app_service.ip_range_whitelist:
|
||||
ip_address = IPAddress(self.hs.get_ip_from_request(request))
|
||||
if ip_address not in app_service.ip_range_whitelist:
|
||||
return None, None
|
||||
return (None, None)
|
||||
|
||||
if b"user_id" not in request.args:
|
||||
return app_service.sender, app_service
|
||||
return (app_service.sender, app_service)
|
||||
|
||||
user_id = request.args[b"user_id"][0].decode("utf8")
|
||||
if app_service.sender == user_id:
|
||||
return app_service.sender, app_service
|
||||
return (app_service.sender, app_service)
|
||||
|
||||
if not app_service.is_interested_in_user(user_id):
|
||||
raise AuthError(403, "Application service cannot masquerade as this user.")
|
||||
if not (yield self.store.get_user_by_id(user_id)):
|
||||
raise AuthError(403, "Application service has not registered this user")
|
||||
return user_id, app_service
|
||||
return (user_id, app_service)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_user_by_access_token(self, token, rights="access"):
|
||||
@@ -694,7 +694,7 @@ class Auth(object):
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.check_user_was_in_room(room_id, user_id)
|
||||
return member_event.membership, member_event.event_id
|
||||
return (member_event.membership, member_event.event_id)
|
||||
except AuthError:
|
||||
visibility = yield self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
@@ -703,7 +703,7 @@ class Auth(object):
|
||||
visibility
|
||||
and visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
return Membership.JOIN, None
|
||||
return (Membership.JOIN, None)
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
|
||||
@@ -122,8 +122,7 @@ class UserTypes(object):
|
||||
"""
|
||||
|
||||
SUPPORT = "support"
|
||||
BOT = "bot"
|
||||
ALL_USER_TYPES = (SUPPORT, BOT)
|
||||
ALL_USER_TYPES = (SUPPORT,)
|
||||
|
||||
|
||||
class RelationTypes(object):
|
||||
|
||||
@@ -36,20 +36,18 @@ from synapse.util.versionstring import get_version_string
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# list of tuples of function, args list, kwargs dict
|
||||
_sighup_callbacks = []
|
||||
|
||||
|
||||
def register_sighup(func, *args, **kwargs):
|
||||
def register_sighup(func):
|
||||
"""
|
||||
Register a function to be called when a SIGHUP occurs.
|
||||
|
||||
Args:
|
||||
func (function): Function to be called when sent a SIGHUP signal.
|
||||
Will be called with a single default argument, the homeserver.
|
||||
*args, **kwargs: args and kwargs to be passed to the target function.
|
||||
Will be called with a single argument, the homeserver.
|
||||
"""
|
||||
_sighup_callbacks.append((func, args, kwargs))
|
||||
_sighup_callbacks.append(func)
|
||||
|
||||
|
||||
def start_worker_reactor(appname, config, run_command=reactor.run):
|
||||
@@ -250,8 +248,8 @@ def start(hs, listeners=None):
|
||||
# we're not using systemd.
|
||||
sdnotify(b"RELOADING=1")
|
||||
|
||||
for i, args, kwargs in _sighup_callbacks:
|
||||
i(hs, *args, **kwargs)
|
||||
for i in _sighup_callbacks:
|
||||
i(hs)
|
||||
|
||||
sdnotify(b"READY=1")
|
||||
|
||||
|
||||
@@ -227,6 +227,8 @@ def start(config_options):
|
||||
config.start_pushers = False
|
||||
config.send_federation = False
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -239,8 +241,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
|
||||
# We use task.react as the basic run command as it correctly handles tearing
|
||||
|
||||
@@ -141,6 +141,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.appservice"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -165,8 +167,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ps, config.worker_listeners
|
||||
|
||||
@@ -179,6 +179,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.client_reader"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -191,8 +193,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -175,6 +175,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_replication_http_port is not None
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
# This should only be done on the user directory worker or the master
|
||||
config.update_user_directory = False
|
||||
|
||||
@@ -190,8 +192,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -160,6 +160,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_reader"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -172,8 +174,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -171,6 +171,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_sender"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -195,8 +197,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -70,12 +70,12 @@ class PresenceStatusStubServlet(RestServlet):
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
|
||||
return 200, result
|
||||
return (200, result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
return 200, {}
|
||||
return (200, {})
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
@@ -126,11 +126,11 @@ class KeyUploadServlet(RestServlet):
|
||||
self.main_uri + request.uri.decode("ascii"), body, headers=headers
|
||||
)
|
||||
|
||||
return 200, result
|
||||
return (200, result)
|
||||
else:
|
||||
# Just interested in counts.
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
return 200, {"one_time_key_counts": result}
|
||||
return (200, {"one_time_key_counts": result})
|
||||
|
||||
|
||||
class FrontendProxySlavedStore(
|
||||
@@ -232,6 +232,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_main_http_uri is not None
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -244,8 +246,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -341,6 +341,8 @@ def setup(config_options):
|
||||
# generating config files and shouldn't try to continue.
|
||||
sys.exit(0)
|
||||
|
||||
synapse.config.logger.setup_logging(config, use_worker_options=False)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -354,8 +356,6 @@ def setup(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
|
||||
|
||||
logger.info("Preparing database: %s...", config.database_config["name"])
|
||||
|
||||
try:
|
||||
|
||||
@@ -155,6 +155,8 @@ def start(config_options):
|
||||
"Please add ``enable_media_repo: false`` to the main config\n"
|
||||
)
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -167,8 +169,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -184,6 +184,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.pusher"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
if config.start_pushers:
|
||||
@@ -208,8 +210,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ps, config, use_worker_options=True)
|
||||
|
||||
ps.setup()
|
||||
|
||||
def start():
|
||||
|
||||
@@ -435,6 +435,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.synchrotron"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -448,8 +450,6 @@ def start(config_options):
|
||||
application_service_handler=SynchrotronApplicationService(),
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -197,6 +197,8 @@ def start(config_options):
|
||||
|
||||
assert config.worker_app == "synapse.app.user_dir"
|
||||
|
||||
setup_logging(config, use_worker_options=True)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
@@ -221,8 +223,6 @@ def start(config_options):
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
setup_logging(ss, config, use_worker_options=True)
|
||||
|
||||
ss.setup()
|
||||
reactor.addSystemEventTrigger(
|
||||
"before", "startup", _base.start, ss, config.worker_listeners
|
||||
|
||||
@@ -70,37 +70,35 @@ class ApplicationServiceScheduler(object):
|
||||
self.store = hs.get_datastore()
|
||||
self.as_api = hs.get_application_service_api()
|
||||
|
||||
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
|
||||
def create_recoverer(service, callback):
|
||||
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
|
||||
|
||||
self.txn_ctrl = _TransactionController(
|
||||
self.clock, self.store, self.as_api, create_recoverer
|
||||
)
|
||||
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
logger.info("Starting appservice scheduler")
|
||||
|
||||
# check for any DOWN ASes and start recoverers for them.
|
||||
services = yield self.store.get_appservices_by_state(
|
||||
ApplicationServiceState.DOWN
|
||||
recoverers = yield _Recoverer.start(
|
||||
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
|
||||
)
|
||||
|
||||
for service in services:
|
||||
self.txn_ctrl.start_recoverer(service)
|
||||
self.txn_ctrl.add_recoverers(recoverers)
|
||||
|
||||
def submit_event_for_as(self, service, event):
|
||||
self.queuer.enqueue(service, event)
|
||||
|
||||
|
||||
class _ServiceQueuer(object):
|
||||
"""Queue of events waiting to be sent to appservices.
|
||||
|
||||
Groups events into transactions per-appservice, and sends them on to the
|
||||
TransactionController. Makes sure that we only have one transaction in flight per
|
||||
appservice at a given time.
|
||||
"""Queues events for the same application service together, sending
|
||||
transactions as soon as possible. Once a transaction is sent successfully,
|
||||
this schedules any other events in the queue to run.
|
||||
"""
|
||||
|
||||
def __init__(self, txn_ctrl, clock):
|
||||
self.queued_events = {} # dict of {service_id: [events]}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
self.txn_ctrl = txn_ctrl
|
||||
self.clock = clock
|
||||
@@ -138,29 +136,13 @@ class _ServiceQueuer(object):
|
||||
|
||||
|
||||
class _TransactionController(object):
|
||||
"""Transaction manager.
|
||||
|
||||
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
|
||||
if a transaction fails.
|
||||
|
||||
(Note we have only have one of these in the homeserver.)
|
||||
|
||||
Args:
|
||||
clock (synapse.util.Clock):
|
||||
store (synapse.storage.DataStore):
|
||||
as_api (synapse.appservice.api.ApplicationServiceApi):
|
||||
"""
|
||||
|
||||
def __init__(self, clock, store, as_api):
|
||||
def __init__(self, clock, store, as_api, recoverer_fn):
|
||||
self.clock = clock
|
||||
self.store = store
|
||||
self.as_api = as_api
|
||||
|
||||
# map from service id to recoverer instance
|
||||
self.recoverers = {}
|
||||
|
||||
# for UTs
|
||||
self.RECOVERER_CLASS = _Recoverer
|
||||
self.recoverer_fn = recoverer_fn
|
||||
# keep track of how many recoverers there are
|
||||
self.recoverers = []
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send(self, service, events):
|
||||
@@ -172,45 +154,42 @@ class _TransactionController(object):
|
||||
if sent:
|
||||
yield txn.complete(self.store)
|
||||
else:
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
run_in_background(self._start_recoverer, service)
|
||||
except Exception:
|
||||
logger.exception("Error creating appservice transaction")
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
run_in_background(self._start_recoverer, service)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_recovered(self, recoverer):
|
||||
self.recoverers.remove(recoverer)
|
||||
logger.info(
|
||||
"Successfully recovered application service AS ID %s", recoverer.service.id
|
||||
)
|
||||
self.recoverers.pop(recoverer.service.id)
|
||||
logger.info("Remaining active recoverers: %s", len(self.recoverers))
|
||||
yield self.store.set_appservice_state(
|
||||
recoverer.service, ApplicationServiceState.UP
|
||||
)
|
||||
|
||||
def add_recoverers(self, recoverers):
|
||||
for r in recoverers:
|
||||
self.recoverers.append(r)
|
||||
if len(recoverers) > 0:
|
||||
logger.info("New active recoverers: %s", len(self.recoverers))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_txn_fail(self, service):
|
||||
def _start_recoverer(self, service):
|
||||
try:
|
||||
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
|
||||
self.start_recoverer(service)
|
||||
logger.info(
|
||||
"Application service falling behind. Starting recoverer. AS ID %s",
|
||||
service.id,
|
||||
)
|
||||
recoverer = self.recoverer_fn(service, self.on_recovered)
|
||||
self.add_recoverers([recoverer])
|
||||
recoverer.recover()
|
||||
except Exception:
|
||||
logger.exception("Error starting AS recoverer")
|
||||
|
||||
def start_recoverer(self, service):
|
||||
"""Start a Recoverer for the given service
|
||||
|
||||
Args:
|
||||
service (synapse.appservice.ApplicationService):
|
||||
"""
|
||||
logger.info("Starting recoverer for AS ID %s", service.id)
|
||||
assert service.id not in self.recoverers
|
||||
recoverer = self.RECOVERER_CLASS(
|
||||
self.clock, self.store, self.as_api, service, self.on_recovered
|
||||
)
|
||||
self.recoverers[service.id] = recoverer
|
||||
recoverer.recover()
|
||||
logger.info("Now %i active recoverers", len(self.recoverers))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_service_up(self, service):
|
||||
state = yield self.store.get_appservice_state(service)
|
||||
@@ -218,17 +197,18 @@ class _TransactionController(object):
|
||||
|
||||
|
||||
class _Recoverer(object):
|
||||
"""Manages retries and backoff for a DOWN appservice.
|
||||
|
||||
We have one of these for each appservice which is currently considered DOWN.
|
||||
|
||||
Args:
|
||||
clock (synapse.util.Clock):
|
||||
store (synapse.storage.DataStore):
|
||||
as_api (synapse.appservice.api.ApplicationServiceApi):
|
||||
service (synapse.appservice.ApplicationService): the service we are managing
|
||||
callback (callable[_Recoverer]): called once the service recovers.
|
||||
"""
|
||||
@staticmethod
|
||||
@defer.inlineCallbacks
|
||||
def start(clock, store, as_api, callback):
|
||||
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
|
||||
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
|
||||
for r in recoverers:
|
||||
logger.info(
|
||||
"Starting recoverer for AS ID %s which was marked as " "DOWN",
|
||||
r.service.id,
|
||||
)
|
||||
r.recover()
|
||||
return recoverers
|
||||
|
||||
def __init__(self, clock, store, as_api, service, callback):
|
||||
self.clock = clock
|
||||
@@ -244,9 +224,7 @@ class _Recoverer(object):
|
||||
"as-recoverer-%s" % (self.service.id,), self.retry
|
||||
)
|
||||
|
||||
delay = 2 ** self.backoff_counter
|
||||
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
|
||||
self.clock.call_later(delay, _retry)
|
||||
self.clock.call_later((2 ** self.backoff_counter), _retry)
|
||||
|
||||
def _backoff(self):
|
||||
# cap the backoff to be around 8.5min => (2^9) = 512 secs
|
||||
@@ -256,30 +234,25 @@ class _Recoverer(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def retry(self):
|
||||
logger.info("Starting retries on %s", self.service.id)
|
||||
try:
|
||||
while True:
|
||||
txn = yield self.store.get_oldest_unsent_txn(self.service)
|
||||
if not txn:
|
||||
# nothing left: we're done!
|
||||
self.callback(self)
|
||||
return
|
||||
|
||||
txn = yield self.store.get_oldest_unsent_txn(self.service)
|
||||
if txn:
|
||||
logger.info(
|
||||
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
|
||||
)
|
||||
sent = yield txn.send(self.as_api)
|
||||
if not sent:
|
||||
break
|
||||
if sent:
|
||||
yield txn.complete(self.store)
|
||||
# reset the backoff counter and retry immediately
|
||||
self.backoff_counter = 1
|
||||
yield self.retry()
|
||||
else:
|
||||
self._backoff()
|
||||
else:
|
||||
self._set_service_recovered()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
self._backoff()
|
||||
|
||||
yield txn.complete(self.store)
|
||||
|
||||
# reset the backoff counter and then process the next transaction
|
||||
self.backoff_counter = 1
|
||||
|
||||
except Exception:
|
||||
logger.exception("Unexpected error running retries")
|
||||
|
||||
# we didn't manage to send all of the transactions before we got an error of
|
||||
# some flavour: reschedule the next retry.
|
||||
self._backoff()
|
||||
def _set_service_recovered(self):
|
||||
self.callback(self)
|
||||
|
||||
@@ -13,9 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import ConfigError, find_config_files
|
||||
from ._base import ConfigError
|
||||
|
||||
# export ConfigError and find_config_files if somebody does
|
||||
# import *
|
||||
# export ConfigError if somebody does import *
|
||||
# this is largely a fudge to stop PEP8 moaning about the import
|
||||
__all__ = ["ConfigError", "find_config_files"]
|
||||
__all__ = ["ConfigError"]
|
||||
|
||||
@@ -181,11 +181,6 @@ class Config(object):
|
||||
generate_secrets=False,
|
||||
report_stats=None,
|
||||
open_private_ports=False,
|
||||
listeners=None,
|
||||
database_conf=None,
|
||||
tls_certificate_path=None,
|
||||
tls_private_key_path=None,
|
||||
acme_domain=None,
|
||||
):
|
||||
"""Build a default configuration file
|
||||
|
||||
@@ -212,33 +207,6 @@ class Config(object):
|
||||
open_private_ports (bool): True to leave private ports (such as the non-TLS
|
||||
HTTP listener) open to the internet.
|
||||
|
||||
listeners (list(dict)|None): A list of descriptions of the listeners
|
||||
synapse should start with each of which specifies a port (str), a list of
|
||||
resources (list(str)), tls (bool) and type (str). For example:
|
||||
[{
|
||||
"port": 8448,
|
||||
"resources": [{"names": ["federation"]}],
|
||||
"tls": True,
|
||||
"type": "http",
|
||||
},
|
||||
{
|
||||
"port": 443,
|
||||
"resources": [{"names": ["client"]}],
|
||||
"tls": False,
|
||||
"type": "http",
|
||||
}],
|
||||
|
||||
|
||||
database (str|None): The database type to configure, either `psycog2`
|
||||
or `sqlite3`.
|
||||
|
||||
tls_certificate_path (str|None): The path to the tls certificate.
|
||||
|
||||
tls_private_key_path (str|None): The path to the tls private key.
|
||||
|
||||
acme_domain (str|None): The domain acme will try to validate. If
|
||||
specified acme will be enabled.
|
||||
|
||||
Returns:
|
||||
str: the yaml config file
|
||||
"""
|
||||
@@ -252,11 +220,6 @@ class Config(object):
|
||||
generate_secrets=generate_secrets,
|
||||
report_stats=report_stats,
|
||||
open_private_ports=open_private_ports,
|
||||
listeners=listeners,
|
||||
database_conf=database_conf,
|
||||
tls_certificate_path=tls_certificate_path,
|
||||
tls_private_key_path=tls_private_key_path,
|
||||
acme_domain=acme_domain,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -13,9 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
from textwrap import indent
|
||||
|
||||
import yaml
|
||||
|
||||
from ._base import Config
|
||||
|
||||
@@ -41,28 +38,20 @@ class DatabaseConfig(Config):
|
||||
|
||||
self.set_databasepath(config.get("database_path"))
|
||||
|
||||
def generate_config_section(self, data_dir_path, database_conf, **kwargs):
|
||||
if not database_conf:
|
||||
database_path = os.path.join(data_dir_path, "homeserver.db")
|
||||
database_conf = (
|
||||
"""# The database engine name
|
||||
name: "sqlite3"
|
||||
# Arguments to pass to the engine
|
||||
args:
|
||||
# Path to the database
|
||||
database: "%(database_path)s"
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
else:
|
||||
database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
|
||||
|
||||
def generate_config_section(self, data_dir_path, **kwargs):
|
||||
database_path = os.path.join(data_dir_path, "homeserver.db")
|
||||
return (
|
||||
"""\
|
||||
## Database ##
|
||||
|
||||
database:
|
||||
%(database_conf)s
|
||||
# The database engine name
|
||||
name: "sqlite3"
|
||||
# Arguments to pass to the engine
|
||||
args:
|
||||
# Path to the database
|
||||
database: "%(database_path)s"
|
||||
|
||||
# Number of events to cache in memory.
|
||||
#
|
||||
#event_cache_size: 10K
|
||||
|
||||
@@ -115,7 +115,7 @@ class EmailConfig(Config):
|
||||
missing.append("email." + k)
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
missing.append("public_baseurl")
|
||||
missing.append("public_base_url")
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
|
||||
@@ -76,7 +76,7 @@ class KeyConfig(Config):
|
||||
config_dir_path, config["server_name"] + ".signing.key"
|
||||
)
|
||||
|
||||
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
|
||||
self.signing_key = self.read_signing_key(signing_key_path)
|
||||
|
||||
self.old_signing_keys = self.read_old_signing_keys(
|
||||
config.get("old_signing_keys", {})
|
||||
@@ -85,14 +85,6 @@ class KeyConfig(Config):
|
||||
config.get("key_refresh_interval", "1d")
|
||||
)
|
||||
|
||||
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
|
||||
if key_server_signing_keys_path:
|
||||
self.key_server_signing_keys = self.read_signing_keys(
|
||||
key_server_signing_keys_path, "key_server_signing_keys_path"
|
||||
)
|
||||
else:
|
||||
self.key_server_signing_keys = list(self.signing_key)
|
||||
|
||||
# if neither trusted_key_servers nor perspectives are given, use the default.
|
||||
if "perspectives" not in config and "trusted_key_servers" not in config:
|
||||
key_servers = [{"server_name": "matrix.org"}]
|
||||
@@ -218,34 +210,16 @@ class KeyConfig(Config):
|
||||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
def read_signing_keys(self, signing_key_path, name):
|
||||
"""Read the signing keys in the given path.
|
||||
|
||||
Args:
|
||||
signing_key_path (str)
|
||||
name (str): Associated config key name
|
||||
|
||||
Returns:
|
||||
list[SigningKey]
|
||||
"""
|
||||
|
||||
signing_keys = self.read_file(signing_key_path, name)
|
||||
def read_signing_key(self, signing_key_path):
|
||||
signing_keys = self.read_file(signing_key_path, "signing_key")
|
||||
try:
|
||||
return read_signing_keys(signing_keys.splitlines(True))
|
||||
except Exception as e:
|
||||
raise ConfigError("Error reading %s: %s" % (name, str(e)))
|
||||
raise ConfigError("Error reading signing_key: %s" % (str(e)))
|
||||
|
||||
def read_old_signing_keys(self, old_signing_keys):
|
||||
keys = {}
|
||||
|
||||
@@ -25,10 +25,6 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
|
||||
|
||||
import synapse
|
||||
from synapse.app import _base as appbase
|
||||
from synapse.logging._structured import (
|
||||
reload_structured_logging,
|
||||
setup_structured_logging,
|
||||
)
|
||||
from synapse.logging.context import LoggingContextFilter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
@@ -89,8 +85,7 @@ class LoggingConfig(Config):
|
||||
"""\
|
||||
## Logging ##
|
||||
|
||||
# A yaml python logging config file as described by
|
||||
# https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
|
||||
# A yaml python logging config file
|
||||
#
|
||||
log_config: "%(log_config)s"
|
||||
"""
|
||||
@@ -124,10 +119,21 @@ class LoggingConfig(Config):
|
||||
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
|
||||
|
||||
|
||||
def _setup_stdlib_logging(config, log_config):
|
||||
"""
|
||||
Set up Python stdlib logging.
|
||||
def setup_logging(config, use_worker_options=False):
|
||||
""" Set up python logging
|
||||
|
||||
Args:
|
||||
config (LoggingConfig | synapse.config.workers.WorkerConfig):
|
||||
configuration data
|
||||
|
||||
use_worker_options (bool): True to use the 'worker_log_config' option
|
||||
instead of 'log_config'.
|
||||
|
||||
register_sighup (func | None): Function to call to register a
|
||||
sighup handler.
|
||||
"""
|
||||
log_config = config.worker_log_config if use_worker_options else config.log_config
|
||||
|
||||
if log_config is None:
|
||||
log_format = (
|
||||
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
|
||||
@@ -145,10 +151,35 @@ def _setup_stdlib_logging(config, log_config):
|
||||
handler.addFilter(LoggingContextFilter(request=""))
|
||||
logger.addHandler(handler)
|
||||
else:
|
||||
logging.config.dictConfig(log_config)
|
||||
|
||||
# Route Twisted's native logging through to the standard library logging
|
||||
# system.
|
||||
def load_log_config():
|
||||
with open(log_config, "r") as f:
|
||||
logging.config.dictConfig(yaml.safe_load(f))
|
||||
|
||||
def sighup(*args):
|
||||
# it might be better to use a file watcher or something for this.
|
||||
load_log_config()
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
load_log_config()
|
||||
appbase.register_sighup(sighup)
|
||||
|
||||
# make sure that the first thing we log is a thing we can grep backwards
|
||||
# for
|
||||
logging.warn("***** STARTING SERVER *****")
|
||||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
||||
logging.info("Server hostname: %s", config.server_name)
|
||||
|
||||
# It's critical to point twisted's internal logging somewhere, otherwise it
|
||||
# stacks up and leaks kup to 64K object;
|
||||
# see: https://twistedmatrix.com/trac/ticket/8164
|
||||
#
|
||||
# Routing to the python logging framework could be a performance problem if
|
||||
# the handlers blocked for a long time as python.logging is a blocking API
|
||||
# see https://twistedmatrix.com/documents/current/core/howto/logger.html
|
||||
# filed as https://github.com/matrix-org/synapse/issues/1727
|
||||
#
|
||||
# However this may not be too much of a problem if we are just writing to a file.
|
||||
observer = STDLibLogObserver()
|
||||
|
||||
def _log(event):
|
||||
@@ -170,54 +201,3 @@ def _setup_stdlib_logging(config, log_config):
|
||||
)
|
||||
if not config.no_redirect_stdio:
|
||||
print("Redirected stdout/stderr to logs")
|
||||
|
||||
|
||||
def _reload_stdlib_logging(*args, log_config=None):
|
||||
logger = logging.getLogger("")
|
||||
|
||||
if not log_config:
|
||||
logger.warn("Reloaded a blank config?")
|
||||
|
||||
logging.config.dictConfig(log_config)
|
||||
|
||||
|
||||
def setup_logging(hs, config, use_worker_options=False):
|
||||
"""
|
||||
Set up the logging subsystem.
|
||||
|
||||
Args:
|
||||
config (LoggingConfig | synapse.config.workers.WorkerConfig):
|
||||
configuration data
|
||||
|
||||
use_worker_options (bool): True to use the 'worker_log_config' option
|
||||
instead of 'log_config'.
|
||||
"""
|
||||
log_config = config.worker_log_config if use_worker_options else config.log_config
|
||||
|
||||
def read_config(*args, callback=None):
|
||||
if log_config is None:
|
||||
return None
|
||||
|
||||
with open(log_config, "rb") as f:
|
||||
log_config_body = yaml.safe_load(f.read())
|
||||
|
||||
if callback:
|
||||
callback(log_config=log_config_body)
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
return log_config_body
|
||||
|
||||
log_config_body = read_config()
|
||||
|
||||
if log_config_body and log_config_body.get("structured") is True:
|
||||
setup_structured_logging(hs, config, log_config_body)
|
||||
appbase.register_sighup(read_config, callback=reload_structured_logging)
|
||||
else:
|
||||
_setup_stdlib_logging(config, log_config_body)
|
||||
appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
|
||||
|
||||
# make sure that the first thing we log is a thing we can grep backwards
|
||||
# for
|
||||
logging.warn("***** STARTING SERVER *****")
|
||||
logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
|
||||
logging.info("Server hostname: %s", config.server_name)
|
||||
|
||||
@@ -17,11 +17,8 @@
|
||||
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
from textwrap import indent
|
||||
|
||||
import attr
|
||||
import yaml
|
||||
from netaddr import IPSet
|
||||
|
||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||
@@ -355,7 +352,7 @@ class ServerConfig(Config):
|
||||
return any(l["tls"] for l in self.listeners)
|
||||
|
||||
def generate_config_section(
|
||||
self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
|
||||
self, server_name, data_dir_path, open_private_ports, **kwargs
|
||||
):
|
||||
_, bind_port = parse_and_validate_server_name(server_name)
|
||||
if bind_port is not None:
|
||||
@@ -369,68 +366,11 @@ class ServerConfig(Config):
|
||||
# Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
|
||||
# default config string
|
||||
default_room_version = DEFAULT_ROOM_VERSION
|
||||
secure_listeners = []
|
||||
unsecure_listeners = []
|
||||
private_addresses = ["::1", "127.0.0.1"]
|
||||
if listeners:
|
||||
for listener in listeners:
|
||||
if listener["tls"]:
|
||||
secure_listeners.append(listener)
|
||||
else:
|
||||
# If we don't want open ports we need to bind the listeners
|
||||
# to some address other than 0.0.0.0. Here we chose to use
|
||||
# localhost.
|
||||
# If the addresses are already bound we won't overwrite them
|
||||
# however.
|
||||
if not open_private_ports:
|
||||
listener.setdefault("bind_addresses", private_addresses)
|
||||
|
||||
unsecure_listeners.append(listener)
|
||||
|
||||
secure_http_bindings = indent(
|
||||
yaml.dump(secure_listeners), " " * 10
|
||||
).lstrip()
|
||||
|
||||
unsecure_http_bindings = indent(
|
||||
yaml.dump(unsecure_listeners), " " * 10
|
||||
).lstrip()
|
||||
|
||||
if not unsecure_listeners:
|
||||
unsecure_http_bindings = (
|
||||
"""- port: %(unsecure_port)s
|
||||
tls: false
|
||||
type: http
|
||||
x_forwarded: true"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
if not open_private_ports:
|
||||
unsecure_http_bindings += (
|
||||
"\n bind_addresses: ['::1', '127.0.0.1']"
|
||||
)
|
||||
|
||||
unsecure_http_bindings += """
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false"""
|
||||
|
||||
if listeners:
|
||||
# comment out this block
|
||||
unsecure_http_bindings = "#" + re.sub(
|
||||
"\n {10}",
|
||||
lambda match: match.group(0) + "#",
|
||||
unsecure_http_bindings,
|
||||
)
|
||||
|
||||
if not secure_listeners:
|
||||
secure_http_bindings = (
|
||||
"""#- port: %(bind_port)s
|
||||
# type: http
|
||||
# tls: true
|
||||
# resources:
|
||||
# - names: [client, federation]"""
|
||||
% locals()
|
||||
unsecure_http_binding = "port: %i\n tls: false" % (unsecure_port,)
|
||||
if not open_private_ports:
|
||||
unsecure_http_binding += (
|
||||
"\n bind_addresses: ['::1', '127.0.0.1']"
|
||||
)
|
||||
|
||||
return (
|
||||
@@ -616,7 +556,11 @@ class ServerConfig(Config):
|
||||
# will also need to give Synapse a TLS key and certificate: see the TLS section
|
||||
# below.)
|
||||
#
|
||||
%(secure_http_bindings)s
|
||||
#- port: %(bind_port)s
|
||||
# type: http
|
||||
# tls: true
|
||||
# resources:
|
||||
# - names: [client, federation]
|
||||
|
||||
# Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy
|
||||
# that unwraps TLS.
|
||||
@@ -624,7 +568,13 @@ class ServerConfig(Config):
|
||||
# If you plan to use a reverse proxy, please see
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst.
|
||||
#
|
||||
%(unsecure_http_bindings)s
|
||||
- %(unsecure_http_binding)s
|
||||
type: http
|
||||
x_forwarded: true
|
||||
|
||||
resources:
|
||||
- names: [client, federation]
|
||||
compress: false
|
||||
|
||||
# example additional_resources:
|
||||
#
|
||||
|
||||
@@ -27,16 +27,19 @@ class StatsConfig(Config):
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
self.stats_enabled = True
|
||||
self.stats_bucket_size = 86400 * 1000
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_retention = sys.maxsize
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
|
||||
self.stats_bucket_size = self.parse_duration(
|
||||
stats_config.get("bucket_size", "1d")
|
||||
self.stats_bucket_size = (
|
||||
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
|
||||
)
|
||||
self.stats_retention = self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
self.stats_retention = (
|
||||
self.parse_duration(
|
||||
stats_config.get("retention", "%ds" % (sys.maxsize,))
|
||||
)
|
||||
/ 1000
|
||||
)
|
||||
|
||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||
|
||||
@@ -239,38 +239,12 @@ class TlsConfig(Config):
|
||||
self.tls_fingerprints.append({"sha256": sha256_fingerprint})
|
||||
|
||||
def generate_config_section(
|
||||
self,
|
||||
config_dir_path,
|
||||
server_name,
|
||||
data_dir_path,
|
||||
tls_certificate_path,
|
||||
tls_private_key_path,
|
||||
acme_domain,
|
||||
**kwargs
|
||||
self, config_dir_path, server_name, data_dir_path, **kwargs
|
||||
):
|
||||
"""If the acme_domain is specified acme will be enabled.
|
||||
If the TLS paths are not specified the default will be certs in the
|
||||
config directory"""
|
||||
|
||||
base_key_name = os.path.join(config_dir_path, server_name)
|
||||
|
||||
if bool(tls_certificate_path) != bool(tls_private_key_path):
|
||||
raise ConfigError(
|
||||
"Please specify both a cert path and a key path or neither."
|
||||
)
|
||||
|
||||
tls_enabled = (
|
||||
"" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
|
||||
)
|
||||
|
||||
if not tls_certificate_path:
|
||||
tls_certificate_path = base_key_name + ".tls.crt"
|
||||
if not tls_private_key_path:
|
||||
tls_private_key_path = base_key_name + ".tls.key"
|
||||
|
||||
acme_enabled = bool(acme_domain)
|
||||
acme_domain = "matrix.example.com"
|
||||
|
||||
tls_certificate_path = base_key_name + ".tls.crt"
|
||||
tls_private_key_path = base_key_name + ".tls.key"
|
||||
default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
|
||||
|
||||
# this is to avoid the max line length. Sorrynotsorry
|
||||
@@ -295,11 +269,11 @@ class TlsConfig(Config):
|
||||
# instance, if using certbot, use `fullchain.pem` as your certificate,
|
||||
# not `cert.pem`).
|
||||
#
|
||||
%(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
|
||||
#tls_certificate_path: "%(tls_certificate_path)s"
|
||||
|
||||
# PEM-encoded private key for TLS
|
||||
#
|
||||
%(tls_enabled)stls_private_key_path: "%(tls_private_key_path)s"
|
||||
#tls_private_key_path: "%(tls_private_key_path)s"
|
||||
|
||||
# Whether to verify TLS server certificates for outbound federation requests.
|
||||
#
|
||||
@@ -366,10 +340,10 @@ class TlsConfig(Config):
|
||||
# permission to listen on port 80.
|
||||
#
|
||||
acme:
|
||||
# ACME support is disabled by default. Set this to `true` and uncomment
|
||||
# tls_certificate_path and tls_private_key_path above to enable it.
|
||||
# ACME support is disabled by default. Uncomment the following line
|
||||
# (and tls_certificate_path and tls_private_key_path above) to enable it.
|
||||
#
|
||||
enabled: %(acme_enabled)s
|
||||
#enabled: true
|
||||
|
||||
# Endpoint to use to request certificates. If you only want to test,
|
||||
# use Let's Encrypt's staging url:
|
||||
@@ -380,17 +354,17 @@ class TlsConfig(Config):
|
||||
# Port number to listen on for the HTTP-01 challenge. Change this if
|
||||
# you are forwarding connections through Apache/Nginx/etc.
|
||||
#
|
||||
port: 80
|
||||
#port: 80
|
||||
|
||||
# Local addresses to listen on for incoming connections.
|
||||
# Again, you may want to change this if you are forwarding connections
|
||||
# through Apache/Nginx/etc.
|
||||
#
|
||||
bind_addresses: ['::', '0.0.0.0']
|
||||
#bind_addresses: ['::', '0.0.0.0']
|
||||
|
||||
# How many days remaining on a certificate before it is renewed.
|
||||
#
|
||||
reprovision_threshold: 30
|
||||
#reprovision_threshold: 30
|
||||
|
||||
# The domain that the certificate should be for. Normally this
|
||||
# should be the same as your Matrix domain (i.e., 'server_name'), but,
|
||||
@@ -404,7 +378,7 @@ class TlsConfig(Config):
|
||||
#
|
||||
# If not set, defaults to your 'server_name'.
|
||||
#
|
||||
domain: %(acme_domain)s
|
||||
#domain: matrix.example.com
|
||||
|
||||
# file to use for the account key. This will be generated if it doesn't
|
||||
# exist.
|
||||
|
||||
@@ -83,7 +83,7 @@ def compute_content_hash(event_dict, hash_algorithm):
|
||||
event_json_bytes = encode_canonical_json(event_dict)
|
||||
|
||||
hashed = hash_algorithm(event_json_bytes)
|
||||
return hashed.name, hashed.digest()
|
||||
return (hashed.name, hashed.digest())
|
||||
|
||||
|
||||
def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
|
||||
@@ -106,7 +106,7 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
|
||||
event_dict.pop("unsigned", None)
|
||||
event_json_bytes = encode_canonical_json(event_dict)
|
||||
hashed = hash_algorithm(event_json_bytes)
|
||||
return hashed.name, hashed.digest()
|
||||
return (hashed.name, hashed.digest())
|
||||
|
||||
|
||||
def compute_event_signature(event_dict, signature_name, signing_key):
|
||||
|
||||
@@ -18,6 +18,7 @@ import logging
|
||||
from collections import defaultdict
|
||||
|
||||
import six
|
||||
from six import raise_from
|
||||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
@@ -29,6 +30,7 @@ from signedjson.key import (
|
||||
from signedjson.sign import (
|
||||
SignatureVerifyException,
|
||||
encode_canonical_json,
|
||||
sign_json,
|
||||
signature_ids,
|
||||
verify_signed_json,
|
||||
)
|
||||
@@ -538,7 +540,13 @@ class BaseV2KeyFetcher(object):
|
||||
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
|
||||
)
|
||||
|
||||
key_json_bytes = encode_canonical_json(response_json)
|
||||
# re-sign the json with our own key, so that it is ready if we are asked to
|
||||
# give it out as a notary server
|
||||
signed_key_json = sign_json(
|
||||
response_json, self.config.server_name, self.config.signing_key[0]
|
||||
)
|
||||
|
||||
signed_key_json_bytes = encode_canonical_json(signed_key_json)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -550,7 +558,7 @@ class BaseV2KeyFetcher(object):
|
||||
from_server=from_server,
|
||||
ts_now_ms=time_added_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=key_json_bytes,
|
||||
key_json_bytes=signed_key_json_bytes,
|
||||
)
|
||||
for key_id in verify_keys
|
||||
],
|
||||
@@ -649,10 +657,9 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
||||
},
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
# these both have str() representations which we can't really improve upon
|
||||
raise KeyLookupError(str(e))
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
except HttpResponseException as e:
|
||||
raise KeyLookupError("Remote server returned an error: %s" % (e,))
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
|
||||
keys = {}
|
||||
added_keys = []
|
||||
@@ -814,11 +821,9 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
timeout=10000,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
# these both have str() representations which we can't really improve
|
||||
# upon
|
||||
raise KeyLookupError(str(e))
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
except HttpResponseException as e:
|
||||
raise KeyLookupError("Remote server returned an error: %s" % (e,))
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
|
||||
if response["server_name"] != server_name:
|
||||
raise KeyLookupError(
|
||||
|
||||
@@ -355,7 +355,7 @@ class FederationClient(FederationBase):
|
||||
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
return pdus, auth_chain
|
||||
return (pdus, auth_chain)
|
||||
except HttpResponseException as e:
|
||||
if e.code == 400 or e.code == 404:
|
||||
logger.info("Failed to use get_room_state_ids API, falling back")
|
||||
@@ -404,7 +404,7 @@ class FederationClient(FederationBase):
|
||||
|
||||
signed_auth.sort(key=lambda e: e.depth)
|
||||
|
||||
return signed_pdus, signed_auth
|
||||
return (signed_pdus, signed_auth)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
|
||||
@@ -429,7 +429,7 @@ class FederationClient(FederationBase):
|
||||
missing_events.discard(k)
|
||||
|
||||
if not missing_events:
|
||||
return signed_events, failed_to_fetch
|
||||
return (signed_events, failed_to_fetch)
|
||||
|
||||
logger.debug(
|
||||
"Fetching unknown state/auth events %s for room %s",
|
||||
@@ -465,7 +465,7 @@ class FederationClient(FederationBase):
|
||||
# We removed all events we successfully fetched from `batch`
|
||||
failed_to_fetch.update(batch)
|
||||
|
||||
return signed_events, failed_to_fetch
|
||||
return (signed_events, failed_to_fetch)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
||||
@@ -43,7 +43,6 @@ from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.http.endpoint import parse_server_name
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
@@ -100,7 +99,7 @@ class FederationServer(FederationBase):
|
||||
|
||||
res = self._transaction_from_pdus(pdus).get_dict()
|
||||
|
||||
return 200, res
|
||||
return (200, res)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
@@ -163,7 +162,7 @@ class FederationServer(FederationBase):
|
||||
yield self.transaction_actions.set_response(
|
||||
origin, transaction, 400, response
|
||||
)
|
||||
return 400, response
|
||||
return (400, response)
|
||||
|
||||
received_pdus_counter.inc(len(transaction.pdus))
|
||||
|
||||
@@ -265,7 +264,7 @@ class FederationServer(FederationBase):
|
||||
logger.debug("Returning: %s", str(response))
|
||||
|
||||
yield self.transaction_actions.set_response(origin, transaction, 200, response)
|
||||
return 200, response
|
||||
return (200, response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def received_edu(self, origin, edu_type, content):
|
||||
@@ -298,7 +297,7 @@ class FederationServer(FederationBase):
|
||||
event_id,
|
||||
)
|
||||
|
||||
return 200, resp
|
||||
return (200, resp)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_state_ids_request(self, origin, room_id, event_id):
|
||||
@@ -315,7 +314,7 @@ class FederationServer(FederationBase):
|
||||
state_ids = yield self.handler.get_state_ids_for_pdu(room_id, event_id)
|
||||
auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
|
||||
|
||||
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
|
||||
return (200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_context_state_request_compute(self, room_id, event_id):
|
||||
@@ -345,15 +344,15 @@ class FederationServer(FederationBase):
|
||||
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
|
||||
|
||||
if pdu:
|
||||
return 200, self._transaction_from_pdus([pdu]).get_dict()
|
||||
return (200, self._transaction_from_pdus([pdu]).get_dict())
|
||||
else:
|
||||
return 404, ""
|
||||
return (404, "")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_query_request(self, query_type, args):
|
||||
received_queries_counter.labels(query_type).inc()
|
||||
resp = yield self.registry.on_query(query_type, args)
|
||||
return 200, resp
|
||||
return (200, resp)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
|
||||
@@ -435,7 +434,7 @@ class FederationServer(FederationBase):
|
||||
|
||||
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
|
||||
yield self.handler.on_send_leave_request(origin, pdu)
|
||||
return 200, {}
|
||||
return (200, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_event_auth(self, origin, room_id, event_id):
|
||||
@@ -446,7 +445,7 @@ class FederationServer(FederationBase):
|
||||
time_now = self._clock.time_msec()
|
||||
auth_pdus = yield self.handler.on_event_auth(event_id)
|
||||
res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
|
||||
return 200, res
|
||||
return (200, res)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_query_auth_request(self, origin, content, room_id, event_id):
|
||||
@@ -499,7 +498,7 @@ class FederationServer(FederationBase):
|
||||
"missing": ret.get("missing", []),
|
||||
}
|
||||
|
||||
return 200, send_content
|
||||
return (200, send_content)
|
||||
|
||||
@log_function
|
||||
def on_query_client_keys(self, origin, content):
|
||||
@@ -508,7 +507,6 @@ class FederationServer(FederationBase):
|
||||
def on_query_user_devices(self, origin, user_id):
|
||||
return self.on_query_request("user_devices", user_id)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_claim_client_keys(self, origin, content):
|
||||
@@ -517,7 +515,6 @@ class FederationServer(FederationBase):
|
||||
for device_id, algorithm in device_keys.items():
|
||||
query.append((user_id, device_id, algorithm))
|
||||
|
||||
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
|
||||
results = yield self.store.claim_e2e_one_time_keys(query)
|
||||
|
||||
json_result = {}
|
||||
@@ -811,13 +808,12 @@ class FederationHandlerRegistry(object):
|
||||
if not handler:
|
||||
logger.warn("No handler registered for EDU type %s", edu_type)
|
||||
|
||||
with start_active_span_from_edu(content, "handle_edu"):
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
||||
except Exception:
|
||||
logger.exception("Failed to handle edu %r", edu_type)
|
||||
|
||||
def on_query(self, query_type, args):
|
||||
handler = self.query_handlers.get(query_type)
|
||||
|
||||
@@ -14,19 +14,11 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.logging.opentracing import (
|
||||
extract_text_map,
|
||||
set_tag,
|
||||
start_active_span_follows_from,
|
||||
tags,
|
||||
)
|
||||
from synapse.util.metrics import measure_func
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -52,109 +44,93 @@ class TransactionManager(object):
|
||||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
||||
# Make a transaction-sending opentracing span. This span follows on from
|
||||
# all the edus in that transaction. This needs to be done since there is
|
||||
# no active span here, so if the edus were not received by the remote the
|
||||
# span would have no causality and it would be forgotten.
|
||||
# The span_contexts is a generator so that it won't be evaluated if
|
||||
# opentracing is disabled. (Yay speed!)
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
pdus = [x[0] for x in pending_pdus]
|
||||
edus = pending_edus
|
||||
|
||||
span_contexts = (
|
||||
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
|
||||
success = True
|
||||
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
|
||||
txn_id = str(self._next_txn_id)
|
||||
|
||||
logger.debug(
|
||||
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
|
||||
destination,
|
||||
txn_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
)
|
||||
|
||||
with start_active_span_follows_from("send_transaction", span_contexts):
|
||||
transaction = Transaction.create_new(
|
||||
origin_server_ts=int(self.clock.time_msec()),
|
||||
transaction_id=txn_id,
|
||||
origin=self._server_name,
|
||||
destination=destination,
|
||||
pdus=pdus,
|
||||
edus=edus,
|
||||
)
|
||||
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
pdus = [x[0] for x in pending_pdus]
|
||||
edus = pending_edus
|
||||
self._next_txn_id += 1
|
||||
|
||||
success = True
|
||||
logger.info(
|
||||
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
|
||||
destination,
|
||||
txn_id,
|
||||
transaction.transaction_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
)
|
||||
|
||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||
# Actually send the transaction
|
||||
|
||||
txn_id = str(self._next_txn_id)
|
||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||
# keys work
|
||||
def json_data_cb():
|
||||
data = transaction.get_dict()
|
||||
now = int(self.clock.time_msec())
|
||||
if "pdus" in data:
|
||||
for p in data["pdus"]:
|
||||
if "age_ts" in p:
|
||||
unsigned = p.setdefault("unsigned", {})
|
||||
unsigned["age"] = now - int(p["age_ts"])
|
||||
del p["age_ts"]
|
||||
return data
|
||||
|
||||
logger.debug(
|
||||
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
|
||||
destination,
|
||||
txn_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
try:
|
||||
response = yield self._transport_layer.send_transaction(
|
||||
transaction, json_data_cb
|
||||
)
|
||||
code = 200
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
response = e.response
|
||||
|
||||
transaction = Transaction.create_new(
|
||||
origin_server_ts=int(self.clock.time_msec()),
|
||||
transaction_id=txn_id,
|
||||
origin=self._server_name,
|
||||
destination=destination,
|
||||
pdus=pdus,
|
||||
edus=edus,
|
||||
)
|
||||
if e.code in (401, 404, 429) or 500 <= e.code:
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
raise e
|
||||
|
||||
self._next_txn_id += 1
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
|
||||
logger.info(
|
||||
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
|
||||
destination,
|
||||
txn_id,
|
||||
transaction.transaction_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
)
|
||||
|
||||
# Actually send the transaction
|
||||
|
||||
# FIXME (erikj): This is a bit of a hack to make the Pdu age
|
||||
# keys work
|
||||
def json_data_cb():
|
||||
data = transaction.get_dict()
|
||||
now = int(self.clock.time_msec())
|
||||
if "pdus" in data:
|
||||
for p in data["pdus"]:
|
||||
if "age_ts" in p:
|
||||
unsigned = p.setdefault("unsigned", {})
|
||||
unsigned["age"] = now - int(p["age_ts"])
|
||||
del p["age_ts"]
|
||||
return data
|
||||
|
||||
try:
|
||||
response = yield self._transport_layer.send_transaction(
|
||||
transaction, json_data_cb
|
||||
)
|
||||
code = 200
|
||||
except HttpResponseException as e:
|
||||
code = e.code
|
||||
response = e.response
|
||||
|
||||
if e.code in (401, 404, 429) or 500 <= e.code:
|
||||
logger.info(
|
||||
"TX [%s] {%s} got %d response", destination, txn_id, code
|
||||
)
|
||||
raise e
|
||||
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
|
||||
if code == 200:
|
||||
for e_id, r in response.get("pdus", {}).items():
|
||||
if "error" in r:
|
||||
logger.warn(
|
||||
"TX [%s] {%s} Remote returned error for %s: %s",
|
||||
destination,
|
||||
txn_id,
|
||||
e_id,
|
||||
r,
|
||||
)
|
||||
else:
|
||||
for p in pdus:
|
||||
if code == 200:
|
||||
for e_id, r in response.get("pdus", {}).items():
|
||||
if "error" in r:
|
||||
logger.warn(
|
||||
"TX [%s] {%s} Failed to send event %s",
|
||||
"TX [%s] {%s} Remote returned error for %s: %s",
|
||||
destination,
|
||||
txn_id,
|
||||
p.event_id,
|
||||
e_id,
|
||||
r,
|
||||
)
|
||||
success = False
|
||||
else:
|
||||
for p in pdus:
|
||||
logger.warn(
|
||||
"TX [%s] {%s} Failed to send event %s",
|
||||
destination,
|
||||
txn_id,
|
||||
p.event_id,
|
||||
)
|
||||
success = False
|
||||
|
||||
set_tag(tags.ERROR, not success)
|
||||
return success
|
||||
return success
|
||||
|
||||
@@ -327,37 +327,21 @@ class TransportLayerClient(object):
|
||||
include_all_networks=False,
|
||||
third_party_instance_id=None,
|
||||
):
|
||||
if search_filter:
|
||||
# this uses MSC2197 (Search Filtering over Federation)
|
||||
path = _create_v1_path("/publicRooms")
|
||||
path = _create_v1_path("/publicRooms")
|
||||
|
||||
data = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
data["third_party_instance_id"] = third_party_instance_id
|
||||
if limit:
|
||||
data["limit"] = str(limit)
|
||||
if since_token:
|
||||
data["since"] = since_token
|
||||
args = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
args["third_party_instance_id"] = (third_party_instance_id,)
|
||||
if limit:
|
||||
args["limit"] = [str(limit)]
|
||||
if since_token:
|
||||
args["since"] = [since_token]
|
||||
|
||||
data["filter"] = search_filter
|
||||
# TODO(erikj): Actually send the search_filter across federation.
|
||||
|
||||
response = yield self.client.post_json(
|
||||
destination=remote_server, path=path, data=data, ignore_backoff=True
|
||||
)
|
||||
else:
|
||||
path = _create_v1_path("/publicRooms")
|
||||
|
||||
args = {"include_all_networks": "true" if include_all_networks else "false"}
|
||||
if third_party_instance_id:
|
||||
args["third_party_instance_id"] = (third_party_instance_id,)
|
||||
if limit:
|
||||
args["limit"] = [str(limit)]
|
||||
if since_token:
|
||||
args["since"] = [since_token]
|
||||
|
||||
response = yield self.client.get_json(
|
||||
destination=remote_server, path=path, args=args, ignore_backoff=True
|
||||
)
|
||||
response = yield self.client.get_json(
|
||||
destination=remote_server, path=path, args=args, ignore_backoff=True
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@@ -38,12 +38,7 @@ from synapse.http.servlet import (
|
||||
parse_string_from_args,
|
||||
)
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
start_active_span_from_request,
|
||||
tags,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.logging.opentracing import start_active_span_from_context, tags
|
||||
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
@@ -293,28 +288,20 @@ class BaseFederationServlet(object):
|
||||
logger.warn("authenticate_request failed: %s", e)
|
||||
raise
|
||||
|
||||
request_tags = {
|
||||
"request_id": request.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
tags.HTTP_URL: request.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
}
|
||||
|
||||
# Only accept the span context if the origin is authenticated
|
||||
# and whitelisted
|
||||
if origin and whitelisted_homeserver(origin):
|
||||
scope = start_active_span_from_request(
|
||||
request, "incoming-federation-request", tags=request_tags
|
||||
)
|
||||
else:
|
||||
scope = start_active_span(
|
||||
"incoming-federation-request", tags=request_tags
|
||||
)
|
||||
|
||||
with scope:
|
||||
# Start an opentracing span
|
||||
with start_active_span_from_context(
|
||||
request.requestHeaders,
|
||||
"incoming-federation-request",
|
||||
tags={
|
||||
"request_id": request.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
tags.HTTP_URL: request.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
},
|
||||
):
|
||||
if origin:
|
||||
with ratelimiter.ratelimit(origin) as d:
|
||||
await d
|
||||
@@ -770,42 +757,6 @@ class PublicRoomList(BaseFederationServlet):
|
||||
)
|
||||
return 200, data
|
||||
|
||||
async def on_POST(self, origin, content, query):
|
||||
# This implements MSC2197 (Search Filtering over Federation)
|
||||
if not self.allow_access:
|
||||
raise FederationDeniedError(origin)
|
||||
|
||||
limit = int(content.get("limit", 100))
|
||||
since_token = content.get("since", None)
|
||||
search_filter = content.get("filter", None)
|
||||
|
||||
include_all_networks = content.get("include_all_networks", False)
|
||||
third_party_instance_id = content.get("third_party_instance_id", None)
|
||||
|
||||
if include_all_networks:
|
||||
network_tuple = None
|
||||
if third_party_instance_id is not None:
|
||||
raise SynapseError(
|
||||
400, "Can't use include_all_networks with an explicit network"
|
||||
)
|
||||
elif third_party_instance_id is None:
|
||||
network_tuple = ThirdPartyInstanceID(None, None)
|
||||
else:
|
||||
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
|
||||
|
||||
if search_filter is None:
|
||||
logger.warning("Nonefilter")
|
||||
|
||||
data = await self.handler.get_local_public_room_list(
|
||||
limit=limit,
|
||||
since_token=since_token,
|
||||
search_filter=search_filter,
|
||||
network_tuple=network_tuple,
|
||||
from_federation=True,
|
||||
)
|
||||
|
||||
return 200, data
|
||||
|
||||
|
||||
class FederationVersionServlet(BaseFederationServlet):
|
||||
PATH = "/version"
|
||||
|
||||
@@ -38,9 +38,6 @@ class Edu(JsonEncodedObject):
|
||||
|
||||
internal_keys = ["origin", "destination"]
|
||||
|
||||
def get_context(self):
|
||||
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
|
||||
|
||||
|
||||
class Transaction(JsonEncodedObject):
|
||||
""" A transaction is a list of Pdus and Edus to be sent to a remote home
|
||||
|
||||
@@ -51,8 +51,8 @@ class AccountDataEventSource(object):
|
||||
{"type": account_data_type, "content": content, "room_id": room_id}
|
||||
)
|
||||
|
||||
return results, current_stream_id
|
||||
return (results, current_stream_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pagination_rows(self, user, config, key):
|
||||
return [], config.to_id
|
||||
return ([], config.to_id)
|
||||
|
||||
@@ -94,15 +94,6 @@ class AdminHandler(BaseHandler):
|
||||
|
||||
return ret
|
||||
|
||||
def get_user_server_admin(self, user):
|
||||
"""
|
||||
Get the admin bit on a user.
|
||||
|
||||
Args:
|
||||
user_id (UserID): the (necessarily local) user to manipulate
|
||||
"""
|
||||
return self.store.is_server_admin(user)
|
||||
|
||||
def set_user_server_admin(self, user, admin):
|
||||
"""
|
||||
Set the admin bit on a user.
|
||||
|
||||
@@ -280,7 +280,7 @@ class AuthHandler(BaseHandler):
|
||||
creds,
|
||||
list(clientdict),
|
||||
)
|
||||
return creds, clientdict, session["id"]
|
||||
return (creds, clientdict, session["id"])
|
||||
|
||||
ret = self._auth_dict_for_flows(flows, session)
|
||||
ret["completed"] = list(creds)
|
||||
@@ -722,7 +722,7 @@ class AuthHandler(BaseHandler):
|
||||
known_login_type = True
|
||||
is_valid = yield provider.check_password(qualified_user_id, password)
|
||||
if is_valid:
|
||||
return qualified_user_id, None
|
||||
return (qualified_user_id, None)
|
||||
|
||||
if not hasattr(provider, "get_supported_login_types") or not hasattr(
|
||||
provider, "check_auth"
|
||||
@@ -766,7 +766,7 @@ class AuthHandler(BaseHandler):
|
||||
)
|
||||
|
||||
if canonical_user_id:
|
||||
return canonical_user_id, None
|
||||
return (canonical_user_id, None)
|
||||
|
||||
if not known_login_type:
|
||||
raise SynapseError(400, "Unknown login type %s" % login_type)
|
||||
@@ -816,7 +816,7 @@ class AuthHandler(BaseHandler):
|
||||
result = (result, None)
|
||||
return result
|
||||
|
||||
return None, None
|
||||
return (None, None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_local_password(self, user_id, password):
|
||||
|
||||
@@ -15,17 +15,9 @@
|
||||
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_text_map,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
@@ -108,21 +100,14 @@ class DeviceMessageHandler(object):
|
||||
|
||||
message_id = random_string(16)
|
||||
|
||||
context = get_active_span_text_map()
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
with start_active_span("to_device_for_user"):
|
||||
set_tag("destination", destination)
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"org.matrix.opentracing_context": json.dumps(context)
|
||||
if whitelisted_homeserver(destination)
|
||||
else None,
|
||||
}
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
}
|
||||
|
||||
stream_id = yield self.store.add_messages_to_device_inbox(
|
||||
local_messages, remote_edu_contents
|
||||
|
||||
@@ -24,7 +24,6 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import CodeMessageException, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
@@ -47,7 +46,6 @@ class E2eKeysHandler(object):
|
||||
"client_keys", self.on_federation_query_client_keys
|
||||
)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body, timeout):
|
||||
""" Handle a device key query from a client
|
||||
@@ -83,9 +81,6 @@ class E2eKeysHandler(object):
|
||||
else:
|
||||
remote_queries[user_id] = device_ids
|
||||
|
||||
set_tag("local_key_query", local_query)
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
# First get local devices.
|
||||
failures = {}
|
||||
results = {}
|
||||
@@ -126,7 +121,6 @@ class E2eKeysHandler(object):
|
||||
r[user_id] = remote_queries[user_id]
|
||||
|
||||
# Now fetch any devices that we don't have in our cache
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def do_remote_query(destination):
|
||||
"""This is called when we are querying the device list of a user on
|
||||
@@ -191,8 +185,6 @@ class E2eKeysHandler(object):
|
||||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
set_tag("error", True)
|
||||
set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -206,7 +198,6 @@ class E2eKeysHandler(object):
|
||||
|
||||
return {"device_keys": results, "failures": failures}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
"""Get E2E device keys for local users
|
||||
@@ -219,7 +210,6 @@ class E2eKeysHandler(object):
|
||||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
map from user_id -> device_id -> device details
|
||||
"""
|
||||
set_tag("local_query", query)
|
||||
local_query = []
|
||||
|
||||
result_dict = {}
|
||||
@@ -227,14 +217,6 @@ class E2eKeysHandler(object):
|
||||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Requested a local key for a user which"
|
||||
" was not local to the homeserver",
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
set_tag("error", True)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not device_ids:
|
||||
@@ -259,7 +241,6 @@ class E2eKeysHandler(object):
|
||||
r["unsigned"]["device_display_name"] = display_name
|
||||
result_dict[user_id][device_id] = r
|
||||
|
||||
log_kv(results)
|
||||
return result_dict
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -270,7 +251,6 @@ class E2eKeysHandler(object):
|
||||
res = yield self.query_local_devices(device_keys_query)
|
||||
return {"device_keys": res}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_one_time_keys(self, query, timeout):
|
||||
local_query = []
|
||||
@@ -285,9 +265,6 @@ class E2eKeysHandler(object):
|
||||
domain = get_domain_from_id(user_id)
|
||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||
|
||||
set_tag("local_key_query", local_query)
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
@@ -299,10 +276,8 @@ class E2eKeysHandler(object):
|
||||
key_id: json.loads(json_bytes)
|
||||
}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_client_keys(destination):
|
||||
set_tag("destination", destination)
|
||||
device_keys = remote_queries[destination]
|
||||
try:
|
||||
remote_result = yield self.federation.claim_client_keys(
|
||||
@@ -315,8 +290,6 @@ class E2eKeysHandler(object):
|
||||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
set_tag("error", True)
|
||||
set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
@@ -340,11 +313,9 @@ class E2eKeysHandler(object):
|
||||
),
|
||||
)
|
||||
|
||||
log_kv({"one_time_keys": json_result, "failures": failures})
|
||||
return {"one_time_keys": json_result, "failures": failures}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@tag_args
|
||||
def upload_keys_for_user(self, user_id, device_id, keys):
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
@@ -358,13 +329,6 @@ class E2eKeysHandler(object):
|
||||
user_id,
|
||||
time_now,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating device_keys for user.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
# TODO: Sign the JSON with the server key
|
||||
changed = yield self.store.set_e2e_device_keys(
|
||||
user_id, device_id, time_now, device_keys
|
||||
@@ -372,24 +336,12 @@ class E2eKeysHandler(object):
|
||||
if changed:
|
||||
# Only notify about device updates *if* the keys actually changed
|
||||
yield self.device_handler.notify_device_update(user_id, [device_id])
|
||||
else:
|
||||
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
|
||||
|
||||
one_time_keys = keys.get("one_time_keys", None)
|
||||
if one_time_keys:
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating one_time_keys for device.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
yield self._upload_one_time_keys_for_user(
|
||||
user_id, device_id, time_now, one_time_keys
|
||||
)
|
||||
else:
|
||||
log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
|
||||
# the device should have been registered already, but it may have been
|
||||
# deleted due to a race with a DELETE request. Or we may be using an
|
||||
@@ -400,7 +352,6 @@ class E2eKeysHandler(object):
|
||||
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
|
||||
set_tag("one_time_key_counts", result)
|
||||
return {"one_time_key_counts": result}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -444,7 +395,6 @@ class E2eKeysHandler(object):
|
||||
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
||||
)
|
||||
|
||||
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
|
||||
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ from synapse.api.errors import (
|
||||
StoreError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -50,7 +49,6 @@ class E2eRoomKeysHandler(object):
|
||||
# changed.
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -86,10 +84,8 @@ class E2eRoomKeysHandler(object):
|
||||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
log_kv(results)
|
||||
return results
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
@@ -111,7 +107,6 @@ class E2eRoomKeysHandler(object):
|
||||
with (yield self._upload_linearizer.queue(user_id)):
|
||||
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def upload_room_keys(self, user_id, version, room_keys):
|
||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||
@@ -191,14 +186,7 @@ class E2eRoomKeysHandler(object):
|
||||
session_id(str): the session whose room_key we're setting
|
||||
room_key(dict): the room_key being set
|
||||
"""
|
||||
log_kv(
|
||||
{
|
||||
"message": "Trying to upload room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
|
||||
# get the room_key for this particular row
|
||||
current_room_key = None
|
||||
try:
|
||||
@@ -207,23 +195,14 @@ class E2eRoomKeysHandler(object):
|
||||
)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
log_kv(
|
||||
{
|
||||
"message": "Room key not found.",
|
||||
"room_id": room_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
if self._should_replace_room_key(current_room_key, room_key):
|
||||
log_kv({"message": "Replacing room key."})
|
||||
yield self.store.set_e2e_room_key(
|
||||
user_id, version, room_id, session_id, room_key
|
||||
)
|
||||
else:
|
||||
log_kv({"message": "Not replacing room_key."})
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(current_room_key, room_key):
|
||||
@@ -257,7 +236,6 @@ class E2eRoomKeysHandler(object):
|
||||
return False
|
||||
return True
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def create_version(self, user_id, version_info):
|
||||
"""Create a new backup version. This automatically becomes the new
|
||||
@@ -316,7 +294,6 @@ class E2eRoomKeysHandler(object):
|
||||
raise
|
||||
return res
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_version(self, user_id, version=None):
|
||||
"""Deletes a given version of the user's e2e_room_keys backup
|
||||
@@ -337,7 +314,6 @@ class E2eRoomKeysHandler(object):
|
||||
else:
|
||||
raise
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def update_version(self, user_id, version, version_info):
|
||||
"""Update the info about a given version of the user's backup
|
||||
|
||||
@@ -326,9 +326,8 @@ class FederationHandler(BaseHandler):
|
||||
ours = yield self.store.get_state_groups_ids(room_id, seen)
|
||||
|
||||
# state_maps is a list of mappings from (type, state_key) to event_id
|
||||
state_maps = list(
|
||||
ours.values()
|
||||
) # type: list[dict[tuple[str, str], str]]
|
||||
# type: list[dict[tuple[str, str], str]]
|
||||
state_maps = list(ours.values())
|
||||
|
||||
# we don't need this any more, let's delete it.
|
||||
del ours
|
||||
@@ -1428,7 +1427,7 @@ class FederationHandler(BaseHandler):
|
||||
assert event.user_id == user_id
|
||||
assert event.state_key == user_id
|
||||
assert event.room_id == room_id
|
||||
return origin, event, format_ver
|
||||
return (origin, event, format_ver)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
||||
@@ -449,7 +449,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
||||
return member_event.membership, member_event.event_id
|
||||
return (member_event.membership, member_event.event_id)
|
||||
return
|
||||
except AuthError:
|
||||
visibility = yield self.state_handler.get_current_state(
|
||||
@@ -459,7 +459,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
visibility
|
||||
and visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
return Membership.JOIN, None
|
||||
return (Membership.JOIN, None)
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
|
||||
@@ -24,7 +24,7 @@ from twisted.internet import defer
|
||||
from twisted.internet.defer import succeed
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
|
||||
from synapse.api.constants import EventTypes, Membership, RelationTypes
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
@@ -469,9 +469,6 @@ class EventCreationHandler(object):
|
||||
|
||||
u = yield self.store.get_user_by_id(user_id)
|
||||
assert u is not None
|
||||
if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
|
||||
# support and bot users are not required to consent
|
||||
return
|
||||
if u["appservice_id"] is not None:
|
||||
# users registered by an appservice are exempt
|
||||
return
|
||||
|
||||
@@ -70,7 +70,6 @@ class PaginationHandler(object):
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
self._server_name = hs.hostname
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
@@ -154,22 +153,6 @@ class PaginationHandler(object):
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
async def purge_room(self, room_id):
|
||||
"""Purge the given room from the database"""
|
||||
with (await self.pagination_lock.write(room_id)):
|
||||
# check we know about the room
|
||||
await self.store.get_room_version(room_id)
|
||||
|
||||
# first check that we have no users in this room
|
||||
joined = await defer.maybeDeferred(
|
||||
self.store.is_host_joined, room_id, self._server_name
|
||||
)
|
||||
|
||||
if joined:
|
||||
raise SynapseError(400, "Users are still joined to this room")
|
||||
|
||||
await self.store.purge_room(room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(
|
||||
self,
|
||||
|
||||
@@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
|
||||
#
|
||||
# Hence this guard where we just return nothing so that the sync
|
||||
# doesn't return. C.f. #5503.
|
||||
return [], max_token
|
||||
return ([], max_token)
|
||||
|
||||
presence = self.get_presence_handler()
|
||||
stream_change_cache = self.store.presence_stream_cache
|
||||
@@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
|
||||
# Always notify self
|
||||
users_to_states.setdefault(state.user_id, []).append(state)
|
||||
|
||||
return room_ids_to_states, users_to_states
|
||||
return (room_ids_to_states, users_to_states)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -34,7 +34,7 @@ from ._base import BaseHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_DISPLAYNAME_LEN = 256
|
||||
MAX_DISPLAYNAME_LEN = 100
|
||||
MAX_AVATAR_URL_LEN = 1000
|
||||
|
||||
|
||||
|
||||
@@ -148,7 +148,7 @@ class ReceiptEventSource(object):
|
||||
to_key = yield self.get_current_key()
|
||||
|
||||
if from_key == to_key:
|
||||
return [], to_key
|
||||
return ([], to_key)
|
||||
|
||||
events = yield self.store.get_linearized_receipts_for_rooms(
|
||||
room_ids, from_key=from_key, to_key=to_key
|
||||
|
||||
@@ -622,7 +622,7 @@ class RegistrationHandler(BaseHandler):
|
||||
initial_display_name=initial_display_name,
|
||||
is_guest=is_guest,
|
||||
)
|
||||
return r["device_id"], r["access_token"]
|
||||
return (r["device_id"], r["access_token"])
|
||||
|
||||
valid_until_ms = None
|
||||
if self.session_lifetime is not None:
|
||||
|
||||
@@ -25,7 +25,6 @@ from unpaddedbase64 import decode_base64, encode_base64
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.api.errors import Codes, HttpResponseException
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
@@ -486,33 +485,7 @@ class RoomListHandler(BaseHandler):
|
||||
return {"chunk": [], "total_room_count_estimate": 0}
|
||||
|
||||
if search_filter:
|
||||
# Searching across federation is defined in MSC2197.
|
||||
# However, the remote homeserver may or may not actually support it.
|
||||
# So we first try an MSC2197 remote-filtered search, then fall back
|
||||
# to a locally-filtered search if we must.
|
||||
|
||||
try:
|
||||
res = yield self._get_remote_list_cached(
|
||||
server_name,
|
||||
limit=limit,
|
||||
since_token=since_token,
|
||||
include_all_networks=include_all_networks,
|
||||
third_party_instance_id=third_party_instance_id,
|
||||
search_filter=search_filter,
|
||||
)
|
||||
return res
|
||||
except HttpResponseException as hre:
|
||||
syn_err = hre.to_synapse_error()
|
||||
if hre.code in (404, 405) or syn_err.errcode in (
|
||||
Codes.UNRECOGNIZED,
|
||||
Codes.NOT_FOUND,
|
||||
):
|
||||
logger.debug("Falling back to locally-filtered /publicRooms")
|
||||
else:
|
||||
raise # Not an error that should trigger a fallback.
|
||||
|
||||
# if we reach this point, then we fall back to the situation where
|
||||
# we currently don't support searching across federation, so we have
|
||||
# We currently don't support searching across federation, so we have
|
||||
# to do it manually without pagination
|
||||
limit = None
|
||||
since_token = None
|
||||
|
||||
@@ -903,7 +903,7 @@ class RoomMemberHandler(object):
|
||||
if not public_keys:
|
||||
public_keys.append(fallback_public_key)
|
||||
display_name = data["display_name"]
|
||||
return token, public_keys, fallback_public_key, display_name
|
||||
return (token, public_keys, fallback_public_key, display_name)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_host_in_room(self, current_state_ids):
|
||||
|
||||
@@ -49,6 +49,9 @@ class StatsHandler(StateDeltasHandler):
|
||||
# The current position in the current_state_delta stream
|
||||
self.pos = None
|
||||
|
||||
# Guard to ensure we only process deltas one at a time
|
||||
self._is_processing = False
|
||||
|
||||
if hs.config.stats_enabled:
|
||||
self.notifier.add_replication_callback(self.notify_new_event)
|
||||
|
||||
@@ -62,61 +65,43 @@ class StatsHandler(StateDeltasHandler):
|
||||
if not self.hs.config.stats_enabled:
|
||||
return
|
||||
|
||||
lock = self.store.stats_delta_processing_lock
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process():
|
||||
yield lock.acquire()
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
finally:
|
||||
yield lock.release()
|
||||
self._is_processing = False
|
||||
|
||||
if not lock.locked:
|
||||
# we only want to run this process one-at-a-time,
|
||||
# and also, if the initial background updater wants us to keep out,
|
||||
# we should respect that.
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
self._is_processing = True
|
||||
run_as_background_process("stats.notify_new_event", process)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
# If self.pos is None then means we haven't fetched it from DB
|
||||
# If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
|
||||
# but note that this might be outdated, so we retrieve the positions again.
|
||||
if self.pos is None or None in self.pos.values():
|
||||
self.pos = yield self.store.get_stats_positions()
|
||||
if self.pos is None:
|
||||
self.pos = yield self.store.get_stats_stream_pos()
|
||||
|
||||
# If still contains a None position, then the stats regenerator hasn't started yet
|
||||
if None in self.pos.values():
|
||||
# If still None then the initial background update hasn't happened yet
|
||||
if self.pos is None:
|
||||
return None
|
||||
|
||||
# Loop round handling deltas until we're up to date
|
||||
|
||||
while True:
|
||||
with Measure(self.clock, "stats_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
return
|
||||
|
||||
if deltas:
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_positions(self.pos)
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
yield self.store.update_stats_stream_pos(self.pos)
|
||||
|
||||
event_processing_positions.labels("stats").set(
|
||||
self.pos["state_delta_stream_id"]
|
||||
)
|
||||
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
with Measure(self.clock, "stats_total_events_and_bytes"):
|
||||
self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
|
||||
self.pos
|
||||
)
|
||||
|
||||
if not deltas and not had_counts:
|
||||
break
|
||||
event_processing_positions.labels("stats").set(self.pos)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
@@ -130,10 +115,11 @@ class StatsHandler(StateDeltasHandler):
|
||||
event_id = delta["event_id"]
|
||||
stream_id = delta["stream_id"]
|
||||
prev_event_id = delta["prev_event_id"]
|
||||
stream_pos = delta["stream_id"]
|
||||
|
||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||
|
||||
token = yield self.store.get_earliest_token_for_stats("room", room_id)
|
||||
token = yield self.store.get_earliest_token_for_room_stats(room_id)
|
||||
|
||||
# If the earliest token to begin from is larger than our current
|
||||
# stream ID, skip processing this delta.
|
||||
@@ -145,10 +131,7 @@ class StatsHandler(StateDeltasHandler):
|
||||
continue
|
||||
|
||||
if event_id is None and prev_event_id is None:
|
||||
logger.error(
|
||||
"event ID is None and so is the previous event ID. stream_id: %s",
|
||||
stream_id,
|
||||
)
|
||||
# Errr...
|
||||
continue
|
||||
|
||||
event_content = {}
|
||||
@@ -158,86 +141,94 @@ class StatsHandler(StateDeltasHandler):
|
||||
if event:
|
||||
event_content = event.content or {}
|
||||
|
||||
# We can't afford for this time to stray into the past, so we count
|
||||
# it as now.
|
||||
stream_timestamp = int(self.clock.time_msec())
|
||||
# We use stream_pos here rather than fetch by event_id as event_id
|
||||
# may be None
|
||||
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
|
||||
|
||||
# All the values in this dict are deltas (RELATIVE changes)
|
||||
room_stats_delta = {}
|
||||
is_newly_created = False
|
||||
|
||||
if prev_event_id is None:
|
||||
# this state event doesn't overwrite another,
|
||||
# so it is a new effective/current state event
|
||||
room_stats_delta["current_state_events"] = 1
|
||||
# quantise time to the nearest bucket
|
||||
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
|
||||
|
||||
if typ == EventTypes.Member:
|
||||
# we could use _get_key_change here but it's a bit inefficient
|
||||
# given we're not testing for a specific result; might as well
|
||||
# just grab the prev_membership and membership strings and
|
||||
# compare them.
|
||||
# We take None rather than leave as a previous membership
|
||||
# in the absence of a previous event because we do not want to
|
||||
# reduce the leave count when a new-to-the-room user joins.
|
||||
prev_membership = None
|
||||
prev_event_content = {}
|
||||
if prev_event_id is not None:
|
||||
prev_event = yield self.store.get_event(
|
||||
prev_event_id, allow_none=True
|
||||
)
|
||||
if prev_event:
|
||||
prev_event_content = prev_event.content
|
||||
prev_membership = prev_event_content.get(
|
||||
"membership", Membership.LEAVE
|
||||
)
|
||||
|
||||
membership = event_content.get("membership", Membership.LEAVE)
|
||||
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
|
||||
|
||||
if prev_membership is None:
|
||||
logger.debug("No previous membership for this user.")
|
||||
elif membership == prev_membership:
|
||||
pass # noop
|
||||
elif prev_membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = -1
|
||||
elif prev_membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = -1
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = -1
|
||||
elif prev_membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = -1
|
||||
else:
|
||||
raise ValueError(
|
||||
"%r is not a valid prev_membership" % (prev_membership,)
|
||||
if prev_membership == membership:
|
||||
continue
|
||||
|
||||
if prev_membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", -1
|
||||
)
|
||||
elif prev_membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", -1
|
||||
)
|
||||
|
||||
if membership == prev_membership:
|
||||
pass # noop
|
||||
if membership == Membership.JOIN:
|
||||
room_stats_delta["joined_members"] = +1
|
||||
elif membership == Membership.INVITE:
|
||||
room_stats_delta["invited_members"] = +1
|
||||
elif membership == Membership.LEAVE:
|
||||
room_stats_delta["left_members"] = +1
|
||||
elif membership == Membership.BAN:
|
||||
room_stats_delta["banned_members"] = +1
|
||||
else:
|
||||
raise ValueError("%r is not a valid membership" % (membership,))
|
||||
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "joined_members", +1
|
||||
)
|
||||
elif membership == Membership.INVITE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "invited_members", +1
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "left_members", +1
|
||||
)
|
||||
elif membership == Membership.BAN:
|
||||
yield self.store.update_stats_delta(
|
||||
now, "room", room_id, "banned_members", +1
|
||||
)
|
||||
else:
|
||||
err = "%s is not a valid membership" % (repr(membership),)
|
||||
logger.error(err)
|
||||
raise ValueError(err)
|
||||
|
||||
user_id = state_key
|
||||
if self.is_mine_id(user_id):
|
||||
# this accounts for transitions like leave → ban and so on.
|
||||
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
|
||||
membership == Membership.JOIN
|
||||
)
|
||||
|
||||
if has_changed_joinedness:
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
field = "public_rooms" if public else "private_rooms"
|
||||
delta = +1 if membership == Membership.JOIN else -1
|
||||
# update user_stats as it's one of our users
|
||||
public = yield self._is_public_room(room_id)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp, "user", user_id, {field: delta}
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
-1,
|
||||
)
|
||||
elif membership == Membership.JOIN:
|
||||
yield self.store.update_stats_delta(
|
||||
now,
|
||||
"user",
|
||||
user_id,
|
||||
"public_rooms" if public else "private_rooms",
|
||||
+1,
|
||||
)
|
||||
|
||||
elif typ == EventTypes.Create:
|
||||
@@ -255,50 +246,28 @@ class StatsHandler(StateDeltasHandler):
|
||||
},
|
||||
)
|
||||
|
||||
is_newly_created = True
|
||||
|
||||
elif typ == EventTypes.JoinRules:
|
||||
old_room_state = yield self.store.get_room_stats_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id, {"join_rules": event_content.get("join_rule")}
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of history_visibility
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["history_visibility"] == "world_readable"
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.RoomHistoryVisibility:
|
||||
old_room_state = yield self.store.get_room_stats_state(room_id)
|
||||
yield self.store.update_room_state(
|
||||
room_id,
|
||||
{"history_visibility": event_content.get("history_visibility")},
|
||||
)
|
||||
|
||||
# whether the room would be public anyway,
|
||||
# because of join_rule
|
||||
other_field_gives_publicity = (
|
||||
old_room_state["join_rules"] == JoinRules.PUBLIC
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
|
||||
if not other_field_gives_publicity:
|
||||
is_public = yield self._get_key_change(
|
||||
prev_event_id, event_id, "history_visibility", "world_readable"
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(
|
||||
stream_timestamp, room_id, is_public
|
||||
)
|
||||
if is_public is not None:
|
||||
yield self.update_public_room_stats(now, room_id, is_public)
|
||||
|
||||
elif typ == EventTypes.Encryption:
|
||||
yield self.store.update_room_state(
|
||||
@@ -321,20 +290,6 @@ class StatsHandler(StateDeltasHandler):
|
||||
room_id, {"canonical_alias": event_content.get("alias")}
|
||||
)
|
||||
|
||||
if is_newly_created:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp,
|
||||
"room",
|
||||
room_id,
|
||||
room_stats_delta,
|
||||
complete_with_stream_id=stream_id,
|
||||
)
|
||||
|
||||
elif len(room_stats_delta) > 0:
|
||||
yield self.store.update_stats_delta(
|
||||
stream_timestamp, "room", room_id, room_stats_delta
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_public_room_stats(self, ts, room_id, is_public):
|
||||
"""
|
||||
@@ -353,13 +308,10 @@ class StatsHandler(StateDeltasHandler):
|
||||
for user_id in user_ids:
|
||||
if self.hs.is_mine(UserID.from_string(user_id)):
|
||||
yield self.store.update_stats_delta(
|
||||
ts,
|
||||
"user",
|
||||
user_id,
|
||||
{
|
||||
"public_rooms": +1 if is_public else -1,
|
||||
"private_rooms": -1 if is_public else +1,
|
||||
},
|
||||
ts, "user", user_id, "public_rooms", +1 if is_public else -1
|
||||
)
|
||||
yield self.store.update_stats_delta(
|
||||
ts, "user", user_id, "private_rooms", -1 if is_public else +1
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -378,7 +378,7 @@ class SyncHandler(object):
|
||||
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
|
||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||
|
||||
return now_token, ephemeral_by_room
|
||||
return (now_token, ephemeral_by_room)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _load_filtered_recents(
|
||||
@@ -1332,7 +1332,7 @@ class SyncHandler(object):
|
||||
)
|
||||
if not tags_by_room:
|
||||
logger.debug("no-oping sync")
|
||||
return [], [], [], []
|
||||
return ([], [], [], [])
|
||||
|
||||
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
|
||||
"m.ignored_user_list", user_id=user_id
|
||||
@@ -1642,7 +1642,7 @@ class SyncHandler(object):
|
||||
)
|
||||
room_entries.append(entry)
|
||||
|
||||
return room_entries, invited, newly_joined_rooms, newly_left_rooms
|
||||
return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_all_rooms(self, sync_result_builder, ignored_users):
|
||||
@@ -1716,7 +1716,7 @@ class SyncHandler(object):
|
||||
)
|
||||
)
|
||||
|
||||
return room_entries, invited, []
|
||||
return (room_entries, invited, [])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _generate_room_entry(
|
||||
|
||||
@@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
|
||||
return self.get_typing_handler()._latest_room_serial
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
return [], pagination_config.from_key
|
||||
return ([], pagination_config.from_key)
|
||||
|
||||
@@ -14,21 +14,21 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from netaddr import AddrFormatError, IPAddress
|
||||
import attr
|
||||
from netaddr import IPAddress
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet.interfaces import IStreamClientEndpoint
|
||||
from twisted.web.client import Agent, HTTPConnectionPool
|
||||
from twisted.web.client import URI, Agent, HTTPConnectionPool
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.iweb import IAgent, IAgentEndpointFactory
|
||||
from twisted.web.iweb import IAgent
|
||||
|
||||
from synapse.http.federation.srv_resolver import Server, SrvResolver
|
||||
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
|
||||
from synapse.http.federation.well_known_resolver import WellKnownResolver
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util import Clock
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -36,9 +36,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@implementer(IAgent)
|
||||
class MatrixFederationAgent(object):
|
||||
"""An Agent-like thing which provides a `request` method which correctly
|
||||
handles resolving matrix server names when using matrix://. Handles standard
|
||||
https URIs as normal.
|
||||
"""An Agent-like thing which provides a `request` method which will look up a matrix
|
||||
server and send an HTTP request to it.
|
||||
|
||||
Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
|
||||
|
||||
@@ -52,9 +51,9 @@ class MatrixFederationAgent(object):
|
||||
SRVResolver impl to use for looking up SRV records. None to use a default
|
||||
implementation.
|
||||
|
||||
_well_known_resolver (WellKnownResolver|None):
|
||||
WellKnownResolver to use to perform well-known lookups. None to use a
|
||||
default implementation.
|
||||
_well_known_cache (TTLCache|None):
|
||||
TTLCache impl for storing cached well-known lookups. None to use a default
|
||||
implementation.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -62,49 +61,49 @@ class MatrixFederationAgent(object):
|
||||
reactor,
|
||||
tls_client_options_factory,
|
||||
_srv_resolver=None,
|
||||
_well_known_resolver=None,
|
||||
_well_known_cache=None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
self._tls_client_options_factory = tls_client_options_factory
|
||||
if _srv_resolver is None:
|
||||
_srv_resolver = SrvResolver()
|
||||
self._srv_resolver = _srv_resolver
|
||||
|
||||
self._pool = HTTPConnectionPool(reactor)
|
||||
self._pool.retryAutomatically = False
|
||||
self._pool.maxPersistentPerHost = 5
|
||||
self._pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
self._agent = Agent.usingEndpointFactory(
|
||||
self._well_known_resolver = WellKnownResolver(
|
||||
self._reactor,
|
||||
MatrixHostnameEndpointFactory(
|
||||
reactor, tls_client_options_factory, _srv_resolver
|
||||
),
|
||||
pool=self._pool,
|
||||
)
|
||||
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
agent=Agent(
|
||||
self._reactor,
|
||||
agent=Agent(
|
||||
self._reactor,
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
),
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
),
|
||||
well_known_cache=_well_known_cache,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(self, method, uri, headers=None, bodyProducer=None):
|
||||
"""
|
||||
Args:
|
||||
method (bytes): HTTP method: GET/POST/etc
|
||||
|
||||
uri (bytes): Absolute URI to be retrieved
|
||||
|
||||
headers (twisted.web.http_headers.Headers|None):
|
||||
HTTP headers to send with the request, or None to
|
||||
send no extra headers.
|
||||
|
||||
bodyProducer (twisted.web.iweb.IBodyProducer|None):
|
||||
An object which can generate bytes to make up the
|
||||
body of this request (for example, the properly encoded contents of
|
||||
a file for a file upload). Or None if the request is to have
|
||||
no body.
|
||||
|
||||
Returns:
|
||||
Deferred[twisted.web.iweb.IResponse]:
|
||||
fires when the header of the response has been received (regardless of the
|
||||
@@ -112,207 +111,210 @@ class MatrixFederationAgent(object):
|
||||
response from being received (including problems that prevent the request
|
||||
from being sent).
|
||||
"""
|
||||
# We use urlparse as that will set `port` to None if there is no
|
||||
# explicit port.
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# If this is a matrix:// URI check if the server has delegated matrix
|
||||
# traffic using well-known delegation.
|
||||
#
|
||||
# We have to do this here and not in the endpoint as we need to rewrite
|
||||
# the host header with the delegated server name.
|
||||
delegated_server = None
|
||||
if (
|
||||
parsed_uri.scheme == b"matrix"
|
||||
and not _is_ip_literal(parsed_uri.hostname)
|
||||
and not parsed_uri.port
|
||||
):
|
||||
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||
parsed_uri.hostname
|
||||
)
|
||||
delegated_server = well_known_result.delegated_server
|
||||
|
||||
if delegated_server:
|
||||
# Ok, the server has delegated matrix traffic to somewhere else, so
|
||||
# lets rewrite the URL to replace the server with the delegated
|
||||
# server name.
|
||||
uri = urllib.parse.urlunparse(
|
||||
(
|
||||
parsed_uri.scheme,
|
||||
delegated_server,
|
||||
parsed_uri.path,
|
||||
parsed_uri.params,
|
||||
parsed_uri.query,
|
||||
parsed_uri.fragment,
|
||||
)
|
||||
)
|
||||
parsed_uri = urllib.parse.urlparse(uri)
|
||||
|
||||
# We need to make sure the host header is set to the netloc of the
|
||||
# server.
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
else:
|
||||
headers = headers.copy()
|
||||
|
||||
if not headers.hasHeader(b"host"):
|
||||
headers.addRawHeader(b"host", parsed_uri.netloc)
|
||||
|
||||
res = yield make_deferred_yieldable(
|
||||
self._agent.request(method, uri, headers, bodyProducer)
|
||||
)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
@implementer(IAgentEndpointFactory)
|
||||
class MatrixHostnameEndpointFactory(object):
|
||||
"""Factory for MatrixHostnameEndpoint for parsing to an Agent.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, tls_client_options_factory, srv_resolver):
|
||||
self._reactor = reactor
|
||||
self._tls_client_options_factory = tls_client_options_factory
|
||||
|
||||
if srv_resolver is None:
|
||||
srv_resolver = SrvResolver()
|
||||
|
||||
self._srv_resolver = srv_resolver
|
||||
|
||||
def endpointForURI(self, parsed_uri):
|
||||
return MatrixHostnameEndpoint(
|
||||
self._reactor,
|
||||
self._tls_client_options_factory,
|
||||
self._srv_resolver,
|
||||
parsed_uri,
|
||||
)
|
||||
|
||||
|
||||
@implementer(IStreamClientEndpoint)
|
||||
class MatrixHostnameEndpoint(object):
|
||||
"""An endpoint that resolves matrix:// URLs using Matrix server name
|
||||
resolution (i.e. via SRV). Does not check for well-known delegation.
|
||||
|
||||
Args:
|
||||
reactor (IReactor)
|
||||
tls_client_options_factory (ClientTLSOptionsFactory|None):
|
||||
factory to use for fetching client tls options, or none to disable TLS.
|
||||
srv_resolver (SrvResolver): The SRV resolver to use
|
||||
parsed_uri (twisted.web.client.URI): The parsed URI that we're wanting
|
||||
to connect to.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, tls_client_options_factory, srv_resolver, parsed_uri):
|
||||
self._reactor = reactor
|
||||
|
||||
self._parsed_uri = parsed_uri
|
||||
parsed_uri = URI.fromBytes(uri, defaultPort=-1)
|
||||
res = yield self._route_matrix_uri(parsed_uri)
|
||||
|
||||
# set up the TLS connection params
|
||||
#
|
||||
# XXX disabling TLS is really only supported here for the benefit of the
|
||||
# unit tests. We should make the UTs cope with TLS rather than having to make
|
||||
# the code support the unit tests.
|
||||
|
||||
if tls_client_options_factory is None:
|
||||
self._tls_options = None
|
||||
if self._tls_client_options_factory is None:
|
||||
tls_options = None
|
||||
else:
|
||||
self._tls_options = tls_client_options_factory.get_options(
|
||||
self._parsed_uri.host.decode("ascii")
|
||||
tls_options = self._tls_client_options_factory.get_options(
|
||||
res.tls_server_name.decode("ascii")
|
||||
)
|
||||
|
||||
self._srv_resolver = srv_resolver
|
||||
# make sure that the Host header is set correctly
|
||||
if headers is None:
|
||||
headers = Headers()
|
||||
else:
|
||||
headers = headers.copy()
|
||||
|
||||
def connect(self, protocol_factory):
|
||||
"""Implements IStreamClientEndpoint interface
|
||||
"""
|
||||
if not headers.hasHeader(b"host"):
|
||||
headers.addRawHeader(b"host", res.host_header)
|
||||
|
||||
return run_in_background(self._do_connect, protocol_factory)
|
||||
class EndpointFactory(object):
|
||||
@staticmethod
|
||||
def endpointForURI(_uri):
|
||||
ep = LoggingHostnameEndpoint(
|
||||
self._reactor, res.target_host, res.target_port
|
||||
)
|
||||
if tls_options is not None:
|
||||
ep = wrapClientTLS(tls_options, ep)
|
||||
return ep
|
||||
|
||||
agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
|
||||
res = yield make_deferred_yieldable(
|
||||
agent.request(method, uri, headers, bodyProducer)
|
||||
)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_connect(self, protocol_factory):
|
||||
first_exception = None
|
||||
def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
|
||||
"""Helper for `request`: determine the routing for a Matrix URI
|
||||
|
||||
server_list = yield self._resolve_server()
|
||||
Args:
|
||||
parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
|
||||
parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
|
||||
if there is no explicit port given.
|
||||
|
||||
for server in server_list:
|
||||
host = server.host
|
||||
port = server.port
|
||||
|
||||
try:
|
||||
logger.info("Connecting to %s:%i", host.decode("ascii"), port)
|
||||
endpoint = HostnameEndpoint(self._reactor, host, port)
|
||||
if self._tls_options:
|
||||
endpoint = wrapClientTLS(self._tls_options, endpoint)
|
||||
result = yield make_deferred_yieldable(
|
||||
endpoint.connect(protocol_factory)
|
||||
)
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
|
||||
)
|
||||
if not first_exception:
|
||||
first_exception = e
|
||||
|
||||
# We return the first failure because that's probably the most interesting.
|
||||
if first_exception:
|
||||
raise first_exception
|
||||
|
||||
# This shouldn't happen as we should always have at least one host/port
|
||||
# to try and if that doesn't work then we'll have an exception.
|
||||
raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _resolve_server(self):
|
||||
"""Resolves the server name to a list of hosts and ports to attempt to
|
||||
connect to.
|
||||
lookup_well_known (bool): True if we should look up the .well-known file if
|
||||
there is no SRV record.
|
||||
|
||||
Returns:
|
||||
Deferred[list[Server]]
|
||||
Deferred[_RoutingResult]
|
||||
"""
|
||||
# check for an IP literal
|
||||
try:
|
||||
ip_address = IPAddress(parsed_uri.host.decode("ascii"))
|
||||
except Exception:
|
||||
# not an IP address
|
||||
ip_address = None
|
||||
|
||||
if self._parsed_uri.scheme != b"matrix":
|
||||
return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
|
||||
if ip_address:
|
||||
port = parsed_uri.port
|
||||
if port == -1:
|
||||
port = 8448
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=parsed_uri.host,
|
||||
target_port=port,
|
||||
)
|
||||
|
||||
# Note: We don't do well-known lookup as that needs to have happened
|
||||
# before now, due to needing to rewrite the Host header of the HTTP
|
||||
# request.
|
||||
if parsed_uri.port != -1:
|
||||
# there is an explicit port
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=parsed_uri.host,
|
||||
target_port=parsed_uri.port,
|
||||
)
|
||||
|
||||
# We reparse the URI so that defaultPort is -1 rather than 80
|
||||
parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
|
||||
if lookup_well_known:
|
||||
# try a .well-known lookup
|
||||
well_known_result = yield self._well_known_resolver.get_well_known(
|
||||
parsed_uri.host
|
||||
)
|
||||
well_known_server = well_known_result.delegated_server
|
||||
|
||||
host = parsed_uri.hostname
|
||||
port = parsed_uri.port
|
||||
if well_known_server:
|
||||
# if we found a .well-known, start again, but don't do another
|
||||
# .well-known lookup.
|
||||
|
||||
# If there is an explicit port or the host is an IP address we bypass
|
||||
# SRV lookups and just use the given host/port.
|
||||
if port or _is_ip_literal(host):
|
||||
return [Server(host, port or 8448)]
|
||||
# parse the server name in the .well-known response into host/port.
|
||||
# (This code is lifted from twisted.web.client.URI.fromBytes).
|
||||
if b":" in well_known_server:
|
||||
well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
|
||||
try:
|
||||
well_known_port = int(well_known_port)
|
||||
except ValueError:
|
||||
# the part after the colon could not be parsed as an int
|
||||
# - we assume it is an IPv6 literal with no port (the closing
|
||||
# ']' stops it being parsed as an int)
|
||||
well_known_host, well_known_port = well_known_server, -1
|
||||
else:
|
||||
well_known_host, well_known_port = well_known_server, -1
|
||||
|
||||
server_list = yield self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
|
||||
new_uri = URI(
|
||||
scheme=parsed_uri.scheme,
|
||||
netloc=well_known_server,
|
||||
host=well_known_host,
|
||||
port=well_known_port,
|
||||
path=parsed_uri.path,
|
||||
params=parsed_uri.params,
|
||||
query=parsed_uri.query,
|
||||
fragment=parsed_uri.fragment,
|
||||
)
|
||||
|
||||
if server_list:
|
||||
return server_list
|
||||
res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
|
||||
return res
|
||||
|
||||
# No SRV records, so we fallback to host and 8448
|
||||
return [Server(host, 8448)]
|
||||
# try a SRV lookup
|
||||
service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
|
||||
server_list = yield self._srv_resolver.resolve_service(service_name)
|
||||
|
||||
if not server_list:
|
||||
target_host = parsed_uri.host
|
||||
port = 8448
|
||||
logger.debug(
|
||||
"No SRV record for %s, using %s:%i",
|
||||
parsed_uri.host.decode("ascii"),
|
||||
target_host.decode("ascii"),
|
||||
port,
|
||||
)
|
||||
else:
|
||||
target_host, port = pick_server_from_list(server_list)
|
||||
logger.debug(
|
||||
"Picked %s:%i from SRV records for %s",
|
||||
target_host.decode("ascii"),
|
||||
port,
|
||||
parsed_uri.host.decode("ascii"),
|
||||
)
|
||||
|
||||
return _RoutingResult(
|
||||
host_header=parsed_uri.netloc,
|
||||
tls_server_name=parsed_uri.host,
|
||||
target_host=target_host,
|
||||
target_port=port,
|
||||
)
|
||||
|
||||
|
||||
def _is_ip_literal(host):
|
||||
"""Test if the given host name is either an IPv4 or IPv6 literal.
|
||||
@implementer(IStreamClientEndpoint)
|
||||
class LoggingHostnameEndpoint(object):
|
||||
"""A wrapper for HostnameEndpint which logs when it connects"""
|
||||
|
||||
Args:
|
||||
host (bytes)
|
||||
def __init__(self, reactor, host, port, *args, **kwargs):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
|
||||
|
||||
Returns:
|
||||
bool
|
||||
def connect(self, protocol_factory):
|
||||
logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
|
||||
return self.ep.connect(protocol_factory)
|
||||
|
||||
|
||||
@attr.s
|
||||
class _RoutingResult(object):
|
||||
"""The result returned by `_route_matrix_uri`.
|
||||
|
||||
Contains the parameters needed to direct a federation connection to a particular
|
||||
server.
|
||||
|
||||
Where a SRV record points to several servers, this object contains a single server
|
||||
chosen from the list.
|
||||
"""
|
||||
|
||||
host = host.decode("ascii")
|
||||
host_header = attr.ib()
|
||||
"""
|
||||
The value we should assign to the Host header (host:port from the matrix
|
||||
URI, or .well-known).
|
||||
|
||||
try:
|
||||
IPAddress(host)
|
||||
return True
|
||||
except AddrFormatError:
|
||||
return False
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
tls_server_name = attr.ib()
|
||||
"""
|
||||
The server name we should set in the SNI (typically host, without port, from the
|
||||
matrix URI or .well-known)
|
||||
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
target_host = attr.ib()
|
||||
"""
|
||||
The hostname (or IP literal) we should route the TCP connection to (the target of the
|
||||
SRV record, or the hostname from the URL/.well-known)
|
||||
|
||||
:type: bytes
|
||||
"""
|
||||
|
||||
target_port = attr.ib()
|
||||
"""
|
||||
The port we should route the TCP connection to (the target of the SRV record, or
|
||||
the port from the URL/.well-known, or 8448)
|
||||
|
||||
:type: int
|
||||
"""
|
||||
|
||||
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
|
||||
SERVER_CACHE = {}
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
@attr.s
|
||||
class Server(object):
|
||||
"""
|
||||
Our record of an individual server which can be tried to reach a destination.
|
||||
@@ -53,47 +53,34 @@ class Server(object):
|
||||
expires = attr.ib(default=0)
|
||||
|
||||
|
||||
def _sort_server_list(server_list):
|
||||
"""Given a list of SRV records sort them into priority order and shuffle
|
||||
each priority with the given weight.
|
||||
def pick_server_from_list(server_list):
|
||||
"""Randomly choose a server from the server list
|
||||
|
||||
Args:
|
||||
server_list (list[Server]): list of candidate servers
|
||||
|
||||
Returns:
|
||||
Tuple[bytes, int]: (host, port) pair for the chosen server
|
||||
"""
|
||||
priority_map = {}
|
||||
if not server_list:
|
||||
raise RuntimeError("pick_server_from_list called with empty list")
|
||||
|
||||
for server in server_list:
|
||||
priority_map.setdefault(server.priority, []).append(server)
|
||||
# TODO: currently we only use the lowest-priority servers. We should maintain a
|
||||
# cache of servers known to be "down" and filter them out
|
||||
|
||||
results = []
|
||||
for priority in sorted(priority_map):
|
||||
servers = priority_map[priority]
|
||||
min_priority = min(s.priority for s in server_list)
|
||||
eligible_servers = list(s for s in server_list if s.priority == min_priority)
|
||||
total_weight = sum(s.weight for s in eligible_servers)
|
||||
target_weight = random.randint(0, total_weight)
|
||||
|
||||
# This algorithms roughly follows the algorithm described in RFC2782,
|
||||
# changed to remove an off-by-one error.
|
||||
#
|
||||
# N.B. Weights can be zero, which means that they should be picked
|
||||
# rarely.
|
||||
for s in eligible_servers:
|
||||
target_weight -= s.weight
|
||||
|
||||
total_weight = sum(s.weight for s in servers)
|
||||
if target_weight <= 0:
|
||||
return s.host, s.port
|
||||
|
||||
# Total weight can become zero if there are only zero weight servers
|
||||
# left, which we handle by just shuffling and appending to the results.
|
||||
while servers and total_weight:
|
||||
target_weight = random.randint(1, total_weight)
|
||||
|
||||
for s in servers:
|
||||
target_weight -= s.weight
|
||||
|
||||
if target_weight <= 0:
|
||||
break
|
||||
|
||||
results.append(s)
|
||||
servers.remove(s)
|
||||
total_weight -= s.weight
|
||||
|
||||
if servers:
|
||||
random.shuffle(servers)
|
||||
results.extend(servers)
|
||||
|
||||
return results
|
||||
# this should be impossible.
|
||||
raise RuntimeError("pick_server_from_list got to end of eligible server list.")
|
||||
|
||||
|
||||
class SrvResolver(object):
|
||||
@@ -133,7 +120,7 @@ class SrvResolver(object):
|
||||
if cache_entry:
|
||||
if all(s.expires > now for s in cache_entry):
|
||||
servers = list(cache_entry)
|
||||
return _sort_server_list(servers)
|
||||
return servers
|
||||
|
||||
try:
|
||||
answers, _, _ = yield make_deferred_yieldable(
|
||||
@@ -182,4 +169,4 @@ class SrvResolver(object):
|
||||
)
|
||||
|
||||
self._cache[service_name] = list(servers)
|
||||
return _sort_server_list(servers)
|
||||
return servers
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user