1
0

Compare commits

..

3 Commits

Author SHA1 Message Date
Richard van der Hoff
42044c1a5c prepare v0.33.2.1 2018-09-05 17:27:23 +01:00
Richard van der Hoff
a5a0bf5cf7 Fix origin handling for pushed transactions
Use the actual origin for push transactions, rather than whatever the remote
server claimed.
2018-09-05 17:06:57 +01:00
Richard van der Hoff
5bf8bc79eb Check that signatures on events are valid
We should check that both the sender's server, and the server which created the
event_id (which may be different from whatever the remote server has told us
the origin is), have signed the event.
2018-09-05 17:05:08 +01:00
287 changed files with 5267 additions and 9267 deletions

View File

@@ -1,114 +0,0 @@
version: 2
jobs:
sytestpy2:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2merged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2postgresmerged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3merged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3postgresmerged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
workflows:
version: 2
build:
jobs:
- sytestpy2
- sytestpy2postgres
- sytestpy2merged:
filters:
branches:
ignore: /develop|master/
- sytestpy2postgresmerged:
filters:
branches:
ignore: /develop|master/
# Currently broken while the Python 3 port is incomplete
# - sytestpy3
# - sytestpy3postgres

View File

@@ -1,31 +0,0 @@
#!/usr/bin/env bash
set -e
# CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful.
# In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL.
echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV
source $BASH_ENV
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
then
echo "Can't figure out what the PR number is!"
exit 1
fi
# Get the reference, using the GitHub API
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
# Show what we are before
git show -s
# Set up username so it can do a merge
git config --global user.email bot@matrix.org
git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
git merge --no-edit origin/$GITBASE
# Show what we are after.
git show -s

View File

@@ -3,5 +3,3 @@ Dockerfile
.gitignore
demo/etc
tox.ini
.git/*
.tox/*

1
.gitignore vendored
View File

@@ -44,7 +44,6 @@ media_store/
build/
venv/
venv*/
*venv/
localhost-800*/
static/client/register/register_config.js

View File

@@ -20,11 +20,6 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
services:
- postgresql
- python: 3.6
env: TOX_ENV=py36

View File

@@ -1,19 +1,4 @@
Synapse 0.33.4 (2018-09-07)
===========================
Internal Changes
----------------
- Unignore synctl in .dockerignore to fix docker builds ([\#3802](https://github.com/matrix-org/synapse/issues/3802))
Synapse 0.33.4rc2 (2018-09-06)
==============================
Pull in security fixes from v0.33.3.1
Synapse 0.33.3.1 (2018-09-06)
Synapse 0.33.2.1 (2018-09-06)
=============================
SECURITY FIXES
@@ -22,138 +7,6 @@ SECURITY FIXES
- Fix an issue where event signatures were not always correctly validated ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
- Fix an issue where server_acls could be circumvented for incoming events ([\#3796](https://github.com/matrix-org/synapse/issues/3796))
Internal Changes
----------------
- Unignore synctl in .dockerignore to fix docker builds ([\#3802](https://github.com/matrix-org/synapse/issues/3802))
Synapse 0.33.4rc1 (2018-09-04)
==============================
Features
--------
- Support profile API endpoints on workers ([\#3659](https://github.com/matrix-org/synapse/issues/3659))
- Server notices for resource limit blocking ([\#3680](https://github.com/matrix-org/synapse/issues/3680))
- Allow guests to use /rooms/:roomId/event/:eventId ([\#3724](https://github.com/matrix-org/synapse/issues/3724))
- Add mau_trial_days config param, so that users only get counted as MAU after N days. ([\#3749](https://github.com/matrix-org/synapse/issues/3749))
- Require twisted 17.1 or later (fixes [#3741](https://github.com/matrix-org/synapse/issues/3741)). ([\#3751](https://github.com/matrix-org/synapse/issues/3751))
Bugfixes
--------
- Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues ([\#3722](https://github.com/matrix-org/synapse/issues/3722))
- Fix bug where we resent "limit exceeded" server notices repeatedly ([\#3747](https://github.com/matrix-org/synapse/issues/3747))
- Fix bug where we broke sync when using limit_usage_by_mau but hadn't configured server notices ([\#3753](https://github.com/matrix-org/synapse/issues/3753))
- Fix 'federation_domain_whitelist' such that an empty list correctly blocks all outbound federation traffic ([\#3754](https://github.com/matrix-org/synapse/issues/3754))
- Fix tagging of server notice rooms ([\#3755](https://github.com/matrix-org/synapse/issues/3755), [\#3756](https://github.com/matrix-org/synapse/issues/3756))
- Fix 'admin_uri' config variable and error parameter to be 'admin_contact' to match the spec. ([\#3758](https://github.com/matrix-org/synapse/issues/3758))
- Don't return non-LL-member state in incremental sync state blocks ([\#3760](https://github.com/matrix-org/synapse/issues/3760))
- Fix bug in sending presence over federation ([\#3768](https://github.com/matrix-org/synapse/issues/3768))
- Fix bug where preserved threepid user comes to sign up and server is mau blocked ([\#3777](https://github.com/matrix-org/synapse/issues/3777))
Internal Changes
----------------
- Removed the link to the unmaintained matrix-synapse-auto-deploy project from the readme. ([\#3378](https://github.com/matrix-org/synapse/issues/3378))
- Refactor state module to support multiple room versions ([\#3673](https://github.com/matrix-org/synapse/issues/3673))
- The synapse.storage module has been ported to Python 3. ([\#3725](https://github.com/matrix-org/synapse/issues/3725))
- Split the state_group_cache into member and non-member state events (and so speed up LL /sync) ([\#3726](https://github.com/matrix-org/synapse/issues/3726))
- Log failure to authenticate remote servers as warnings (without stack traces) ([\#3727](https://github.com/matrix-org/synapse/issues/3727))
- The CONTRIBUTING guidelines have been updated to mention our use of Markdown and that .misc files have content. ([\#3730](https://github.com/matrix-org/synapse/issues/3730))
- Reference the need for an HTTP replication port when using the federation_reader worker ([\#3734](https://github.com/matrix-org/synapse/issues/3734))
- Fix minor spelling error in federation client documentation. ([\#3735](https://github.com/matrix-org/synapse/issues/3735))
- Remove redundant state resolution function ([\#3737](https://github.com/matrix-org/synapse/issues/3737))
- The test suite now passes on PostgreSQL. ([\#3740](https://github.com/matrix-org/synapse/issues/3740))
- Fix MAU cache invalidation due to missing yield ([\#3746](https://github.com/matrix-org/synapse/issues/3746))
- Make sure that we close db connections opened during init ([\#3764](https://github.com/matrix-org/synapse/issues/3764))
Synapse 0.33.3 (2018-08-22)
===========================
Bugfixes
--------
- Fix bug introduced in v0.33.3rc1 which made the ToS give a 500 error ([\#3732](https://github.com/matrix-org/synapse/issues/3732))
Synapse 0.33.3rc2 (2018-08-21)
==============================
Bugfixes
--------
- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs ([\#3723](https://github.com/matrix-org/synapse/issues/3723))
Synapse 0.33.3rc1 (2018-08-21)
==============================
Features
--------
- Add support for the SNI extension to federation TLS connections. Thanks to @vojeroen! ([\#3439](https://github.com/matrix-org/synapse/issues/3439))
- Add /_media/r0/config ([\#3184](https://github.com/matrix-org/synapse/issues/3184))
- speed up /members API and add `at` and `membership` params as per MSC1227 ([\#3568](https://github.com/matrix-org/synapse/issues/3568))
- implement `summary` block in /sync response as per MSC688 ([\#3574](https://github.com/matrix-org/synapse/issues/3574))
- Add lazy-loading support to /messages as per MSC1227 ([\#3589](https://github.com/matrix-org/synapse/issues/3589))
- Add ability to limit number of monthly active users on the server ([\#3633](https://github.com/matrix-org/synapse/issues/3633))
- Support more federation endpoints on workers ([\#3653](https://github.com/matrix-org/synapse/issues/3653))
- Basic support for room versioning ([\#3654](https://github.com/matrix-org/synapse/issues/3654))
- Ability to disable client/server Synapse via conf toggle ([\#3655](https://github.com/matrix-org/synapse/issues/3655))
- Ability to whitelist specific threepids against monthly active user limiting ([\#3662](https://github.com/matrix-org/synapse/issues/3662))
- Add some metrics for the appservice and federation event sending loops ([\#3664](https://github.com/matrix-org/synapse/issues/3664))
- Where server is disabled, block ability for locked out users to read new messages ([\#3670](https://github.com/matrix-org/synapse/issues/3670))
- set admin uri via config, to be used in error messages where the user should contact the administrator ([\#3687](https://github.com/matrix-org/synapse/issues/3687))
- Synapse's presence functionality can now be disabled with the "use_presence" configuration option. ([\#3694](https://github.com/matrix-org/synapse/issues/3694))
- For resource limit blocked users, prevent writing into rooms ([\#3708](https://github.com/matrix-org/synapse/issues/3708))
Bugfixes
--------
- Fix occasional glitches in the synapse_event_persisted_position metric ([\#3658](https://github.com/matrix-org/synapse/issues/3658))
- Fix bug on deleting 3pid when using identity servers that don't support unbind API ([\#3661](https://github.com/matrix-org/synapse/issues/3661))
- Make the tests pass on Twisted < 18.7.0 ([\#3676](https://github.com/matrix-org/synapse/issues/3676))
- Dont ship recaptcha_ajax.js, use it directly from Google ([\#3677](https://github.com/matrix-org/synapse/issues/3677))
- Fixes test_reap_monthly_active_users so it passes under postgres ([\#3681](https://github.com/matrix-org/synapse/issues/3681))
- Fix mau blocking calulation bug on login ([\#3689](https://github.com/matrix-org/synapse/issues/3689))
- Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users ([\#3692](https://github.com/matrix-org/synapse/issues/3692))
- Improve HTTP request logging to include all requests ([\#3700](https://github.com/matrix-org/synapse/issues/3700))
- Avoid timing out requests while we are streaming back the response ([\#3701](https://github.com/matrix-org/synapse/issues/3701))
- Support more federation endpoints on workers ([\#3705](https://github.com/matrix-org/synapse/issues/3705), [\#3713](https://github.com/matrix-org/synapse/issues/3713))
- Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning ([\#3710](https://github.com/matrix-org/synapse/issues/3710))
- Fix bug where `state_cache` cache factor ignored environment variables ([\#3719](https://github.com/matrix-org/synapse/issues/3719))
Deprecations and Removals
-------------------------
- The Shared-Secret registration method of the legacy v1/register REST endpoint has been removed. For a replacement, please see [the admin/register API documentation](https://github.com/matrix-org/synapse/blob/master/docs/admin_api/register_api.rst). ([\#3703](https://github.com/matrix-org/synapse/issues/3703))
Internal Changes
----------------
- The test suite now can run under PostgreSQL. ([\#3423](https://github.com/matrix-org/synapse/issues/3423))
- Refactor HTTP replication endpoints to reduce code duplication ([\#3632](https://github.com/matrix-org/synapse/issues/3632))
- Tests now correctly execute on Python 3. ([\#3647](https://github.com/matrix-org/synapse/issues/3647))
- Sytests can now be run inside a Docker container. ([\#3660](https://github.com/matrix-org/synapse/issues/3660))
- Port over enough to Python 3 to allow the sytests to start. ([\#3668](https://github.com/matrix-org/synapse/issues/3668))
- Update docker base image from alpine 3.7 to 3.8. ([\#3669](https://github.com/matrix-org/synapse/issues/3669))
- Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7. ([\#3678](https://github.com/matrix-org/synapse/issues/3678))
- Synapse's tests are now formatted with the black autoformatter. ([\#3679](https://github.com/matrix-org/synapse/issues/3679))
- Implemented a new testing base class to reduce test boilerplate. ([\#3684](https://github.com/matrix-org/synapse/issues/3684))
- Rename MAU prometheus metrics ([\#3690](https://github.com/matrix-org/synapse/issues/3690))
- add new error type ResourceLimit ([\#3707](https://github.com/matrix-org/synapse/issues/3707))
- Logcontexts for replication command handlers ([\#3709](https://github.com/matrix-org/synapse/issues/3709))
- Update admin register API documentation to reference a real user ID. ([\#3712](https://github.com/matrix-org/synapse/issues/3712))
Synapse 0.33.2 (2018-08-09)
===========================
@@ -180,7 +33,7 @@ Features
Bugfixes
--------
- Make /directory/list API return 404 for room not found instead of 400. Thanks to @fuzzmz! ([\#3620](https://github.com/matrix-org/synapse/issues/3620))
- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952))
- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391))
- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514))
- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520))

View File

@@ -30,11 +30,11 @@ use github's pull request workflow to review the contribution, and either ask
you to make any refinements needed or merge it and make them ourselves. The
changes will then land on master when we next do a release.
We use `Jenkins <http://matrix.org/jenkins>`_ and
We use `Jenkins <http://matrix.org/jenkins>`_ and
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
integration. All pull requests to synapse get automatically tested by Travis;
the Jenkins builds require an adminstrator to start them. If your change
breaks the build, this will be shown in github, so please keep an eye on the
pull request for feedback.
Code style
@@ -56,18 +56,17 @@ entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``PRnumber.type``. The type can be
file named in the format of ``issuenumberOrPR.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain Markdown
formatting. Adding credits to the changelog is encouraged, we value
your contributions and would like to have you shouted out in the
release notes!
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix in PR #1234 would have its changelog entry in
``changelog.d/1234.bugfix``, and contain content like "The security levels of
Florbs are now validated when recieved over federation. Contributed by Jane
Matrix".
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
Attribution
~~~~~~~~~~~
@@ -126,7 +125,7 @@ the contribution or otherwise have the right to contribute it to Matrix::
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.
If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment::

View File

@@ -36,4 +36,3 @@ recursive-include changelog.d *
prune .github
prune demo/etc
prune docker
prune .circleci

View File

@@ -167,6 +167,11 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
tested with VirtualBox/AWS/DigitalOcean - see
https://github.com/EMnify/matrix-synapse-auto-deploy
for details.
Configuring synapse
-------------------
@@ -742,18 +747,6 @@ so an example nginx configuration might look like::
}
}
and an example apache configuration may look like::
<VirtualHost *:443>
SSLEngine on
ServerName matrix.example.com;
<Location /_matrix>
ProxyPass http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse http://127.0.0.1:8008/_matrix
</Location>
</VirtualHost>
You will also want to set ``bind_addresses: ['127.0.0.1']`` and ``x_forwarded: true``
for port 8008 in ``homeserver.yaml`` to ensure that client IP addresses are
recorded correctly.

View File

@@ -1 +0,0 @@
CircleCI tests now run on the potential merge of a PR.

View File

@@ -1 +0,0 @@
http/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
Remove connection ID for replication prometheus metrics, as it creates a large number of new series.

View File

@@ -1 +0,0 @@
Improve human readable error messages for threepid registration/account update

View File

@@ -1 +0,0 @@
Implement `event_format` filter param in `/sync`

View File

@@ -1 +0,0 @@
Make /sync slightly faster by avoiding needless copies

View File

@@ -1 +0,0 @@
guest users should not be part of mau total

View File

@@ -1 +0,0 @@
handlers/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
Bump dependency on pyopenssl 16.x, to avoid incompatibility with recent Twisted.

View File

@@ -1 +0,0 @@
Limit the number of PDUs/EDUs per federation transaction

View File

@@ -1 +0,0 @@
Only start postgres instance for postgres tests on Travis CI

View File

@@ -1 +0,0 @@
tests/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
Fix existing room tags not coming down sync when joining a room

View File

@@ -1 +0,0 @@
crypto/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
rest/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
Fix jwt import check

View File

@@ -1 +0,0 @@
add some logging for the keyring queue

View File

@@ -1 +0,0 @@
speed up lazy loading by 2-3x

View File

@@ -1 +0,0 @@
Improved Dockerfile to remove build requirements after building reducing the image size.

View File

@@ -1 +0,0 @@
fix VOIP crashes under Python 3 (#3821)

View File

@@ -1 +0,0 @@
Disable lazy loading for incremental syncs for now

View File

@@ -1 +0,0 @@
Fix manhole so that it works with latest openssh clients

View File

@@ -1 +0,0 @@
Fix outbound requests occasionally wedging, which can result in federation breaking between servers.

View File

@@ -1 +0,0 @@
Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users

View File

@@ -1 +0,0 @@
federation/ is now ported to Python 3.

View File

@@ -1 +0,0 @@
Show heroes if room name/canonical alias has been deleted

View File

@@ -1 +0,0 @@
Log when we retry outbound requests

View File

@@ -1 +0,0 @@
Removed some excess logging messages.

View File

@@ -1 +0,0 @@
Speed up purge history for rooms that have been previously purged

View File

@@ -1 +0,0 @@
Refactor some HTTP timeout code.

View File

@@ -1 +0,0 @@
Fix running merged builds on CircleCI

View File

@@ -1 +0,0 @@
Fix handling of redacted events from federation

View File

@@ -54,7 +54,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1533598785368,
"iteration": 1533026624326,
"links": [
{
"asDropdown": true,
@@ -4629,7 +4629,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 29
"y": 11
},
"id": 67,
"legend": {
@@ -4655,11 +4655,11 @@
"steppedLine": false,
"targets": [
{
"expr": " synapse_event_persisted_position{instance=\"$instance\",job=\"synapse\"} - ignoring(index, job, name) group_right() synapse_event_processing_positions{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
"expr": " synapse_event_persisted_position{instance=\"$instance\"} - ignoring(index, job, name) group_right(instance) synapse_event_processing_positions{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{job}}-{{index}} ",
"legendFormat": "{{job}}-{{index}}",
"refId": "A"
}
],
@@ -4697,11 +4697,7 @@
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
]
},
{
"aliasColors": {},
@@ -4714,7 +4710,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 29
"y": 11
},
"id": 71,
"legend": {
@@ -4782,11 +4778,7 @@
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
]
}
],
"title": "Event processing loop positions",
@@ -4965,5 +4957,5 @@
"timezone": "",
"title": "Synapse",
"uid": "000000012",
"version": 127
"version": 125
}

View File

@@ -1,8 +1,6 @@
FROM docker.io/python:2-alpine3.8
FROM docker.io/python:2-alpine3.7
COPY . /synapse
RUN apk add --no-cache --virtual .build_deps \
RUN apk add --no-cache --virtual .nacl_deps \
build-base \
libffi-dev \
libjpeg-turbo-dev \
@@ -10,16 +8,13 @@ RUN apk add --no-cache --virtual .build_deps \
libxslt-dev \
linux-headers \
postgresql-dev \
zlib-dev \
&& cd /synapse \
&& apk add --no-cache --virtual .runtime_deps \
libffi \
libjpeg-turbo \
libressl \
libxslt \
libpq \
zlib \
su-exec \
su-exec \
zlib-dev
COPY . /synapse
# A wheel cache may be provided in ./cache for faster build
RUN cd /synapse \
&& pip install --upgrade \
lxml \
pip \
@@ -31,9 +26,8 @@ RUN apk add --no-cache --virtual .build_deps \
&& rm -rf \
setup.cfg \
setup.py \
synapse \
&& apk del .build_deps
synapse
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp

View File

@@ -33,7 +33,7 @@ As an example::
< {
"access_token": "token_here",
"user_id": "@pepper_roni:localhost",
"user_id": "@pepper_roni@test",
"home_server": "test",
"device_id": "device_id_here"
}

View File

@@ -74,7 +74,7 @@ replication endpoints that it's talking to on the main synapse process.
``worker_replication_port`` should point to the TCP replication listener port and
``worker_replication_http_port`` should point to the HTTP replication port.
Currently, the ``event_creator`` and ``federation_reader`` workers require specifying
Currently, only the ``event_creator`` worker requires specifying
``worker_replication_http_port``.
For instance::
@@ -173,23 +173,10 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
^/_matrix/federation/v1/query/
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
instance.
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -241,14 +228,6 @@ regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
If ``use_presence`` is False in the homeserver config, it can also handle REST
endpoints matching the following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
This "stub" presence handler will pass through ``GET`` request but make the
``PUT`` effectively a no-op.
It will proxy any requests it cannot handle to the main synapse instance. It
must therefore be configured with the location of the main instance, via
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
@@ -265,7 +244,6 @@ Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.

View File

@@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5'
$TOX_BIN/pip install 'pip>=10'
{ python synapse/python_dependencies.py
echo lxml
echo lxml psycopg2
} | xargs $TOX_BIN/pip install

View File

@@ -17,14 +17,13 @@ ignore =
[pep8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik
# doesn't like it. E203 is contrary to PEP8. E731 is silly.
ignore = W503,E203,E731
# doesn't like it. E203 is contrary to PEP8.
ignore = W503,E203
[flake8]
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
# pep8 to do those checks), but not the "max-line-length" setting
max-line-length = 90
ignore=W503,E203,E731
[isort]
line_length = 89

View File

@@ -17,14 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
try:
from twisted.internet import protocol
from twisted.internet.protocol import Factory
from twisted.names.dns import DNSDatagramProtocol
protocol.Factory.noisy = False
Factory.noisy = False
DNSDatagramProtocol.noisy = False
except ImportError:
pass
__version__ = "0.33.4"
__version__ = "0.33.2.1"

View File

@@ -25,8 +25,7 @@ from twisted.internet import defer
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, ResourceLimitError
from synapse.config.server import is_threepid_reserved
from synapse.api.errors import AuthError, Codes
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@@ -212,9 +211,9 @@ class Auth(object):
user_agent = request.requestHeaders.getRawHeaders(
b"User-Agent",
default=[b""]
)[0].decode('ascii', 'surrogateescape')
)[0]
if user and access_token and ip_addr:
yield self.store.insert_client_ip(
self.store.insert_client_ip(
user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
@@ -683,7 +682,7 @@ class Auth(object):
Returns:
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get(b"access_token")
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@@ -699,7 +698,7 @@ class Auth(object):
401 since some of the old clients depended on auth errors returning
403.
Returns:
unicode: The access_token
str: The access_token
Raises:
AuthError: If there isn't an access_token in the request.
"""
@@ -721,9 +720,9 @@ class Auth(object):
"Too many Authorization headers.",
errcode=Codes.MISSING_TOKEN,
)
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
return parts[1].decode('ascii')
parts = auth_headers[0].split(" ")
if parts[0] == "Bearer" and len(parts) == 2:
return parts[1]
else:
raise AuthError(
token_not_found_http_status,
@@ -739,7 +738,7 @@ class Auth(object):
errcode=Codes.MISSING_TOKEN
)
return query_params[0].decode('ascii')
return query_params[0]
@defer.inlineCallbacks
def check_in_room_or_world_readable(self, room_id, user_id):
@@ -774,58 +773,3 @@ class Auth(object):
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None, threepid=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
threepid(dict|None): If present, checks for presence against configured
reserved threepid. Used in cases where the user is trying register
with a MAU blocked server, normally they would be rejected but their
threepid is on the reserved list. user_id and
threepid should never be set at the same time.
"""
# Never fail an auth check for the server notices users
# This can be a problem where event creation is prohibited due to blocking
if user_id == self.hs.config.server_notices_mxid:
return
if self.hs.config.hs_disabled:
raise ResourceLimitError(
403, self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=self.hs.config.admin_contact,
limit_type=self.hs.config.hs_disabled_limit_type
)
if self.hs.config.limit_usage_by_mau is True:
assert not (user_id and threepid)
# If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
is_trial = yield self.store.is_trial_user(user_id)
if is_trial:
return
elif threepid:
# If the user does not exist yet, but is signing up with a
# reserved threepid then pass auth check
if is_threepid_reserved(self.hs.config, threepid):
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
admin_contact=self.hs.config.admin_contact,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type="monthly_active_user"
)

View File

@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -78,7 +77,6 @@ class EventTypes(object):
Name = "m.room.name"
ServerACL = "m.room.server_acl"
Pinned = "m.room.pinned_events"
class RejectedReason(object):
@@ -96,19 +94,3 @@ class RoomCreationPreset(object):
class ThirdPartyEntityKind(object):
USER = "user"
LOCATION = "location"
class RoomVersions(object):
V1 = "1"
VDH_TEST = "vdh-test-version"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
ServerNoticeMsgType = "m.server_notice"
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -56,9 +55,7 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
class CodeMessageException(RuntimeError):
@@ -231,30 +228,6 @@ class AuthError(SynapseError):
super(AuthError, self).__init__(*args, **kwargs)
class ResourceLimitError(SynapseError):
"""
Any error raised when there is a problem with resource usage.
For instance, the monthly active user limit for the server has been exceeded
"""
def __init__(
self, code, msg,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_contact=None,
limit_type=None,
):
self.admin_contact = admin_contact
self.limit_type = limit_type
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
admin_contact=self.admin_contact,
limit_type=self.limit_type
)
class EventSizeError(SynapseError):
"""An error raised when an event is too big."""
@@ -312,27 +285,6 @@ class LimitExceededError(SynapseError):
)
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""
def __init__(self, room_version):
super(IncompatibleRoomVersionError, self).__init__(
code=400,
msg="Your homeserver does not support the features required to "
"join this room",
errcode=Codes.INCOMPATIBLE_ROOM_VERSION,
)
self._room_version = room_version
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
room_version=self._room_version,
)
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
""" Utility method for constructing an error response for client-server
interactions.

View File

@@ -251,7 +251,6 @@ class FilterCollection(object):
"include_leave", False
)
self.event_fields = filter_json.get("event_fields", [])
self.event_format = filter_json.get("event_format", "client")
def __repr__(self):
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)

View File

@@ -72,7 +72,7 @@ class Ratelimiter(object):
return allowed, time_allowed
def prune_message_counts(self, time_now_s):
for user_id in list(self.message_counts.keys()):
for user_id in self.message_counts.keys():
message_count, time_start, msg_rate_hz = (
self.message_counts[user_id]
)

View File

@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
def listen_tcp(bind_addresses, port, factory, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
@@ -156,9 +156,7 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
check_bind_error(e, address, bind_addresses)
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
"""
Create an SSL socket for a port and several addresses
"""

View File

@@ -51,7 +51,10 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
DATASTORE_CLASS = AppserviceSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -114,9 +117,8 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()

View File

@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedTransactionStore,
TransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
@@ -74,7 +74,10 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
DATASTORE_CLASS = ClientReaderSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -165,13 +168,11 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -43,13 +43,8 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
RoomMembershipRestServlet,
@@ -58,7 +53,6 @@ from synapse.rest.client.v1.room import (
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
@@ -68,11 +62,8 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
DirectoryStore,
SlavedTransactionStore,
TransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
@@ -90,7 +81,10 @@ class EventCreatorSlavedStore(
class EventCreatorServer(HomeServer):
DATASTORE_CLASS = EventCreatorSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -107,9 +101,6 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -183,13 +174,11 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -32,17 +32,11 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -55,24 +49,21 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
SlavedAccountDataStore,
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
SlavedPushRuleStore,
SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
SlavedTransactionStore,
TransactionStore,
BaseSlavedStore,
):
pass
class FederationReaderServer(HomeServer):
DATASTORE_CLASS = FederationReaderSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -152,13 +143,11 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
@@ -78,7 +78,10 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
DATASTORE_CLASS = FederationSenderSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -141,9 +144,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(FederationSenderReplicationHandler, self).on_rdata(
super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -184,13 +186,11 @@ def start(config_options):
config.send_federation = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = FederationSenderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -38,7 +38,6 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,35 +49,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -148,7 +118,10 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
DATASTORE_CLASS = FrontendProxySlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -162,12 +135,6 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -186,8 +153,7 @@ class FrontendProxyServer(HomeServer):
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor()
)
)
logger.info("Synapse client reader now listening on port %d", port)
@@ -242,13 +208,11 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage import are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -111,8 +111,6 @@ def build_resource_for_web_client(hs):
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
@@ -305,12 +303,8 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau:registered_reserved_users",
"Registered users with reserved threepids"
)
current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
def setup(config_options):
@@ -344,7 +338,6 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -353,7 +346,6 @@ def setup(config_options):
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
@@ -362,13 +354,13 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
db_conn = hs.get_db_conn(run_new_connection=False)
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)
hs.run_startup_checks(db_conn, database_engine)
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
@@ -527,31 +519,17 @@ def run(hs):
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastore().reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
current_mau_count = 0
reserved_count = 0
store = hs.get_datastore()
count = 0
if hs.config.limit_usage_by_mau:
current_mau_count = yield store.get_monthly_active_count()
reserved_count = yield store.get_registered_reserved_users_count()
current_mau_gauge.set(float(current_mau_count))
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
count = yield hs.get_datastore().count_monthly_users()
current_mau_gauge.set(float(count))
max_mau_value_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids
)
generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")

View File

@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
SlavedTransactionStore,
TransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
@@ -60,7 +60,10 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
DATASTORE_CLASS = MediaRepositorySlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -152,13 +155,11 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -78,7 +78,10 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
DATASTORE_CLASS = PusherSlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -145,9 +148,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
@@ -160,11 +162,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
self.pusher_pool.on_new_receipts(
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:

View File

@@ -114,10 +114,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
)
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
@@ -214,13 +211,10 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
return set()
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
class SynchrotronTyping(object):
@@ -249,7 +243,10 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
DATASTORE_CLASS = SynchrotronSlavedStore
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -335,9 +332,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):

View File

@@ -94,7 +94,10 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
DATASTORE_CLASS = UserDirectorySlaveStore
def setup(self):
logger.info("Setting up.")
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -166,9 +169,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
@@ -212,13 +214,11 @@ def start(config_options):
config.update_user_directory = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -13,8 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six.moves import urllib
import urllib
from prometheus_client import Counter
@@ -99,7 +98,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_user(self, service, user_id):
if service.url is None:
defer.returnValue(False)
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
uri = service.url + ("/users/%s" % urllib.quote(user_id))
response = None
try:
response = yield self.get_json(uri, {
@@ -120,7 +119,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_alias(self, service, alias):
if service.url is None:
defer.returnValue(False)
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
uri = service.url + ("/rooms/%s" % urllib.quote(alias))
response = None
try:
response = yield self.get_json(uri, {
@@ -154,7 +153,7 @@ class ApplicationServiceApi(SimpleHttpClient):
service.url,
APP_SERVICE_PREFIX,
kind,
urllib.parse.quote(protocol)
urllib.quote(protocol)
)
try:
response = yield self.get_json(uri, fields)
@@ -189,7 +188,7 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = "%s%s/thirdparty/protocol/%s" % (
service.url,
APP_SERVICE_PREFIX,
urllib.parse.quote(protocol)
urllib.quote(protocol)
)
try:
info = yield self.get_json(uri, {})
@@ -229,7 +228,7 @@ class ApplicationServiceApi(SimpleHttpClient):
txn_id = str(txn_id)
uri = service.url + ("/transactions/%s" %
urllib.parse.quote(txn_id))
urllib.quote(txn_id))
try:
yield self.put_json(
uri=uri,

View File

@@ -21,7 +21,7 @@ from .consent_config import ConsentConfig
from .database import DatabaseConfig
from .emailconfig import EmailConfig
from .groups import GroupsConfig
from .jwt_config import JWTConfig
from .jwt import JWTConfig
from .key import KeyConfig
from .logger import LoggingConfig
from .metrics import MetricsConfig

View File

@@ -168,8 +168,7 @@ def setup_logging(config, use_worker_options=False):
if log_file:
# TODO: Customisable file size / backup count
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3,
encoding='utf8'
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
)
def sighup(signum, stack):
@@ -194,8 +193,9 @@ def setup_logging(config, use_worker_options=False):
def sighup(signum, stack):
# it might be better to use a file watcher or something for this.
logging.info("Reloading log config from %s due to SIGHUP",
log_config)
load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config()
@@ -227,22 +227,7 @@ def setup_logging(config, use_worker_options=False):
#
# However this may not be too much of a problem if we are just writing to a file.
observer = STDLibLogObserver()
def _log(event):
if "log_text" in event:
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
return
if event["log_text"].startswith("(UDP Port "):
return
if event["log_text"].startswith("Timing out client"):
return
return observer(event)
globalLogBeginner.beginLoggingTo(
[_log],
[observer],
redirectStandardIO=not config.no_redirect_stdio,
)

View File

@@ -49,9 +49,6 @@ class ServerConfig(Config):
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)
# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
@@ -72,29 +69,12 @@ class ServerConfig(Config):
# Options to control access by tracking MAU
self.limit_usage_by_mau = config.get("limit_usage_by_mau", False)
self.max_mau_value = 0
if self.limit_usage_by_mau:
self.max_mau_value = config.get(
"max_mau_value", 0,
)
self.mau_limits_reserved_threepids = config.get(
"mau_limit_reserved_threepids", []
)
self.mau_trial_days = config.get(
"mau_trial_days", 0,
)
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "")
# Admin uri to direct users at should their instance become blocked
# due to resource constraints
self.admin_contact = config.get("admin_contact", None)
else:
self.max_mau_value = 0
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
federation_domain_whitelist = config.get(
@@ -258,9 +238,6 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
use_presence: true
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
@@ -352,33 +329,6 @@ class ServerConfig(Config):
# - port: 9000
# bind_addresses: ['::1', '127.0.0.1']
# type: manhole
# Homeserver blocking
#
# How to reach the server admin, used in ResourceLimitError
# admin_contact: 'mailto:admin@server.com'
#
# Global block config
#
# hs_disabled: False
# hs_disabled_message: 'Human readable reason for why the HS is blocked'
# hs_disabled_limit_type: 'error code(str), to help clients decode reason'
#
# Monthly Active User Blocking
#
# Enables monthly active user checking
# limit_usage_by_mau: False
# max_mau_value: 50
# mau_trial_days: 2
#
# Sometimes the server admin will want to ensure certain accounts are
# never blocked by mau checking. These accounts are specified here.
#
# mau_limit_reserved_threepids:
# - medium: 'email'
# address: 'reserved_user@example.com'
""" % locals()
def read_arguments(self, args):
@@ -404,23 +354,6 @@ class ServerConfig(Config):
" service on the given port.")
def is_threepid_reserved(config, threepid):
"""Check the threepid against the reserved threepid config
Args:
config(ServerConfig) - to access server config attributes
threepid(dict) - The threepid to test for
Returns:
boolean Is the threepid undertest reserved_user
"""
for tp in config.mau_limits_reserved_threepids:
if (threepid['medium'] == tp['medium']
and threepid['address'] == tp['address']):
return True
return False
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.

View File

@@ -11,22 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from zope.interface import implementer
from OpenSSL import SSL, crypto
from twisted.internet import ssl
from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions, ContextFactory
from twisted.python.failure import Failure
logger = logging.getLogger(__name__)
class ServerContextFactory(ContextFactory):
class ServerContextFactory(ssl.ContextFactory):
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
connections."""
connections and to make connections to remote servers."""
def __init__(self, config):
self._context = SSL.Context(SSL.SSLv23_METHOD)
@@ -51,78 +48,3 @@ class ServerContextFactory(ContextFactory):
def getContext(self):
return self._context
def _idnaBytes(text):
"""
Convert some text typed by a human into some ASCII bytes. This is a
copy of twisted.internet._idna._idnaBytes. For documentation, see the
twisted documentation.
"""
try:
import idna
except ImportError:
return text.encode("idna")
else:
return idna.encode(text)
def _tolerateErrors(wrapped):
"""
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
the error is immediately logged and the connection is dropped if possible.
This is a copy of twisted.internet._sslverify._tolerateErrors. For
documentation, see the twisted documentation.
"""
def infoCallback(connection, where, ret):
try:
return wrapped(connection, where, ret)
except: # noqa: E722, taken from the twisted implementation
f = Failure()
logger.exception("Error during info_callback")
connection.get_app_data().failVerification(f)
return infoCallback
@implementer(IOpenSSLClientConnectionCreator)
class ClientTLSOptions(object):
"""
Client creator for TLS without certificate identity verification. This is a
copy of twisted.internet._sslverify.ClientTLSOptions with the identity
verification left out. For documentation, see the twisted documentation.
"""
def __init__(self, hostname, ctx):
self._ctx = ctx
self._hostname = hostname
self._hostnameBytes = _idnaBytes(hostname)
ctx.set_info_callback(
_tolerateErrors(self._identityVerifyingInfoCallback)
)
def clientConnectionForTLS(self, tlsProtocol):
context = self._ctx
connection = SSL.Connection(context, None)
connection.set_app_data(tlsProtocol)
return connection
def _identityVerifyingInfoCallback(self, connection, where, ret):
if where & SSL.SSL_CB_HANDSHAKE_START:
connection.set_tlsext_host_name(self._hostnameBytes)
class ClientTLSOptionsFactory(object):
"""Factory for Twisted ClientTLSOptions that are used to make connections
to remote servers for federation."""
def __init__(self, config):
# We don't use config options yet
pass
def get_options(self, host):
return ClientTLSOptions(
host,
CertificateOptions(verify=False).getContext()
)

View File

@@ -18,9 +18,7 @@ import logging
from canonicaljson import json
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
from twisted.internet.protocol import Factory
from twisted.names.error import DomainError
from twisted.web.http import HTTPClient
from synapse.http.endpoint import matrix_federation_endpoint
@@ -32,14 +30,14 @@ KEY_API_V1 = b"/_matrix/key/v1/"
@defer.inlineCallbacks
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
"""Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory()
factory.path = path
factory.host = server_name
endpoint = matrix_federation_endpoint(
reactor, server_name, tls_client_options_factory, timeout=30
reactor, server_name, ssl_context_factory, timeout=30
)
for i in range(5):
@@ -49,14 +47,12 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
server_response, server_certificate = yield protocol.remote_key
defer.returnValue((server_response, server_certificate))
except SynapseKeyClientError as e:
logger.warn("Error getting key for %r: %s", server_name, e)
if e.status.startswith(b"4"):
logger.exception("Error getting key for %r" % (server_name,))
if e.status.startswith("4"):
# Don't retry for 4xx responses.
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
except Exception as e:
logger.exception("Error getting key for %r", server_name)
logger.exception(e)
raise IOError("Cannot get key for %r" % server_name)
@@ -82,12 +78,6 @@ class SynapseKeyClientProtocol(HTTPClient):
self._peer = self.transport.getPeer()
logger.debug("Connected to %s", self._peer)
if not isinstance(self.path, bytes):
self.path = self.path.encode('ascii')
if not isinstance(self.host, bytes):
self.host = self.host.encode('ascii')
self.sendCommand(b"GET", self.path)
if self.host:
self.sendHeader(b"Host", self.host)

View File

@@ -16,10 +16,9 @@
import hashlib
import logging
import urllib
from collections import namedtuple
from six.moves import urllib
from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
@@ -41,7 +40,6 @@ from synapse.api.errors import Codes, SynapseError
from synapse.crypto.keyclient import fetch_server_key
from synapse.util import logcontext, unwrapFirstError
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
preserve_fn,
run_in_background,
@@ -218,34 +216,23 @@ class Keyring(object):
servers have completed. Follows the synapse rules of logcontext
preservation.
"""
loop_count = 1
while True:
wait_on = [
(server_name, self.key_downloads[server_name])
self.key_downloads[server_name]
for server_name in server_names
if server_name in self.key_downloads
]
if not wait_on:
if wait_on:
with PreserveLoggingContext():
yield defer.DeferredList(wait_on)
else:
break
logger.info(
"Waiting for existing lookups for %s to complete [loop %i]",
[w[0] for w in wait_on], loop_count,
)
with PreserveLoggingContext():
yield defer.DeferredList((w[1] for w in wait_on))
loop_count += 1
ctx = LoggingContext.current_context()
def rm(r, server_name_):
with PreserveLoggingContext(ctx):
logger.debug("Releasing key lookup lock on %s", server_name_)
self.key_downloads.pop(server_name_, None)
self.key_downloads.pop(server_name_, None)
return r
for server_name, deferred in server_to_deferred.items():
logger.debug("Got key lookup lock on %s", server_name)
self.key_downloads[server_name] = deferred
deferred.addBoth(rm, server_name)
@@ -445,7 +432,7 @@ class Keyring(object):
# an incoming request.
query_response = yield self.client.post_json(
destination=perspective_name,
path="/_matrix/key/v2/query",
path=b"/_matrix/key/v2/query",
data={
u"server_keys": {
server_name: {
@@ -525,9 +512,9 @@ class Keyring(object):
continue
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_client_options_factory,
path=("/_matrix/key/v2/server/%s" % (
urllib.parse.quote(requested_key_id),
server_name, self.hs.tls_server_context_factory,
path=(b"/_matrix/key/v2/server/%s" % (
urllib.quote(requested_key_id),
)).encode("ascii"),
)
@@ -668,7 +655,7 @@ class Keyring(object):
# Try to fetch the key from the remote server.
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_client_options_factory
server_name, self.hs.tls_server_context_factory
)
# Check the response.

View File

@@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.types import UserID, get_domain_from_id
@@ -83,14 +83,6 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Creation event's room_id domain does not match sender's"
)
room_version = event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
raise AuthError(
403,
"room appears to have unsupported version %s" % (
room_version,
))
# FIXME
logger.debug("Allowing! %s", event)
return

View File

@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -149,9 +147,6 @@ class EventBase(object):
def items(self):
return list(self._event_dict.items())
def keys(self):
return six.iterkeys(self._event_dict)
class FrozenEvent(EventBase):
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):

View File

@@ -143,31 +143,11 @@ class FederationBase(object):
def callback(_, pdu):
with logcontext.PreserveLoggingContext(ctx):
if not check_event_content_hash(pdu):
# let's try to distinguish between failures because the event was
# redacted (which are somewhat expected) vs actual ball-tampering
# incidents.
#
# This is just a heuristic, so we just assume that if the keys are
# about the same between the redacted and received events, then the
# received event was probably a redacted copy (but we then use our
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if (
set(redacted_event.keys()) == set(pdu.keys()) and
set(six.iterkeys(redacted_event.content))
== set(six.iterkeys(pdu.content))
):
logger.info(
"Event %s seems to have been redacted; using our redacted "
"copy",
pdu.event_id,
)
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id, pdu.get_pdu_json(),
)
return redacted_event
logger.warn(
"Event content has been tampered, redacting %s: %s",
pdu.event_id, pdu.get_pdu_json()
)
return prune_event(pdu)
if self.spam_checker.check_event_for_spam(pdu):
logger.warn(
@@ -182,8 +162,8 @@ class FederationBase(object):
failure.trap(SynapseError)
with logcontext.PreserveLoggingContext(ctx):
logger.warn(
"Signature check failed for %s: %s",
pdu.event_id, failure.getErrorMessage(),
"Signature check failed for %s",
pdu.event_id,
)
return failure

View File

@@ -25,7 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.constants import Membership
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
@@ -271,10 +271,10 @@ class FederationClient(FederationBase):
event_id, destination, e,
)
except NotRetryingDestination as e:
logger.info(str(e))
logger.info(e.message)
continue
except FederationDeniedError as e:
logger.info(str(e))
logger.info(e.message)
continue
except Exception as e:
pdu_attempts[destination] = now
@@ -510,7 +510,7 @@ class FederationClient(FederationBase):
else:
logger.warn(
"Failed to %s via %s: %i %s",
description, destination, e.code, e.args[0],
description, destination, e.code, e.message,
)
except Exception:
logger.warn(
@@ -518,10 +518,10 @@ class FederationClient(FederationBase):
description, destination, exc_info=1,
)
raise RuntimeError("Failed to %s via any server" % (description, ))
raise RuntimeError("Failed to %s via any server", description)
def make_membership_event(self, destinations, room_id, user_id, membership,
content, params):
content={},):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -537,10 +537,8 @@ class FederationClient(FederationBase):
user_id (str): The user whose membership is being evented.
membership (str): The "membership" property of the event. Must be
one of "join" or "leave".
content (dict): Any additional data to put into the content field
content (object): Any additional data to put into the content field
of the event.
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return:
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
@@ -560,12 +558,10 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
destination, room_id, user_id, membership, params,
destination, room_id, user_id, membership
)
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
pdu_dict = ret["event"]
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
@@ -609,26 +605,6 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable.
"""
def check_authchain_validity(signed_auth_chain):
for e in signed_auth_chain:
if e.type == EventTypes.Create:
create_event = e
break
else:
raise InvalidResponseError(
"no %s in auth chain" % (EventTypes.Create,),
)
# the room version should be sane.
room_version = create_event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
# This shouldn't be possible, because the remote server should have
# rejected the join attempt during make_join.
raise InvalidResponseError(
"room appears to have unsupported version %s" % (
room_version,
))
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
@@ -685,7 +661,7 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
check_authchain_validity(signed_auth)
auth_chain.sort(key=lambda e: e.depth)
defer.returnValue({
"state": signed_state,
@@ -875,7 +851,7 @@ class FederationClient(FederationBase):
except Exception as e:
logger.exception(
"Failed to send_third_party_invite via %s: %s",
destination, str(e)
destination, e.message
)
raise RuntimeError("Failed to send to any server.")

View File

@@ -27,24 +27,14 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.errors import (
AuthError,
FederationError,
IncompatibleRoomVersionError,
NotFoundError,
SynapseError,
)
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
@@ -71,8 +61,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store)
@@ -203,7 +193,7 @@ class FederationServer(FederationBase):
event_id, f.getTraceback().rstrip(),
)
yield concurrently_execute(
yield async.concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
TRANSACTION_CONCURRENCY_LIMIT,
)
@@ -333,21 +323,12 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
def on_make_join_request(self, origin, room_id, user_id):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id)
if room_version not in supported_versions:
logger.warn("Room version %s not in %s", room_version, supported_versions)
raise IncompatibleRoomVersionError(room_version=room_version)
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
})
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
@@ -764,8 +745,6 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
logger.info("Registering federation EDU handler for %r", edu_type)
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@@ -784,8 +763,6 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
logger.info("Registering federation query handler for %r", query_type)
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@@ -808,49 +785,3 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""
def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
super(ReplicationFederationHandlerRegistry, self).__init__()
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content,
)
return self._send_edu(
edu_type=edu_type,
origin=origin,
content=content,
)
def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return handler(args)
return self._get_query_client(
query_type=query_type,
args=args,
)

View File

@@ -32,7 +32,7 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
from six import iteritems
from six import iteritems, itervalues
from sortedcontainers import SortedDict
@@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in self.presence_changed.values()
for uids in itervalues(self.presence_changed)
for user_id in uids
)

View File

@@ -26,8 +26,6 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
sent_edus_counter,
sent_transactions_counter,
@@ -58,7 +56,6 @@ class TransactionQueue(object):
"""
def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname
self.store = hs.get_datastore()
@@ -256,13 +253,7 @@ class TransactionQueue(object):
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
@@ -309,9 +300,6 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
@@ -463,19 +451,7 @@ class TransactionQueue(object):
# pending_transactions flag.
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_edus.extend(

View File

@@ -15,8 +15,7 @@
# limitations under the License.
import logging
from six.moves import urllib
import urllib
from twisted.internet import defer
@@ -107,7 +106,7 @@ class TransportLayerClient(object):
dest (str)
room_id (str)
event_tuples (list)
limit (int)
limt (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.
@@ -196,7 +195,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def make_membership_event(self, destination, room_id, user_id, membership, params):
def make_membership_event(self, destination, room_id, user_id, membership):
"""Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs.
@@ -206,8 +205,6 @@ class TransportLayerClient(object):
room_id (str): room to join/leave
user_id (str): user to be joined/left
membership (str): one of join/leave
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
@@ -244,7 +241,6 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
args=params,
retry_on_dns_fail=retry_on_dns_fail,
timeout=20000,
ignore_backoff=ignore_backoff,
@@ -952,4 +948,4 @@ def _create_path(prefix, path, *args):
Returns:
str
"""
return prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)

View File

@@ -90,8 +90,8 @@ class Authenticator(object):
@defer.inlineCallbacks
def authenticate_request(self, request, content):
json_request = {
"method": request.method.decode('ascii'),
"uri": request.uri.decode('ascii'),
"method": request.method,
"uri": request.uri,
"destination": self.server_name,
"signatures": {},
}
@@ -190,41 +190,6 @@ def _parse_auth_header(header_bytes):
class BaseFederationServlet(object):
"""Abstract base class for federation servlet classes.
The servlet object should have a PATH attribute which takes the form of a regexp to
match against the request path (excluding the /federation/v1 prefix).
The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
the appropriate HTTP method. These methods have the signature:
on_<METHOD>(self, origin, content, query, **kwargs)
With arguments:
origin (unicode|None): The authenticated server_name of the calling server,
unless REQUIRE_AUTH is set to False and authentication failed.
content (unicode|None): decoded json body of the request. None if the
request was a GET.
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
(ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
yet.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
Raises:
SynapseError: to return an error code
Exception: other exceptions will be caught, logged, and a 500 will be
returned.
"""
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
@@ -239,20 +204,8 @@ class BaseFederationServlet(object):
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
""" A callback which can be passed to HttpServer.RegisterPaths
Args:
request (twisted.web.http.Request):
*args: unused?
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in [b"PUT", b"POST"]:
if request.method in ["PUT", "POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
@@ -261,10 +214,10 @@ class BaseFederationServlet(object):
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
logger.exception("authenticate_request failed")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
except Exception:
logger.exception("authenticate_request failed")
raise
if origin:
@@ -386,7 +339,7 @@ class FederationStateServlet(BaseFederationServlet):
return self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None),
query.get("event_id", [None])[0],
)
@@ -397,7 +350,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
return self.handler.on_state_ids_request(
origin,
room_id,
parse_string_from_args(query, "event_id", None),
query.get("event_id", [None])[0],
)
@@ -405,12 +358,14 @@ class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/(?P<context>[^/]*)/"
def on_GET(self, origin, content, query, context):
versions = [x.decode('ascii') for x in query[b"v"]]
limit = parse_integer_from_args(query, "limit", None)
versions = query["v"]
limits = query["limit"]
if not limit:
if not limits:
return defer.succeed((400, {"error": "Did not include limit param"}))
limit = int(limits[-1])
return self.handler.on_backfill_request(origin, context, versions, limit)
@@ -421,7 +376,7 @@ class FederationQueryServlet(BaseFederationServlet):
def on_GET(self, origin, content, query, query_type):
return self.handler.on_query_request(
query_type,
{k.decode('utf8'): v[0].decode("utf-8") for k, v in query.items()}
{k: v[0].decode("utf-8") for k, v in query.items()}
)
@@ -429,31 +384,9 @@ class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, _content, query, context, user_id):
"""
Args:
origin (unicode): The authenticated server_name of the calling server
_content (None): (GETs don't have bodies)
query (dict[bytes, list[bytes]]): Query params from the request.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
"""
versions = query.get(b'ver')
if versions is not None:
supported_versions = [v.decode("utf-8") for v in versions]
else:
supported_versions = ["1"]
def on_GET(self, origin, content, query, context, user_id):
content = yield self.handler.on_make_join_request(
origin, context, user_id,
supported_versions=supported_versions,
)
defer.returnValue((200, content))
@@ -628,14 +561,14 @@ class OpenIdUserInfo(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query):
token = query.get(b"access_token", [None])[0]
token = query.get("access_token", [None])[0]
if token is None:
defer.returnValue((401, {
"errcode": "M_MISSING_TOKEN", "error": "Access Token required"
}))
return
user_id = yield self.handler.on_openid_userinfo(token.decode('ascii'))
user_id = yield self.handler.on_openid_userinfo(token)
if user_id is None:
defer.returnValue((401, {

View File

@@ -23,10 +23,6 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -140,12 +136,6 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"appservice_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("appservice_sender").inc()
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(

View File

@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
yield self.auth.check_auth_blocking(user_id)
yield self._check_mau_limits()
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -734,6 +734,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
yield self._check_mau_limits()
auth_api = self.hs.get_auth()
user_id = None
try:
@@ -742,7 +743,6 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@defer.inlineCallbacks
@@ -828,26 +828,12 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
Args:
user_id (str)
medium (str)
address (str)
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
the identity server, False if identity server doesn't support the
unbind API.
"""
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
result = yield identity_handler.try_unbind_threepid(
yield identity_handler.unbind_threepid(
user_id,
{
'medium': medium,
@@ -855,10 +841,10 @@ class AuthHandler(BaseHandler):
},
)
yield self.store.user_delete_threepid(
ret = yield self.store.user_delete_threepid(
user_id, medium, address,
)
defer.returnValue(result)
defer.returnValue(ret)
def _save_session(self, session):
# TODO: Persistent storage
@@ -895,24 +881,22 @@ class AuthHandler(BaseHandler):
Args:
password (unicode): Password to hash.
stored_hash (bytes): Expected hash value.
stored_hash (unicode): Expected hash value.
Returns:
Deferred(bool): Whether self.hash(password) == stored_hash.
"""
def _do_validate_hash():
# Normalise the Unicode in the password
pw = unicodedata.normalize("NFKC", password)
return bcrypt.checkpw(
pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
stored_hash
stored_hash.encode('utf8')
)
if stored_hash:
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
@@ -923,6 +907,19 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
@defer.inlineCallbacks
def _check_mau_limits(self):
"""
Ensure that if mau blocking is enabled that invalid users cannot
log in.
"""
if self.hs.config.limit_usage_by_mau is True:
current_mau = yield self.store.count_monthly_users()
if current_mau >= self.hs.config.max_mau_value:
raise AuthError(
403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
)
@attr.s
class MacaroonGenerator(object):

View File

@@ -51,8 +51,7 @@ class DeactivateAccountHandler(BaseHandler):
erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred[bool]: True if identity server supports removing
threepids, otherwise False.
Deferred
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
@@ -61,22 +60,16 @@ class DeactivateAccountHandler(BaseHandler):
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
# This will be set to false if the identity server doesn't support
# unbinding
identity_server_supports_unbinding = True
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
result = yield self._identity_handler.try_unbind_threepid(
yield self._identity_handler.unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
identity_server_supports_unbinding &= result
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
@@ -110,8 +103,6 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
defer.returnValue(identity_server_supports_unbinding)
def _start_user_parting(self):
"""
Start the process that goes through the table of users

View File

@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination

View File

@@ -330,8 +330,7 @@ class E2eKeysHandler(object):
(algorithm, key_id, ex_json, key)
)
else:
new_keys.append((
algorithm, key_id, encode_canonical_json(key).decode('ascii')))
new_keys.append((algorithm, key_id, encode_canonical_json(key)))
yield self.store.add_e2e_one_time_keys(
user_id, device_id, time_now, new_keys
@@ -359,7 +358,7 @@ def _exception_to_failure(e):
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
# give a string for e.message, which json then fails to serialize.
return {
"status": 503, "message": str(e),
"status": 503, "message": str(e.message),
}

View File

@@ -30,12 +30,7 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RejectedReason,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -49,15 +44,10 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
@@ -96,18 +86,6 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._send_events_to_master = (
ReplicationFederationSendEventsRestServlet.make_client(hs)
)
self._notify_user_membership_change = (
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
)
self._clean_room_for_join_client = (
ReplicationCleanRoomRestServlet.make_client(hs)
)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -291,9 +269,8 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
room_version = yield self.store.get_room_version(pdu.room_id)
state_map = yield resolve_events_with_factory(
room_version, state_groups, {pdu.event_id: pdu}, fetch
state_groups, {pdu.event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
@@ -594,7 +571,7 @@ class FederationHandler(BaseHandler):
required_auth = set(
a_id
for event in events + list(state_events.values()) + list(auth_events.values())
for event in events + state_events.values() + auth_events.values()
for a_id, _ in event.auth_events
)
auth_events.update({
@@ -802,7 +779,7 @@ class FederationHandler(BaseHandler):
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
logger.info(e.message)
continue
except FederationDeniedError as e:
logger.info(e)
@@ -945,9 +922,6 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
params={
"ver": KNOWN_ROOM_VERSIONS,
},
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -1176,7 +1150,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self.persist_events_and_notify([(event, context)])
yield self._persist_events([(event, context)])
defer.returnValue(event)
@@ -1207,20 +1181,19 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self.persist_events_and_notify([(event, context)])
yield self._persist_events([(event, context)])
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={}, params=None):
content={},):
origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1358,7 +1331,7 @@ class FederationHandler(BaseHandler):
)
if state_groups:
_, state = list(state_groups.items()).pop()
_, state = state_groups.items().pop()
results = state
if event.is_state():
@@ -1450,7 +1423,7 @@ class FederationHandler(BaseHandler):
event, context
)
yield self.persist_events_and_notify(
yield self._persist_events(
[(event, context)],
backfilled=backfilled,
)
@@ -1488,7 +1461,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
yield self.persist_events_and_notify(
yield self._persist_events(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1576,7 +1549,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self.persist_events_and_notify(
yield self._persist_events(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1587,7 +1560,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
yield self.persist_events_and_notify(
yield self._persist_events(
[(event, new_event_context)],
)
@@ -1829,10 +1802,7 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
room_version = yield self.store.get_room_version(event.room_id)
new_state = yield self.state_handler.resolve_events(
room_version,
new_state = self.state_handler.resolve_events(
[list(local_view.values()), list(remote_view.values())],
event
)
@@ -2318,7 +2288,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
response = yield self.http_client.get_json(
response = yield self.hs.get_simple_http_client().get_json(
url,
{"public_key": public_key}
)
@@ -2331,7 +2301,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
def _persist_events(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2343,21 +2313,14 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
if self.config.worker_app:
yield self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled
)
else:
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2390,30 +2353,15 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
self.pusher_pool.on_new_notifications(
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)
def _clean_room_for_join(self, room_id):
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
room_id (str)
"""
if self.config.worker_app:
return self._clean_room_for_join_client(room_id)
else:
return self.store.clean_room_for_join(room_id)
return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
if self.config.worker_app:
return self._notify_user_membership_change(
room_id=room_id,
user_id=user.to_string(),
change="joined",
)
else:
return user_joined_room(self.distributor, user, room_id)
return user_joined_room(self.distributor, user, room_id)

View File

@@ -137,19 +137,15 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def try_unbind_threepid(self, mxid, threepid):
"""Removes a binding from an identity server
def unbind_threepid(self, mxid, threepid):
"""
Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
Raises:
SynapseError: If we failed to contact the identity server
Returns:
Deferred[bool]: True on success, otherwise False if the identity
server doesn't support unbinding
Deferred[bool]: True on success, otherwise False
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
@@ -179,21 +175,11 @@ class IdentityHandler(BaseHandler):
content=content,
destination_is=id_server,
)
try:
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
except HttpResponseException as e:
if e.code in (400, 404, 501,):
# The remote server probably doesn't support unbinding (yet)
logger.warn("Received %d response while unbinding threepid", e.code)
defer.returnValue(False)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(502, "Failed to contact identity server")
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
defer.returnValue(True)
@defer.inlineCallbacks

View File

@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
@@ -372,10 +372,6 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,

View File

@@ -25,24 +25,17 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
NotFoundError,
SynapseError,
)
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -89,85 +82,28 @@ class MessageHandler(object):
defer.returnValue(data)
@defer.inlineCallbacks
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
at_token=None, is_guest=False,
):
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left. If an explicit
'at' parameter is passed, return the state events as of that event, if
visible.
left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
NotFoundError (404) if the at token does not yield an event
AuthError (403) if the user doesn't have permission to view
members of this room.
"""
if at_token:
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=at_token.room_key, limit=1,
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
)
if not last_events:
raise NotFoundError("Can't find event for token %s" % (at_token, ))
visible_events = yield filter_events_for_client(
self.store, user_id, last_events,
)
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
)
room_state = room_state[event.event_id]
else:
raise AuthError(
403,
"User %s not allowed to view events in room %s at token %s" % (
user_id, room_id, at_token,
)
)
else:
membership, membership_event_id = (
yield self.auth.check_in_room_or_world_readable(
room_id, user_id,
)
)
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
)
room_state = room_state[membership_event_id]
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@@ -235,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
self.http_client = hs.get_simple_http_client()
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -276,14 +212,10 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
yield self.auth.check_auth_blocking(requester.user.to_string())
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@@ -627,9 +559,12 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield self.send_event_to_master(
event_id=event.event_id,
yield send_event_to_master(
clock=self.hs.get_clock(),
store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
@@ -778,8 +713,11 @@ class EventCreationHandler(object):
event, context=context
)
self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)
def _notify():

View File

@@ -18,11 +18,11 @@ import logging
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.async import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -251,33 +251,6 @@ class PaginationHandler(object):
is_peeking=(member_event_id is None),
)
state = None
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in events
)
]
state_ids = yield self.store.get_state_ids_for_event(
events[0].event_id, types=types,
)
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
@@ -289,10 +262,4 @@ class PaginationHandler(object):
"end": next_token.to_string(),
}
if state:
chunk["state"] = [
serialize_event(e, time_now, as_client_event)
for e in state
]
defer.returnValue(chunk)

View File

@@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
@@ -95,7 +95,6 @@ class PresenceHandler(object):
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -231,10 +230,6 @@ class PresenceHandler(object):
earlier than they should when synapse is restarted. This affect of this
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
@@ -395,10 +390,6 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -428,11 +419,6 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.use_presence:
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -478,16 +464,13 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):

View File

@@ -32,16 +32,12 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
class BaseProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
"""
class ProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(BaseProfileHandler, self).__init__(hs)
super(ProfileHandler, self).__init__(hs)
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -50,6 +46,11 @@ class BaseProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
if hs.config.worker_app is None:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
@@ -281,20 +282,6 @@ class BaseProfileHandler(BaseHandler):
room_id, str(e.message)
)
class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(MasterProfileHandler, self).__init__(hs)
assert hs.config.worker_app is None
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,

View File

@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from ._base import BaseHandler

View File

@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id
from synapse.util import logcontext
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -115,15 +116,16 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)
defer.returnValue(True)
defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks

View File

@@ -28,7 +28,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -125,7 +125,6 @@ class RegistrationHandler(BaseHandler):
guest_access_token=None,
make_guest=False,
admin=False,
threepid=None,
):
"""Registers a new client on the server.
@@ -145,8 +144,7 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield self.auth.check_auth_blocking(threepid=threepid)
yield self._check_mau_limits()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -291,7 +289,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
yield self.auth.check_auth_blocking()
yield self._check_mau_limits()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -441,7 +439,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
yield self.auth.check_auth_blocking()
yield self._check_mau_limits()
need_register = True
try:
@@ -535,3 +533,16 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
@defer.inlineCallbacks
def _check_mau_limits(self):
"""
Do not accept registrations if monthly active user limits exceeded
and limiting is enabled
"""
if self.hs.config.limit_usage_by_mau is True:
current_mau = yield self.store.count_monthly_users()
if current_mau >= self.hs.config.max_mau_value:
raise RegistrationError(
403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED
)

View File

@@ -21,17 +21,9 @@ import math
import string
from collections import OrderedDict
from six import string_types
from twisted.internet import defer
from synapse.api.constants import (
DEFAULT_ROOM_VERSION,
KNOWN_ROOM_VERSIONS,
EventTypes,
JoinRules,
RoomCreationPreset,
)
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -98,34 +90,15 @@ class RoomCreationHandler(BaseHandler):
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
ResourceLimitError if server is blocked to some resource being
exceeded
"""
user_id = requester.user.to_string()
self.auth.check_auth_blocking(user_id)
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
if ratelimit:
yield self.ratelimit(requester)
room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
if not isinstance(room_version, string_types):
raise SynapseError(
400,
"room_version must be a string",
Codes.BAD_JSON,
)
if room_version not in KNOWN_ROOM_VERSIONS:
raise SynapseError(
400,
"Your homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
@@ -211,9 +184,6 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(

View File

@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
# Filter out rooms that we don't want to return
rooms_to_scan = [
r for r in sorted_rooms
if r not in newly_unpublished and rooms_to_num_joined[r] > 0
if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
]
total_room_count = len(rooms_to_scan)

View File

@@ -30,7 +30,7 @@ import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.async import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
@@ -344,7 +344,6 @@ class RoomMemberHandler(object):
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)

Some files were not shown because too many files have changed in this diff Show More