1
0

Compare commits

...

169 Commits

Author SHA1 Message Date
Neil Johnson
4d1a8718b5 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/update_limits_error_codes 2018-08-15 16:28:30 +01:00
Erik Johnston
dc56c47dc0 Merge pull request #3653 from matrix-org/erikj/split_federation
Move more federation APIs to workers
2018-08-15 14:59:02 +01:00
Neil Johnson
4601129c44 Merge pull request #3687 from matrix-org/neilj/admin_email
support admin_email config and pass through into blocking errors,
2018-08-15 13:52:25 +00:00
Erik Johnston
ef184caf30 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_federation 2018-08-15 14:25:46 +01:00
Erik Johnston
488ffe6fdb Use federation handler function rather than duplicate
This involves renaming _persist_events to be a public function.
2018-08-15 14:17:18 +01:00
Erik Johnston
773db62a22 Rename slave TransactionStore to SlaveTransactionStore 2018-08-15 14:17:06 +01:00
Neil Johnson
39176f27f7 remove changelog referencing another PR 2018-08-15 13:53:51 +01:00
Neil Johnson
fdb612c1dd fix typo 2018-08-15 12:05:51 +01:00
Neil Johnson
87a824bad1 clean up AuthError 2018-08-15 11:58:03 +01:00
Neil Johnson
c4eb97518f Merge branch 'neilj/update_limits_error_codes' of github.com:matrix-org/synapse into neilj/admin_email 2018-08-15 11:53:01 +01:00
Neil Johnson
55afba0fc5 update admin email to uri 2018-08-15 11:41:18 +01:00
Neil Johnson
75c663c7b9 update error codes 2018-08-15 11:27:48 +01:00
Neil Johnson
1c5e690a6b Merge pull request #3690 from matrix-org/neilj/change_prometheus_mau_metric_name
combine mau metrics into one group
2018-08-15 09:53:59 +00:00
Erik Johnston
fef2e65d12 Merge pull request #3667 from matrix-org/erikj/fixup_unbind
Don't fail requests to unbind 3pids for non supporting ID servers
2018-08-15 10:32:12 +01:00
Neil Johnson
596ce63576 update resource limit error codes 2018-08-15 10:23:28 +01:00
Neil Johnson
b8429c7c81 update error codes for resource limiting 2018-08-15 10:19:48 +01:00
Neil Johnson
ab035bdeac replace admin_email with admin_uri for greater flexibility 2018-08-15 10:16:41 +01:00
Neil Johnson
19b433e3f4 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/admin_email 2018-08-14 17:44:46 +01:00
Neil Johnson
70e48cbbb1 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/change_prometheus_mau_metric_name 2018-08-14 17:40:59 +01:00
Neil Johnson
aa3220df6a Merge pull request #3692 from matrix-org/neil/fix_postgres_test_initialise_reserved_users
Neil/fix postgres test initialise reserved users
2018-08-14 16:40:11 +00:00
Neil Johnson
7277216d01 fix setup_test_homeserver to be postgres compatible 2018-08-14 17:14:39 +01:00
Neil Johnson
cd0c749c4f Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users 2018-08-14 17:06:51 +01:00
Neil Johnson
9ecbaf8ba8 adding missing yield 2018-08-14 16:55:28 +01:00
Neil Johnson
1522ed9c07 in case max_mau is less than I think 2018-08-14 16:52:30 +01:00
Neil Johnson
629f390035 Rename MAU prometheus metric 2018-08-14 16:38:05 +01:00
Neil Johnson
e5962f845c pep8 2018-08-14 16:36:14 +01:00
Neil Johnson
e7d091fb86 combine mau metrics into one group 2018-08-14 16:26:55 +01:00
Neil Johnson
414d54b61a Merge pull request #3670 from matrix-org/neilj/mau_sync_block
Block ability to read via sync if mau limit exceeded
2018-08-14 15:21:31 +00:00
Neil Johnson
2545993ce4 make comments clearer 2018-08-14 15:48:12 +01:00
Neil Johnson
c74c71128d remove blank line 2018-08-14 15:06:24 +01:00
Neil Johnson
99ebaed8e6 Update register.py
remove comments
2018-08-14 14:55:55 +01:00
Amber Brown
614e6d517d Dockerised sytest (#3660) 2018-08-14 21:30:34 +10:00
Amber Brown
0bdc362598 Merge pull request #3681 from matrix-org/neilj/fix_reap_users_in_postgres
Neilj/fix reap users in postgres
2018-08-14 21:29:35 +10:00
Amber Brown
591bf87c6a Merge remote-tracking branch 'origin/develop' into neilj/fix_reap_users_in_postgres 2018-08-14 20:56:23 +10:00
Amber Brown
bdfbd934d6 Implement a new test baseclass to cut down on boilerplate (#3684) 2018-08-14 20:53:43 +10:00
Jan Christian Grünhage
8f0c430ca4 Merge pull request #3669 from matrix-org/jcgruenhage/update-docker-base
update docker base-image to alpine 3.8
2018-08-14 11:43:59 +02:00
Neil Johnson
1f24d8681b set admin email via config 2018-08-13 21:26:43 +01:00
Neil Johnson
f4b49152e2 support admin_email config and pass through into blocking errors, return AuthError in all cases 2018-08-13 21:09:47 +01:00
Richard van der Hoff
8ba2dac6ea Name changelog after PR, not bug 2018-08-13 18:54:01 +01:00
Richard van der Hoff
50bcaf1a27 Merge pull request #3677 from andrewshadura/master
Use recaptcha_ajax.js directly from Google
2018-08-13 18:52:56 +01:00
Neil Johnson
ce7de9ae6b Revert "support admin_email config and pass through into blocking errors, return AuthError in all cases"
This reverts commit 0d43f991a1.
2018-08-13 18:06:18 +01:00
Neil Johnson
0d43f991a1 support admin_email config and pass through into blocking errors, return AuthError in all cases 2018-08-13 18:00:23 +01:00
Amber Brown
99dd975dae Run tests under PostgreSQL (#3423) 2018-08-13 16:47:46 +10:00
Neil Johnson
ac205a54b2 Fixes test_reap_monthly_active_users so it passes under postgres 2018-08-11 22:50:09 +01:00
Neil Johnson
31fa743567 fix sqlite/postgres incompatibility in reap_monthly_active_users 2018-08-11 22:38:34 +01:00
Amber Brown
a001038b92 Merge pull request #3679 from matrix-org/hawkowl/blackify-tests
Blackify the tests
2018-08-11 00:12:56 +10:00
Amber Brown
807449d8f2 fix up a forced long line 2018-08-11 00:01:43 +10:00
Richard van der Hoff
c31793a784 Merge branch 'rav/fix_linearizer_cancellation' into develop 2018-08-10 14:57:27 +01:00
Richard van der Hoff
c08f9d95b2 log *after* reloading log config
... because logging *before* reloading means the log message gets lost in the old MemoryLogger
2018-08-10 14:56:48 +01:00
Amber Brown
dd16e7dfcc changelog 2018-08-10 23:54:46 +10:00
black
8b3d9b6b19 Run black. 2018-08-10 23:54:09 +10:00
Amber Brown
b37c472419 Rename async to async_helpers because async is a keyword on Python 3.7 (#3678) 2018-08-10 23:50:21 +10:00
Andrej Shadura
c75b71a397 Use recaptcha_ajax.js directly from Google
The script recaptcha_ajax.js contains minified non-free code which
we probably cannot redistribute. Since it talks to Google servers
anyway, it is better to just download it from Google directly instead
of shipping it.

This fixes #1932.
2018-08-10 13:36:14 +02:00
Richard van der Hoff
3c0213a217 Merge pull request #3439 from vojeroen/send_sni_for_federation_requests
send SNI for federation requests
2018-08-10 12:23:54 +01:00
Richard van der Hoff
178ab76ac0 changelog 2018-08-10 11:05:23 +01:00
Richard van der Hoff
638d35ef08 Fix linearizer cancellation on twisted < 18.7
Turns out that cancellation of inlineDeferreds didn't really work properly
until Twisted 18.7. This commit refactors Linearizer.queue to avoid
inlineCallbacks.
2018-08-10 10:59:09 +01:00
Jeroen
2e9c73e8ca more generic conversion of str/bytes to unicode 2018-08-09 21:31:26 +02:00
Jeroen
64899341dc include private functions from twisted 2018-08-09 21:04:22 +02:00
Neil Johnson
885ea9c602 rename _user_last_seen_monthly_active 2018-08-09 18:02:12 +01:00
Neil Johnson
c1f9dec92a fix errant parenthesis 2018-08-09 17:43:26 +01:00
Neil Johnson
04df714259 fix imports 2018-08-09 17:41:52 +01:00
Neil Johnson
09cf130898 only block on sync where user is not part of the mau cohort 2018-08-09 17:39:12 +01:00
Neil Johnson
c6b28fb479 Where server is disabled, block ability for locked out users to read new messages 2018-08-09 12:45:58 +01:00
Jan Christian Grünhage
d967653705 update docker base-image to alpine 3.8 2018-08-09 13:31:10 +02:00
Neil Johnson
69ce057ea6 block sync if auth checks fail 2018-08-09 12:26:27 +01:00
Neil Johnson
3dce9050cf Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_sync_block 2018-08-09 11:42:02 +01:00
Neil Johnson
0ad98e38d0 Merge pull request #3655 from matrix-org/neilj/disable_hs
Flag to disable HS without disabling federation
2018-08-09 10:41:43 +00:00
Neil Johnson
a5ef110749 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_sync_block 2018-08-09 11:40:37 +01:00
Erik Johnston
5c6226707d Update docs/workers.rst 2018-08-09 10:37:42 +01:00
Erik Johnston
8876ce7f77 Newsfile 2018-08-09 10:37:42 +01:00
Erik Johnston
b179537f2a Move clean_room_for_join to master 2018-08-09 10:37:38 +01:00
Erik Johnston
72d1902bbe Fixup doc comments 2018-08-09 10:23:49 +01:00
Amber Brown
984376745b Merge branch 'master' into develop 2018-08-09 19:23:47 +10:00
Erik Johnston
5785b93711 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_federation 2018-08-09 10:16:16 +01:00
Erik Johnston
bf7598f582 Log when we 3pid/unbind request fails 2018-08-09 10:09:56 +01:00
Erik Johnston
2bdafaf3c1 Merge pull request #3632 from matrix-org/erikj/refactor_repl_servlet
Add helper base class for generating new replication endpoints
2018-08-09 10:06:23 +01:00
Erik Johnston
62564797f5 Fixup wording and remove dead code 2018-08-09 09:56:10 +01:00
Amber Brown
2511f3f8a0 Test fixes for Python 3 (#3647) 2018-08-09 12:22:01 +10:00
Richard van der Hoff
bb89c84614 Merge pull request #3664 from matrix-org/rav/federation_metrics
more metrics for the federation and appservice senders
2018-08-08 23:16:58 +01:00
Jeroen
d5c0ce4cad updated docstring for ServerContextFactory 2018-08-08 19:25:01 +02:00
Neil Johnson
e92fb00f32 sync auth blocking 2018-08-08 17:54:49 +01:00
Neil Johnson
839a317c96 fix pep8 too many lines 2018-08-08 17:39:04 +01:00
Neil Johnson
5298d79fb5 Merge branch 'develop' into neilj/disable_hs 2018-08-08 16:13:03 +00:00
Richard van der Hoff
8521ae13e3 Merge pull request #3654 from matrix-org/rav/room_versions
Support for room versioning
2018-08-08 17:10:53 +01:00
Neil Johnson
d2f3ef98ac Merge branch 'develop' into neilj/disable_hs 2018-08-08 15:55:47 +00:00
Neil Johnson
54685d294d Ability to disable client/server Synapse via conf toggle 2018-08-08 15:38:54 +01:00
Neil Johnson
cc187debf3 Merge pull request #3662 from matrix-org/neilj/reserved_users
Reserved users for MAU limits
2018-08-08 14:33:17 +00:00
Neil Johnson
2b5baebeba Merge branch 'develop' of github.com:matrix-org/synapse into neilj/disable_hs 2018-08-08 13:47:07 +01:00
Neil Johnson
be59910b93 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/reserved_users 2018-08-08 13:45:21 +01:00
Neil Johnson
990fe9fc23 Merge pull request #3633 from matrix-org/neilj/mau_tracker
API for monthly_active_users table
2018-08-08 12:44:15 +00:00
Neil Johnson
312ae74746 typos 2018-08-08 13:33:16 +01:00
Neil Johnson
d92675a486 Ability to whitelist specific threepids against monthly active user limiting 2018-08-08 12:06:42 +01:00
Erik Johnston
360ba89c50 Don't fail requests to unbind 3pids for non supporting ID servers
Older identity servers may not support the unbind 3pid request, so we
shouldn't fail the requests if we received one of 400/404/501. The
request still fails if we receive e.g. 500 responses, allowing clients
to retry requests on transient identity server errors that otherwise do
support the API.

Fixes #3661
2018-08-08 12:06:18 +01:00
Neil Johnson
7f3d897e7a mock config.max_mau_value 2018-08-08 11:46:23 +01:00
Erik Johnston
bebe325e6c Rename POST param to METHOD 2018-08-08 10:36:18 +01:00
Erik Johnston
5011417632 Fixup logging and docstrings 2018-08-08 10:29:58 +01:00
Richard van der Hoff
e6d73b8582 changelog 2018-08-07 22:14:35 +01:00
Richard van der Hoff
bab94da79c fix metric name 2018-08-07 22:11:45 +01:00
Richard van der Hoff
53bca4690b more metrics for the federation and appservice senders 2018-08-07 19:09:48 +01:00
Neil Johnson
ef3589063a prevent total number of reserved users being too large 2018-08-07 17:59:27 +01:00
Neil Johnson
e8eba2b4e3 implement reserved users for mau limits 2018-08-07 17:49:43 +01:00
Richard van der Hoff
e5d2c67844 Merge pull request #3658 from matrix-org/rav/fix_event_persisted_position_metrics
Fix occasional glitches in the synapse_event_persisted_position metric
2018-08-07 14:48:08 +01:00
Richard van der Hoff
3523f5432a Don't expose default_room_version as config opt 2018-08-07 12:51:57 +01:00
Richard van der Hoff
9b92720d88 fix event lag graph 2018-08-07 12:27:34 +01:00
Richard van der Hoff
865d07cd38 changelog 2018-08-07 10:02:01 +01:00
Richard van der Hoff
ca9bc1f4fe Fix occasional glitches in the synapse_event_persisted_position metric
Every so often this metric glitched to a negative number. I'm assuming it was
due to backfilled events.
2018-08-07 10:00:03 +01:00
Neil Johnson
a74b25faaa WIP building out mau reserved users 2018-08-06 23:25:25 +01:00
Neil Johnson
fbe255f9a4 add default mau_limits_reserved_threepids 2018-08-06 23:24:54 +01:00
Neil Johnson
7daa8a78c5 load mau limit threepids 2018-08-06 22:55:05 +01:00
Neil Johnson
89834d9a29 Merge branch 'neilj/mau_tracker' of github.com:matrix-org/synapse into neilj/disable_hs 2018-08-06 21:56:56 +01:00
Neil Johnson
e54794f5b6 Fix postgres compatibility bug 2018-08-06 21:55:54 +01:00
Neil Johnson
7bcf126b18 Merge branch 'neilj/mau_tracker' of github.com:matrix-org/synapse into neilj/disable_hs 2018-08-06 21:39:44 +01:00
Neil Johnson
1911c037cb update comments to reflect new sig 2018-08-06 18:01:46 +01:00
Neil Johnson
33bd07d062 Merge branch 'neilj/mau_tracker' of github.com:matrix-org/synapse into neilj/disable_hs 2018-08-06 17:52:28 +01:00
Neil Johnson
16d78be315 make use of _simple_select_one_onecol, improved comments 2018-08-06 17:51:15 +01:00
Richard van der Hoff
7f3f108561 changelog 2018-08-06 16:30:52 +01:00
Richard van der Hoff
19a17068f1 Check m.room.create for sane room_versions 2018-08-06 16:11:24 +01:00
Erik Johnston
96a9a29645 Pull in necessary stores in federation_reader 2018-08-06 15:23:57 +01:00
Erik Johnston
62ace05c45 Move necessary storage functions to worker classes 2018-08-06 15:23:38 +01:00
Erik Johnston
1e2bed9656 Import all functions from TransactionStore 2018-08-06 15:23:38 +01:00
Erik Johnston
a3f5bf79a0 Add EDU/query handling over replication 2018-08-06 15:23:31 +01:00
Erik Johnston
e26dbd82ef Add replication APIs for persisting federation events 2018-08-06 15:02:28 +01:00
Erik Johnston
051a99c400 Fix isort 2018-08-06 14:29:31 +01:00
Richard van der Hoff
f900d50824 include known room versions in outgoing make_joins 2018-08-06 13:45:37 +01:00
Neil Johnson
42c6823827 disable HS from config 2018-08-04 22:07:04 +01:00
Neil Johnson
e40a510fbf py3 fix 2018-08-03 23:19:13 +01:00
Neil Johnson
d08296f9f2 remove unused import 2018-08-03 23:00:16 +01:00
Neil Johnson
886be75ad1 bug fixes 2018-08-03 22:29:03 +01:00
Neil Johnson
e10830e976 wip commit - tests failing 2018-08-03 17:55:50 +01:00
Richard van der Hoff
3777fa26aa sanity check response from make_join 2018-08-03 16:08:32 +01:00
Richard van der Hoff
0d63d93ca8 Enforce compatibility when processing make_join requests
Reject make_join requests from servers which do not support the room version.

Also include the room version in the response.
2018-08-03 16:08:32 +01:00
Richard van der Hoff
0ca459ea33 Basic support for room versioning
This is the first tranche of support for room versioning. It includes:
 * setting the default room version in the config file
 * new room_version param on the createRoom API
 * storing the version of newly-created rooms in the m.room.create event
 * fishing the version of existing rooms out of the m.room.create event
2018-08-03 16:08:32 +01:00
Richard van der Hoff
15c1ae45e5 Docstrings for BaseFederationServlet
... to save me reverse-engineering this stuff again.
2018-08-03 16:08:32 +01:00
Neil Johnson
5593ff6773 fix (lots of) py3 test failures 2018-08-03 14:59:17 +01:00
Neil Johnson
b2aab04d2c fix py3 test failure 2018-08-03 14:12:56 +01:00
Neil Johnson
da90337d89 Add ability to limit number of monthly active users on the server 2018-08-03 14:05:20 +01:00
Neil Johnson
950807d93a fix caching and tests 2018-08-03 13:49:53 +01:00
Neil Johnson
897c51d274 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_tracker 2018-08-03 13:40:47 +01:00
Erik Johnston
cb298ff623 Merge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_repl_servlet 2018-08-03 09:25:15 +01:00
Neil Johnson
c0affa7b4f update generate_monthly_active_users, and reap_monthly_active_users 2018-08-02 23:03:01 +01:00
Neil Johnson
5e2f7b8084 typo 2018-08-02 22:47:48 +01:00
Neil Johnson
4ecb4bdac9 wip attempt at caching 2018-08-02 22:41:05 +01:00
Neil Johnson
74b1d46ad9 do mau checks based on monthly_active_users table 2018-08-02 16:57:35 +01:00
Neil Johnson
9180061b49 remove unused count_monthly_users 2018-08-02 15:55:29 +01:00
Neil Johnson
c4ffbecb68 fix test, update constructor call 2018-08-02 13:51:05 +01:00
Neil Johnson
00f99f74b1 insertion into monthly_active_users 2018-08-02 13:47:19 +01:00
Neil Johnson
4a6725d9d1 Merge branch 'neilj/mau_tracker' of github.com:matrix-org/synapse into neilj/mau_tracker 2018-08-02 11:04:18 +01:00
Neil Johnson
165e067033 Revert "change monthly_active_users table to be a single column"
This reverts commit ec716a35b2.
2018-08-02 10:59:58 +01:00
Neil Johnson
08281fe6b7 self.db_conn unused 2018-08-01 23:26:24 +01:00
Neil Johnson
c21d82bab3 normalise reaping query 2018-08-01 23:24:38 +01:00
Neil Johnson
ec716a35b2 change monthly_active_users table to be a single column 2018-08-01 17:54:37 +01:00
Neil Johnson
d766f26de9 Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_tracker 2018-08-01 17:49:41 +01:00
Neil Johnson
4e5ac901dd clean up 2018-08-01 12:03:57 +01:00
Neil Johnson
f9f5559971 fix comment 2018-08-01 12:03:42 +01:00
Neil Johnson
6ef983ce5c api into monthly_active_users table 2018-07-31 16:36:24 +01:00
Erik Johnston
16bd63f32f Newsfile 2018-07-31 14:48:43 +01:00
Erik Johnston
443da003bc Use new helper base class for membership requests 2018-07-31 14:32:23 +01:00
Erik Johnston
729b672823 Use new helper base class for ReplicationSendEventRestServlet 2018-07-31 14:32:23 +01:00
Erik Johnston
d81602b75a Add helper base class for generating new replication endpoints
This will hopefully reduce the boiler plate required to implement new
internal HTTP requests.
2018-07-31 14:32:20 +01:00
Jeroen
2903e65aff fix isort 2018-07-29 19:47:08 +02:00
Jeroen
8e3f75b39a fix accidental removal of hs 2018-07-27 12:17:31 +02:00
Richard van der Hoff
7041cd872b Merge branch 'develop' into send_sni_for_federation_requests 2018-07-27 09:17:11 +01:00
Jeroen
505530f36a Merge remote-tracking branch 'upstream/develop' into send_sni_for_federation_requests
# Conflicts:
#	synapse/crypto/context_factory.py
2018-07-14 20:24:46 +02:00
Jeroen
b5e157d895 Merge branch 'develop' into send_sni_for_federation_requests
# Conflicts:
#	synapse/http/endpoint.py
2018-07-09 08:51:11 +02:00
Jeroen
95341a8f6f take idna implementation from twisted 2018-06-26 21:15:14 +02:00
Jeroen
26651b0d6a towncrier changelog 2018-06-26 21:10:45 +02:00
Jeroen
b7f34ee348 allow self-signed certificates 2018-06-26 20:41:05 +02:00
Jeroen
07b4f88de9 formatting changes for pep8 2018-06-25 12:31:16 +02:00
Jeroen
3d605853c8 send SNI for federation requests 2018-06-24 22:38:43 +02:00
192 changed files with 4287 additions and 3311 deletions

48
.circleci/config.yml Normal file
View File

@@ -0,0 +1,48 @@
version: 2
jobs:
sytestpy2:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy2postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy3:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- store_artifacts:
path: ~/project/logs
destination: logs
sytestpy3postgres:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
workflows:
version: 2
build:
jobs:
- sytestpy2
- sytestpy2postgres
# Currently broken while the Python 3 port is incomplete
# - sytestpy3
# - sytestpy3postgres

View File

@@ -3,3 +3,6 @@ Dockerfile
.gitignore
demo/etc
tox.ini
synctl
.git/*
.tox/*

View File

@@ -8,6 +8,9 @@ before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
services:
- postgresql
matrix:
fast_finish: true
include:
@@ -20,6 +23,9 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
- python: 3.6
env: TOX_ENV=py36
@@ -29,6 +35,10 @@ matrix:
- python: 3.6
env: TOX_ENV=check-newsfragment
allow_failures:
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
install:
- pip install tox

View File

@@ -36,3 +36,4 @@ recursive-include changelog.d *
prune .github
prune demo/etc
prune docker
prune .circleci

1
changelog.d/1491.feature Normal file
View File

@@ -0,0 +1 @@
Add support for the SNI extension to federation TLS connections

1
changelog.d/3423.misc Normal file
View File

@@ -0,0 +1 @@
The test suite now can run under PostgreSQL.

1
changelog.d/3632.misc Normal file
View File

@@ -0,0 +1 @@
Refactor HTTP replication endpoints to reduce code duplication

1
changelog.d/3633.feature Normal file
View File

@@ -0,0 +1 @@
Add ability to limit number of monthly active users on the server

1
changelog.d/3647.misc Normal file
View File

@@ -0,0 +1 @@
Tests now correctly execute on Python 3.

1
changelog.d/3653.feature Normal file
View File

@@ -0,0 +1 @@
Support more federation endpoints on workers

1
changelog.d/3654.feature Normal file
View File

@@ -0,0 +1 @@
Basic support for room versioning

1
changelog.d/3655.feature Normal file
View File

@@ -0,0 +1 @@
Ability to disable client/server Synapse via conf toggle

1
changelog.d/3658.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix occasional glitches in the synapse_event_persisted_position metric

1
changelog.d/3660.misc Normal file
View File

@@ -0,0 +1 @@
Sytests can now be run inside a Docker container.

1
changelog.d/3661.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug on deleting 3pid when using identity servers that don't support unbind API

1
changelog.d/3662.feature Normal file
View File

@@ -0,0 +1 @@
Ability to whitelist specific threepids against monthly active user limiting

1
changelog.d/3664.feature Normal file
View File

@@ -0,0 +1 @@
Add some metrics for the appservice and federation event sending loops

1
changelog.d/3669.misc Normal file
View File

@@ -0,0 +1 @@
Update docker base image from alpine 3.7 to 3.8.

1
changelog.d/3670.feature Normal file
View File

@@ -0,0 +1 @@
Where server is disabled, block ability for locked out users to read new messages

1
changelog.d/3676.bugfix Normal file
View File

@@ -0,0 +1 @@
Make the tests pass on Twisted < 18.7.0

1
changelog.d/3677.bugfix Normal file
View File

@@ -0,0 +1 @@
Dont ship recaptcha_ajax.js, use it directly from Google

1
changelog.d/3678.misc Normal file
View File

@@ -0,0 +1 @@
Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7.

1
changelog.d/3679.misc Normal file
View File

@@ -0,0 +1 @@
Synapse's tests are now formatted with the black autoformatter.

1
changelog.d/3681.bugfix Normal file
View File

@@ -0,0 +1 @@
Fixes test_reap_monthly_active_users so it passes under postgres

1
changelog.d/3684.misc Normal file
View File

@@ -0,0 +1 @@
Implemented a new testing base class to reduce test boilerplate.

1
changelog.d/3687.feature Normal file
View File

@@ -0,0 +1 @@
set admin uri via config, to be used in error messages where the user should contact the administrator

1
changelog.d/3690.misc Normal file
View File

@@ -0,0 +1 @@
Rename MAU prometheus metrics

1
changelog.d/3692.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users

View File

@@ -54,7 +54,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1533026624326,
"iteration": 1533598785368,
"links": [
{
"asDropdown": true,
@@ -4629,7 +4629,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 11
"y": 29
},
"id": 67,
"legend": {
@@ -4655,11 +4655,11 @@
"steppedLine": false,
"targets": [
{
"expr": " synapse_event_persisted_position{instance=\"$instance\"} - ignoring(index, job, name) group_right(instance) synapse_event_processing_positions{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
"expr": " synapse_event_persisted_position{instance=\"$instance\",job=\"synapse\"} - ignoring(index, job, name) group_right() synapse_event_processing_positions{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{job}}-{{index}}",
"legendFormat": "{{job}}-{{index}} ",
"refId": "A"
}
],
@@ -4697,7 +4697,11 @@
"min": null,
"show": true
}
]
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
@@ -4710,7 +4714,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 11
"y": 29
},
"id": 71,
"legend": {
@@ -4778,7 +4782,11 @@
"min": null,
"show": true
}
]
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Event processing loop positions",
@@ -4957,5 +4965,5 @@
"timezone": "",
"title": "Synapse",
"uid": "000000012",
"version": 125
"version": 127
}

View File

@@ -1,4 +1,4 @@
FROM docker.io/python:2-alpine3.7
FROM docker.io/python:2-alpine3.8
RUN apk add --no-cache --virtual .nacl_deps \
build-base \

View File

@@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
^/_matrix/federation/v1/query/
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
instance.
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -213,7 +213,7 @@ class Auth(object):
default=[b""]
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
yield self.store.insert_client_ip(
user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
@@ -773,3 +773,33 @@ class Auth(object):
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks
def check_auth_blocking(self, user_id=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
"""
if self.hs.config.hs_disabled:
raise AuthError(
403, self.hs.config.hs_disabled_message,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
admin_uri=self.hs.config.admin_uri,
)
if self.hs.config.limit_usage_by_mau is True:
# If the user is already part of the MAU cohort
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise AuthError(
403, "Monthly Active User Limits AU Limit Exceeded",
admin_uri=self.hs.config.admin_uri,
errcode=Codes.RESOURCE_LIMIT_EXCEEDED
)

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -94,3 +95,11 @@ class RoomCreationPreset(object):
class ThirdPartyEntityKind(object):
USER = "user"
LOCATION = "location"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = "1"
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -55,7 +56,9 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
class CodeMessageException(RuntimeError):
@@ -221,11 +224,16 @@ class NotFoundError(SynapseError):
class AuthError(SynapseError):
"""An error raised when there was a problem authorising an event."""
def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
self.admin_uri = admin_uri
super(AuthError, self).__init__(code, msg, errcode=errcode)
def __init__(self, *args, **kwargs):
if "errcode" not in kwargs:
kwargs["errcode"] = Codes.FORBIDDEN
super(AuthError, self).__init__(*args, **kwargs)
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
admin_uri=self.admin_uri,
)
class EventSizeError(SynapseError):
@@ -285,6 +293,27 @@ class LimitExceededError(SynapseError):
)
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""
def __init__(self, room_version):
super(IncompatibleRoomVersionError, self).__init__(
code=400,
msg="Your homeserver does not support the features required to "
"join this room",
errcode=Codes.INCOMPATIBLE_ROOM_VERSION,
)
self._room_version = room_version
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
room_version=self._room_version,
)
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
""" Utility method for constructing an error response for client-server
interactions.

View File

@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
@@ -168,11 +168,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
@@ -174,11 +174,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
SlavedPushRuleStore,
SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
):
pass
@@ -143,11 +153,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
@@ -186,11 +186,13 @@ def start(config_options):
config.send_federation = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = FederationSenderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -208,11 +208,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
def setup(config_options):
@@ -338,6 +338,7 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -346,6 +347,7 @@ def setup(config_options):
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
@@ -519,17 +521,26 @@ def run(hs):
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
)
@defer.inlineCallbacks
def generate_monthly_active_users():
count = 0
if hs.config.limit_usage_by_mau:
count = yield hs.get_datastore().count_monthly_users()
count = yield hs.get_datastore().get_monthly_active_count()
current_mau_gauge.set(float(count))
max_mau_value_gauge.set(float(hs.config.max_mau_value))
max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids
)
generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")

View File

@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
@@ -155,11 +155,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -214,11 +214,13 @@ def start(config_options):
config.update_user_directory = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,

View File

@@ -193,9 +193,8 @@ def setup_logging(config, use_worker_options=False):
def sighup(signum, stack):
# it might be better to use a file watcher or something for this.
logging.info("Reloading log config from %s due to SIGHUP",
log_config)
load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config()

View File

@@ -69,12 +69,23 @@ class ServerConfig(Config):
# Options to control access by tracking MAU
self.limit_usage_by_mau = config.get("limit_usage_by_mau", False)
self.max_mau_value = 0
if self.limit_usage_by_mau:
self.max_mau_value = config.get(
"max_mau_value", 0,
)
else:
self.max_mau_value = 0
self.mau_limits_reserved_threepids = config.get(
"mau_limit_reserved_threepids", []
)
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
# Admin uri to direct users at should their instance become blocked
# due to resource constraints
self.admin_uri = config.get("admin_uri", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
federation_domain_whitelist = config.get(

View File

@@ -11,19 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from zope.interface import implementer
from OpenSSL import SSL, crypto
from twisted.internet import ssl
from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions, ContextFactory
from twisted.python.failure import Failure
logger = logging.getLogger(__name__)
class ServerContextFactory(ssl.ContextFactory):
class ServerContextFactory(ContextFactory):
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
connections and to make connections to remote servers."""
connections."""
def __init__(self, config):
self._context = SSL.Context(SSL.SSLv23_METHOD)
@@ -48,3 +51,78 @@ class ServerContextFactory(ssl.ContextFactory):
def getContext(self):
return self._context
def _idnaBytes(text):
"""
Convert some text typed by a human into some ASCII bytes. This is a
copy of twisted.internet._idna._idnaBytes. For documentation, see the
twisted documentation.
"""
try:
import idna
except ImportError:
return text.encode("idna")
else:
return idna.encode(text)
def _tolerateErrors(wrapped):
"""
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
the error is immediately logged and the connection is dropped if possible.
This is a copy of twisted.internet._sslverify._tolerateErrors. For
documentation, see the twisted documentation.
"""
def infoCallback(connection, where, ret):
try:
return wrapped(connection, where, ret)
except: # noqa: E722, taken from the twisted implementation
f = Failure()
logger.exception("Error during info_callback")
connection.get_app_data().failVerification(f)
return infoCallback
@implementer(IOpenSSLClientConnectionCreator)
class ClientTLSOptions(object):
"""
Client creator for TLS without certificate identity verification. This is a
copy of twisted.internet._sslverify.ClientTLSOptions with the identity
verification left out. For documentation, see the twisted documentation.
"""
def __init__(self, hostname, ctx):
self._ctx = ctx
self._hostname = hostname
self._hostnameBytes = _idnaBytes(hostname)
ctx.set_info_callback(
_tolerateErrors(self._identityVerifyingInfoCallback)
)
def clientConnectionForTLS(self, tlsProtocol):
context = self._ctx
connection = SSL.Connection(context, None)
connection.set_app_data(tlsProtocol)
return connection
def _identityVerifyingInfoCallback(self, connection, where, ret):
if where & SSL.SSL_CB_HANDSHAKE_START:
connection.set_tlsext_host_name(self._hostnameBytes)
class ClientTLSOptionsFactory(object):
"""Factory for Twisted ClientTLSOptions that are used to make connections
to remote servers for federation."""
def __init__(self, config):
# We don't use config options yet
pass
def get_options(self, host):
return ClientTLSOptions(
host.decode('utf-8'),
CertificateOptions(verify=False).getContext()
)

View File

@@ -30,14 +30,14 @@ KEY_API_V1 = b"/_matrix/key/v1/"
@defer.inlineCallbacks
def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
"""Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory()
factory.path = path
factory.host = server_name
endpoint = matrix_federation_endpoint(
reactor, server_name, ssl_context_factory, timeout=30
reactor, server_name, tls_client_options_factory, timeout=30
)
for i in range(5):

View File

@@ -512,7 +512,7 @@ class Keyring(object):
continue
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory,
server_name, self.hs.tls_client_options_factory,
path=(b"/_matrix/key/v2/server/%s" % (
urllib.quote(requested_key_id),
)).encode("ascii"),
@@ -655,7 +655,7 @@ class Keyring(object):
# Try to fetch the key from the remote server.
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_server_context_factory
server_name, self.hs.tls_client_options_factory
)
# Check the response.

View File

@@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.types import UserID, get_domain_from_id
@@ -83,6 +83,14 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Creation event's room_id domain does not match sender's"
)
room_version = event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
raise AuthError(
403,
"room appears to have unsupported version %s" % (
room_version,
))
# FIXME
logger.debug("Allowing! %s", event)
return

View File

@@ -25,7 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
@@ -518,10 +518,10 @@ class FederationClient(FederationBase):
description, destination, exc_info=1,
)
raise RuntimeError("Failed to %s via any server", description)
raise RuntimeError("Failed to %s via any server" % (description, ))
def make_membership_event(self, destinations, room_id, user_id, membership,
content={},):
content, params):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -537,8 +537,10 @@ class FederationClient(FederationBase):
user_id (str): The user whose membership is being evented.
membership (str): The "membership" property of the event. Must be
one of "join" or "leave".
content (object): Any additional data to put into the content field
content (dict): Any additional data to put into the content field
of the event.
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return:
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
@@ -558,10 +560,12 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
destination, room_id, user_id, membership
destination, room_id, user_id, membership, params,
)
pdu_dict = ret["event"]
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
@@ -605,6 +609,26 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable.
"""
def check_authchain_validity(signed_auth_chain):
for e in signed_auth_chain:
if e.type == EventTypes.Create:
create_event = e
break
else:
raise InvalidResponseError(
"no %s in auth chain" % (EventTypes.Create,),
)
# the room version should be sane.
room_version = create_event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
# This shouldn't be possible, because the remote server should have
# rejected the join attempt during make_join.
raise InvalidResponseError(
"room appears to have unsupported version %s" % (
room_version,
))
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
@@ -661,7 +685,7 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
auth_chain.sort(key=lambda e: e.depth)
check_authchain_validity(signed_auth)
defer.returnValue({
"state": signed_state,

View File

@@ -27,14 +27,24 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
from synapse.api.errors import (
AuthError,
FederationError,
IncompatibleRoomVersionError,
NotFoundError,
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
@@ -61,8 +71,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self._server_linearizer = async.Linearizer("fed_server")
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
self.transaction_actions = TransactionActions(self.store)
@@ -194,7 +204,7 @@ class FederationServer(FederationBase):
event_id, f.getTraceback().rstrip(),
)
yield async.concurrently_execute(
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
TRANSACTION_CONCURRENCY_LIMIT,
)
@@ -323,12 +333,21 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id):
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id)
if room_version not in supported_versions:
logger.warn("Room version %s not in %s", room_version, supported_versions)
raise IncompatibleRoomVersionError(room_version=room_version)
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
defer.returnValue({
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
})
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
@@ -745,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
logger.info("Registering federation EDU handler for %r", edu_type)
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@@ -763,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
logger.info("Registering federation query handler for %r", query_type)
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@@ -785,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""
def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
super(ReplicationFederationHandlerRegistry, self).__init__()
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content,
)
return self._send_edu(
edu_type=edu_type,
origin=origin,
content=content,
)
def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return handler(args)
return self._get_query_client(
query_type=query_type,
args=args,
)

View File

@@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
sent_edus_counter,
sent_transactions_counter,
@@ -253,7 +255,13 @@ class TransactionQueue(object):
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc(len(events))
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)

View File

@@ -195,7 +195,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def make_membership_event(self, destination, room_id, user_id, membership):
def make_membership_event(self, destination, room_id, user_id, membership, params):
"""Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs.
@@ -205,6 +205,8 @@ class TransportLayerClient(object):
room_id (str): room to join/leave
user_id (str): user to be joined/left
membership (str): one of join/leave
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
@@ -241,6 +243,7 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
args=params,
retry_on_dns_fail=retry_on_dns_fail,
timeout=20000,
ignore_backoff=ignore_backoff,

View File

@@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
class BaseFederationServlet(object):
"""Abstract base class for federation servlet classes.
The servlet object should have a PATH attribute which takes the form of a regexp to
match against the request path (excluding the /federation/v1 prefix).
The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
the appropriate HTTP method. These methods have the signature:
on_<METHOD>(self, origin, content, query, **kwargs)
With arguments:
origin (unicode|None): The authenticated server_name of the calling server,
unless REQUIRE_AUTH is set to False and authentication failed.
content (unicode|None): decoded json body of the request. None if the
request was a GET.
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
(ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
yet.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
Raises:
SynapseError: to return an error code
Exception: other exceptions will be caught, logged, and a 500 will be
returned.
"""
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
@@ -204,6 +239,18 @@ class BaseFederationServlet(object):
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
""" A callback which can be passed to HttpServer.RegisterPaths
Args:
request (twisted.web.http.Request):
*args: unused?
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in ["PUT", "POST"]:
# TODO: Handle other method types? other content types?
@@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
def on_GET(self, origin, _content, query, context, user_id):
"""
Args:
origin (unicode): The authenticated server_name of the calling server
_content (None): (GETs don't have bodies)
query (dict[bytes, list[bytes]]): Query params from the request.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
"""
versions = query.get(b'ver')
if versions is not None:
supported_versions = [v.decode("utf-8") for v in versions]
else:
supported_versions = ["1"]
content = yield self.handler.on_make_join_request(
origin, context, user_id,
supported_versions=supported_versions,
)
defer.returnValue((200, content))

View File

@@ -23,6 +23,10 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -136,6 +140,12 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"appservice_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("appservice_sender").inc()
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(

View File

@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -734,7 +734,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
auth_api = self.hs.get_auth()
user_id = None
try:
@@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
"""Attempts to unbind the 3pid on the identity servers and deletes it
from the local database.
Args:
user_id (str)
medium (str)
address (str)
Returns:
Deferred[bool]: Returns True if successfully unbound the 3pid on
the identity server, False if identity server doesn't support the
unbind API.
"""
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
yield identity_handler.unbind_threepid(
result = yield identity_handler.try_unbind_threepid(
user_id,
{
'medium': medium,
@@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
},
)
ret = yield self.store.user_delete_threepid(
yield self.store.user_delete_threepid(
user_id, medium, address,
)
defer.returnValue(ret)
defer.returnValue(result)
def _save_session(self, session):
# TODO: Persistent storage
@@ -907,19 +921,6 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
@defer.inlineCallbacks
def _check_mau_limits(self):
"""
Ensure that if mau blocking is enabled that invalid users cannot
log in.
"""
if self.hs.config.limit_usage_by_mau is True:
current_mau = yield self.store.count_monthly_users()
if current_mau >= self.hs.config.max_mau_value:
raise AuthError(
403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
)
@attr.s
class MacaroonGenerator(object):

View File

@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred
Deferred[bool]: True if identity server supports removing
threepids, otherwise False.
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
# This will be set to false if the identity server doesn't support
# unbinding
identity_server_supports_unbinding = True
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
yield self._identity_handler.unbind_threepid(
result = yield self._identity_handler.try_unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
identity_server_supports_unbinding &= result
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
defer.returnValue(identity_server_supports_unbinding)
def _start_user_parting(self):
"""
Start the process that goes through the table of users

View File

@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination

View File

@@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RejectedReason,
)
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -44,10 +49,15 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
@@ -86,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._send_events_to_master = (
ReplicationFederationSendEventsRestServlet.make_client(hs)
)
self._notify_user_membership_change = (
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
)
self._clean_room_for_join_client = (
ReplicationCleanRoomRestServlet.make_client(hs)
)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -922,6 +944,9 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
params={
"ver": KNOWN_ROOM_VERSIONS,
},
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -1150,7 +1175,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1181,19 +1206,20 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={},):
content={}, params=None):
origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1423,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@@ -1461,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
yield self._persist_events(
yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1549,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self._persist_events(
yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1560,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@@ -2288,7 +2314,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
response = yield self.hs.get_simple_http_client().get_json(
response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2301,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2313,14 +2339,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if self.config.worker_app:
yield self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled
)
else:
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2359,9 +2392,25 @@ class FederationHandler(BaseHandler):
)
def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
room_id (str)
"""
if self.config.worker_app:
return self._clean_room_for_join_client(room_id)
else:
return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)
if self.config.worker_app:
return self._notify_user_membership_change(
room_id=room_id,
user_id=user.to_string(),
change="joined",
)
else:
return user_joined_room(self.distributor, user, room_id)

View File

@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def unbind_threepid(self, mxid, threepid):
"""
Removes a binding from an identity server
def try_unbind_threepid(self, mxid, threepid):
"""Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
Raises:
SynapseError: If we failed to contact the identity server
Returns:
Deferred[bool]: True on success, otherwise False
Deferred[bool]: True on success, otherwise False if the identity
server doesn't support unbinding
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
content=content,
destination_is=id_server,
)
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
try:
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
except HttpResponseException as e:
if e.code in (400, 404, 501,):
# The remote server probably doesn't support unbinding (yet)
logger.warn("Received %d response while unbinding threepid", e.code)
defer.returnValue(False)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(502, "Failed to contact identity server")
defer.returnValue(True)
@defer.inlineCallbacks

View File

@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client

View File

@@ -30,9 +30,9 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
@@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -559,12 +559,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
clock=self.hs.get_clock(),
yield self.send_event_to_master(
event_id=event.event_id,
store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,

View File

@@ -22,7 +22,7 @@ from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
from synapse.util.async import ReadWriteLock
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client

View File

@@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
@@ -95,6 +95,7 @@ class PresenceHandler(object):
Args:
hs (synapse.server.HomeServer):
"""
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -230,6 +231,10 @@ class PresenceHandler(object):
earlier than they should when synapse is restarted. This affect of this
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)

View File

@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler

View File

@@ -28,7 +28,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -144,7 +144,8 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -289,7 +290,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -439,7 +440,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
yield self._check_mau_limits()
yield self.auth.check_auth_blocking()
need_register = True
try:
@@ -533,16 +534,3 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
@defer.inlineCallbacks
def _check_mau_limits(self):
"""
Do not accept registrations if monthly active user limits exceeded
and limiting is enabled
"""
if self.hs.config.limit_usage_by_mau is True:
current_mau = yield self.store.count_monthly_users()
if current_mau >= self.hs.config.max_mau_value:
raise RegistrationError(
403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED
)

View File

@@ -21,9 +21,17 @@ import math
import string
from collections import OrderedDict
from six import string_types
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.constants import (
DEFAULT_ROOM_VERSION,
KNOWN_ROOM_VERSIONS,
EventTypes,
JoinRules,
RoomCreationPreset,
)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -99,6 +107,21 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
if not isinstance(room_version, string_types):
raise SynapseError(
400,
"room_version must be a string",
Codes.BAD_JSON,
)
if room_version not in KNOWN_ROOM_VERSIONS:
raise SynapseError(
400,
"Your homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
@@ -184,6 +207,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(

View File

@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache

View File

@@ -30,7 +30,7 @@ import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)

View File

@@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
get_or_register_3pid_guest,
notify_user_membership_change,
remote_join,
remote_reject_invite,
ReplicationRegister3PIDGuestRestServlet as Repl3PID,
ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberWorkerHandler, self).__init__(hs)
self._get_register_3pid_client = Repl3PID.make_client(hs)
self._remote_join_client = ReplRemoteJoin.make_client(hs)
self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs)
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
ret = yield remote_join(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
ret = yield self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
return remote_reject_invite(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
return self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
return notify_user_membership_change(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
return notify_user_membership_change(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
return get_or_register_3pid_guest(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
return self._get_register_3pid_client(
requester=requester,
medium=medium,
address=address,

View File

@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
@@ -191,6 +191,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -198,19 +199,27 @@ class SyncHandler(object):
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
@defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
A Deferred SyncResult.
Deferred[SyncResult]
"""
return self.response_cache.wrap(
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
# auth_blocking will occur)
user_id = sync_config.user.to_string()
yield self.auth.check_auth_blocking(user_id)
res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
defer.returnValue(res)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,

View File

@@ -42,7 +42,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async import add_timeout_to_deferred
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable

View File

@@ -26,7 +26,6 @@ from twisted.names.error import DNSNameError, DomainError
logger = logging.getLogger(__name__)
SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
@@ -103,15 +102,16 @@ def parse_and_validate_server_name(server_name):
return host, port
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.
Args:
reactor: Twisted reactor.
destination (bytes): The name of the server to connect to.
ssl_context_factory (twisted.internet.ssl.ContextFactory): Factory
which generates SSL contexts to use for TLS.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
timeout (int): connection timeout in seconds
"""
@@ -122,13 +122,13 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
if timeout is not None:
endpoint_kw_args.update(timeout=timeout)
if ssl_context_factory is None:
if tls_client_options_factory is None:
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
ssl_context_factory,
tls_client_options_factory.get_options(host),
HostnameEndpoint(reactor, host, port, timeout=timeout))
default_port = 8448

View File

@@ -43,7 +43,7 @@ from synapse.api.errors import (
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async import add_timeout_to_deferred
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.logcontext import make_deferred_yieldable
logger = logging.getLogger(__name__)
@@ -61,14 +61,14 @@ MAX_SHORT_RETRIES = 3
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.tls_server_context_factory = hs.tls_server_context_factory
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
destination = uri.netloc
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.tls_server_context_factory
tls_client_options_factory=self.tls_client_options_factory
)
@@ -439,7 +439,7 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
timeout=None, ignore_backoff=False):
""" GETs some json from the given host homeserver and path
@@ -447,7 +447,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
args (dict|None): A dictionary used to create query strings, defaults to
None.
timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout and that the request will
@@ -702,6 +702,9 @@ def check_content_type_is_json(headers):
def encode_query_args(args):
if args is None:
return b""
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, string_types):

View File

@@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count",
"Event processing loop iterations",
["name"],
)
event_processing_loop_room_count = Counter(
"synapse_event_processing_loop_room_count",
"Rooms seen per event processing loop iteration",
["name"],
)
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])

View File

@@ -25,7 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async import (
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,

View File

@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.event_auth import get_user_power_level
from synapse.state import POWER_KEY
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
from synapse.util.caches.descriptors import cached

View File

@@ -35,7 +35,7 @@ from synapse.push.presentable_names import (
name_from_member_event,
)
from synapse.types import UserID
from synapse.util.async import concurrently_execute
from synapse.util.async_helpers import concurrently_execute
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)

View File

@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event
from synapse.replication.http import federation, membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)

View File

@@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import re
from six.moves import urllib
from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
class ReplicationEndpoint(object):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
(with an `/:txn_id` prefix for cached requests.), where NAME is a name,
PATH_ARGS are a tuple of parameters to be encoded in the URL.
For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
with `CACHE` set to true then this generates an endpoint:
/_synapse/replication/send_event/:event_id/:txn_id
For POST/PUT requests the payload is serialized to json and sent as the
body, while for GET requests the payload is added as query parameters. See
`_serialize_payload` for details.
Incoming requests are handled by overriding `_handle_request`. Servers
must call `register` to register the path with the HTTP server.
Requests can be sent by calling the client returned by `make_client`.
Attributes:
NAME (str): A name for the endpoint, added to the path as well as used
in logging and metrics.
PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
Adding parameters to the path (rather than payload) can make it
easier to follow along in the log files.
METHOD (str): The method of the HTTP request, defaults to POST. Can be
one of POST, PUT or GET. If GET then the payload is sent as query
parameters rather than a JSON body.
CACHE (bool): Whether server should cache the result of the request/
If true then transparently adds a txn_id to all requests, and
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
"""
__metaclass__ = abc.ABCMeta
NAME = abc.abstractproperty()
PATH_ARGS = abc.abstractproperty()
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
def __init__(self, hs):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME,
timeout_ms=30 * 60 * 1000,
)
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
kwargs) so that an appropriate exception is raised if the client is
called with unexpected parameters. All PATH_ARGS must appear in
argument list.
Returns:
Deferred[dict]|dict: If POST/PUT request then dictionary must be
JSON serialisable, otherwise must be appropriate for adding as
query args.
"""
return {}
@abc.abstractmethod
def _handle_request(self, request, **kwargs):
"""Handle incoming request.
This is called with the request object and PATH_ARGS.
Returns:
Deferred[dict]: A JSON serialisable dict to be used as response
body of request.
"""
pass
@classmethod
def make_client(cls, hs):
"""Create a client that makes requests.
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
host = hs.config.worker_replication_host
port = hs.config.worker_replication_http_port
client = hs.get_simple_http_client()
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)
if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host, port, cls.NAME, "/".join(url_args)
)
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield request_func(uri, data)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
raise
logger.warn("%s request timed out", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
return send_request
def register(self, http_server):
"""Called by the server to register this as a handler to the
appropriate path.
"""
url_args = list(self.PATH_ARGS)
handler = self._handle_request
method = self.METHOD
if self.CACHE:
handler = self._cached_handler
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (
self.NAME,
args
))
http_server.register_paths(method, [pattern], handler)
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
otherwise calls `_handle_request` and caches its response.
"""
# We just use the txn_id here, but we probably also want to use the
# other PATH_ARGS as well.
assert self.CACHE
return self.response_cache.wrap(
txn_id,
self._handle_request,
request, **kwargs
)

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"""Handles events newly received from federation, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/fed_send_events/:txn_id
{
"events": [{
"event": { .. serialized event .. },
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"""
NAME = "fed_send_events"
PATH_ARGS = ()
def __init__(self, hs):
super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
@defer.inlineCallbacks
def _serialize_payload(store, event_and_contexts, backfilled):
"""
Args:
store
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
"""
event_payloads = []
for event, context in event_and_contexts:
serialized_context = yield context.serialize(event, store)
event_payloads.append({
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
})
payload = {
"events": event_payloads,
"backfilled": backfilled,
}
defer.returnValue(payload)
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
backfilled = content["backfilled"]
event_payloads = content["events"]
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"],
)
event_and_contexts.append((event, context))
logger.info(
"Got %d events from federation",
len(event_and_contexts),
)
yield self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled,
)
defer.returnValue((200, {}))
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and
notifying.
Request format:
POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
{
"origin": ...,
"content: { ... }
}
"""
NAME = "fed_send_edu"
PATH_ARGS = ("edu_type",)
def __init__(self, hs):
super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(edu_type, origin, content):
return {
"origin": origin,
"content": content,
}
@defer.inlineCallbacks
def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)
origin = content["origin"]
edu_content = content["content"]
logger.info(
"Got %r edu from $s",
edu_type, origin,
)
result = yield self.registry.on_edu(edu_type, origin, edu_content)
defer.returnValue((200, result))
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
"""Handle responding to queries from federation.
Request format:
POST /_synapse/replication/fed_query/:query_type
{
"args": { ... }
}
"""
NAME = "fed_query"
PATH_ARGS = ("query_type",)
# This is a query, so let's not bother caching
CACHE = False
def __init__(self, hs):
super(ReplicationGetQueryRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
args (dict): The arguments received for the given query type
"""
return {
"args": args,
}
@defer.inlineCallbacks
def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)
args = content["args"]
logger.info(
"Got %r query",
query_type,
)
result = yield self.registry.on_query(query_type, args)
defer.returnValue((200, result))
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Request format:
POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
{}
"""
NAME = "fed_cleanup_room"
PATH_ARGS = ("room_id",)
def __init__(self, hs):
super(ReplicationCleanRoomRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
@staticmethod
def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
"""
return {}
@defer.inlineCallbacks
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)

View File

@@ -14,182 +14,63 @@
# limitations under the License.
import logging
import re
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def remote_join(client, host, port, requester, remote_room_hosts,
room_id, user_id, content):
"""Ask the master to do a remote join for the given user to the given room
class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
"""Does a remote join for the given user to the given room
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and join via
room_id (str)
user_id (str)
content (dict): The event content to use for the join event
Request format:
Returns:
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
POST /_synapse/replication/remote_join/:room_id/:user_id
payload = {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"room_id": room_id,
"user_id": user_id,
"content": content,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def remote_reject_invite(client, host, port, requester, remote_room_hosts,
room_id, user_id):
"""Ask master to reject the invite for the user and room.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and reject via
room_id (str)
user_id (str)
Returns:
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
payload = {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"room_id": room_id,
"user_id": user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def get_or_register_3pid_guest(client, host, port, requester,
medium, address, inviter_user_id):
"""Ask the master to get/create a guest account for given 3PID.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
{
"requester": ...,
"remote_room_hosts": [...],
"content": { ... }
}
"""
uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
payload = {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def notify_user_membership_change(client, host, port, user_id, room_id, change):
"""Notify master that a user has joined or left the room
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication.
user_id (str)
room_id (str)
change (str): Either "join" or "left"
Returns:
Deferred
"""
assert change in ("joined", "left")
uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
payload = {
"user_id": user_id,
"room_id": room_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationRemoteJoinRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
NAME = "remote_join"
PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
super(ReplicationRemoteJoinRestServlet, self).__init__()
super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
content):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and join via
content(dict): The event content to use for the join event
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"content": content,
}
@defer.inlineCallbacks
def on_POST(self, request):
def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
defer.returnValue((200, {}))
class ReplicationRemoteRejectInviteRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"""Rejects the invite for the user and room.
Request format:
POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
{
"requester": ...,
"remote_room_hosts": [...],
}
"""
NAME = "remote_reject_invite"
PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and reject via
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
}
@defer.inlineCallbacks
def on_POST(self, request):
def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
defer.returnValue((200, ret))
class ReplicationRegister3PIDGuestRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
"""Gets/creates a guest account for given 3PID.
Request format:
POST /_synapse/replication/get_or_register_3pid_guest/
{
"requester": ...,
"medium": ...,
"address": ...,
"inviter_user_id": ...
}
"""
NAME = "get_or_register_3pid_guest"
PATH_ARGS = ()
def __init__(self, hs):
super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, medium, address, inviter_user_id):
"""
Args:
requester(Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
"""
return {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
@defer.inlineCallbacks
def on_POST(self, request):
def _handle_request(self, request):
content = parse_json_object_from_request(request)
medium = content["medium"]
@@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
defer.returnValue((200, ret))
class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
Request format:
POST /_synapse/replication/membership_change/:room_id/:user_id/:change
{}
"""
NAME = "membership_change"
PATH_ARGS = ("room_id", "user_id", "change")
CACHE = False # No point caching as should return instantly.
def __init__(self, hs):
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
def on_POST(self, request, change):
content = parse_json_object_from_request(request)
@staticmethod
def _serialize_payload(room_id, user_id, change):
"""
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
"""
assert change in ("joined", "left",)
user_id = content["user_id"]
room_id = content["room_id"]
return {}
def _handle_request(self, request, room_id, user_id, change):
logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id)

View File

@@ -14,86 +14,26 @@
# limitations under the License.
import logging
import re
from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
host, port, event.event_id,
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield client.put_json(uri, payload)
break
except CodeMessageException as e:
if e.code != 504:
raise
logger.warn("send_event request timed out")
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationSendEventRestServlet(RestServlet):
class ReplicationSendEventRestServlet(ReplicationEndpoint):
"""Handles events newly created on workers, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/send_event/:event_id
POST /_synapse/replication/send_event/:event_id/:txn_id
{
"event": { .. serialized event .. },
@@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [],
}
"""
PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
NAME = "send_event"
PATH_ARGS = ("event_id",)
def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__()
super(ReplicationSendEventRestServlet, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
@staticmethod
@defer.inlineCallbacks
def _serialize_payload(event_id, store, event, context, requester,
ratelimit, extra_users):
"""
Args:
event_id (str)
store (DataStore)
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
def on_PUT(self, request, event_id):
return self.response_cache.wrap(
event_id,
self._handle_request,
request
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
defer.returnValue(payload)
@defer.inlineCallbacks
def _handle_request(self, request):
def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)

View File

@@ -44,8 +44,8 @@ class SlavedEventStore(EventFederationWorkerStore,
RoomMemberWorkerStore,
EventPushActionsWorkerStore,
StreamWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
EventsWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):

View File

@@ -13,19 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
from ._base import BaseSlavedStore
class TransactionStore(BaseSlavedStore):
get_destination_retry_timings = TransactionStore.__dict__[
"get_destination_retry_timings"
]
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
_set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
prep_send_transaction = DataStore.prep_send_transaction.__func__
delivered_txn = DataStore.delivered_txn.__func__
class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
pass

View File

@@ -17,7 +17,7 @@
to ensure idempotency when performing PUTs using the REST API."""
import logging
from synapse.util.async import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)

View File

@@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
yield self._deactivate_account_handler.deactivate_account(
result = yield self._deactivate_account_handler.deactivate_account(
target_user_id, erase,
)
defer.returnValue((200, {}))
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class ShutdownRoomRestServlet(ClientV1RestServlet):

View File

@@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request),
)
yield self._deactivate_account_handler.deactivate_account(
result = yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
if result:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class EmailThreepidRequestTokenRestServlet(RestServlet):
@@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
user_id = requester.user.to_string()
try:
yield self.auth_handler.delete_threepid(
ret = yield self.auth_handler.delete_threepid(
user_id, body['medium'], body['address']
)
except Exception:
@@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
logger.exception("Failed to remove threepid")
raise SynapseError(500, "Failed to remove threepid")
defer.returnValue((200, {}))
if ret:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class WhoamiRestServlet(RestServlet):

View File

@@ -36,7 +36,7 @@ from synapse.api.errors import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import is_ascii, random_string

View File

@@ -42,7 +42,7 @@ from synapse.http.server import (
)
from synapse.http.servlet import parse_integer, parse_string
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import is_ascii, random_string

View File

@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
return FederationHandlerRegistry()
if self.config.worker_app:
return ReplicationFederationHandlerRegistry(self)
else:
return FederationHandlerRegistry()
def build_server_notices_manager(self):
if self.config.worker_app:

View File

@@ -28,7 +28,7 @@ from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.events.snapshot import EventContext
from synapse.util.async import Linearizer
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function

View File

@@ -4,7 +4,7 @@
<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
<link rel="stylesheet" href="style.css">
<script src="js/jquery-2.1.3.min.js"></script>
<script src="js/recaptcha_ajax.js"></script>
<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
<script src="register_config.js"></script>
<script src="js/register.js"></script>
</head>

File diff suppressed because one or more lines are too long

View File

@@ -39,6 +39,7 @@ from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
from .media_repository import MediaRepositoryStore
from .monthly_active_users import MonthlyActiveUsersStore
from .openid import OpenIdStore
from .presence import PresenceStore, UserPresenceState
from .profile import ProfileStore
@@ -87,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
):
def __init__(self, db_conn, hs):
@@ -94,7 +96,6 @@ class DataStore(RoomMemberStore, RoomStore,
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.db_conn = db_conn
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering",
extra_tables=[("local_invites", "stream_id")]
@@ -267,31 +268,6 @@ class DataStore(RoomMemberStore, RoomStore,
return self.runInteraction("count_users", _count_users)
def count_monthly_users(self):
"""Counts the number of users who used this homeserver in the last 30 days
This method should be refactored with count_daily_users - the only
reason not to is waiting on definition of mau
Returns:
Defered[int]
"""
def _count_monthly_users(txn):
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
sql = """
SELECT COALESCE(count(*), 0) FROM (
SELECT user_id FROM user_ips
WHERE last_seen > ?
GROUP BY user_id
) u
"""
txn.execute(sql, (thirty_days_ago,))
count, = txn.fetchone()
return count
return self.runInteraction("count_monthly_users", _count_monthly_users)
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-

View File

@@ -35,6 +35,7 @@ LAST_SEEN_GRANULARITY = 120 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
def __init__(self, db_conn, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
@@ -74,6 +75,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"before", "shutdown", self._update_client_ips_batch
)
@defer.inlineCallbacks
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
now=None):
if not now:
@@ -84,7 +86,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
yield self.populate_monthly_active_users(user_id)
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
@@ -94,6 +96,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return
def update():
to_update = self._batch_row_update
self._batch_row_update = {}

View File

@@ -38,7 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
@@ -485,9 +485,14 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
if not backfilled:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -1430,88 +1435,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
(event.event_id, event.redacts)
)
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
rows = yield self._simple_select_many_batch(
table="events",
retcols=("event_id",),
column="event_id",
iterable=list(event_ids),
keyvalues={"outlier": False},
desc="have_events_in_timeline",
)
defer.returnValue(set(r["event_id"] for r in rows))
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):
"""

View File

@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry)
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
rows = yield self._simple_select_many_batch(
table="events",
retcols=("event_id",),
column="event_id",
iterable=list(event_ids),
keyvalues={"outlier": False},
desc="have_events_in_timeline",
)
defer.returnValue(set(r["event_id"] for r in rows))
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction("get_rejection_reasons", f)

View File

@@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
# Number of msec of granularity to store the monthly_active_user timestamp
# This means it is not necessary to update the table on every request
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
class MonthlyActiveUsersStore(SQLBaseStore):
def __init__(self, dbconn, hs):
super(MonthlyActiveUsersStore, self).__init__(None, hs)
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
# TODO Why can't I do this in init?
store = self.hs.get_datastore()
reserved_user_list = []
# Do not add more reserved users than the total allowable number
for tp in threepids[:self.hs.config.max_mau_value]:
user_id = yield store.get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
yield self.upsert_monthly_active_user(user_id)
reserved_user_list.append(user_id)
else:
logger.warning(
"mau limit reserved threepid %s not found in db" % tp
)
self.reserved_users = tuple(reserved_user_list)
@defer.inlineCallbacks
def reap_monthly_active_users(self):
"""
Cleans out monthly active user table to ensure that no stale
entries exist.
Returns:
Deferred[]
"""
def _reap_users(txn):
# Purge stale users
thirty_days_ago = (
int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
)
query_args = [thirty_days_ago]
base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
# when len(reserved_users) == 0. Works fine on sqlite.
if len(self.reserved_users) > 0:
# questionmarks is a hack to overcome sqlite not supporting
# tuples in 'WHERE IN %s'
questionmarks = '?' * len(self.reserved_users)
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
','.join(questionmarks)
)
else:
sql = base_sql
txn.execute(sql, query_args)
# If MAU user count still exceeds the MAU threshold, then delete on
# a least recently active basis.
# Note it is not possible to write this query using OFFSET due to
# incompatibilities in how sqlite and postgres support the feature.
# sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
# While Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can
# support
query_args = [self.hs.config.max_mau_value]
base_sql = """
DELETE FROM monthly_active_users
WHERE user_id NOT IN (
SELECT user_id FROM monthly_active_users
ORDER BY timestamp DESC
LIMIT ?
)
"""
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
# when len(reserved_users) == 0. Works fine on sqlite.
if len(self.reserved_users) > 0:
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
','.join(questionmarks)
)
else:
sql = base_sql
txn.execute(sql, query_args)
yield self.runInteraction("reap_monthly_active_users", _reap_users)
# It seems poor to invalidate the whole cache, Postgres supports
# 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead
# I would need to SELECT and the DELETE which without locking
# is racy.
# Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant
self.user_last_seen_monthly_active.invalidate_all()
self.get_monthly_active_count.invalidate_all()
@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
count, = txn.fetchone()
return count
return self.runInteraction("count_users", _count_users)
def upsert_monthly_active_user(self, user_id):
"""
Updates or inserts monthly active user member
Arguments:
user_id (str): user to add/update
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
is_insert = self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
keyvalues={
"user_id": user_id,
},
values={
"timestamp": int(self._clock.time_msec()),
},
lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
user_id (str): user to add/update
Return:
Deferred[int] : timestamp since last seen, None if never seen
"""
return(self._simple_select_one_onecol(
table="monthly_active_users",
keyvalues={
"user_id": user_id,
},
retcol="timestamp",
allow_none=True,
desc="user_last_seen_monthly_active",
))
@defer.inlineCallbacks
def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables
Args:
user_id(str): the user_id to query
"""
if self.hs.config.limit_usage_by_mau:
last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
now = self.hs.get_clock().time_msec()
# We want to reduce to the total number of db writes, and are happy
# to trade accuracy of timestamp in order to lighten load. This means
# We always insert new users (where MAU threshold has not been reached),
# but only update if we have not previously seen the user for
# LAST_SEEN_GRANULARITY ms
if last_seen_timestamp is None:
count = yield self.get_monthly_active_count()
if count < self.hs.config.max_mau_value:
yield self.upsert_monthly_active_user(user_id)
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
yield self.upsert_monthly_active_user(user_id)

View File

@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 50
SCHEMA_VERSION = 51
dir_path = os.path.abspath(os.path.dirname(__file__))

View File

@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
class RoomWorkerStore(SQLBaseStore):
def get_room(self, room_id):
"""Retrieve a room.
Args:
room_id (str): The ID of the room to retrieve.
Returns:
A namedtuple containing the room information, or an empty list.
"""
return self._simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
desc="get_room",
allow_none=True,
)
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
def get_room(self, room_id):
"""Retrieve a room.
Args:
room_id (str): The ID of the room to retrieve.
Returns:
A namedtuple containing the room information, or an empty list.
"""
return self._simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
desc="get_room",
allow_none=True,
)
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):

Some files were not shown because too many files have changed in this diff Show More